Composing Workflows
Overview
Section titled “Overview”Sayiir supports first-class workflow composition via then_flow (Python), thenFlow (Node.js), and then_flow / flow (Rust). A child workflow’s entire pipeline is inlined as a single step in the parent — its output becomes the input to the next step. Task registries are merged automatically.
Basic Composition
Section titled “Basic Composition”Define a child workflow, then inline it in a parent with a single call:
from sayiir import task, Flow, run_workflow
@taskdef validate(order: dict) -> dict: return {**order, "validated": True}
@taskdef charge(order: dict) -> dict: return {"charge_id": f"ch_{order['id']}", "amount": order["amount"]}
@taskdef prepare(order: dict) -> dict: return {**order, "prepared": True}
@taskdef confirm(charge: dict) -> str: return f"Order complete: {charge['charge_id']}"
# Child workflow: handles billingbilling = ( Flow("billing") .then(validate) .then(charge) .build())
# Parent inlines the childorder_pipeline = ( Flow("order-pipeline") .then(prepare) .then_flow(billing) .then(confirm) .build())
result = run_workflow(order_pipeline, {"id": "abc", "amount": 49.99})# "Order complete: ch_abc"import { task, flow, runWorkflow } from "sayiir";
const validate = task("validate", (order: { id: string; amount: number }) => ({ ...order, validated: true,}));
const charge = task("charge", (order: { id: string; amount: number }) => ({ chargeId: `ch_${order.id}`, amount: order.amount,}));
// Child workflow: handles billingconst billing = flow<{ id: string; amount: number }>("billing") .then(validate) .then(charge) .build();
// Parent inlines the childconst orderPipeline = flow<{ id: string; amount: number }>("order-pipeline") .then("prepare", (order) => ({ ...order, prepared: true })) .thenFlow(billing) .then("confirm", (charge) => `Order complete: ${charge.chargeId}`) .build();
const result = await runWorkflow(orderPipeline, { id: "abc", amount: 49.99,});// "Order complete: ch_abc"use sayiir_macros::{task, workflow};
#[task]async fn validate(order: Order) -> Result<Order, BoxError> { Ok(Order { validated: true, ..order })}
#[task]async fn charge(order: Order) -> Result<Charge, BoxError> { Ok(Charge { charge_id: format!("ch_{}", order.id), amount: order.amount })}
#[task]async fn prepare(order: Order) -> Result<Order, BoxError> { Ok(Order { prepared: true, ..order })}
#[task]async fn confirm(charge: Charge) -> Result<String, BoxError> { Ok(format!("Order complete: {}", charge.charge_id))}
// Child workflowlet billing = workflow! { name: "billing", steps: [validate, charge]}.unwrap();
// Parent inlines the childlet order_pipeline = workflow! { name: "order-pipeline", steps: [prepare, flow billing, confirm]}.unwrap();How It Works
Section titled “How It Works”When you call thenFlow / then_flow / flow:
- The child workflow’s continuation tree is embedded as a
ChildWorkflownode in the parent - The child’s task registry is merged into the parent’s (tasks are deduplicated by ID)
- At execution time, the engine walks into the child’s steps, runs them in sequence, and passes the final output to the next parent step
The child workflow is not a separate execution — it runs inline within the parent’s execution context. This means:
- No extra instance IDs or lifecycle management needed
- Checkpointing covers both parent and child steps
- Errors propagate naturally to the parent
Multiple Children
Section titled “Multiple Children”You can inline multiple child workflows in a single parent:
enrichment = Flow("enrich").then(geocode).then(score).build()billing = Flow("billing").then(validate).then(charge).build()
pipeline = ( Flow("full-pipeline") .then(ingest) .then_flow(enrichment) .then_flow(billing) .then(notify) .build())const enrichment = flow<Lead>("enrich") .then(geocode) .then(score) .build();
const billing = flow<ScoredLead>("billing") .then(validate) .then(charge) .build();
const pipeline = flow<Lead>("full-pipeline") .then(ingest) .thenFlow(enrichment) .thenFlow(billing) .then(notify) .build();let enrichment = workflow! { name: "enrich", steps: [geocode, score]}.unwrap();
let billing = workflow! { name: "billing", steps: [validate, charge]}.unwrap();
let pipeline = workflow! { name: "full-pipeline", steps: [ingest, flow enrichment, flow billing, notify]}.unwrap();When to Compose
Section titled “When to Compose”Workflow composition is useful when:
- Reuse — The same sequence of tasks appears in multiple parent workflows (e.g., billing, notification, approval)
- Separation of concerns — Teams own different sub-workflows independently
- Testing — Sub-workflows can be tested and run in isolation
- Readability — Breaking a large workflow into named sub-workflows clarifies intent
Reusable Task Libraries
Section titled “Reusable Task Libraries”Beyond inlining child workflows, you can build standalone task libraries — reusable collections of tasks that you merge into any workflow or worker. This is useful when multiple workflows share the same tasks but don’t share the same pipeline structure.
The #[task] macro generates a register() method on each task struct, so building a library is just a function that populates a TaskRegistry:
use sayiir_runtime::prelude::*;use std::sync::Arc;
// billing_tasks crate — define tasks with the macro#[task(id = "charge", retries = 2, timeout = "30s")]async fn charge( order: Order, #[inject] stripe: Arc<StripeClient>,) -> Result<Receipt, BoxError> { stripe.charge(&order).await}
#[task(id = "refund", retries = 2, timeout = "30s")]async fn refund( order: Order, #[inject] stripe: Arc<StripeClient>,) -> Result<Refund, BoxError> { stripe.refund(&order).await}
/// Build a registry containing all billing tasks.pub fn billing_tasks(codec: Arc<JsonCodec>, stripe: Arc<StripeClient>) -> TaskRegistry { let mut reg = TaskRegistry::new(); ChargeTask::register(&mut reg, codec.clone(), ChargeTask::new(stripe.clone())); RefundTask::register(&mut reg, codec.clone(), RefundTask::new(stripe)); reg}// notification_tasks crate#[task(id = "send_email", retries = 3, timeout = "10s")]async fn send_email( data: EmailPayload, #[inject] mailer: Arc<Mailer>,) -> Result<EmailResult, BoxError> { mailer.send(&data).await}
pub fn notification_tasks(codec: Arc<JsonCodec>, mailer: Arc<Mailer>) -> TaskRegistry { let mut reg = TaskRegistry::new(); SendEmailTask::register(&mut reg, codec.clone(), SendEmailTask::new(mailer)); reg}Then in your application, merge libraries into a single registry:
// application — combine libraries for a workerlet codec = Arc::new(JsonCodec);let mut registry = TaskRegistry::new();registry.merge(billing_tasks(codec.clone(), stripe));registry.merge(notification_tasks(codec.clone(), mailer));
let worker = PooledWorker::new("worker-1", backend, registry);On ID collisions, the parent registry wins — existing entries are not overwritten. This lets you layer overrides on top of library defaults.