Distributed Workers
When to Go Distributed
Section titled “When to Go Distributed”Single-process workflow execution is sufficient for most workloads. Consider distributed workers when you need:
- Horizontal scaling — Handle high throughput by adding more worker processes
- High availability — Ensure workflows continue if a worker crashes
- Resource isolation — Separate CPU-intensive or memory-heavy tasks across machines
How It Works
Section titled “How It Works”A distributed worker polls a shared backend (PostgreSQL) for claimable tasks, executes them using locally registered task functions, and checkpoints the result. Multiple workers can run across machines or processes, all polling the same backend.
Sayiir needs two pieces of information per worker:
- Workflows — The structural definitions (DAG shape, retry policies, timeouts). Workers use the definition hash to ensure all workers agree on the workflow version.
- Task functions — The executable code for each task node. In Python and Node.js, these are automatically extracted from the compiled
Workflowobjects. In Rust, tasks annotated with#[task]are registered automatically by theworkflow!macro — no manual registry setup needed.
Setting Up a Worker
Section titled “Setting Up a Worker”import signalfrom sayiir import task, Flow, PostgresBackendfrom sayiir.worker import Worker
@taskdef fetch_order(order_id: int) -> dict: return {"order_id": order_id, "total": 99.99}
@taskdef charge_payment(order: dict) -> str: return f"charged ${order['total']}"
@taskdef send_confirmation(receipt: str) -> str: return f"confirmed: {receipt}"
workflow = ( Flow("order-pipeline") .then(fetch_order) .then(charge_payment) .then(send_confirmation) .build())
backend = PostgresBackend("postgresql://localhost/sayiir")worker = Worker("worker-1", backend, poll_interval=2.0)handle = worker.start([workflow])
# Graceful shutdown on SIGTERMsignal.signal(signal.SIGTERM, lambda *_: handle.shutdown())handle.join() # blocks until shutdown completesimport { task, flow, PostgresBackend, Worker } from "sayiir";
const fetchOrder = task("fetch-order", (orderId: number) => { return { orderId, total: 99.99 };});
const chargePayment = task("charge-payment", (order: { total: number }) => { return `charged $${order.total}`;});
const sendConfirmation = task("send-confirmation", (receipt: string) => { return `confirmed: ${receipt}`;});
const workflow = flow<number>("order-pipeline") .then(fetchOrder) .then(chargePayment) .then(sendConfirmation) .build();
const backend = PostgresBackend.connect(process.env.DATABASE_URL!);const worker = new Worker("worker-1", backend, [workflow], { pollInterval: "2s",});const handle = worker.start();
// Graceful shutdown on SIGTERMprocess.on("SIGTERM", () => handle.shutdown());use sayiir_runtime::prelude::*;use sayiir_runtime::WorkflowRegistry;use sayiir_postgres::PostgresBackend;use std::sync::Arc;use std::time::Duration;
let url = "postgresql://localhost/sayiir";
// With the #[task] macro + workflow! macro, tasks are registered// automatically — no manual TaskRegistry needed.#[task(timeout = "10s")]async fn step_1(input: String) -> Result<String, BoxError> { Ok(format!("processed: {}", input))}
#[task]async fn step_2(data: String) -> Result<String, BoxError> { Ok(data.to_uppercase())}
let workflow = workflow! { name: "my-pipeline", steps: [step_1, step_2]}?;
// Build workflow registrylet workflows: WorkflowRegistry<_, _, _> = vec![ (workflow.definition_hash().to_string(), Arc::new(workflow)),];
// Create backend and workerlet backend = PostgresBackend::<JsonCodec>::connect(url).await?;
// Option A: explicit worker IDlet worker = PooledWorker::new("worker-1", backend, TaskRegistry::new()) .with_claim_ttl(Some(Duration::from_secs(300)));
// Option B: auto-generated ID from hostname + PID (e.g. "myhost-12345")// let worker = PooledWorker::builder(backend, TaskRegistry::new())// .claim_ttl(Some(Duration::from_secs(300)))// .build();
// Spawn worker threadlet handle = worker.spawn(Duration::from_secs(1), workflows);
// Graceful shutdownhandle.shutdown();handle.join().await?;Note: When using
#[task]andworkflow!, task registration is automatic. You only need to build aTaskRegistrymanually if you’re usingWorkflowBuilderdirectly with closures orregister_fn.
Key components:
- Worker ID — Unique identifier for this worker instance (e.g. hostname or container ID)
- Backend — Shared PostgreSQL backend for cross-process coordination (
InMemoryBackendworks for testing) - Workflows — One or more compiled workflow definitions the worker can process
- Poll interval — How often the worker checks for new tasks (default: 5s for Python, 5s for Node.js, configurable in Rust)
- Claim TTL — How long a worker holds a task claim before it expires (default: 300s)
Submitting Workflows
Section titled “Submitting Workflows”Workers only execute tasks — workflows are submitted separately using a WorkflowClient:
from sayiir import WorkflowClient, PostgresBackend
backend = PostgresBackend("postgresql://localhost/sayiir")client = WorkflowClient(backend)status = client.submit(workflow, "order-42", 123)# The workflow is persisted, and a worker will pick up tasksimport { WorkflowClient, PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect(process.env.DATABASE_URL!);const client = new WorkflowClient(backend);const status = client.submit(workflow, "order-42", 123);// The workflow is persisted, and a worker will pick up tasksuse sayiir_runtime::WorkflowClient;
let backend = PostgresBackend::<JsonCodec>::connect(url).await?;let client = WorkflowClient::new(backend);let (status, output) = client.submit(&workflow, "order-42", input).await?;Idempotent Submission
Section titled “Idempotent Submission”The WorkflowClient supports conflict policies for handling duplicate instance_ids:
# "fail" (default) — raise InstanceAlreadyExistsErrorclient = WorkflowClient(backend)
# "use_existing" — return the existing workflow's statusclient = WorkflowClient(backend, conflict_policy="use_existing")
# "terminate_existing" — cancel and restartclient = WorkflowClient(backend, conflict_policy="terminate_existing")// "fail" (default) — throw an errorconst client = new WorkflowClient(backend);
// "useExisting" — return the existing workflow's statusconst client = new WorkflowClient(backend, { conflictPolicy: "useExisting" });
// "terminateExisting" — cancel and restartconst client = new WorkflowClient(backend, { conflictPolicy: "terminateExisting" });use sayiir_core::workflow::ConflictPolicy;
let client = WorkflowClient::new(backend) .with_conflict_policy(ConflictPolicy::UseExisting);Lifecycle Operations via WorkflowClient
Section titled “Lifecycle Operations via WorkflowClient”The WorkflowClient provides methods to control workflows — cancel, pause, unpause, send signals, and check status:
client = WorkflowClient(backend)
# Cancel a workflowclient.cancel("order-42", reason="Out of stock", cancelled_by="system")
# Pause / unpauseclient.pause("order-42", reason="Maintenance window")client.unpause("order-42")
# Send a signalclient.send_signal("order-42", "manager_approval", {"approved": True})
# Check statusstatus = client.status("order-42")const client = new WorkflowClient(backend);
// Cancel a workflowclient.cancel("order-42", { reason: "Out of stock", cancelledBy: "system",});
// Pause / unpauseclient.pause("order-42", { reason: "Maintenance window" });client.unpause("order-42");
// Send a signalclient.sendSignal("order-42", "manager_approval", { approved: true });
// Check statusconst status = client.status("order-42");let client = WorkflowClient::new(backend);
// Cancel a workflowclient.cancel("order-42", Some("Out of stock".into()), Some("system".into())).await?;
// Pause / unpauseclient.pause("order-42", Some("Maintenance window".into()), None).await?;client.unpause("order-42").await?;
// Send a signalclient.send_event("order-42", "manager_approval", payload).await?;
// Check statuslet status = client.status("order-42").await?;Claim TTL
Section titled “Claim TTL”Workers acquire a claim on a task before executing it. The claim prevents other workers from executing the same task concurrently.
The claim includes a time-to-live (TTL):
- If a worker crashes or becomes unresponsive, the claim expires after the TTL
- Another worker can then pick up the abandoned task
- Default TTL is 5 minutes (300 seconds)
Set the TTL based on your longest expected task duration:
worker = Worker("worker-1", backend, claim_ttl=600.0) # 10 minutesconst worker = new Worker("worker-1", backend, [workflow], { claimTtl: "10m",});let worker = PooledWorker::new("worker-1", backend, registry) .with_claim_ttl(Some(Duration::from_secs(600))); // 10 minutesIf a task takes longer than the TTL, another worker may claim and execute it, leading to duplicate execution. Always set the TTL higher than your maximum task duration.
Multiple Workflows
Section titled “Multiple Workflows”A single worker can process tasks from multiple workflows:
order_wf = Flow("orders").then(process_order).build()email_wf = Flow("emails").then(send_email).build()
handle = worker.start([order_wf, email_wf])const orderWf = flow<Order>("orders").then(processOrder).build();const emailWf = flow<Email>("emails").then(sendEmail).build();
const worker = new Worker("worker-1", backend, [orderWf, emailWf]);const handle = worker.start();let order_wf = WorkflowBuilder::new(order_ctx) .then("process_order", process_order) .build()?;let email_wf = WorkflowBuilder::new(email_ctx) .then("send_email", send_email) .build()?;
let workflows: WorkflowRegistry<_, _, _> = vec![ (order_wf.definition_hash().to_string(), Arc::new(order_wf)), (email_wf.definition_hash().to_string(), Arc::new(email_wf)),];
let handle = worker.spawn(Duration::from_secs(1), workflows);All task functions from all workflows are merged into a single registry. Ensure task IDs are unique across workflows.
Task Priority
Section titled “Task Priority”Tasks with lower priority values are picked up first by workers. Priority ranges from 1 (Critical) to 5 (Minimal), with 3 (Normal) as the default.
@task(priority=1)def charge_payment(order: dict) -> dict: return process_payment(order)
@task(priority=4)def generate_report(data: dict) -> dict: return build_report(data)const chargePayment = task("charge-payment", processPayment, { priority: 1 });const generateReport = task("generate-report", buildReport, { priority: 4 });#[task(id = "charge_payment", priority = 1)]async fn charge_payment(order: Order) -> Result<Receipt, BoxError> { process_payment(order).await}
#[task(id = "generate_report", priority = 4)]async fn generate_report(data: ReportData) -> Result<Report, BoxError> { build_report(data).await}Priority Aging
Section titled “Priority Aging”To prevent starvation, Sayiir applies aging: the longer a task waits, the more its effective priority improves. A priority-5 task waiting long enough will eventually overtake a freshly submitted priority-1 task.
The formula is: effective_priority = base_priority - (seconds_waiting / aging_interval). The default aging interval is 300 seconds — a priority-5 task waiting 10 minutes has an effective priority of 3.
In Rust, you can configure the aging interval on the worker:
let worker = PooledWorker::new("worker-1", backend, registry) .with_aging_interval(Duration::from_secs(600)); // slower agingSoft Worker Affinity
Section titled “Soft Worker Affinity”When a task fails and is retried, Sayiir prefers to route the retry to a different worker. This “soft worker affinity” mechanism improves resilience:
- Fault isolation — If a worker has a corrupted cache or bad state, the retry goes elsewhere
- Resource contention — A worker struggling with resource limits won’t keep retrying the same failing task
- Automatic recovery — Transient worker issues resolve themselves through task migration
The mechanism works through claim metadata:
- When a task fails, the system records which worker attempted it
- On retry, workers check if they previously attempted this task
- If a worker sees its own ID in the failure history, it deprioritizes claiming that task
- Another worker is more likely to pick up the retry
This is a “soft” preference, not a hard rule. If only one worker is available, it will retry its own failed tasks. In a multi-worker setup, retries naturally distribute across healthy workers.
Operational Tips
Section titled “Operational Tips”- Ensure all workers register the same workflows — Inconsistent task implementations lead to unpredictable behavior
- Use the same workflow definitions across workers — The definition hash ensures version agreement; mismatched hashes prevent task execution
- Monitor claim TTL expirations — Frequent expirations indicate worker crashes or tasks exceeding TTL
- Scale workers based on task throughput — Add more workers when task queue depth increases
- Graceful shutdown — Always call
shutdown()(andjoin()in Python/Rust) to finish in-flight tasks before terminating - Use unique worker IDs — Include hostname or container ID to identify workers in logs and claim metadata