Skip to content

Configuration

Configure automatic retry behavior for tasks that may fail transiently.

from sayiir import RetryPolicy, task
policy = RetryPolicy(
max_retries=3,
initial_delay_secs=1.0,
backoff_multiplier=2.0
)
@task(retries=policy)
def flaky_task(data: dict) -> dict:
# May fail and retry with exponential backoff
return process(data)
  • max_retries — Maximum number of retry attempts

    • Default: 2
    • Set to 0 to disable retries
  • initial_delay_secs / initial_delay — Initial delay before first retry

    • Default: 1.0 seconds
    • Subsequent delays grow exponentially
  • backoff_multiplier — Exponential backoff multiplier

    • Default: 2.0
    • Each retry waits initial_delay * multiplier^attempt

The delay before each retry attempt is calculated as:

delay = initial_delay * (backoff_multiplier ^ attempt_number)

Example with default settings:

AttemptDelay
11.0s
22.0s
34.0s

Example with custom settings (initial: 0.5s, multiplier: 3.0):

AttemptDelay
10.5s
21.5s
34.5s

Good candidates for retries:

  • Network requests to external APIs
  • Database operations during high load
  • File system operations (temporary locks)
  • Third-party service calls

Not suitable for retries:

  • Validation errors (will fail every time)
  • Authorization failures (won’t change on retry)
  • Idempotency-sensitive operations without proper safeguards

Configure individual task behavior and documentation.

from sayiir import task, RetryPolicy
@task(
name="process_payment",
timeout_secs=30.0,
retries=RetryPolicy(max_retries=3, initial_delay_secs=2.0),
tags=["payment", "external", "critical"],
description="Processes customer payment via Stripe API"
)
def charge_card(order: dict) -> dict:
return {"receipt_id": "R123", "status": "paid"}
OptionTypeDescription
name / idStringOverride task ID (default: function name)
timeout_secs / timeoutFloat / DurationMaximum execution time
retriesRetryPolicyRetry behavior configuration
tagsList[String]Categorization tags for filtering/monitoring
descriptionStringHuman-readable task description

When a task exceeds its timeout:

  1. The task is cancelled (if possible)
  2. If retries are configured, the task may be retried
  3. If all retries are exhausted, the workflow fails
  4. The timeout includes retry delays

Tags help organize and monitor tasks:

# Common tag patterns
@task(tags=["io", "external"]) # Network I/O
@task(tags=["db", "read"]) # Database read
@task(tags=["db", "write"]) # Database write
@task(tags=["cpu-intensive"]) # CPU-bound work
@task(tags=["critical", "payment"]) # Business classification

Use tags for:

  • Filtering logs and metrics
  • Setting up alerts
  • Capacity planning
  • Cost allocation

Durable storage for production workflows.

from sayiir import PostgresBackend
# Connection URL format
backend = PostgresBackend(
"postgresql://username:password@host:port/database"
)
# Example URLs
backend = PostgresBackend("postgresql://postgres:password@localhost:5432/sayiir")
backend = PostgresBackend("postgresql://user:pass@db.example.com/workflows")
postgresql://[user[:password]@][host][:port][/database][?parameters]

Components:

  • user — PostgreSQL username
  • password — User password (URL-encode special characters)
  • host — Database host (default: localhost)
  • port — Port number (default: 5432)
  • database — Database name
  • parameters — Optional connection parameters

Examples:

# Local development
postgresql://postgres:password@localhost/sayiir
# With custom port
postgresql://user:pass@db.example.com:5433/workflows
# Connection parameters
postgresql://user:pass@host/db?sslmode=require&connect_timeout=10
  • PostgreSQL Version: 13 or higher
  • Permissions: User must have permissions to:
    • Create tables (for initial migration)
    • Insert, update, delete rows
    • Create indexes

The backend automatically creates these tables on first connection:

  • workflow_snapshots — Workflow checkpoints and state
  • task_claims — Distributed task claiming for workers
  • signals — External signals buffered for workflows

Migration is automatic — no manual setup required.

1. Create a dedicated database:

CREATE DATABASE sayiir;
CREATE USER sayiir_user WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON DATABASE sayiir TO sayiir_user;

2. Configure connection pooling (if using many workers):

# Python: connection pooling is built into the Rust backend
backend = PostgresBackend("postgresql://user:pass@host/sayiir")
// Rust: connection pool is managed internally
let backend = PostgresBackend::new("postgresql://user:pass@host/sayiir").await?;

3. Set up monitoring:

Monitor these metrics:

  • Table sizes (workflow_snapshots, task_claims, signals)
  • Query performance
  • Connection pool usage
  • Lock contention on task_claims table

For high-throughput workloads:

-- Increase shared buffers (in postgresql.conf)
shared_buffers = 256MB
-- Tune checkpoint behavior
checkpoint_timeout = 10min
max_wal_size = 2GB
-- Monitor table bloat
VACUUM ANALYZE workflow_snapshots;
VACUUM ANALYZE task_claims;

Index optimization:

The backend creates appropriate indexes automatically, but monitor query performance:

-- Check index usage
SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan;

Regular backups:

Terminal window
# Full database backup
pg_dump -U sayiir_user sayiir > sayiir_backup.sql
# Restore
psql -U sayiir_user sayiir < sayiir_backup.sql

Point-in-time recovery:

Enable WAL archiving for production:

wal_level = replica
archive_mode = on
archive_command = 'cp %p /path/to/archive/%f'

Configure distributed workers for horizontal scaling (Rust only).

use sayiir_runtime::prelude::*;
use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry)
.with_claim_ttl(Duration::from_secs(60));
let handle = worker.spawn(
Duration::from_secs(5), // poll_interval
vec![workflow1, workflow2]
);

Unique identifier for this worker process.

let worker = PooledWorker::new("worker-1", backend, registry);

Requirements:

  • Must be unique across all running workers
  • Used for debugging and monitoring
  • Appears in logs and task claims

Naming conventions:

// By hostname
format!("worker-{}", hostname)
// By hostname and process ID
format!("worker-{}-{}", hostname, process_id)
// By role and instance
format!("email-worker-{}", instance_number)

How long a worker holds a task claim before it expires.

.with_claim_ttl(Duration::from_secs(60))

Default: Varies by implementation (typically 60 seconds)

Considerations:

  • Too short: Workers may lose claims before completing tasks
  • Too long: Failed workers block tasks for longer
  • Should be > typical task execution time
  • Should account for clock skew between machines

Recommended values:

  • Fast tasks (< 5s): 30 seconds
  • Medium tasks (5s-30s): 60 seconds
  • Slow tasks (> 30s): 120 seconds

How often to check for new work.

let handle = worker.spawn(
Duration::from_secs(5), // Check every 5 seconds
workflows
);

Considerations:

  • Too short: Increased database load
  • Too long: Higher latency to pick up new work
  • Adjust based on workflow urgency requirements

Recommended values:

  • High-priority workflows: 1-5 seconds
  • Normal workflows: 5-10 seconds
  • Background jobs: 30-60 seconds

Run multiple workers for horizontal scaling:

// Worker 1
let worker1 = PooledWorker::new("worker-1", backend.clone(), registry.clone())
.with_claim_ttl(Duration::from_secs(60));
let handle1 = worker1.spawn(Duration::from_secs(5), workflows.clone());
// Worker 2
let worker2 = PooledWorker::new("worker-2", backend.clone(), registry.clone())
.with_claim_ttl(Duration::from_secs(60));
let handle2 = worker2.spawn(Duration::from_secs(5), workflows.clone());
// Workers coordinate via PostgreSQL

Best practices:

  • All workers must share the same PostgreSQL backend
  • All workers must have the same TaskRegistry configuration
  • Use unique worker_ids
  • Monitor claim expiration rates
  • Set up health checks for each worker
// Spawn worker
let handle = worker.spawn(poll_interval, workflows);
// Later: stop gracefully
tokio::select! {
_ = tokio::signal::ctrl_c() => {
println!("Shutting down worker...");
handle.stop().await?;
}
}

The worker will:

  1. Stop polling for new work
  2. Release any held task claims
  3. Allow in-flight tasks to complete (or timeout)

Track these metrics per worker:

  • Tasks claimed — How many tasks this worker has claimed
  • Tasks completed — Successfully finished tasks
  • Tasks failed — Failed tasks (before retries exhausted)
  • Claim expirations — Claims lost due to TTL timeout
  • Poll cycles — Number of times worker checked for work
  • Idle polls — Polls that found no work
  • Average task duration — Performance tracking

Fast, ephemeral storage for development and testing.

from sayiir import InMemoryBackend
backend = InMemoryBackend()
  • No persistence — All state is lost when process stops
  • Fast — No I/O overhead
  • Thread-safe — Can be shared between threads
  • Requires no setup — No external dependencies

Development:

# Quick local testing
backend = InMemoryBackend()
status = run_durable_workflow(workflow, "test-1", data, backend)

Unit Tests:

def test_workflow():
backend = InMemoryBackend()
workflow = Flow("test").then(double).build()
status = run_durable_workflow(workflow, "test-1", 21, backend)
assert status.is_completed()
assert status.output == 42

Integration Tests:

#[tokio::test]
async fn test_workflow() {
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
let status = runner.run(workflow, "test-1", input).await?;
assert!(status.is_completed());
}
  • State is lost on process restart
  • Not suitable for production
  • No distributed worker support
  • Limited to single process

Control how task inputs and outputs are serialized (Rust only).

JSON serialization using serde_json.

use sayiir_runtime::prelude::*;
let workflow = workflow!("example", JsonCodec, registry,
task_a => task_b
).unwrap();

Advantages:

  • Human-readable
  • Language-agnostic
  • Easy debugging

Disadvantages:

  • Larger payload size
  • Slower serialization
  • Limited type support

Zero-copy binary serialization using rkyv.

use sayiir_runtime::serialization::RkyvCodec;
let workflow = workflow!("example", RkyvCodec, registry,
task_a => task_b
).unwrap();

Advantages:

  • Fast serialization/deserialization
  • Smaller payload size
  • Zero-copy deserialization

Disadvantages:

  • Binary format (not human-readable)
  • Rust-specific
  • More complex debugging

Implement the Codec trait for custom serialization:

use sayiir_core::codec::Codec;
use serde::{Serialize, Deserialize};
pub struct MyCodec;
impl Codec for MyCodec {
fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError> {
// Custom encoding logic
}
fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError> {
// Custom decoding logic
}
}

Use cases for custom codecs:

  • Compression (gzip, zstd)
  • Encryption
  • Custom binary formats
  • Protocol Buffers integration