Rust API Reference
Overview
Section titled “Overview”Sayiir is split into several focused crates following a layered architecture:
sayiir-core (types & traits — runtime-agnostic) ↑sayiir-persistence (storage traits & implementations) ↑sayiir-runtime (execution engines & workers) ↑sayiir-macros (proc macros for convenience)Crates
Section titled “Crates”sayiir-core
Section titled “sayiir-core”Core domain types and traits. Runtime-agnostic — no persistence, no execution strategy.
Key Types:
WorkflowDefinition— The continuation tree describing what to executeSerializableWorkflow— Type-safe workflow with input/output typesWorkflowBuilder— Fluent builder for assembling pipelinesTaskMetadata— Metadata (timeout, retries, tags, description)RetryPolicy— Retry configuration with exponential backoffWorkflowContext— Per-execution context (codec, workflow ID, user metadata)WorkflowSnapshot— Checkpoint of in-flight execution stateTaskRegistry— Name → factory map for task deserialization
Key Traits:
CoreTask— Task execution trait (input → output)Codec— Pluggable serialization (JSON, rkyv, etc.)
sayiir-persistence
Section titled “sayiir-persistence”Storage abstractions and implementations.
Key Types:
InMemoryBackend— Fast, ephemeral storage for dev/testWorkflowSnapshot— Serializable checkpoint stateSignalEvent— External signal with payload
Key Traits:
PersistentBackend— Storage trait for snapshots, signals, and claims
sayiir-postgres
Section titled “sayiir-postgres”PostgreSQL implementation of PersistentBackend.
Key Types:
PostgresBackend— Durable storage with auto-migration- Requires PostgreSQL 13+
sayiir-runtime
Section titled “sayiir-runtime”Execution engines and worker orchestration.
Key Types:
CheckpointingRunner— Single-process, durable executionInProcessRunner— Simple in-memory execution (no durability)PooledWorker— Distributed, multi-machine executionWorkerHandle— Handle to a spawned workerJsonCodec— JSON serialization codec (feature:json)RkyvCodec— Binary serialization codec (feature:rkyv)RuntimeError— Execution errors
Key Traits:
WorkflowRunner— Trait for execution engines
sayiir-macros
Section titled “sayiir-macros”Procedural macros that eliminate boilerplate.
Macros:
#[task]— Transform an async function into aCoreTaskimplementationworkflow!— Build workflows with a concise DSL
Prelude
Section titled “Prelude”For convenience, import the prelude to get the most commonly used types:
use sayiir_runtime::prelude::*;Re-exports:
WorkflowBuilder,Workflow,WorkflowStatus,WorkflowContextTaskRegistryCheckpointingRunner,InProcessRunner,PooledWorker,WorkerHandleInMemoryBackend,PersistentBackendJsonCodec(ifjsonfeature enabled)RkyvCodec(ifrkyvfeature enabled)task,workflow(ifmacrosfeature enabled)RuntimeError
#[task] Macro
Section titled “#[task] Macro”Transform an async function into a CoreTask implementation with automatic registration and dependency injection.
Basic Usage
Section titled “Basic Usage”use sayiir_runtime::prelude::*;use sayiir_core::error::BoxError;
#[task]async fn greet(name: String) -> Result<String, BoxError> { Ok(format!("Hello, {}!", name))}With Metadata
Section titled “With Metadata”#[task( id = "custom_name", timeout = "30s", retries = 3, backoff = "100ms", backoff_multiplier = 2.0, tags = "io", tags = "external")]async fn charge(order: Order) -> Result<Receipt, BoxError> { // Implementation Ok(Receipt { /* ... */ })}With Dependency Injection
Section titled “With Dependency Injection”use std::sync::Arc;
#[task]async fn charge( order: Order, #[inject] stripe: Arc<StripeClient>) -> Result<Receipt, BoxError> { stripe.charge(&order).await}
// Usagelet stripe = Arc::new(StripeClient::new("sk_test_..."));let charge_task = Charge::new(stripe);Attribute Options
Section titled “Attribute Options”| Option | Type | Example | Description |
|---|---|---|---|
id | String | id = "custom_name" | Override task ID (default: function name) |
timeout | Duration | timeout = "30s" | Task timeout (supports ms, s, m, h) |
retries | Integer | retries = 3 | Maximum retry count |
backoff | Duration | backoff = "100ms" | Initial retry delay |
backoff_multiplier | Float | backoff_multiplier = 2.0 | Exponential multiplier (default: 2.0) |
tags | String | tags = "io" | Categorization tag (repeatable) |
display_name | String | display_name = "Charge Card" | Human-readable name |
description | String | description = "..." | Task description |
Parameters
Section titled “Parameters”- Exactly one non-
#[inject]parameter: the task input type - Zero or more
#[inject]parameters: dependency-injected fields
Return Types
Section titled “Return Types”Result<T, E>— Fallible;Eis converted viaInto<BoxError>T— Infallible; automatically wrapped inOk(...)
Generated Code
Section titled “Generated Code”The macro generates:
- A PascalCase struct (e.g.,
fn charge→struct Charge) new()constructor with positional args for injected dependenciestask_id()andmetadata()helper methodsregister()method forTaskRegistryintegrationCoreTasktrait implementation- The original function is preserved for direct use/testing
workflow! Macro
Section titled “workflow! Macro”Build a workflow pipeline with a concise DSL.
Basic Syntax
Section titled “Basic Syntax”let workflow = workflow!("workflow-id", JsonCodec, registry, task_a => task_b => task_c).unwrap();Step Types
Section titled “Step Types”| Syntax | Description | Example |
|---|---|---|
task_name | Reference to a #[task]-generated struct | validate => charge |
name(param: Type) { expr } | Inline task | double(x: i32) { Ok(x * 2) } |
a || b | Parallel fork | email || inventory => finalize |
delay "5s" | Durable delay (auto ID) | delay "5s" |
delay "wait_24h" "5s" | Durable delay (custom ID) | delay "wait_24h" "24h" |
signal "name" | Wait for external signal | signal "approval" |
=> | Sequential chain or join | a => b => c |
Examples
Section titled “Examples”Sequential pipeline:
let workflow = workflow!("order", JsonCodec, registry, validate => charge => send_email).unwrap();Parallel execution:
let workflow = workflow!("parallel", JsonCodec, registry, prepare => (send_email || update_inventory) => finalize).unwrap();Inline tasks:
let workflow = workflow!("inline", JsonCodec, registry, double(x: i32) { Ok(x * 2) } => triple(x: i32) { Ok(x * 3) }).unwrap();Durable delay:
let workflow = workflow!("delayed", JsonCodec, registry, send_email => delay "24h" => send_reminder).unwrap();Wait for signal:
let workflow = workflow!("approval", JsonCodec, registry, submit => signal "manager_approval" => process).unwrap();Returns
Section titled “Returns”A Result<SerializableWorkflow<C, Input, ()>, WorkflowError> expression.
WorkflowBuilder
Section titled “WorkflowBuilder”Fluent builder for constructing workflows programmatically (alternative to the workflow! macro).
Basic Usage
Section titled “Basic Usage”use sayiir_runtime::prelude::*;
let workflow = WorkflowBuilder::new(context) .then("task_a", |x: i32| async move { Ok(x * 2) }) .then("task_b", |x: i32| async move { Ok(x + 1) }) .build()?;Key Methods
Section titled “Key Methods”then(name, closure)
Section titled “then(name, closure)”Add a sequential task with an inline closure.
.then("double", |x: i32| async move { Ok(x * 2) })then_registered::<T>(name)
Section titled “then_registered::<T>(name)”Use a task from the registry.
.then_registered::<Validate>("validate")fork(|fork| { ... })
Section titled “fork(|fork| { ... })”Start parallel branches.
.fork(|fork| { fork.branch("email", send_email_task); fork.branch("inventory", update_inventory_task);})?.join("finalize", finalize_task)join(name, closure)
Section titled “join(name, closure)”Join parallel branches with a combining task.
.join("combine", |results: BranchResults| async move { // Combine branch outputs Ok(combined)})delay(name, duration)
Section titled “delay(name, duration)”Add a durable delay.
use std::time::Duration;
.delay("wait_24h", Duration::from_secs(86400))wait_for_signal(id, signal_name, timeout)
Section titled “wait_for_signal(id, signal_name, timeout)”Wait for an external signal.
.wait_for_signal("approval_step", "manager_approval", Some(Duration::from_secs(3600)))with_registry() / with_existing_registry(registry)
Section titled “with_registry() / with_existing_registry(registry)”Configure task registry.
let builder = WorkflowBuilder::new(context) .with_existing_registry(registry);build()
Section titled “build()”Build the workflow.
let workflow = builder.build()?;Runners
Section titled “Runners”CheckpointingRunner
Section titled “CheckpointingRunner”Single-process, durable execution with automatic checkpointing.
use sayiir_runtime::prelude::*;
let backend = InMemoryBackend::new();let runner = CheckpointingRunner::new(backend);
// Start a new workflowlet status = runner .run(workflow, "instance-001", input_data) .await?;
// Resume after crashlet status = runner .resume(workflow, "instance-001") .await?;
// Cancelrunner.cancel("instance-001", Some("user cancelled"), Some("admin")).await?;
// Pauserunner.pause("instance-001", Some("maintenance"), Some("ops")).await?;
// Unpauserunner.unpause("instance-001").await?;Methods:
new(backend)— Create with anyPersistentBackendrun(workflow, instance_id, input)— Run to completionresume(workflow, instance_id)— Resume from checkpointcancel(instance_id, reason, by)— Cancel workflowpause(instance_id, reason, by)— Pause workflowunpause(instance_id)— Unpause workflow
PooledWorker
Section titled “PooledWorker”Distributed, multi-machine execution with work claiming.
use sayiir_runtime::prelude::*;use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry) .with_claim_ttl(Duration::from_secs(60));
let handle = worker.spawn( Duration::from_secs(5), // Poll interval vec![workflow1, workflow2]);
// Later: stop the workerhandle.stop().await?;Methods:
new(worker_id, backend, registry)— Create workerwith_claim_ttl(duration)— Set claim timeoutspawn(poll_interval, workflows)→WorkerHandle— Start polling
InProcessRunner
Section titled “InProcessRunner”Simple in-memory execution without durability.
use sayiir_runtime::prelude::*;
let runner = InProcessRunner::new();let result = runner.run(workflow, input_data).await?;No persistence, no resume. Useful for testing or short-lived workflows.
Key Traits
Section titled “Key Traits”Serialization trait for task inputs and outputs.
pub trait Codec: Send + Sync + 'static { fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError>; fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError>;}Built-in implementations:
JsonCodec— JSON serialization (feature:json)RkyvCodec— Zero-copy binary serialization (feature:rkyv)
PersistentBackend
Section titled “PersistentBackend”Storage trait for workflow state.
pub trait PersistentBackend: Send + Sync + 'static { async fn save_snapshot(&self, snapshot: &WorkflowSnapshot) -> Result<(), BackendError>; async fn load_snapshot(&self, instance_id: &str) -> Result<Option<WorkflowSnapshot>, BackendError>; async fn claim_task(&self, claim: TaskClaim) -> Result<bool, BackendError>; async fn release_claim(&self, instance_id: &str, task_id: &str) -> Result<(), BackendError>; async fn save_signal(&self, instance_id: &str, signal: SignalEvent) -> Result<(), BackendError>; async fn load_signals(&self, instance_id: &str, signal_name: &str) -> Result<Vec<SignalEvent>, BackendError>; async fn delete_signal(&self, signal_id: &str) -> Result<(), BackendError>;}Implementations:
InMemoryBackend— In-memory storagePostgresBackend— PostgreSQL storage
CoreTask
Section titled “CoreTask”Task execution trait.
#[async_trait]pub trait CoreTask: Send + Sync + 'static { async fn run(&self, input: &[u8], ctx: &WorkflowContext) -> Result<Vec<u8>, BoxError>; fn metadata(&self) -> &TaskMetadata; fn task_id(&self) -> &str;}Usually implemented via the #[task] macro, but can be implemented manually if needed.
Configuration Types
Section titled “Configuration Types”RetryPolicy
Section titled “RetryPolicy”use std::time::Duration;use sayiir_core::task::RetryPolicy;
let policy = RetryPolicy { max_retries: 3, initial_delay: Duration::from_secs(1), backoff_multiplier: 2.0,};Delay calculation: delay = initial_delay * multiplier^attempt
TaskMetadata
Section titled “TaskMetadata”use sayiir_core::task::TaskMetadata;use std::time::Duration;
let metadata = TaskMetadata { display_name: Some("Process Order".to_string()), description: Some("Validates and processes customer orders".to_string()), timeout: Some(Duration::from_secs(30)), retry_policy: Some(policy), tags: vec!["orders".to_string(), "payment".to_string()],};WorkflowContext
Section titled “WorkflowContext”Per-execution context carrying codec and metadata.
use sayiir_core::context::WorkflowContext;use std::sync::Arc;
let ctx = WorkflowContext::new( "order-pipeline", Arc::new(JsonCodec), Arc::new(()), // User metadata (can be any type));Error Types
Section titled “Error Types”RuntimeError
Section titled “RuntimeError”Errors from the runtime layer.
use sayiir_runtime::RuntimeError;
match result { Err(RuntimeError::TaskFailed { task_id, message, .. }) => { eprintln!("Task {} failed: {}", task_id, message); } Err(RuntimeError::BackendError(e)) => { eprintln!("Storage error: {}", e); } _ => {}}BoxError
Section titled “BoxError”Type alias for Box<dyn std::error::Error + Send + Sync>.
use sayiir_core::error::BoxError;
async fn my_task(input: Data) -> Result<Output, BoxError> { // Any error implementing Error + Send + Sync can be returned Ok(process(input)?)}Full Documentation
Section titled “Full Documentation”For complete API documentation with all types, traits, and methods, see: