Skip to content

Composing Workflows

Sayiir supports first-class workflow composition via then_flow (Python), thenFlow (Node.js), and then_flow / flow (Rust). A child workflow’s entire pipeline is inlined as a single step in the parent — its output becomes the input to the next step. Task registries are merged automatically.

Define a child workflow, then inline it in a parent with a single call:

from sayiir import task, Flow, run_workflow
@task
def validate(order: dict) -> dict:
return {**order, "validated": True}
@task
def charge(order: dict) -> dict:
return {"charge_id": f"ch_{order['id']}", "amount": order["amount"]}
@task
def prepare(order: dict) -> dict:
return {**order, "prepared": True}
@task
def confirm(charge: dict) -> str:
return f"Order complete: {charge['charge_id']}"
# Child workflow: handles billing
billing = (
Flow("billing")
.then(validate)
.then(charge)
.build()
)
# Parent inlines the child
order_pipeline = (
Flow("order-pipeline")
.then(prepare)
.then_flow(billing)
.then(confirm)
.build()
)
result = run_workflow(order_pipeline, {"id": "abc", "amount": 49.99})
# "Order complete: ch_abc"

When you call thenFlow / then_flow / flow:

  1. The child workflow’s continuation tree is embedded as a ChildWorkflow node in the parent
  2. The child’s task registry is merged into the parent’s (tasks are deduplicated by ID)
  3. At execution time, the engine walks into the child’s steps, runs them in sequence, and passes the final output to the next parent step

The child workflow is not a separate execution — it runs inline within the parent’s execution context. This means:

  • No extra instance IDs or lifecycle management needed
  • Checkpointing covers both parent and child steps
  • Errors propagate naturally to the parent

You can inline multiple child workflows in a single parent:

enrichment = Flow("enrich").then(geocode).then(score).build()
billing = Flow("billing").then(validate).then(charge).build()
pipeline = (
Flow("full-pipeline")
.then(ingest)
.then_flow(enrichment)
.then_flow(billing)
.then(notify)
.build()
)

Workflow composition is useful when:

  • Reuse — The same sequence of tasks appears in multiple parent workflows (e.g., billing, notification, approval)
  • Separation of concerns — Teams own different sub-workflows independently
  • Testing — Sub-workflows can be tested and run in isolation
  • Readability — Breaking a large workflow into named sub-workflows clarifies intent

Beyond inlining child workflows, you can build standalone task libraries — reusable collections of tasks that you merge into any workflow or worker. This is useful when multiple workflows share the same tasks but don’t share the same pipeline structure.

The #[task] macro generates a register() method on each task struct, so building a library is just a function that populates a TaskRegistry:

use sayiir_runtime::prelude::*;
use std::sync::Arc;
// billing_tasks crate — define tasks with the macro
#[task(id = "charge", retries = 2, timeout = "30s")]
async fn charge(
order: Order,
#[inject] stripe: Arc<StripeClient>,
) -> Result<Receipt, BoxError> {
stripe.charge(&order).await
}
#[task(id = "refund", retries = 2, timeout = "30s")]
async fn refund(
order: Order,
#[inject] stripe: Arc<StripeClient>,
) -> Result<Refund, BoxError> {
stripe.refund(&order).await
}
/// Build a registry containing all billing tasks.
pub fn billing_tasks(codec: Arc<JsonCodec>, stripe: Arc<StripeClient>) -> TaskRegistry {
let mut reg = TaskRegistry::new();
ChargeTask::register(&mut reg, codec.clone(), ChargeTask::new(stripe.clone()));
RefundTask::register(&mut reg, codec.clone(), RefundTask::new(stripe));
reg
}
// notification_tasks crate
#[task(id = "send_email", retries = 3, timeout = "10s")]
async fn send_email(
data: EmailPayload,
#[inject] mailer: Arc<Mailer>,
) -> Result<EmailResult, BoxError> {
mailer.send(&data).await
}
pub fn notification_tasks(codec: Arc<JsonCodec>, mailer: Arc<Mailer>) -> TaskRegistry {
let mut reg = TaskRegistry::new();
SendEmailTask::register(&mut reg, codec.clone(), SendEmailTask::new(mailer));
reg
}

Then in your application, merge libraries into a single registry:

// application — combine libraries for a worker
let codec = Arc::new(JsonCodec);
let mut registry = TaskRegistry::new();
registry.merge(billing_tasks(codec.clone(), stripe));
registry.merge(notification_tasks(codec.clone(), mailer));
let worker = PooledWorker::new("worker-1", backend, registry);

On ID collisions, the parent registry wins — existing entries are not overwritten. This lets you layer overrides on top of library defaults.