Composing Workflows
Overview
Section titled “Overview”Sayiir supports first-class workflow composition via then_flow (Python), thenFlow (Node.js), and then_flow / flow (Rust). A child workflow’s entire pipeline is inlined as a single step in the parent — its output becomes the input to the next step. Task registries are merged automatically.
Basic Composition
Section titled “Basic Composition”Define a child workflow, then inline it in a parent with a single call:
from sayiir import task, Flow, run_workflow
@taskdef validate(order: dict) -> dict: return {**order, "validated": True}
@taskdef charge(order: dict) -> dict: return {"charge_id": f"ch_{order['id']}", "amount": order["amount"]}
@taskdef prepare(order: dict) -> dict: return {**order, "prepared": True}
@taskdef confirm(charge: dict) -> str: return f"Order complete: {charge['charge_id']}"
# Child workflow: handles billingbilling = ( Flow("billing") .then(validate) .then(charge) .build())
# Parent inlines the childorder_pipeline = ( Flow("order-pipeline") .then(prepare) .then_flow(billing) .then(confirm) .build())
result = run_workflow(order_pipeline, {"id": "abc", "amount": 49.99})# "Order complete: ch_abc"import { task, flow, runWorkflow } from "sayiir";
const validate = task("validate", (order: { id: string; amount: number }) => ({ ...order, validated: true,}));
const charge = task("charge", (order: { id: string; amount: number }) => ({ chargeId: `ch_${order.id}`, amount: order.amount,}));
// Child workflow: handles billingconst billing = flow<{ id: string; amount: number }>("billing") .then(validate) .then(charge) .build();
// Parent inlines the childconst orderPipeline = flow<{ id: string; amount: number }>("order-pipeline") .then("prepare", (order) => ({ ...order, prepared: true })) .thenFlow(billing) .then("confirm", (charge) => `Order complete: ${charge.chargeId}`) .build();
const result = await runWorkflow(orderPipeline, { id: "abc", amount: 49.99,});// "Order complete: ch_abc"use sayiir_macros::{task, workflow};
#[task]async fn validate(order: Order) -> Result<Order, BoxError> { Ok(Order { validated: true, ..order })}
#[task]async fn charge(order: Order) -> Result<Charge, BoxError> { Ok(Charge { charge_id: format!("ch_{}", order.id), amount: order.amount })}
#[task]async fn prepare(order: Order) -> Result<Order, BoxError> { Ok(Order { prepared: true, ..order })}
#[task]async fn confirm(charge: Charge) -> Result<String, BoxError> { Ok(format!("Order complete: {}", charge.charge_id))}
// Child workflowlet billing = workflow! { name: "billing", steps: [validate, charge]}.unwrap();
// Parent inlines the childlet order_pipeline = workflow! { name: "order-pipeline", steps: [prepare, flow billing, confirm]}.unwrap();How It Works
Section titled “How It Works”When you call thenFlow / then_flow / flow:
- The child workflow’s continuation tree is embedded as a
ChildWorkflownode in the parent - The child’s task registry is merged into the parent’s (tasks are deduplicated by ID)
- At execution time, the engine walks into the child’s steps, runs them in sequence, and passes the final output to the next parent step
The child workflow is not a separate execution — it runs inline within the parent’s execution context. This means:
- No extra instance IDs or lifecycle management needed
- Checkpointing covers both parent and child steps
- Errors propagate naturally to the parent
Multiple Children
Section titled “Multiple Children”You can inline multiple child workflows in a single parent:
enrichment = Flow("enrich").then(geocode).then(score).build()billing = Flow("billing").then(validate).then(charge).build()
pipeline = ( Flow("full-pipeline") .then(ingest) .then_flow(enrichment) .then_flow(billing) .then(notify) .build())const enrichment = flow<Lead>("enrich") .then(geocode) .then(score) .build();
const billing = flow<ScoredLead>("billing") .then(validate) .then(charge) .build();
const pipeline = flow<Lead>("full-pipeline") .then(ingest) .thenFlow(enrichment) .thenFlow(billing) .then(notify) .build();let enrichment = workflow! { name: "enrich", steps: [geocode, score]}.unwrap();
let billing = workflow! { name: "billing", steps: [validate, charge]}.unwrap();
let pipeline = workflow! { name: "full-pipeline", steps: [ingest, flow enrichment, flow billing, notify]}.unwrap();Sharing dependencies across parent and child
Section titled “Sharing dependencies across parent and child”When child workflows contain #[task] functions with #[inject] parameters, build all the workflows with the same Deps container so the shared services flow through automatically:
use sayiir_runtime::prelude::*;
let deps = Deps::builder() .insert(Arc::new(StripeClient::new("sk_test_..."))) .insert(Arc::new(Mailer::new(/* … */))) .build();
let billing = workflow! { name: "billing", deps: &deps, steps: [validate, charge]}.unwrap();
let pipeline = workflow! { name: "full-pipeline", deps: &deps, // same container — child + parent agree steps: [ingest, flow billing, notify]}.unwrap();You only construct the services once — from_deps resolves them by type into each task instance, and verify_deps fails the build immediately if either workflow forgets to register a dependency.
When to Compose
Section titled “When to Compose”Workflow composition is useful when:
- Reuse — The same sequence of tasks appears in multiple parent workflows (e.g., billing, notification, approval)
- Separation of concerns — Teams own different sub-workflows independently
- Testing — Sub-workflows can be tested and run in isolation
- Readability — Breaking a large workflow into named sub-workflows clarifies intent
Reusable Task Libraries
Section titled “Reusable Task Libraries”Beyond inlining child workflows, you can build standalone task libraries — reusable collections of tasks that you merge into any workflow or worker. This is useful when multiple workflows share the same tasks but don’t share the same pipeline structure.
TaskRegistry::register_from_deps::<T, C>(codec, &deps) builds and registers a #[task] in one call. Pair it with a shared Deps container so library functions take services by type rather than threading each service through positional arguments:
use sayiir_runtime::prelude::*;use std::sync::Arc;
// billing_tasks crate — define tasks with the macro#[task(id = "charge", retries = 2, timeout = "30s")]async fn charge( order: Order, #[inject] stripe: Arc<StripeClient>,) -> Result<Receipt, BoxError> { stripe.charge(&order).await}
#[task(id = "refund", retries = 2, timeout = "30s")]async fn refund( order: Order, #[inject] stripe: Arc<StripeClient>,) -> Result<Refund, BoxError> { stripe.refund(&order).await}
/// Build a registry containing all billing tasks.pub fn billing_tasks( codec: Arc<JsonCodec>, deps: &Deps,) -> Result<TaskRegistry, Vec<MissingDep>> { let mut reg = TaskRegistry::new(); reg.register_from_deps::<ChargeTask, _>(codec.clone(), deps)?; reg.register_from_deps::<RefundTask, _>(codec, deps)?; Ok(reg)}// notification_tasks crate#[task(id = "send_email", retries = 3, timeout = "10s")]async fn send_email( data: EmailPayload, #[inject] mailer: Arc<Mailer>,) -> Result<EmailResult, BoxError> { mailer.send(&data).await}
pub fn notification_tasks( codec: Arc<JsonCodec>, deps: &Deps,) -> Result<TaskRegistry, Vec<MissingDep>> { let mut reg = TaskRegistry::new(); reg.register_from_deps::<SendEmailTask, _>(codec, deps)?; Ok(reg)}register_from_deps calls DepsInjectable::verify_deps first, so a missing Arc<StripeClient> or Arc<Mailer> is reported as Err(Vec<MissingDep>) before any task is added — the registry is never left partially populated.
Then in your application, build one shared Deps and merge the library registries:
// application — combine libraries for a workerlet codec = Arc::new(JsonCodec);let deps = Deps::builder() .insert(Arc::new(StripeClient::new("sk_live_..."))) .insert(Arc::new(Mailer::new(/* … */))) .build();
let mut registry = TaskRegistry::new();registry.merge(billing_tasks(codec.clone(), &deps)?);registry.merge(notification_tasks(codec, &deps)?);
let worker = PooledWorker::new("worker-1", backend, registry);On ID collisions, the parent registry wins — existing entries are not overwritten. This lets you layer overrides on top of library defaults.