Parallel & Conditional Workflows
Fork/Join Model
Section titled “Fork/Join Model”Sayiir supports fork/join parallelism, allowing you to run multiple branches in parallel and collect their results in a join task. This pattern is ideal for independent operations that can execute concurrently, such as validating payment while checking inventory.
Basic Fork/Join
Section titled “Basic Fork/Join”The fork/join pattern splits execution into multiple parallel branches, then merges the results:
from sayiir import task, Flow, run_workflow
@taskdef validate_payment(order: dict) -> dict: return {"payment": "valid"}
@taskdef check_inventory(order: dict) -> dict: return {"stock": "available"}
@taskdef finalize(results: dict) -> str: return f"Order complete: {results}"
workflow = ( Flow("checkout") .fork() .branch(validate_payment) .branch(check_inventory) .join(finalize) .build())
result = run_workflow(workflow, {"order_id": 1})import { task, flow, branch, runWorkflow } from "sayiir";
const validatePayment = task("validate_payment", (order: Record<string, unknown>) => { return { payment: "valid" };});
const checkInventory = task("check_inventory", (order: Record<string, unknown>) => { return { stock: "available" };});
const workflow = flow<Record<string, unknown>>("checkout") .fork([ branch("validate_payment", validatePayment), branch("check_inventory", checkInventory), ]) .join("finalize", ([payment, inventory]) => { return `Order complete: payment=${JSON.stringify(payment)}, stock=${JSON.stringify(inventory)}`; }) .build();
const result = await runWorkflow(workflow, { order_id: 1 });let workflow = WorkflowBuilder::new(ctx) .with_registry() .then_task::<FetchOrderTask>() .fork() .add_task::<ValidatePaymentTask>() .add_task::<CheckInventoryTask>() .add_task::<CalculateShippingTask>() .join("finalize_order", |results: BranchOutputs<JsonCodec>| async move { let payment = results.get::<ValidatePaymentTask>()?; let inventory = results.get::<CheckInventoryTask>()?; let shipping = results.get::<CalculateShippingTask>()?; Ok(Order::finalize(payment, inventory, shipping)) }) .build()?;Workflow Macro Syntax
Section titled “Workflow Macro Syntax”Rust also supports a concise macro syntax for fork/join:
let wf = workflow! { name: "checkout", registry: registry, steps: [validate_payment || check_inventory, finalize]};The || operator denotes parallel execution, and , separates sequential steps including the join handler.
Multi-Step Branches
Section titled “Multi-Step Branches”In Python, you can chain multiple steps within a single branch using .branch(step_a, step_b):
workflow = ( Flow("complex_checkout") .fork() .branch(validate_payment, charge_card) .branch(check_inventory, reserve_items) .branch(calculate_shipping, book_courier) .join(finalize_order) .build())Each branch executes its steps sequentially, but branches run in parallel with each other.
Join Handler
Section titled “Join Handler”The join task receives a dictionary (Python), tuple of branch results (Node.js), or map (Rust) containing the results from all branches:
@taskdef finalize(results: dict) -> str: payment = results["validate_payment"] inventory = results["check_inventory"] shipping = results["calculate_shipping"]
return f"Order processed: payment={payment}, stock={inventory}, shipping={shipping}"// Join receives a tuple of branch results in declaration order.join("finalize", ([payment, inventory, shipping]) => { return `Order processed: payment=${JSON.stringify(payment)}, stock=${JSON.stringify(inventory)}, shipping=${JSON.stringify(shipping)}`;}).join("finalize_order", |results: BranchOutputs<JsonCodec>| async move { let payment: PaymentResult = results.get_by_id(ValidatePaymentTask::task_id())?; let inventory: InventoryResult = results.get_by_id(CheckInventoryTask::task_id())?; let shipping: ShippingResult = results.get_by_id(CalculateShippingTask::task_id())?;
Ok(format!( "Order processed: payment={:?}, stock={:?}, shipping={:?}", payment, inventory, shipping ))})The join task only executes after all branches complete successfully. If any branch fails, the entire fork/join fails and triggers workflow retry logic.
Conditional Branching
Section titled “Conditional Branching”While fork/join runs all branches in parallel, conditional branching routes execution to exactly one branch based on a runtime value. A key function extracts a routing key from the previous step’s output, and the matching branch runs. All languages now support declaring the set of valid keys upfront, enabling exhaustiveness checks at build time.
from sayiir import task, Flow, run_workflow
@taskdef classify(ticket: dict) -> str: return "billing" if ticket["type"] == "invoice" else "tech"
@taskdef handle_billing(ticket: dict) -> str: return f"Billing resolved: {ticket['id']}"
@taskdef handle_tech(ticket: dict) -> str: return f"Tech resolved: {ticket['id']}"
@taskdef fallback(ticket: dict) -> str: return f"General: {ticket['id']}"
workflow = ( Flow("support-router") .route(classify, keys=["billing", "tech"]) .branch("billing", handle_billing) .branch("tech", handle_tech) .default_branch(fallback) .done() .build())
result = run_workflow(workflow, {"id": 1, "type": "invoice"})# {"branch": "billing", "result": "Billing resolved: 1"}import { task, flow, runWorkflow } from "sayiir";
const classify = task("classify", (ticket: { id: number; type: string }) => { return ticket.type === "invoice" ? "billing" : "tech";});
const handleBilling = task("handle-billing", (ticket: { id: number }) => { return `Billing resolved: ${ticket.id}`;});
const handleTech = task("handle-tech", (ticket: { id: number }) => { return `Tech resolved: ${ticket.id}`;});
const workflow = flow<{ id: number; type: string }>("support-router") .route( (ticket) => ticket.type === "invoice" ? "billing" : "tech", ["billing", "tech"] as const ) .branch("billing", handleBilling) .branch("tech", handleTech) .defaultBranch("fallback", (ticket) => `General: ${ticket.id}`) .done() .build();
const result = await runWorkflow(workflow, { id: 1, type: "invoice" });// { branch: "billing", result: "Billing resolved: 1" }// Builder API — #[derive(BranchKey)] gives static type checking:// the compiler rejects typos, missing branches, and orphan keys.#[derive(BranchKey)]enum Route { Billing, Tech,}
let wf = WorkflowBuilder::new(ctx) .with_registry() .then_task::<ClassifyTask>() .route::<_, Route, _, _>( |input: ClassifyOutput| async move { Ok(if input.intent == "billing" { Route::Billing } else { Route::Tech }) }) .branch(Route::Billing, |sub| sub.then_task::<HandleBillingTask>()) .branch(Route::Tech, |sub| sub.then_task::<HandleTechTask>()) .default_branch(|sub| sub.then_task::<FallbackTask>()) .done() .build()?;
// Or using the workflow! macro with typed keyslet wf = workflow! { name: "support-router", registry: registry, steps: [ classify_ticket, route extract_intent -> Route { Billing => [handle_billing], Tech => [handle_tech], _ => [fallback], } ]};
// String-based keys also work (without `-> Type`)let wf = workflow! { name: "support-router", registry: registry, steps: [ classify_ticket, route extract_intent { "billing" => [handle_billing], "tech" => [handle_tech], _ => [fallback], } ]};How It Works
Section titled “How It Works”- Declare your keys upfront —
keys=in Python,as constarray in Node.js,#[derive(BranchKey)]enum in Rust. This is the recommended approach across all three languages: it catches typos and missing branches at build time - The key function receives the current step’s output and returns one of the declared keys
- Sayiir matches the key against named branches — only the matching branch executes
- If no branch matches and a default is set, the default runs; otherwise the workflow fails with
BranchKeyNotFound .done()performs exhaustiveness checks — missing or orphan branches are caught at build time. In Rust,#[derive(BranchKey)]enums give you compile-time safety: the type system rejects typos and missing cases before the code runs. Theworkflow!macro supports typed keys viaroute key_fn -> EnumType { Variant => [...] }, which generates a compile-time match exhaustiveness check- The output is a
BranchEnvelopecontainingbranch(the matched key) andresult(the branch output) - The branch result is checkpointed — on resume, the key function is not re-evaluated
Multi-Step Branches
Section titled “Multi-Step Branches”Each branch can contain a chain of tasks:
workflow = ( Flow("order-router") .route(classify_order, keys=["express", "standard"]) .branch("express", validate_express, ship_express, notify_customer) .branch("standard", validate_standard, ship_standard) .done() .then(send_confirmation) .build())Combining with Fork/Join
Section titled “Combining with Fork/Join”Conditional branching composes naturally with other workflow primitives. You can branch after a fork/join, fork within a branch, or chain additional steps after a branch: