Skip to content

Composing Workflows

Sayiir supports first-class workflow composition via then_flow (Python), thenFlow (Node.js), and then_flow / flow (Rust). A child workflow’s entire pipeline is inlined as a single step in the parent — its output becomes the input to the next step. Task registries are merged automatically.

Define a child workflow, then inline it in a parent with a single call:

from sayiir import task, Flow, run_workflow
@task
def validate(order: dict) -> dict:
return {**order, "validated": True}
@task
def charge(order: dict) -> dict:
return {"charge_id": f"ch_{order['id']}", "amount": order["amount"]}
@task
def prepare(order: dict) -> dict:
return {**order, "prepared": True}
@task
def confirm(charge: dict) -> str:
return f"Order complete: {charge['charge_id']}"
# Child workflow: handles billing
billing = (
Flow("billing")
.then(validate)
.then(charge)
.build()
)
# Parent inlines the child
order_pipeline = (
Flow("order-pipeline")
.then(prepare)
.then_flow(billing)
.then(confirm)
.build()
)
result = run_workflow(order_pipeline, {"id": "abc", "amount": 49.99})
# "Order complete: ch_abc"

When you call thenFlow / then_flow / flow:

  1. The child workflow’s continuation tree is embedded as a ChildWorkflow node in the parent
  2. The child’s task registry is merged into the parent’s (tasks are deduplicated by ID)
  3. At execution time, the engine walks into the child’s steps, runs them in sequence, and passes the final output to the next parent step

The child workflow is not a separate execution — it runs inline within the parent’s execution context. This means:

  • No extra instance IDs or lifecycle management needed
  • Checkpointing covers both parent and child steps
  • Errors propagate naturally to the parent

You can inline multiple child workflows in a single parent:

enrichment = Flow("enrich").then(geocode).then(score).build()
billing = Flow("billing").then(validate).then(charge).build()
pipeline = (
Flow("full-pipeline")
.then(ingest)
.then_flow(enrichment)
.then_flow(billing)
.then(notify)
.build()
)

Workflow composition is useful when:

  • Reuse — The same sequence of tasks appears in multiple parent workflows (e.g., billing, notification, approval)
  • Separation of concerns — Teams own different sub-workflows independently
  • Testing — Sub-workflows can be tested and run in isolation
  • Readability — Breaking a large workflow into named sub-workflows clarifies intent

Beyond inlining child workflows, you can build standalone task libraries — reusable collections of tasks that you merge into any workflow or worker. This is useful when multiple workflows share the same tasks but don’t share the same pipeline structure.

TaskRegistry::register_from_deps::<T, C>(codec, &deps) builds and registers a #[task] in one call. Pair it with a shared Deps container so library functions take services by type rather than threading each service through positional arguments:

use sayiir_runtime::prelude::*;
use std::sync::Arc;
// billing_tasks crate — define tasks with the macro
#[task(id = "charge", retries = 2, timeout = "30s")]
async fn charge(
order: Order,
#[inject] stripe: Arc<StripeClient>,
) -> Result<Receipt, BoxError> {
stripe.charge(&order).await
}
#[task(id = "refund", retries = 2, timeout = "30s")]
async fn refund(
order: Order,
#[inject] stripe: Arc<StripeClient>,
) -> Result<Refund, BoxError> {
stripe.refund(&order).await
}
/// Build a registry containing all billing tasks.
pub fn billing_tasks(
codec: Arc<JsonCodec>,
deps: &Deps,
) -> Result<TaskRegistry, Vec<MissingDep>> {
let mut reg = TaskRegistry::new();
reg.register_from_deps::<ChargeTask, _>(codec.clone(), deps)?;
reg.register_from_deps::<RefundTask, _>(codec, deps)?;
Ok(reg)
}
// notification_tasks crate
#[task(id = "send_email", retries = 3, timeout = "10s")]
async fn send_email(
data: EmailPayload,
#[inject] mailer: Arc<Mailer>,
) -> Result<EmailResult, BoxError> {
mailer.send(&data).await
}
pub fn notification_tasks(
codec: Arc<JsonCodec>,
deps: &Deps,
) -> Result<TaskRegistry, Vec<MissingDep>> {
let mut reg = TaskRegistry::new();
reg.register_from_deps::<SendEmailTask, _>(codec, deps)?;
Ok(reg)
}

register_from_deps calls DepsInjectable::verify_deps first, so a missing Arc<StripeClient> or Arc<Mailer> is reported as Err(Vec<MissingDep>) before any task is added — the registry is never left partially populated.

Then in your application, build one shared Deps and merge the library registries:

// application — combine libraries for a worker
let codec = Arc::new(JsonCodec);
let deps = Deps::builder()
.insert(Arc::new(StripeClient::new("sk_live_...")))
.insert(Arc::new(Mailer::new(/* … */)))
.build();
let mut registry = TaskRegistry::new();
registry.merge(billing_tasks(codec.clone(), &deps)?);
registry.merge(notification_tasks(codec, &deps)?);
let worker = PooledWorker::new("worker-1", backend, registry);

On ID collisions, the parent registry wins — existing entries are not overwritten. This lets you layer overrides on top of library defaults.