Rust API Reference
Overview
Section titled “Overview”Sayiir is split into several focused crates following a layered architecture:
sayiir-core (types & traits — runtime-agnostic) ↑sayiir-persistence (storage traits & implementations) ↑sayiir-runtime (execution engines & workers) ↑sayiir-macros (proc macros for convenience)Crates
Section titled “Crates”sayiir-core
Section titled “sayiir-core”Core domain types and traits. Runtime-agnostic — no persistence, no execution strategy.
Key Types:
WorkflowDefinition— The continuation tree describing what to executeSerializableWorkflow— Type-safe workflow with input/output typesWorkflowBuilder— Fluent builder for assembling pipelinesTaskMetadata— Metadata (timeout, retries, tags, description)RetryPolicy— Retry configuration with exponential backoffWorkflowContext— Per-execution context (codec, workflow ID, user metadata)TaskExecutionContext— Read-only context available to running tasks (workflow ID, instance ID, task ID, metadata)WorkflowSnapshot— Checkpoint of in-flight execution stateTaskRegistry— Name → factory map for task deserialization
Key Traits:
CoreTask— Task execution trait (input → output)Codec— Pluggable serialization (JSON, rkyv, etc.)BranchKey— Typed routing key for conditional branching
sayiir-persistence
Section titled “sayiir-persistence”Storage abstractions and implementations.
Key Types:
InMemoryBackend— Fast, ephemeral storage for dev/testWorkflowSnapshot— Serializable checkpoint stateSignalEvent— External signal with payload
Key Traits:
PersistentBackend— Storage trait for snapshots, signals, and claims
sayiir-postgres
Section titled “sayiir-postgres”PostgreSQL implementation of PersistentBackend.
Key Types:
PostgresBackend— Durable storage with auto-migration- Requires PostgreSQL 13+
sayiir-runtime
Section titled “sayiir-runtime”Execution engines and worker orchestration.
Key Types:
CheckpointingRunner— Single-process, durable executionInProcessRunner— Simple in-memory execution (no durability)PooledWorker— Distributed, multi-machine executionPooledWorkerBuilder— Builder forPooledWorkerwith auto-generated worker IDsWorkerHandle— Handle to a spawned workerWorkflowClient— Client for submitting and controlling workflow instancesJsonCodec— JSON serialization codec (feature:json)RkyvCodec— Binary serialization codec (feature:rkyv)RuntimeError— Execution errors
Key Traits:
WorkflowRunner— Trait for execution enginesWorkflowRunExt— Conveniencerun_oncemethod onWorkflow
sayiir-macros
Section titled “sayiir-macros”Procedural macros that eliminate boilerplate.
Macros:
#[task]— Transform an async function into aCoreTaskimplementation#[derive(BranchKey)]— DeriveBranchKeyfor fieldless enumsworkflow!— Build workflows with a concise DSL
Prelude
Section titled “Prelude”For convenience, import the prelude to get the most commonly used types:
use sayiir_runtime::prelude::*;Re-exports:
WorkflowBuilder,Workflow,WorkflowStatus,WorkflowContextTaskRegistryCheckpointingRunner,InProcessRunner,PooledWorker,PooledWorkerBuilder,WorkerHandle,WorkflowClient,WorkflowRunExtInMemoryBackend,PersistentBackendJsonCodec(ifjsonfeature enabled)RkyvCodec(ifrkyvfeature enabled)BranchKeytraittask,workflow,BranchKeyderive (ifmacrosfeature enabled)RuntimeError
#[task] Macro
Section titled “#[task] Macro”Transform an async function into a CoreTask implementation with automatic registration and dependency injection.
Basic Usage
Section titled “Basic Usage”use sayiir_runtime::prelude::*;use sayiir_core::error::BoxError;
#[task(id = "greet")]async fn greet(name: String) -> Result<String, BoxError> { Ok(format!("Hello, {}!", name))}
// Generated: GreetTask structlet task = GreetTask::new();assert_eq!(GreetTask::task_id(), "greet");With Metadata
Section titled “With Metadata”#[task( id = "charge_card", timeout = "30s", retries = 3, backoff = "100ms", backoff_multiplier = 2.0, tags = "io", tags = "external")]async fn charge(order: Order) -> Result<Receipt, BoxError> { // Implementation Ok(Receipt { /* ... */ })}// Generated: ChargeTask struct, task_id() → "charge_card"With Dependency Injection
Section titled “With Dependency Injection”use std::sync::Arc;
#[task(id = "charge_card")]async fn charge( order: Order, #[inject] stripe: Arc<StripeClient>) -> Result<Receipt, BoxError> { stripe.charge(&order).await}
// Usagelet stripe = Arc::new(StripeClient::new("sk_test_..."));let charge_task = ChargeTask::new(stripe);Attribute Options
Section titled “Attribute Options”| Option | Type | Example | Description |
|---|---|---|---|
id | String | id = "charge_card" | Recommended. Explicit task ID (default: function name) |
timeout | Duration | timeout = "30s" | Task timeout (supports ms, s, m, h) |
retries | Integer | retries = 3 | Maximum retry count |
backoff | Duration | backoff = "100ms" | Initial retry delay |
backoff_multiplier | Float | backoff_multiplier = 2.0 | Exponential multiplier (default: 2.0) |
tags | String | tags = "io" | Categorization tag (repeatable) |
display_name | String | display_name = "Charge Card" | Human-readable name |
description | String | description = "..." | Task description |
priority | Integer (1–5) | priority = 1 | Execution priority (1 = Critical, 5 = Minimal) |
Parameters
Section titled “Parameters”- Exactly one non-
#[inject]parameter: the task input type - Zero or more
#[inject]parameters: dependency-injected fields
Return Types
Section titled “Return Types”Result<T, E>— Fallible;Eis converted viaInto<BoxError>T— Infallible; automatically wrapped inOk(...)
Generated Code
Section titled “Generated Code”The macro generates a PascalCase struct with a Task suffix from the function name:
| Function | Generated struct |
|---|---|
fn charge(...) | ChargeTask |
fn send_email(...) | SendEmailTask |
fn process_order(...) | ProcessOrderTask |
The struct includes:
new()constructor with positional args for injected dependenciestask_id()— returns the task ID (idattribute or function name)metadata()— returnsTaskMetadatafrom attributesregister()method forTaskRegistryintegrationCoreTasktrait implementation
The original function is preserved for direct use/testing.
workflow! Macro
Section titled “workflow! Macro”Build a workflow pipeline with a concise DSL. The macro automatically registers #[task]-annotated functions — no manual registry.register(...) calls needed. Uses named fields: name and steps (with optional codec and registry):
Basic Syntax
Section titled “Basic Syntax”let workflow = workflow! { name: "workflow-id", steps: [task_a, task_b, task_c]}.unwrap();Step Types
Section titled “Step Types”| Syntax | Description | Example |
|---|---|---|
task_name | Reference to a #[task]-generated struct | validate, charge |
name(param: Type) { expr } | Inline task | double(x: i32) { Ok(x * 2) } |
a || b | Parallel fork | email || inventory, finalize |
delay "5s" | Durable delay (auto ID) | delay "5s" |
delay "wait_24h" "5s" | Durable delay (custom ID) | delay "wait_24h" "24h" |
signal "name" | Wait for external signal | signal "approval" |
loop task N | Loop body task up to N iterations | loop refine 10 |
loop task N exit_with_last | Loop with exit-on-max policy | loop poll 100 exit_with_last |
flow expr | Inline a child workflow | flow child_workflow |
route key_fn { ... } | Conditional branch | See below |
, | Sequential separator | a, b, c |
Examples
Section titled “Examples”Sequential pipeline:
let workflow = workflow! { name: "order", registry: registry, steps: [validate, charge, send_email]}.unwrap();Parallel execution:
let workflow = workflow! { name: "parallel", registry: registry, steps: [prepare, (send_email || update_inventory), finalize]}.unwrap();Inline tasks:
let workflow = workflow! { name: "inline", registry: registry, steps: [double(x: i32) { Ok(x * 2) }, triple(x: i32) { Ok(x * 3) }]}.unwrap();Durable delay:
let workflow = workflow! { name: "delayed", registry: registry, steps: [send_email, delay "24h", send_reminder]}.unwrap();Wait for signal:
let workflow = workflow! { name: "approval", registry: registry, steps: [submit, signal "manager_approval", process]}.unwrap();Conditional branching with typed keys (-> EnumType triggers compile-time exhaustiveness checking):
#[derive(BranchKey)]enum Intent { Billing, Tech }
let workflow = workflow! { name: "support", registry: registry, steps: [ classify_ticket, route extract_intent -> Intent { Billing => [handle_billing], Tech => [run_diagnostics, escalate], _ => [fallback_handler], }, send_summary ]}.unwrap();String-based keys still work (without -> Type):
let workflow = workflow! { name: "support", registry: registry, steps: [ classify_ticket, route extract_intent { "billing" => [handle_billing], "tech" => [run_diagnostics, escalate], _ => [fallback_handler], }, send_summary ]}.unwrap();Each branch arm uses [task, task] syntax for task chains. The _ arm is the optional default.
Returns
Section titled “Returns”A Result<SerializableWorkflow<C, Input, ()>, BuildError> expression.
TaskRegistry
Section titled “TaskRegistry”Name → factory map for registering task implementations. Used by workers and for workflow deserialization — only task IDs and structure are serialized, implementations are looked up from the registry at runtime.
Basic Usage
Section titled “Basic Usage”use sayiir_runtime::prelude::*;use std::sync::Arc;
let codec = Arc::new(JsonCodec);let mut registry = TaskRegistry::new();
// Register a struct implementing CoreTaskregistry.register("double", codec.clone(), DoubleTask);
// Register a closureregistry.register_fn("add_ten", codec.clone(), |i: u32| async move { Ok(i + 10) });Builder Pattern
Section titled “Builder Pattern”let registry = TaskRegistry::with_codec(codec) .register_fn("double", |i: u32| async move { Ok(i * 2) }) .register_fn("add_ten", |i: u32| async move { Ok(i + 10) }) .build();merge(other)
Section titled “merge(other)”Merge all entries from another registry. Existing entries are not overwritten — the parent’s registrations take precedence on ID collision.
The #[task] macro generates a register() method on each struct, so building reusable libraries is straightforward:
// billing_tasks crate — each #[task] gets a register() methodpub fn billing_tasks(codec: Arc<JsonCodec>, stripe: Arc<StripeClient>) -> TaskRegistry { let mut reg = TaskRegistry::new(); ChargeTask::register(&mut reg, codec.clone(), ChargeTask::new(stripe.clone())); RefundTask::register(&mut reg, codec.clone(), RefundTask::new(stripe)); reg}
// application — merge libraries into a single registrylet mut registry = TaskRegistry::new();registry.merge(billing_tasks(codec.clone(), stripe));registry.merge(notification_tasks(codec.clone(), mailer));Key Methods
Section titled “Key Methods”| Method | Description |
|---|---|
new() | Create an empty registry |
register(id, codec, task) | Register a CoreTask implementation |
register_with_metadata(id, codec, task, metadata) | Register with metadata |
register_fn(id, codec, closure) | Register a closure as a task |
register_fn_with_metadata(id, codec, closure, metadata) | Register closure with metadata |
register_join(id, codec, closure) | Register a fork/join combiner |
merge(other) | Merge another registry (parent wins on collision) |
with_codec(codec) → RegistryBuilder | Start a builder with shared codec |
get(id) → Option<UntypedCoreTask> | Look up a task by ID |
get_metadata(id) → Option<&TaskMetadata> | Get task metadata |
contains(id) → bool | Check if a task ID is registered |
len() / is_empty() | Registry size |
task_ids() → impl Iterator<Item = &str> | Iterate registered IDs |
WorkflowBuilder
Section titled “WorkflowBuilder”Fluent builder for constructing workflows programmatically (alternative to the workflow! macro).
Basic Usage
Section titled “Basic Usage”use sayiir_runtime::prelude::*;
let workflow = WorkflowBuilder::new(context) .then("task_a", |x: i32| async move { Ok(x * 2) }) .then("task_b", |x: i32| async move { Ok(x + 1) }) .build()?;Key Methods
Section titled “Key Methods”Composing from #[task] structs (recommended)
Section titled “Composing from #[task] structs (recommended)”then_task::<T>()
Section titled “then_task::<T>()”Add a #[task] struct to the workflow. The task ID, output type, timeout, and retry
policy are all derived from the type — no strings needed.
// Given: #[task] async fn add_ten(input: u32) -> Result<u32, BoxError> { … }// Given: #[task(timeout = "5s")] async fn double(input: u32) -> Result<u32, BoxError> { … }
let workflow = WorkflowBuilder::new(ctx) .with_registry() .then_task::<AddTenTask>() .then_task::<DoubleTask>() .build()?;then_task_with(instance)
Section titled “then_task_with(instance)”Same as then_task, but for tasks with #[inject] dependencies that don’t implement Default.
.then_task_with(FetchTask::new(http_client.clone()))Inline closures
Section titled “Inline closures”then(name, closure)
Section titled “then(name, closure)”Add a sequential task with an inline closure.
.then("double", |x: i32| async move { Ok(x * 2) })Dynamic workflows
Section titled “Dynamic workflows”then_registered::<T>(name)
Section titled “then_registered::<T>(name)”Reference a pre-registered task by its string ID. Use this when task IDs are determined at runtime (e.g. loaded from config or a database).
Prefer .then_task::<T>() when the task is a #[task] struct known at compile time.
.then_registered::<u32>("validate")branches(closure) / fork()
Section titled “branches(closure) / fork()”Start parallel branches. When branches are #[task] structs, use add_task for type safety:
// Using #[task] structs (recommended).branches(|b| { b.add_task::<SendEmailTask>(); b.add_task::<UpdateInventoryTask>();}).join("finalize", |outputs: BranchOutputs<_>| async move { let email = outputs.get::<SendEmailTask>()?; let inv = outputs.get::<UpdateInventoryTask>()?; Ok(Summary { email, inv })})Or with inline closures:
.branches(|b| { b.add("email", |input: Order| async move { Ok(send_email(input).await?) }); b.add("inventory", |input: Order| async move { Ok(update_inventory(input).await?) });}).join("finalize", |outputs: BranchOutputs<_>| async move { let email: EmailResult = outputs.get("email")?; let inv: InventoryResult = outputs.get("inventory")?; Ok(Summary { email, inv })})join(name, closure)
Section titled “join(name, closure)”Join parallel branches with a combining task. Uses BranchOutputs for type-safe named access to each branch result.
.join("combine", |outputs: BranchOutputs<_>| async move { let count: u32 = outputs.get("count")?; let name: String = outputs.get("name")?; Ok(format!("{}: {}", name, count))})route(key_fn)
Section titled “route(key_fn)”Start conditional branching with static type checking. The key function returns a BranchKey enum variant — the compiler ensures every variant has a matching .branch() call and that no orphan branches exist. Typos and missing cases are caught at compile time, not at runtime. The branch node ID is derived automatically from the enum type name.
#[derive(BranchKey)]enum Route { Billing, Tech,}
let wf = WorkflowBuilder::new(ctx) .with_registry() .then_task::<ClassifyTask>() .route::<_, Route, _, _>( |input: ClassifyOutput| async move { Ok(match input.intent { Intent::Billing => Route::Billing, Intent::Tech => Route::Tech, }) }) .branch(Route::Billing, |sub| sub.then_task::<HandleBillingTask>()) .branch(Route::Tech, |sub| sub.then_task::<HandleTechTask>()) .default_branch(|sub| sub.then_task::<FallbackTask>()) .done() .build()?;After .done(), the output type is BranchEnvelope<BranchOut> containing branch (the key) and result.
loop_task(id, closure, max_iterations)
Section titled “loop_task(id, closure, max_iterations)”Add a loop whose body repeats until it returns LoopResult::Done.
use sayiir_core::LoopResult;
let wf = WorkflowBuilder::new(ctx) .then("setup", setup_fn) .loop_task("refine", |draft: String| async move { let improved = improve(&draft); if is_good_enough(&improved) { Ok(LoopResult::Done(improved)) } else { Ok(LoopResult::Again(improved)) } }, 5) .then("finalize", finalize_fn) .build()?;Parameters:
id(&str) — Body task identifier. The loop node gets an auto-generated ID (loop_0,loop_1, …).func— Closure returningResult<LoopResult<NewOutput>, BoxError>max_iterations(u32) — Maximum iterations before failing (must be ≥ 1)
Use loop_task_with_policy for custom MaxIterationsPolicy:
use sayiir_core::MaxIterationsPolicy;
.loop_task_with_policy("poll", poll_fn, 100, MaxIterationsPolicy::ExitWithLast)LoopResult<T>:
LoopResult::Again(value)— Continue iterating withvalueLoopResult::Done(value)— Exit the loop withvalue- Serde format:
{"_loop": "again"|"done", "value": ...}
MaxIterationsPolicy:
Fail(default) — Return an error when max iterations is reachedExitWithLast— Exit with the last iteration’s output
delay(name, duration)
Section titled “delay(name, duration)”Add a durable delay.
use std::time::Duration;
.delay("wait_24h", Duration::from_secs(86400))wait_for_signal(id, signal_name, timeout)
Section titled “wait_for_signal(id, signal_name, timeout)”Wait for an external signal.
.wait_for_signal("approval_step", "manager_approval", Some(Duration::from_secs(3600)))with_registry() / with_existing_registry(registry)
Section titled “with_registry() / with_existing_registry(registry)”Configure task registry.
let builder = WorkflowBuilder::new(context) .with_existing_registry(registry);then_flow(child) / then_serializable_flow(child)
Section titled “then_flow(child) / then_serializable_flow(child)”Inline a child workflow. The child’s entire continuation tree is embedded as a single step. Use then_flow for non-serializable workflows and then_serializable_flow when both parent and child have registries (merges the child’s task registry into the parent’s).
let child = WorkflowBuilder::new(child_ctx) .with_registry() .then("double", |x: u32| async move { Ok(x * 2) }) .build()?;
let parent = WorkflowBuilder::new(parent_ctx) .with_registry() .then("add_ten", |x: u32| async move { Ok(x + 10) }) .then_serializable_flow(child) .build()?;In the workflow! macro, use the flow keyword:
let child = workflow! { name: "child", steps: [double] }.unwrap();let parent = workflow! { name: "parent", steps: [add_ten, flow child] }.unwrap();build()
Section titled “build()”Build the workflow.
let workflow = builder.build()?;Runners
Section titled “Runners”CheckpointingRunner
Section titled “CheckpointingRunner”Single-process, durable execution with automatic checkpointing.
use sayiir_runtime::prelude::*;
let backend = InMemoryBackend::new();let runner = CheckpointingRunner::new(backend);
// Start a new workflowlet status = runner .run(workflow, "instance-001", input_data) .await?;
// Resume after crashlet status = runner .resume(workflow, "instance-001") .await?;
// Cancelrunner.cancel("instance-001", Some("user cancelled"), Some("admin")).await?;
// Pauserunner.pause("instance-001", Some("maintenance"), Some("ops")).await?;
// Unpauserunner.unpause("instance-001").await?;Methods:
new(backend)— Create with anyPersistentBackendwith_conflict_policy(policy)— Set the conflict policy (builder pattern). SeeConflictPolicybelow.run(workflow, instance_id, input)— Run to completionresume(workflow, instance_id)— Resume from checkpointcancel(instance_id, reason, by)— Cancel workflowpause(instance_id, reason, by)— Pause workflowunpause(instance_id)— Unpause workflow
ConflictPolicy
Section titled “ConflictPolicy”Controls what happens when run() is called with an instance_id that already has a snapshot.
use sayiir_core::workflow::ConflictPolicy;
// Default: fail if instance already existslet runner = CheckpointingRunner::new(backend.clone());
// Idempotent: return existing statuslet runner = CheckpointingRunner::new(backend.clone()) .with_conflict_policy(ConflictPolicy::UseExisting);
// Force restart: delete old snapshot, run freshlet runner = CheckpointingRunner::new(backend.clone()) .with_conflict_policy(ConflictPolicy::TerminateExisting);Variants:
| Variant | Behavior |
|---|---|
Fail (default) | Returns RuntimeError::InstanceAlreadyExists if the instance exists. |
UseExisting | Returns the existing workflow status without re-running. |
TerminateExisting | Deletes the existing snapshot and starts a fresh execution. |
PooledWorker
Section titled “PooledWorker”Distributed, multi-machine execution with work claiming.
use sayiir_runtime::prelude::*;use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry) .with_claim_ttl(Some(Duration::from_secs(60)));
let handle = worker.spawn( Duration::from_secs(5), // Poll interval vec![workflow1, workflow2]);
// Later: stop the workerhandle.shutdown();handle.join().await?;Or use the builder for auto-generated worker IDs:
// Auto-generated worker ID from hostname + PIDlet worker = PooledWorker::builder(backend, registry).build();
// Or override with explicit ID and custom settingslet worker = PooledWorker::builder(backend, registry) .worker_id("custom-worker-1") .claim_ttl(Some(Duration::from_secs(600))) .batch_size(NonZeroUsize::new(4).unwrap()) .build();Methods:
new(worker_id, backend, registry)— Create worker with explicit IDbuilder(backend, registry)→PooledWorkerBuilder— Create builder with auto-generated IDwith_claim_ttl(Option<Duration>)— Set claim timeoutwith_batch_size(NonZeroUsize)— Set tasks per poll (default: 1)with_aging_interval(Duration)— Set priority aging interval (default: 300s, must be > 0)spawn(poll_interval, workflows)→WorkerHandle— Start polling
InProcessRunner
Section titled “InProcessRunner”Simple in-memory execution without durability.
use sayiir_runtime::prelude::*;
let runner = InProcessRunner::new();let result = runner.run(workflow, input_data).await?;No persistence, no resume. Useful for testing or short-lived workflows.
WorkflowClient
Section titled “WorkflowClient”Client for submitting and controlling workflow instances without executing tasks. Use this with PooledWorker for the distributed model.
use sayiir_runtime::WorkflowClient;use sayiir_core::workflow::ConflictPolicy;
let backend = PostgresBackend::<JsonCodec>::connect(url).await?;let client = WorkflowClient::new(backend) .with_conflict_policy(ConflictPolicy::UseExisting);
// Submit a workflowlet (status, output) = client.submit(&workflow, "order-42", input).await?;
// Lifecycle operationsclient.cancel("order-42", Some("Out of stock".into()), None).await?;client.pause("order-42", Some("Maintenance".into()), None).await?;client.unpause("order-42").await?;client.send_event("order-42", "signal_name", payload).await?;let status = client.status("order-42").await?;
// Retrieve a single task result (requires TaskResultStore)let result = client.get_task_result("order-42", "validate_order").await?;
// Type-safe variant using #[task]-generated structlet result = client.get_task_result_of::<ValidateOrderTask>("order-42").await?;Methods:
new(backend)— Create client (wraps backend in Arc)from_shared(Arc<B>)— Create from shared backend referencewith_conflict_policy(ConflictPolicy)— Set duplicate instance policybackend()→&Arc<B>— Access the backendsubmit(workflow, instance_id, input)→(WorkflowStatus, Option<Bytes>)cancel(instance_id, reason, cancelled_by)— Store a cancel signalpause(instance_id, reason, paused_by)— Store a pause signalunpause(instance_id)— Unpause a paused workflowsend_event(instance_id, signal_name, payload)— Send an external eventstatus(instance_id)→WorkflowStatus— Query current statusget_task_result(instance_id, task_id)→Option<Bytes>— Get a single task result (requiresB: TaskResultStore)get_task_result_of::<T>(instance_id)→Option<Bytes>— Type-safe variant that derives task ID from aTaskIdentifierimpl (e.g.#[task]-generated struct)
WorkflowRunExt (run_once)
Section titled “WorkflowRunExt (run_once)”Extension trait that adds a run_once convenience method directly on Workflow. Uses InProcessRunner internally — no backend setup, no instance ID.
use sayiir_runtime::prelude::*;
let status = workflow.run_once(input).await?;Ideal for scripts, tests, and quick experiments. Included in the prelude.
Key Traits
Section titled “Key Traits”Serialization trait for task inputs and outputs.
pub trait Codec: Send + Sync + 'static { fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError>; fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError>;}Built-in implementations:
JsonCodec— JSON serialization (feature:json)RkyvCodec— Zero-copy binary serialization (feature:rkyv)
PersistentBackend
Section titled “PersistentBackend”Storage trait for workflow state.
pub trait PersistentBackend: Send + Sync + 'static { async fn save_snapshot(&self, snapshot: &WorkflowSnapshot) -> Result<(), BackendError>; async fn load_snapshot(&self, instance_id: &str) -> Result<Option<WorkflowSnapshot>, BackendError>; async fn claim_task(&self, claim: TaskClaim) -> Result<bool, BackendError>; async fn release_claim(&self, instance_id: &str, task_id: &str) -> Result<(), BackendError>; async fn save_signal(&self, instance_id: &str, signal: SignalEvent) -> Result<(), BackendError>; async fn load_signals(&self, instance_id: &str, signal_name: &str) -> Result<Vec<SignalEvent>, BackendError>; async fn delete_signal(&self, signal_id: &str) -> Result<(), BackendError>;}Implementations:
InMemoryBackend— In-memory storagePostgresBackend— PostgreSQL storage
CoreTask
Section titled “CoreTask”Task execution trait.
#[async_trait]pub trait CoreTask: Send + Sync + 'static { async fn run(&self, input: &[u8], ctx: &WorkflowContext) -> Result<Vec<u8>, BoxError>; fn metadata(&self) -> &TaskMetadata; fn task_id(&self) -> &str;}Usually implemented via the #[task] macro, but can be implemented manually if needed.
TaskIdentifier
Section titled “TaskIdentifier”Lightweight trait that only provides the task’s unique string identifier. Unlike RegisterableTask, it does not require CoreTask — making it usable in client-only contexts that refer to tasks by ID without pulling in the task implementation.
pub trait TaskIdentifier { fn task_id() -> &'static str;}Automatically implemented for all RegisterableTask types (including #[task]-generated structs). Used by WorkflowClient::get_task_result_of::<T>() for type-safe task result retrieval.
BranchKey
Section titled “BranchKey”Typed routing key for conditional branching. Instead of passing raw strings as branch keys (which are only validated at runtime), BranchKey enums give you static type checking — the compiler rejects typos, missing branches, and orphan keys at build time.
pub trait BranchKey: Send + Sync + 'static { fn as_key(&self) -> &str; fn all_keys() -> &'static [&'static str]; fn from_key(key: &str) -> Option<Self> where Self: Sized;}Use #[derive(BranchKey)] on a fieldless enum to auto-implement the trait. Variant names are converted to snake_case keys by default, with #[branch_key("custom")] for overrides:
use sayiir_runtime::prelude::*;
#[derive(BranchKey)]enum TicketRoute { Billing, // key: "billing" TechSupport, // key: "tech_support" #[branch_key("other")] Fallback, // key: "other"}The key function returns a BranchKey variant, and .branch() accepts variants — not strings. The compiler ensures every variant is handled, eliminating an entire class of runtime routing errors. The derive macro is re-exported in the prelude when the macros feature is enabled.
Configuration Types
Section titled “Configuration Types”RetryPolicy
Section titled “RetryPolicy”use std::time::Duration;use sayiir_core::task::RetryPolicy;
let policy = RetryPolicy { max_retries: 3, initial_delay: Duration::from_secs(1), backoff_multiplier: 2.0,};Delay calculation: delay = initial_delay * multiplier^attempt
TaskMetadata
Section titled “TaskMetadata”use sayiir_core::task::TaskMetadata;use std::time::Duration;
let metadata = TaskMetadata { display_name: Some("Process Order".to_string()), description: Some("Validates and processes customer orders".to_string()), timeout: Some(Duration::from_secs(30)), retry_policy: Some(policy), tags: vec!["orders".to_string(), "payment".to_string()],};WorkflowContext
Section titled “WorkflowContext”Per-execution context carrying codec and metadata.
use sayiir_core::context::WorkflowContext;use std::sync::Arc;
let ctx = WorkflowContext::new( "order-pipeline", Arc::new(JsonCodec), Arc::new(()), // User metadata (can be any type));Task Execution Context
Section titled “Task Execution Context”TaskExecutionContext
Section titled “TaskExecutionContext”Read-only context available from within a running task. Provides access to the workflow ID, instance ID, task ID, and task metadata. Set automatically by the runtime before each task executes.
use sayiir_core::context::TaskExecutionContext;
// Access via the task_context! macro (works in both async and sync contexts)if let Some(ctx) = task_context!() { println!("Workflow: {}", ctx.workflow_id); println!("Instance: {}", ctx.instance_id); println!("Task: {}", ctx.task_id); if let Some(timeout) = ctx.metadata.timeout { println!("Timeout: {:?}", timeout); }}Fields:
workflow_id(Arc<str>) — The workflow definition identifierinstance_id(Arc<str>) — The workflow instance identifier (durable) or workflow ID (in-memory)task_id(Arc<str>) — The current task identifiermetadata(TaskMetadata) — Task metadata (timeout, retry policy, version, tags, etc.)workflow_metadata_json(Option<Arc<str>>) — Optional JSON-encoded workflow-level metadata
task_context!()
Section titled “task_context!()”Macro to access the task execution context from within a running task. Returns Option<TaskExecutionContext>.
Tries tokio task-local storage first (async executor paths), then falls back to thread-local storage (sync paths used by Python GIL and Node.js main thread).
use sayiir_runtime::task_context;
#[task(id = "process")]async fn process(input: Data) -> Result<Output, BoxError> { let ctx = task_context!(); if let Some(ctx) = ctx { tracing::info!( workflow = %ctx.workflow_id, instance = %ctx.instance_id, task = %ctx.task_id, "Processing" ); } Ok(do_process(input))}The task_context! macro is re-exported from sayiir_runtime (included in the prelude).
Error Types
Section titled “Error Types”Sayiir separates errors into two domains:
BuildError
Section titled “BuildError”Errors from workflow construction and hydration (build-time).
use sayiir_core::error::BuildError;
match builder.build() { Err(BuildError::DuplicateTaskId(id)) => eprintln!("Duplicate: {id}"), Err(BuildError::TaskNotFound(id)) => eprintln!("Missing task: {id}"), Err(BuildError::EmptyBranch) => eprintln!("Branch needs at least one step"), Err(BuildError::EmptyFork) => eprintln!("Fork has no branches"), Err(BuildError::MissingBranches { branch_id, missing_keys }) => { eprintln!("Branch '{branch_id}' is missing handlers for: {missing_keys:?}"); } Err(BuildError::OrphanBranches { branch_id, orphan_keys }) => { eprintln!("Branch '{branch_id}' has handlers for unknown keys: {orphan_keys:?}"); } Err(BuildError::DefinitionMismatch { expected, found }) => { eprintln!("Hash mismatch: expected {expected}, got {found}"); } Ok(workflow) => { /* ... */ }}WorkflowError
Section titled “WorkflowError”Errors from workflow execution (runtime).
Variants include TaskNotFound, TaskNotImplemented, DefinitionMismatch, Cancelled, Paused, TaskPanicked, TaskTimedOut, BranchKeyNotFound, Waiting, AwaitingSignal, and others.
RuntimeError
Section titled “RuntimeError”Top-level error from the runtime layer, wrapping all error domains.
use sayiir_runtime::RuntimeError;
match result { Err(RuntimeError::Workflow(e)) => eprintln!("Workflow error: {e}"), Err(RuntimeError::Build(e)) => eprintln!("Build error: {e}"), Err(RuntimeError::Backend(e)) => eprintln!("Storage error: {e}"), Err(RuntimeError::Task(e)) => eprintln!("Task error: {e}"), Err(RuntimeError::InstanceAlreadyExists(id)) => { eprintln!("Instance already exists: {id}"); } _ => {}}BoxError
Section titled “BoxError”Type alias for Box<dyn std::error::Error + Send + Sync>.
use sayiir_core::error::BoxError;
async fn my_task(input: Data) -> Result<Output, BoxError> { // Any error implementing Error + Send + Sync can be returned Ok(process(input)?)}Full Documentation
Section titled “Full Documentation”For complete API documentation with all types, traits, and methods, see: