Distributed Workers
When to Go Distributed
Section titled “When to Go Distributed”Single-process workflow execution is sufficient for most workloads. Consider distributed workers when you need:
- Horizontal scaling — Handle high throughput by adding more worker processes
- High availability — Ensure workflows continue if a worker crashes
- Resource isolation — Separate CPU-intensive or memory-heavy tasks across machines
Two Registries
Section titled “Two Registries”Sayiir uses two separate registries for distributed execution:
TaskRegistry
Section titled “TaskRegistry”Maps task IDs to their implementations. Each worker needs a TaskRegistry containing all tasks it can execute. When a worker claims a task, it looks up the implementation by task ID and executes it.
WorkflowRegistry
Section titled “WorkflowRegistry”Maps workflow definition hashes to workflow definitions. Workers need this to understand the workflow structure when processing tasks. The definition hash ensures that all workers are executing the same workflow version.
Why both? The workflow definition describes the DAG structure and task order, while the task registry provides the actual executable code for each task node.
Since TaskRegistry is not Clone (it holds boxed trait objects), the typical pattern is a builder function that both the workflow definition and the worker call independently:
Setting Up a Worker
Section titled “Setting Up a Worker”Here’s a complete example of setting up a distributed worker pool:
use sayiir_runtime::prelude::*;use sayiir_runtime::WorkflowRegistry;use sayiir_postgres::PostgresBackend;use std::sync::Arc;use std::time::Duration;
let url = "postgresql://localhost/sayiir";let codec = Arc::new(JsonCodec);
// Build a task registry with all tasks this worker can executefn build_task_registry(codec: Arc<JsonCodec>) -> TaskRegistry { let mut r = TaskRegistry::new(); r.register_fn("step_1", codec, |input: String| async move { Ok(format!("processed: {}", input)) }); // Register additional tasks here r}
// Create a workflow that uses registered taskslet workflow = WorkflowBuilder::new(ctx) .with_existing_registry(build_task_registry(codec.clone())) .then_registered::<String>("step_1") .then("step_2", |data: String| async move { Ok(data.to_uppercase()) }) .build()?;
// Build workflow registrylet workflows: WorkflowRegistry<_, _, _> = vec![ (workflow.definition_hash().to_string(), Arc::new(workflow)),];
// Create backend and workerlet backend = PostgresBackend::<JsonCodec>::connect(url).await?;let worker = PooledWorker::new("worker-1", backend, build_task_registry(codec)) .with_claim_ttl(Some(Duration::from_secs(300)));
// Spawn worker threadlet handle = worker.spawn(Duration::from_secs(1), workflows);
// Start a workflow from another process/threadlet backend = PostgresBackend::<JsonCodec>::connect(url).await?;let runner = CheckpointingRunner::new(backend);runner.run(&workflow, "order-42", "hello".to_string()).await?;
// Graceful shutdownhandle.shutdown();handle.join().await?;Key components:
- Worker ID — Unique identifier for this worker instance
- Claim TTL — How long a worker holds a task claim before it expires
- Poll interval —
spawn(Duration::from_secs(1))sets how often workers check for new tasks - Task registry — Must contain all tasks this worker can execute
Claim TTL
Section titled “Claim TTL”Workers acquire a claim on a task before executing it. The claim prevents other workers from executing the same task concurrently.
The claim includes a time-to-live (TTL):
- If a worker crashes or becomes unresponsive, the claim expires after the TTL
- Another worker can then pick up the abandoned task
- Default TTL is 5 minutes (300 seconds)
Set the TTL based on your longest expected task duration:
let worker = PooledWorker::new("worker-1", backend, registry) .with_claim_ttl(Some(Duration::from_secs(600))); // 10 minutesIf a task takes longer than the TTL, another worker may claim and execute it, leading to duplicate execution. Always set the TTL higher than your maximum task duration.
Soft Worker Affinity
Section titled “Soft Worker Affinity”When a task fails and is retried, Sayiir prefers to route the retry to a different worker. This “soft worker affinity” mechanism improves resilience:
- Fault isolation — If a worker has a corrupted cache or bad state, the retry goes elsewhere
- Resource contention — A worker struggling with resource limits won’t keep retrying the same failing task
- Automatic recovery — Transient worker issues resolve themselves through task migration
The mechanism works through claim metadata:
- When a task fails, the system records which worker attempted it
- On retry, workers check if they previously attempted this task
- If a worker sees its own ID in the failure history, it deprioritizes claiming that task
- Another worker is more likely to pick up the retry
This is a “soft” preference, not a hard rule. If only one worker is available, it will retry its own failed tasks. In a multi-worker setup, retries naturally distribute across healthy workers.
Operational Tips
Section titled “Operational Tips”- Ensure all workers have identical task registries — Inconsistent implementations lead to unpredictable behavior
- Use the same workflow definition hash across workers — Mismatched definitions prevent task execution
- Monitor claim TTL expirations — Frequent expirations indicate worker crashes or tasks exceeding TTL
- Scale workers based on task throughput — Add more workers when task queue depth increases
- Graceful shutdown — Always call
handle.shutdown()andhandle.join()to finish in-flight tasks