Configuration
Retry Policy
Section titled “Retry Policy”Configure automatic retry behavior for tasks that may fail transiently.
from sayiir import RetryPolicy, task
policy = RetryPolicy( max_retries=3, initial_delay_secs=1.0, backoff_multiplier=2.0)
@task(retries=policy)def flaky_task(data: dict) -> dict: # May fail and retry with exponential backoff return process(data)use std::time::Duration;use sayiir_core::task::RetryPolicy;
let policy = RetryPolicy { max_retries: 3, initial_delay: Duration::from_secs(1), backoff_multiplier: 2.0,};
#[task(retries = 3, backoff = "1s", backoff_multiplier = 2.0)]async fn flaky_task(data: Data) -> Result<Output, BoxError> { // May fail and retry with exponential backoff Ok(process(data))}Parameters
Section titled “Parameters”-
max_retries — Maximum number of retry attempts
- Default:
2 - Set to
0to disable retries
- Default:
-
initial_delay_secs / initial_delay — Initial delay before first retry
- Default:
1.0seconds - Subsequent delays grow exponentially
- Default:
-
backoff_multiplier — Exponential backoff multiplier
- Default:
2.0 - Each retry waits
initial_delay * multiplier^attempt
- Default:
Delay Calculation
Section titled “Delay Calculation”The delay before each retry attempt is calculated as:
delay = initial_delay * (backoff_multiplier ^ attempt_number)Example with default settings:
| Attempt | Delay |
|---|---|
| 1 | 1.0s |
| 2 | 2.0s |
| 3 | 4.0s |
Example with custom settings (initial: 0.5s, multiplier: 3.0):
| Attempt | Delay |
|---|---|
| 1 | 0.5s |
| 2 | 1.5s |
| 3 | 4.5s |
When to Use Retries
Section titled “When to Use Retries”Good candidates for retries:
- Network requests to external APIs
- Database operations during high load
- File system operations (temporary locks)
- Third-party service calls
Not suitable for retries:
- Validation errors (will fail every time)
- Authorization failures (won’t change on retry)
- Idempotency-sensitive operations without proper safeguards
Task Metadata
Section titled “Task Metadata”Configure individual task behavior and documentation.
from sayiir import task, RetryPolicy
@task( name="process_payment", timeout_secs=30.0, retries=RetryPolicy(max_retries=3, initial_delay_secs=2.0), tags=["payment", "external", "critical"], description="Processes customer payment via Stripe API")def charge_card(order: dict) -> dict: return {"receipt_id": "R123", "status": "paid"}#[task( id = "process_payment", timeout = "30s", retries = 3, backoff = "2s", tags = "payment", tags = "external", tags = "critical", description = "Processes customer payment via Stripe API")]async fn charge_card(order: Order) -> Result<Receipt, BoxError> { Ok(Receipt { receipt_id: "R123", status: "paid" })}
// Or construct TaskMetadata manuallyuse sayiir_core::task::{TaskMetadata, RetryPolicy};use std::time::Duration;
let metadata = TaskMetadata { display_name: Some("Process Payment".to_string()), description: Some("Processes customer payment via Stripe API".to_string()), timeout: Some(Duration::from_secs(30)), retry_policy: Some(RetryPolicy { max_retries: 3, initial_delay: Duration::from_secs(2), backoff_multiplier: 2.0, }), tags: vec!["payment".to_string(), "external".to_string(), "critical".to_string()],};Configuration Options
Section titled “Configuration Options”| Option | Type | Description |
|---|---|---|
name / id | String | Override task ID (default: function name) |
timeout_secs / timeout | Float / Duration | Maximum execution time |
retries | RetryPolicy | Retry behavior configuration |
tags | List[String] | Categorization tags for filtering/monitoring |
description | String | Human-readable task description |
Timeout Behavior
Section titled “Timeout Behavior”When a task exceeds its timeout:
- The task is cancelled (if possible)
- If retries are configured, the task may be retried
- If all retries are exhausted, the workflow fails
- The timeout includes retry delays
Tags help organize and monitor tasks:
# Common tag patterns@task(tags=["io", "external"]) # Network I/O@task(tags=["db", "read"]) # Database read@task(tags=["db", "write"]) # Database write@task(tags=["cpu-intensive"]) # CPU-bound work@task(tags=["critical", "payment"]) # Business classificationUse tags for:
- Filtering logs and metrics
- Setting up alerts
- Capacity planning
- Cost allocation
PostgreSQL Backend
Section titled “PostgreSQL Backend”Durable storage for production workflows.
from sayiir import PostgresBackend
# Connection URL formatbackend = PostgresBackend( "postgresql://username:password@host:port/database")
# Example URLsbackend = PostgresBackend("postgresql://postgres:password@localhost:5432/sayiir")backend = PostgresBackend("postgresql://user:pass@db.example.com/workflows")use sayiir_postgres::PostgresBackend;
// Connection URL formatlet backend = PostgresBackend::new( "postgresql://username:password@host:port/database").await?;
// Example URLslet backend = PostgresBackend::new( "postgresql://postgres:password@localhost:5432/sayiir").await?;
let backend = PostgresBackend::new( "postgresql://user:pass@db.example.com/workflows").await?;Connection URL Format
Section titled “Connection URL Format”postgresql://[user[:password]@][host][:port][/database][?parameters]Components:
user— PostgreSQL usernamepassword— User password (URL-encode special characters)host— Database host (default: localhost)port— Port number (default: 5432)database— Database nameparameters— Optional connection parameters
Examples:
# Local developmentpostgresql://postgres:password@localhost/sayiir
# With custom portpostgresql://user:pass@db.example.com:5433/workflows
# Connection parameterspostgresql://user:pass@host/db?sslmode=require&connect_timeout=10Requirements
Section titled “Requirements”- PostgreSQL Version: 13 or higher
- Permissions: User must have permissions to:
- Create tables (for initial migration)
- Insert, update, delete rows
- Create indexes
Database Schema
Section titled “Database Schema”The backend automatically creates these tables on first connection:
workflow_snapshots— Workflow checkpoints and statetask_claims— Distributed task claiming for workerssignals— External signals buffered for workflows
Migration is automatic — no manual setup required.
Recommended Setup
Section titled “Recommended Setup”1. Create a dedicated database:
CREATE DATABASE sayiir;CREATE USER sayiir_user WITH PASSWORD 'secure_password';GRANT ALL PRIVILEGES ON DATABASE sayiir TO sayiir_user;2. Configure connection pooling (if using many workers):
# Python: connection pooling is built into the Rust backendbackend = PostgresBackend("postgresql://user:pass@host/sayiir")// Rust: connection pool is managed internallylet backend = PostgresBackend::new("postgresql://user:pass@host/sayiir").await?;3. Set up monitoring:
Monitor these metrics:
- Table sizes (
workflow_snapshots,task_claims,signals) - Query performance
- Connection pool usage
- Lock contention on
task_claimstable
Performance Tuning
Section titled “Performance Tuning”For high-throughput workloads:
-- Increase shared buffers (in postgresql.conf)shared_buffers = 256MB
-- Tune checkpoint behaviorcheckpoint_timeout = 10minmax_wal_size = 2GB
-- Monitor table bloatVACUUM ANALYZE workflow_snapshots;VACUUM ANALYZE task_claims;Index optimization:
The backend creates appropriate indexes automatically, but monitor query performance:
-- Check index usageSELECT schemaname, tablename, indexname, idx_scanFROM pg_stat_user_indexesWHERE schemaname = 'public'ORDER BY idx_scan;Backup and Recovery
Section titled “Backup and Recovery”Regular backups:
# Full database backuppg_dump -U sayiir_user sayiir > sayiir_backup.sql
# Restorepsql -U sayiir_user sayiir < sayiir_backup.sqlPoint-in-time recovery:
Enable WAL archiving for production:
wal_level = replicaarchive_mode = onarchive_command = 'cp %p /path/to/archive/%f'Worker Configuration
Section titled “Worker Configuration”Configure distributed workers for horizontal scaling (Rust only).
use sayiir_runtime::prelude::*;use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry) .with_claim_ttl(Duration::from_secs(60));
let handle = worker.spawn( Duration::from_secs(5), // poll_interval vec![workflow1, workflow2]);Configuration Parameters
Section titled “Configuration Parameters”worker_id
Section titled “worker_id”Unique identifier for this worker process.
let worker = PooledWorker::new("worker-1", backend, registry);Requirements:
- Must be unique across all running workers
- Used for debugging and monitoring
- Appears in logs and task claims
Naming conventions:
// By hostnameformat!("worker-{}", hostname)
// By hostname and process IDformat!("worker-{}-{}", hostname, process_id)
// By role and instanceformat!("email-worker-{}", instance_number)claim_ttl
Section titled “claim_ttl”How long a worker holds a task claim before it expires.
.with_claim_ttl(Duration::from_secs(60))Default: Varies by implementation (typically 60 seconds)
Considerations:
- Too short: Workers may lose claims before completing tasks
- Too long: Failed workers block tasks for longer
- Should be > typical task execution time
- Should account for clock skew between machines
Recommended values:
- Fast tasks (< 5s): 30 seconds
- Medium tasks (5s-30s): 60 seconds
- Slow tasks (> 30s): 120 seconds
poll_interval
Section titled “poll_interval”How often to check for new work.
let handle = worker.spawn( Duration::from_secs(5), // Check every 5 seconds workflows);Considerations:
- Too short: Increased database load
- Too long: Higher latency to pick up new work
- Adjust based on workflow urgency requirements
Recommended values:
- High-priority workflows: 1-5 seconds
- Normal workflows: 5-10 seconds
- Background jobs: 30-60 seconds
Multi-Worker Setup
Section titled “Multi-Worker Setup”Run multiple workers for horizontal scaling:
// Worker 1let worker1 = PooledWorker::new("worker-1", backend.clone(), registry.clone()) .with_claim_ttl(Duration::from_secs(60));let handle1 = worker1.spawn(Duration::from_secs(5), workflows.clone());
// Worker 2let worker2 = PooledWorker::new("worker-2", backend.clone(), registry.clone()) .with_claim_ttl(Duration::from_secs(60));let handle2 = worker2.spawn(Duration::from_secs(5), workflows.clone());
// Workers coordinate via PostgreSQLBest practices:
- All workers must share the same PostgreSQL backend
- All workers must have the same TaskRegistry configuration
- Use unique worker_ids
- Monitor claim expiration rates
- Set up health checks for each worker
Graceful Shutdown
Section titled “Graceful Shutdown”// Spawn workerlet handle = worker.spawn(poll_interval, workflows);
// Later: stop gracefullytokio::select! { _ = tokio::signal::ctrl_c() => { println!("Shutting down worker..."); handle.stop().await?; }}The worker will:
- Stop polling for new work
- Release any held task claims
- Allow in-flight tasks to complete (or timeout)
Monitoring Workers
Section titled “Monitoring Workers”Track these metrics per worker:
- Tasks claimed — How many tasks this worker has claimed
- Tasks completed — Successfully finished tasks
- Tasks failed — Failed tasks (before retries exhausted)
- Claim expirations — Claims lost due to TTL timeout
- Poll cycles — Number of times worker checked for work
- Idle polls — Polls that found no work
- Average task duration — Performance tracking
InMemory Backend
Section titled “InMemory Backend”Fast, ephemeral storage for development and testing.
from sayiir import InMemoryBackend
backend = InMemoryBackend()use sayiir_persistence::InMemoryBackend;
let backend = InMemoryBackend::new();Characteristics
Section titled “Characteristics”- No persistence — All state is lost when process stops
- Fast — No I/O overhead
- Thread-safe — Can be shared between threads
- Requires no setup — No external dependencies
Use Cases
Section titled “Use Cases”Development:
# Quick local testingbackend = InMemoryBackend()status = run_durable_workflow(workflow, "test-1", data, backend)Unit Tests:
def test_workflow(): backend = InMemoryBackend() workflow = Flow("test").then(double).build() status = run_durable_workflow(workflow, "test-1", 21, backend) assert status.is_completed() assert status.output == 42Integration Tests:
#[tokio::test]async fn test_workflow() { let backend = InMemoryBackend::new(); let runner = CheckpointingRunner::new(backend); let status = runner.run(workflow, "test-1", input).await?; assert!(status.is_completed());}Limitations
Section titled “Limitations”- State is lost on process restart
- Not suitable for production
- No distributed worker support
- Limited to single process
Codec Configuration
Section titled “Codec Configuration”Control how task inputs and outputs are serialized (Rust only).
JsonCodec
Section titled “JsonCodec”JSON serialization using serde_json.
use sayiir_runtime::prelude::*;
let workflow = workflow!("example", JsonCodec, registry, task_a => task_b).unwrap();Advantages:
- Human-readable
- Language-agnostic
- Easy debugging
Disadvantages:
- Larger payload size
- Slower serialization
- Limited type support
RkyvCodec
Section titled “RkyvCodec”Zero-copy binary serialization using rkyv.
use sayiir_runtime::serialization::RkyvCodec;
let workflow = workflow!("example", RkyvCodec, registry, task_a => task_b).unwrap();Advantages:
- Fast serialization/deserialization
- Smaller payload size
- Zero-copy deserialization
Disadvantages:
- Binary format (not human-readable)
- Rust-specific
- More complex debugging
Custom Codec
Section titled “Custom Codec”Implement the Codec trait for custom serialization:
use sayiir_core::codec::Codec;use serde::{Serialize, Deserialize};
pub struct MyCodec;
impl Codec for MyCodec { fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError> { // Custom encoding logic }
fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError> { // Custom decoding logic }}Use cases for custom codecs:
- Compression (gzip, zstd)
- Encryption
- Custom binary formats
- Protocol Buffers integration