Skip to content

Distributed Workers

Single-process workflow execution is sufficient for most workloads. Consider distributed workers when you need:

  • Horizontal scaling — Handle high throughput by adding more worker processes
  • High availability — Ensure workflows continue if a worker crashes
  • Resource isolation — Separate CPU-intensive or memory-heavy tasks across machines

Sayiir uses two separate registries for distributed execution:

Maps task IDs to their implementations. Each worker needs a TaskRegistry containing all tasks it can execute. When a worker claims a task, it looks up the implementation by task ID and executes it.

Maps workflow definition hashes to workflow definitions. Workers need this to understand the workflow structure when processing tasks. The definition hash ensures that all workers are executing the same workflow version.

Why both? The workflow definition describes the DAG structure and task order, while the task registry provides the actual executable code for each task node.

Since TaskRegistry is not Clone (it holds boxed trait objects), the typical pattern is a builder function that both the workflow definition and the worker call independently:

Here’s a complete example of setting up a distributed worker pool:

use sayiir_runtime::prelude::*;
use sayiir_runtime::WorkflowRegistry;
use sayiir_postgres::PostgresBackend;
use std::sync::Arc;
use std::time::Duration;
let url = "postgresql://localhost/sayiir";
let codec = Arc::new(JsonCodec);
// Build a task registry with all tasks this worker can execute
fn build_task_registry(codec: Arc<JsonCodec>) -> TaskRegistry {
let mut r = TaskRegistry::new();
r.register_fn("step_1", codec, |input: String| async move {
Ok(format!("processed: {}", input))
});
// Register additional tasks here
r
}
// Create a workflow that uses registered tasks
let workflow = WorkflowBuilder::new(ctx)
.with_existing_registry(build_task_registry(codec.clone()))
.then_registered::<String>("step_1")
.then("step_2", |data: String| async move {
Ok(data.to_uppercase())
})
.build()?;
// Build workflow registry
let workflows: WorkflowRegistry<_, _, _> = vec![
(workflow.definition_hash().to_string(), Arc::new(workflow)),
];
// Create backend and worker
let backend = PostgresBackend::<JsonCodec>::connect(url).await?;
let worker = PooledWorker::new("worker-1", backend, build_task_registry(codec))
.with_claim_ttl(Some(Duration::from_secs(300)));
// Spawn worker thread
let handle = worker.spawn(Duration::from_secs(1), workflows);
// Start a workflow from another process/thread
let backend = PostgresBackend::<JsonCodec>::connect(url).await?;
let runner = CheckpointingRunner::new(backend);
runner.run(&workflow, "order-42", "hello".to_string()).await?;
// Graceful shutdown
handle.shutdown();
handle.join().await?;

Key components:

  • Worker ID — Unique identifier for this worker instance
  • Claim TTL — How long a worker holds a task claim before it expires
  • Poll intervalspawn(Duration::from_secs(1)) sets how often workers check for new tasks
  • Task registry — Must contain all tasks this worker can execute

Workers acquire a claim on a task before executing it. The claim prevents other workers from executing the same task concurrently.

The claim includes a time-to-live (TTL):

  • If a worker crashes or becomes unresponsive, the claim expires after the TTL
  • Another worker can then pick up the abandoned task
  • Default TTL is 5 minutes (300 seconds)

Set the TTL based on your longest expected task duration:

let worker = PooledWorker::new("worker-1", backend, registry)
.with_claim_ttl(Some(Duration::from_secs(600))); // 10 minutes

If a task takes longer than the TTL, another worker may claim and execute it, leading to duplicate execution. Always set the TTL higher than your maximum task duration.

When a task fails and is retried, Sayiir prefers to route the retry to a different worker. This “soft worker affinity” mechanism improves resilience:

  • Fault isolation — If a worker has a corrupted cache or bad state, the retry goes elsewhere
  • Resource contention — A worker struggling with resource limits won’t keep retrying the same failing task
  • Automatic recovery — Transient worker issues resolve themselves through task migration

The mechanism works through claim metadata:

  1. When a task fails, the system records which worker attempted it
  2. On retry, workers check if they previously attempted this task
  3. If a worker sees its own ID in the failure history, it deprioritizes claiming that task
  4. Another worker is more likely to pick up the retry

This is a “soft” preference, not a hard rule. If only one worker is available, it will retry its own failed tasks. In a multi-worker setup, retries naturally distribute across healthy workers.

  • Ensure all workers have identical task registries — Inconsistent implementations lead to unpredictable behavior
  • Use the same workflow definition hash across workers — Mismatched definitions prevent task execution
  • Monitor claim TTL expirations — Frequent expirations indicate worker crashes or tasks exceeding TTL
  • Scale workers based on task throughput — Add more workers when task queue depth increases
  • Graceful shutdown — Always call handle.shutdown() and handle.join() to finish in-flight tasks