Skip to content

Rust API Reference

Sayiir is split into several focused crates following a layered architecture:

sayiir-core (types & traits — runtime-agnostic)
sayiir-persistence (storage traits & implementations)
sayiir-runtime (execution engines & workers)
sayiir-macros (proc macros for convenience)

Core domain types and traits. Runtime-agnostic — no persistence, no execution strategy.

Key Types:

  • WorkflowDefinition — The continuation tree describing what to execute
  • SerializableWorkflow — Type-safe workflow with input/output types
  • WorkflowBuilder — Fluent builder for assembling pipelines
  • TaskMetadata — Metadata (timeout, retries, tags, description)
  • RetryPolicy — Retry configuration with exponential backoff
  • WorkflowContext — Per-execution context (codec, workflow ID, user metadata)
  • WorkflowSnapshot — Checkpoint of in-flight execution state
  • TaskRegistry — Name → factory map for task deserialization

Key Traits:

  • CoreTask — Task execution trait (input → output)
  • Codec — Pluggable serialization (JSON, rkyv, etc.)

Storage abstractions and implementations.

Key Types:

  • InMemoryBackend — Fast, ephemeral storage for dev/test
  • WorkflowSnapshot — Serializable checkpoint state
  • SignalEvent — External signal with payload

Key Traits:

  • PersistentBackend — Storage trait for snapshots, signals, and claims

PostgreSQL implementation of PersistentBackend.

Key Types:

  • PostgresBackend — Durable storage with auto-migration
  • Requires PostgreSQL 13+

Execution engines and worker orchestration.

Key Types:

  • CheckpointingRunner — Single-process, durable execution
  • InProcessRunner — Simple in-memory execution (no durability)
  • PooledWorker — Distributed, multi-machine execution
  • WorkerHandle — Handle to a spawned worker
  • JsonCodec — JSON serialization codec (feature: json)
  • RkyvCodec — Binary serialization codec (feature: rkyv)
  • RuntimeError — Execution errors

Key Traits:

  • WorkflowRunner — Trait for execution engines

Procedural macros that eliminate boilerplate.

Macros:

  • #[task] — Transform an async function into a CoreTask implementation
  • workflow! — Build workflows with a concise DSL

For convenience, import the prelude to get the most commonly used types:

use sayiir_runtime::prelude::*;

Re-exports:

  • WorkflowBuilder, Workflow, WorkflowStatus, WorkflowContext
  • TaskRegistry
  • CheckpointingRunner, InProcessRunner, PooledWorker, WorkerHandle
  • InMemoryBackend, PersistentBackend
  • JsonCodec (if json feature enabled)
  • RkyvCodec (if rkyv feature enabled)
  • task, workflow (if macros feature enabled)
  • RuntimeError

Transform an async function into a CoreTask implementation with automatic registration and dependency injection.

use sayiir_runtime::prelude::*;
use sayiir_core::error::BoxError;
#[task]
async fn greet(name: String) -> Result<String, BoxError> {
Ok(format!("Hello, {}!", name))
}
#[task(
id = "custom_name",
timeout = "30s",
retries = 3,
backoff = "100ms",
backoff_multiplier = 2.0,
tags = "io",
tags = "external"
)]
async fn charge(order: Order) -> Result<Receipt, BoxError> {
// Implementation
Ok(Receipt { /* ... */ })
}
use std::sync::Arc;
#[task]
async fn charge(
order: Order,
#[inject] stripe: Arc<StripeClient>
) -> Result<Receipt, BoxError> {
stripe.charge(&order).await
}
// Usage
let stripe = Arc::new(StripeClient::new("sk_test_..."));
let charge_task = Charge::new(stripe);
OptionTypeExampleDescription
idStringid = "custom_name"Override task ID (default: function name)
timeoutDurationtimeout = "30s"Task timeout (supports ms, s, m, h)
retriesIntegerretries = 3Maximum retry count
backoffDurationbackoff = "100ms"Initial retry delay
backoff_multiplierFloatbackoff_multiplier = 2.0Exponential multiplier (default: 2.0)
tagsStringtags = "io"Categorization tag (repeatable)
display_nameStringdisplay_name = "Charge Card"Human-readable name
descriptionStringdescription = "..."Task description
  • Exactly one non-#[inject] parameter: the task input type
  • Zero or more #[inject] parameters: dependency-injected fields
  • Result<T, E> — Fallible; E is converted via Into<BoxError>
  • T — Infallible; automatically wrapped in Ok(...)

The macro generates:

  • A PascalCase struct (e.g., fn chargestruct Charge)
  • new() constructor with positional args for injected dependencies
  • task_id() and metadata() helper methods
  • register() method for TaskRegistry integration
  • CoreTask trait implementation
  • The original function is preserved for direct use/testing

Build a workflow pipeline with a concise DSL.

let workflow = workflow!("workflow-id", JsonCodec, registry,
task_a => task_b => task_c
).unwrap();
SyntaxDescriptionExample
task_nameReference to a #[task]-generated structvalidate => charge
name(param: Type) { expr }Inline taskdouble(x: i32) { Ok(x * 2) }
a || bParallel forkemail || inventory => finalize
delay "5s"Durable delay (auto ID)delay "5s"
delay "wait_24h" "5s"Durable delay (custom ID)delay "wait_24h" "24h"
signal "name"Wait for external signalsignal "approval"
=>Sequential chain or joina => b => c

Sequential pipeline:

let workflow = workflow!("order", JsonCodec, registry,
validate => charge => send_email
).unwrap();

Parallel execution:

let workflow = workflow!("parallel", JsonCodec, registry,
prepare => (send_email || update_inventory) => finalize
).unwrap();

Inline tasks:

let workflow = workflow!("inline", JsonCodec, registry,
double(x: i32) { Ok(x * 2) } => triple(x: i32) { Ok(x * 3) }
).unwrap();

Durable delay:

let workflow = workflow!("delayed", JsonCodec, registry,
send_email => delay "24h" => send_reminder
).unwrap();

Wait for signal:

let workflow = workflow!("approval", JsonCodec, registry,
submit => signal "manager_approval" => process
).unwrap();

A Result<SerializableWorkflow<C, Input, ()>, WorkflowError> expression.

Fluent builder for constructing workflows programmatically (alternative to the workflow! macro).

use sayiir_runtime::prelude::*;
let workflow = WorkflowBuilder::new(context)
.then("task_a", |x: i32| async move { Ok(x * 2) })
.then("task_b", |x: i32| async move { Ok(x + 1) })
.build()?;

Add a sequential task with an inline closure.

.then("double", |x: i32| async move { Ok(x * 2) })

Use a task from the registry.

.then_registered::<Validate>("validate")

Start parallel branches.

.fork(|fork| {
fork.branch("email", send_email_task);
fork.branch("inventory", update_inventory_task);
})?
.join("finalize", finalize_task)

Join parallel branches with a combining task.

.join("combine", |results: BranchResults| async move {
// Combine branch outputs
Ok(combined)
})

Add a durable delay.

use std::time::Duration;
.delay("wait_24h", Duration::from_secs(86400))

Wait for an external signal.

.wait_for_signal("approval_step", "manager_approval", Some(Duration::from_secs(3600)))

with_registry() / with_existing_registry(registry)

Section titled “with_registry() / with_existing_registry(registry)”

Configure task registry.

let builder = WorkflowBuilder::new(context)
.with_existing_registry(registry);

Build the workflow.

let workflow = builder.build()?;

Single-process, durable execution with automatic checkpointing.

use sayiir_runtime::prelude::*;
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
// Start a new workflow
let status = runner
.run(workflow, "instance-001", input_data)
.await?;
// Resume after crash
let status = runner
.resume(workflow, "instance-001")
.await?;
// Cancel
runner.cancel("instance-001", Some("user cancelled"), Some("admin")).await?;
// Pause
runner.pause("instance-001", Some("maintenance"), Some("ops")).await?;
// Unpause
runner.unpause("instance-001").await?;

Methods:

  • new(backend) — Create with any PersistentBackend
  • run(workflow, instance_id, input) — Run to completion
  • resume(workflow, instance_id) — Resume from checkpoint
  • cancel(instance_id, reason, by) — Cancel workflow
  • pause(instance_id, reason, by) — Pause workflow
  • unpause(instance_id) — Unpause workflow

Distributed, multi-machine execution with work claiming.

use sayiir_runtime::prelude::*;
use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry)
.with_claim_ttl(Duration::from_secs(60));
let handle = worker.spawn(
Duration::from_secs(5), // Poll interval
vec![workflow1, workflow2]
);
// Later: stop the worker
handle.stop().await?;

Methods:

  • new(worker_id, backend, registry) — Create worker
  • with_claim_ttl(duration) — Set claim timeout
  • spawn(poll_interval, workflows)WorkerHandle — Start polling

Simple in-memory execution without durability.

use sayiir_runtime::prelude::*;
let runner = InProcessRunner::new();
let result = runner.run(workflow, input_data).await?;

No persistence, no resume. Useful for testing or short-lived workflows.

Serialization trait for task inputs and outputs.

pub trait Codec: Send + Sync + 'static {
fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError>;
fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError>;
}

Built-in implementations:

  • JsonCodec — JSON serialization (feature: json)
  • RkyvCodec — Zero-copy binary serialization (feature: rkyv)

Storage trait for workflow state.

pub trait PersistentBackend: Send + Sync + 'static {
async fn save_snapshot(&self, snapshot: &WorkflowSnapshot) -> Result<(), BackendError>;
async fn load_snapshot(&self, instance_id: &str) -> Result<Option<WorkflowSnapshot>, BackendError>;
async fn claim_task(&self, claim: TaskClaim) -> Result<bool, BackendError>;
async fn release_claim(&self, instance_id: &str, task_id: &str) -> Result<(), BackendError>;
async fn save_signal(&self, instance_id: &str, signal: SignalEvent) -> Result<(), BackendError>;
async fn load_signals(&self, instance_id: &str, signal_name: &str) -> Result<Vec<SignalEvent>, BackendError>;
async fn delete_signal(&self, signal_id: &str) -> Result<(), BackendError>;
}

Implementations:

  • InMemoryBackend — In-memory storage
  • PostgresBackend — PostgreSQL storage

Task execution trait.

#[async_trait]
pub trait CoreTask: Send + Sync + 'static {
async fn run(&self, input: &[u8], ctx: &WorkflowContext) -> Result<Vec<u8>, BoxError>;
fn metadata(&self) -> &TaskMetadata;
fn task_id(&self) -> &str;
}

Usually implemented via the #[task] macro, but can be implemented manually if needed.

use std::time::Duration;
use sayiir_core::task::RetryPolicy;
let policy = RetryPolicy {
max_retries: 3,
initial_delay: Duration::from_secs(1),
backoff_multiplier: 2.0,
};

Delay calculation: delay = initial_delay * multiplier^attempt

use sayiir_core::task::TaskMetadata;
use std::time::Duration;
let metadata = TaskMetadata {
display_name: Some("Process Order".to_string()),
description: Some("Validates and processes customer orders".to_string()),
timeout: Some(Duration::from_secs(30)),
retry_policy: Some(policy),
tags: vec!["orders".to_string(), "payment".to_string()],
};

Per-execution context carrying codec and metadata.

use sayiir_core::context::WorkflowContext;
use std::sync::Arc;
let ctx = WorkflowContext::new(
"order-pipeline",
Arc::new(JsonCodec),
Arc::new(()), // User metadata (can be any type)
);

Errors from the runtime layer.

use sayiir_runtime::RuntimeError;
match result {
Err(RuntimeError::TaskFailed { task_id, message, .. }) => {
eprintln!("Task {} failed: {}", task_id, message);
}
Err(RuntimeError::BackendError(e)) => {
eprintln!("Storage error: {}", e);
}
_ => {}
}

Type alias for Box<dyn std::error::Error + Send + Sync>.

use sayiir_core::error::BoxError;
async fn my_task(input: Data) -> Result<Output, BoxError> {
// Any error implementing Error + Send + Sync can be returned
Ok(process(input)?)
}

For complete API documentation with all types, traits, and methods, see: