Skip to content

Distributed Workers

Single-process workflow execution is sufficient for most workloads. Consider distributed workers when you need:

  • Horizontal scaling — Handle high throughput by adding more worker processes
  • High availability — Ensure workflows continue if a worker crashes
  • Resource isolation — Separate CPU-intensive or memory-heavy tasks across machines

A distributed worker polls a shared backend (PostgreSQL) for claimable tasks, executes them using locally registered task functions, and checkpoints the result. Multiple workers can run across machines or processes, all polling the same backend.

Sayiir needs two pieces of information per worker:

  • Workflows — The structural definitions (DAG shape, retry policies, timeouts). Workers use the definition hash to ensure all workers agree on the workflow version.
  • Task functions — The executable code for each task node. In Python and Node.js, these are automatically extracted from the compiled Workflow objects. In Rust, tasks annotated with #[task] are registered automatically by the workflow! macro — no manual registry setup needed.
import signal
from sayiir import task, Flow, PostgresBackend
from sayiir.worker import Worker
@task
def fetch_order(order_id: int) -> dict:
return {"order_id": order_id, "total": 99.99}
@task
def charge_payment(order: dict) -> str:
return f"charged ${order['total']}"
@task
def send_confirmation(receipt: str) -> str:
return f"confirmed: {receipt}"
workflow = (
Flow("order-pipeline")
.then(fetch_order)
.then(charge_payment)
.then(send_confirmation)
.build()
)
backend = PostgresBackend("postgresql://localhost/sayiir")
worker = Worker("worker-1", backend, poll_interval=2.0)
handle = worker.start([workflow])
# Graceful shutdown on SIGTERM
signal.signal(signal.SIGTERM, lambda *_: handle.shutdown())
handle.join() # blocks until shutdown completes

Key components:

  • Worker ID — Unique identifier for this worker instance (e.g. hostname or container ID)
  • Backend — Shared PostgreSQL backend for cross-process coordination (InMemoryBackend works for testing)
  • Workflows — One or more compiled workflow definitions the worker can process
  • Poll interval — How often the worker checks for new tasks (default: 5s for Python, 5s for Node.js, configurable in Rust)
  • Claim TTL — How long a worker holds a task claim before it expires (default: 300s)

Workers only execute tasks — workflows are submitted separately using a WorkflowClient:

from sayiir import WorkflowClient, PostgresBackend
backend = PostgresBackend("postgresql://localhost/sayiir")
client = WorkflowClient(backend)
status = client.submit(workflow, "order-42", 123)
# The workflow is persisted, and a worker will pick up tasks

The WorkflowClient supports conflict policies for handling duplicate instance_ids:

# "fail" (default) — raise InstanceAlreadyExistsError
client = WorkflowClient(backend)
# "use_existing" — return the existing workflow's status
client = WorkflowClient(backend, conflict_policy="use_existing")
# "terminate_existing" — cancel and restart
client = WorkflowClient(backend, conflict_policy="terminate_existing")

The WorkflowClient provides methods to control workflows — cancel, pause, unpause, send signals, and check status:

client = WorkflowClient(backend)
# Cancel a workflow
client.cancel("order-42", reason="Out of stock", cancelled_by="system")
# Pause / unpause
client.pause("order-42", reason="Maintenance window")
client.unpause("order-42")
# Send a signal
client.send_signal("order-42", "manager_approval", {"approved": True})
# Check status
status = client.status("order-42")

Workers acquire a claim on a task before executing it. The claim prevents other workers from executing the same task concurrently.

The claim includes a time-to-live (TTL):

  • If a worker crashes or becomes unresponsive, the claim expires after the TTL
  • Another worker can then pick up the abandoned task
  • Default TTL is 5 minutes (300 seconds)

Set the TTL based on your longest expected task duration:

worker = Worker("worker-1", backend, claim_ttl=600.0) # 10 minutes

If a task takes longer than the TTL, another worker may claim and execute it, leading to duplicate execution. Always set the TTL higher than your maximum task duration.

A single worker can process tasks from multiple workflows:

order_wf = Flow("orders").then(process_order).build()
email_wf = Flow("emails").then(send_email).build()
handle = worker.start([order_wf, email_wf])

All task functions from all workflows are merged into a single registry. Ensure task IDs are unique across workflows.

Tasks with lower priority values are picked up first by workers. Priority ranges from 1 (Critical) to 5 (Minimal), with 3 (Normal) as the default.

@task(priority=1)
def charge_payment(order: dict) -> dict:
return process_payment(order)
@task(priority=4)
def generate_report(data: dict) -> dict:
return build_report(data)

To prevent starvation, Sayiir applies aging: the longer a task waits, the more its effective priority improves. A priority-5 task waiting long enough will eventually overtake a freshly submitted priority-1 task.

The formula is: effective_priority = base_priority - (seconds_waiting / aging_interval). The default aging interval is 300 seconds — a priority-5 task waiting 10 minutes has an effective priority of 3.

In Rust, you can configure the aging interval on the worker:

let worker = PooledWorker::new("worker-1", backend, registry)
.with_aging_interval(Duration::from_secs(600)); // slower aging

When a task fails and is retried, Sayiir prefers to route the retry to a different worker. This “soft worker affinity” mechanism improves resilience:

  • Fault isolation — If a worker has a corrupted cache or bad state, the retry goes elsewhere
  • Resource contention — A worker struggling with resource limits won’t keep retrying the same failing task
  • Automatic recovery — Transient worker issues resolve themselves through task migration

The mechanism works through claim metadata:

  1. When a task fails, the system records which worker attempted it
  2. On retry, workers check if they previously attempted this task
  3. If a worker sees its own ID in the failure history, it deprioritizes claiming that task
  4. Another worker is more likely to pick up the retry

This is a “soft” preference, not a hard rule. If only one worker is available, it will retry its own failed tasks. In a multi-worker setup, retries naturally distribute across healthy workers.

  • Ensure all workers register the same workflows — Inconsistent task implementations lead to unpredictable behavior
  • Use the same workflow definitions across workers — The definition hash ensures version agreement; mismatched hashes prevent task execution
  • Monitor claim TTL expirations — Frequent expirations indicate worker crashes or tasks exceeding TTL
  • Scale workers based on task throughput — Add more workers when task queue depth increases
  • Graceful shutdown — Always call shutdown() (and join() in Python/Rust) to finish in-flight tasks before terminating
  • Use unique worker IDs — Include hostname or container ID to identify workers in logs and claim metadata