Skip to content

Parallel & Conditional Workflows

Sayiir supports fork/join parallelism, allowing you to run multiple branches in parallel and collect their results in a join task. This pattern is ideal for independent operations that can execute concurrently, such as validating payment while checking inventory.

The fork/join pattern splits execution into multiple parallel branches, then merges the results:

from sayiir import task, Flow, run_workflow
@task
def validate_payment(order: dict) -> dict:
return {"payment": "valid"}
@task
def check_inventory(order: dict) -> dict:
return {"stock": "available"}
@task
def finalize(results: dict) -> str:
return f"Order complete: {results}"
workflow = (
Flow("checkout")
.fork()
.branch(validate_payment)
.branch(check_inventory)
.join(finalize)
.build()
)
result = run_workflow(workflow, {"order_id": 1})

Rust also supports a concise macro syntax for fork/join:

let wf = workflow! {
name: "checkout",
registry: registry,
steps: [validate_payment || check_inventory, finalize]
};

The || operator denotes parallel execution, and , separates sequential steps including the join handler.

In Python, you can chain multiple steps within a single branch using .branch(step_a, step_b):

workflow = (
Flow("complex_checkout")
.fork()
.branch(validate_payment, charge_card)
.branch(check_inventory, reserve_items)
.branch(calculate_shipping, book_courier)
.join(finalize_order)
.build()
)

Each branch executes its steps sequentially, but branches run in parallel with each other.

The join task receives a dictionary (Python), tuple of branch results (Node.js), or map (Rust) containing the results from all branches:

@task
def finalize(results: dict) -> str:
payment = results["validate_payment"]
inventory = results["check_inventory"]
shipping = results["calculate_shipping"]
return f"Order processed: payment={payment}, stock={inventory}, shipping={shipping}"

The join task only executes after all branches complete successfully. If any branch fails, the entire fork/join fails and triggers workflow retry logic.

While fork/join runs all branches in parallel, conditional branching routes execution to exactly one branch based on a runtime value. A key function extracts a routing key from the previous step’s output, and the matching branch runs. All languages now support declaring the set of valid keys upfront, enabling exhaustiveness checks at build time.

from sayiir import task, Flow, run_workflow
@task
def classify(ticket: dict) -> str:
return "billing" if ticket["type"] == "invoice" else "tech"
@task
def handle_billing(ticket: dict) -> str:
return f"Billing resolved: {ticket['id']}"
@task
def handle_tech(ticket: dict) -> str:
return f"Tech resolved: {ticket['id']}"
@task
def fallback(ticket: dict) -> str:
return f"General: {ticket['id']}"
workflow = (
Flow("support-router")
.route(classify, keys=["billing", "tech"])
.branch("billing", handle_billing)
.branch("tech", handle_tech)
.default_branch(fallback)
.done()
.build()
)
result = run_workflow(workflow, {"id": 1, "type": "invoice"})
# {"branch": "billing", "result": "Billing resolved: 1"}
  1. Declare your keys upfrontkeys= in Python, as const array in Node.js, #[derive(BranchKey)] enum in Rust. This is the recommended approach across all three languages: it catches typos and missing branches at build time
  2. The key function receives the current step’s output and returns one of the declared keys
  3. Sayiir matches the key against named branches — only the matching branch executes
  4. If no branch matches and a default is set, the default runs; otherwise the workflow fails with BranchKeyNotFound
  5. .done() performs exhaustiveness checks — missing or orphan branches are caught at build time. In Rust, #[derive(BranchKey)] enums give you compile-time safety: the type system rejects typos and missing cases before the code runs. The workflow! macro supports typed keys via route key_fn -> EnumType { Variant => [...] }, which generates a compile-time match exhaustiveness check
  6. The output is a BranchEnvelope containing branch (the matched key) and result (the branch output)
  7. The branch result is checkpointed — on resume, the key function is not re-evaluated

Each branch can contain a chain of tasks:

workflow = (
Flow("order-router")
.route(classify_order, keys=["express", "standard"])
.branch("express", validate_express, ship_express, notify_customer)
.branch("standard", validate_standard, ship_standard)
.done()
.then(send_confirmation)
.build()
)

Conditional branching composes naturally with other workflow primitives. You can branch after a fork/join, fork within a branch, or chain additional steps after a branch: