Skip to content

Rust API Reference

Sayiir is split into several focused crates following a layered architecture:

sayiir-core (types & traits — runtime-agnostic)
sayiir-persistence (storage traits & implementations)
sayiir-runtime (execution engines & workers)
sayiir-macros (proc macros for convenience)

Core domain types and traits. Runtime-agnostic — no persistence, no execution strategy.

Key Types:

  • WorkflowDefinition — The continuation tree describing what to execute
  • SerializableWorkflow — Type-safe workflow with input/output types
  • WorkflowBuilder — Fluent builder for assembling pipelines
  • TaskMetadata — Metadata (timeout, retries, tags, description)
  • RetryPolicy — Retry configuration with exponential backoff
  • WorkflowContext — Per-execution context (codec, workflow ID, user metadata)
  • TaskExecutionContext — Read-only context available to running tasks (workflow ID, instance ID, task ID, metadata)
  • WorkflowSnapshot — Checkpoint of in-flight execution state
  • TaskRegistry — Name → factory map for task deserialization

Key Traits:

  • CoreTask — Task execution trait (input → output)
  • Codec — Pluggable serialization (JSON, rkyv, etc.)
  • BranchKey — Typed routing key for conditional branching

Storage abstractions and implementations.

Key Types:

  • InMemoryBackend — Fast, ephemeral storage for dev/test
  • WorkflowSnapshot — Serializable checkpoint state
  • SignalEvent — External signal with payload

Key Traits:

  • PersistentBackend — Storage trait for snapshots, signals, and claims

PostgreSQL implementation of PersistentBackend.

Key Types:

  • PostgresBackend — Durable storage with auto-migration
  • Requires PostgreSQL 13+

Execution engines and worker orchestration.

Key Types:

  • CheckpointingRunner — Single-process, durable execution
  • InProcessRunner — Simple in-memory execution (no durability)
  • PooledWorker — Distributed, multi-machine execution
  • PooledWorkerBuilder — Builder for PooledWorker with auto-generated worker IDs
  • WorkerHandle — Handle to a spawned worker
  • WorkflowClient — Client for submitting and controlling workflow instances
  • JsonCodec — JSON serialization codec (feature: json)
  • RkyvCodec — Binary serialization codec (feature: rkyv)
  • RuntimeError — Execution errors

Key Traits:

  • WorkflowRunner — Trait for execution engines
  • WorkflowRunExt — Convenience run_once method on Workflow

Procedural macros that eliminate boilerplate.

Macros:

  • #[task] — Transform an async function into a CoreTask implementation
  • #[derive(BranchKey)] — Derive BranchKey for fieldless enums
  • workflow! — Build workflows with a concise DSL

For convenience, import the prelude to get the most commonly used types:

use sayiir_runtime::prelude::*;

Re-exports:

  • WorkflowBuilder, Workflow, WorkflowStatus, WorkflowContext
  • TaskRegistry
  • CheckpointingRunner, InProcessRunner, PooledWorker, PooledWorkerBuilder, WorkerHandle, WorkflowClient, WorkflowRunExt
  • InMemoryBackend, PersistentBackend
  • JsonCodec (if json feature enabled)
  • RkyvCodec (if rkyv feature enabled)
  • BranchKey trait
  • task, workflow, BranchKey derive (if macros feature enabled)
  • RuntimeError

Transform an async function into a CoreTask implementation with automatic registration and dependency injection.

use sayiir_runtime::prelude::*;
use sayiir_core::error::BoxError;
#[task(id = "greet")]
async fn greet(name: String) -> Result<String, BoxError> {
Ok(format!("Hello, {}!", name))
}
// Generated: GreetTask struct
let task = GreetTask::new();
assert_eq!(GreetTask::task_id(), "greet");
#[task(
id = "charge_card",
timeout = "30s",
retries = 3,
backoff = "100ms",
backoff_multiplier = 2.0,
tags = "io",
tags = "external"
)]
async fn charge(order: Order) -> Result<Receipt, BoxError> {
// Implementation
Ok(Receipt { /* ... */ })
}
// Generated: ChargeTask struct, task_id() → "charge_card"
use std::sync::Arc;
#[task(id = "charge_card")]
async fn charge(
order: Order,
#[inject] stripe: Arc<StripeClient>
) -> Result<Receipt, BoxError> {
stripe.charge(&order).await
}
// Usage
let stripe = Arc::new(StripeClient::new("sk_test_..."));
let charge_task = ChargeTask::new(stripe);
OptionTypeExampleDescription
idStringid = "charge_card"Recommended. Explicit task ID (default: function name)
timeoutDurationtimeout = "30s"Task timeout (supports ms, s, m, h)
retriesIntegerretries = 3Maximum retry count
backoffDurationbackoff = "100ms"Initial retry delay
backoff_multiplierFloatbackoff_multiplier = 2.0Exponential multiplier (default: 2.0)
tagsStringtags = "io"Categorization tag (repeatable)
display_nameStringdisplay_name = "Charge Card"Human-readable name
descriptionStringdescription = "..."Task description
priorityInteger (1–5)priority = 1Execution priority (1 = Critical, 5 = Minimal)
  • Exactly one non-#[inject] parameter: the task input type
  • Zero or more #[inject] parameters: dependency-injected fields
  • Result<T, E> — Fallible; E is converted via Into<BoxError>
  • T — Infallible; automatically wrapped in Ok(...)

The macro generates a PascalCase struct with a Task suffix from the function name:

FunctionGenerated struct
fn charge(...)ChargeTask
fn send_email(...)SendEmailTask
fn process_order(...)ProcessOrderTask

The struct includes:

  • new() constructor with positional args for injected dependencies
  • task_id() — returns the task ID (id attribute or function name)
  • metadata() — returns TaskMetadata from attributes
  • register() method for TaskRegistry integration
  • CoreTask trait implementation

The original function is preserved for direct use/testing.

Build a workflow pipeline with a concise DSL. The macro automatically registers #[task]-annotated functions — no manual registry.register(...) calls needed. Uses named fields: name and steps (with optional codec and registry):

let workflow = workflow! {
name: "workflow-id",
steps: [task_a, task_b, task_c]
}.unwrap();
SyntaxDescriptionExample
task_nameReference to a #[task]-generated structvalidate, charge
name(param: Type) { expr }Inline taskdouble(x: i32) { Ok(x * 2) }
a || bParallel forkemail || inventory, finalize
delay "5s"Durable delay (auto ID)delay "5s"
delay "wait_24h" "5s"Durable delay (custom ID)delay "wait_24h" "24h"
signal "name"Wait for external signalsignal "approval"
loop task NLoop body task up to N iterationsloop refine 10
loop task N exit_with_lastLoop with exit-on-max policyloop poll 100 exit_with_last
flow exprInline a child workflowflow child_workflow
route key_fn { ... }Conditional branchSee below
,Sequential separatora, b, c

Sequential pipeline:

let workflow = workflow! {
name: "order",
registry: registry,
steps: [validate, charge, send_email]
}.unwrap();

Parallel execution:

let workflow = workflow! {
name: "parallel",
registry: registry,
steps: [prepare, (send_email || update_inventory), finalize]
}.unwrap();

Inline tasks:

let workflow = workflow! {
name: "inline",
registry: registry,
steps: [double(x: i32) { Ok(x * 2) }, triple(x: i32) { Ok(x * 3) }]
}.unwrap();

Durable delay:

let workflow = workflow! {
name: "delayed",
registry: registry,
steps: [send_email, delay "24h", send_reminder]
}.unwrap();

Wait for signal:

let workflow = workflow! {
name: "approval",
registry: registry,
steps: [submit, signal "manager_approval", process]
}.unwrap();

Conditional branching with typed keys (-> EnumType triggers compile-time exhaustiveness checking):

#[derive(BranchKey)]
enum Intent { Billing, Tech }
let workflow = workflow! {
name: "support",
registry: registry,
steps: [
classify_ticket,
route extract_intent -> Intent {
Billing => [handle_billing],
Tech => [run_diagnostics, escalate],
_ => [fallback_handler],
},
send_summary
]
}.unwrap();

String-based keys still work (without -> Type):

let workflow = workflow! {
name: "support",
registry: registry,
steps: [
classify_ticket,
route extract_intent {
"billing" => [handle_billing],
"tech" => [run_diagnostics, escalate],
_ => [fallback_handler],
},
send_summary
]
}.unwrap();

Each branch arm uses [task, task] syntax for task chains. The _ arm is the optional default.

A Result<SerializableWorkflow<C, Input, ()>, BuildError> expression.

Name → factory map for registering task implementations. Used by workers and for workflow deserialization — only task IDs and structure are serialized, implementations are looked up from the registry at runtime.

use sayiir_runtime::prelude::*;
use std::sync::Arc;
let codec = Arc::new(JsonCodec);
let mut registry = TaskRegistry::new();
// Register a struct implementing CoreTask
registry.register("double", codec.clone(), DoubleTask);
// Register a closure
registry.register_fn("add_ten", codec.clone(), |i: u32| async move { Ok(i + 10) });
let registry = TaskRegistry::with_codec(codec)
.register_fn("double", |i: u32| async move { Ok(i * 2) })
.register_fn("add_ten", |i: u32| async move { Ok(i + 10) })
.build();

Merge all entries from another registry. Existing entries are not overwritten — the parent’s registrations take precedence on ID collision.

The #[task] macro generates a register() method on each struct, so building reusable libraries is straightforward:

// billing_tasks crate — each #[task] gets a register() method
pub fn billing_tasks(codec: Arc<JsonCodec>, stripe: Arc<StripeClient>) -> TaskRegistry {
let mut reg = TaskRegistry::new();
ChargeTask::register(&mut reg, codec.clone(), ChargeTask::new(stripe.clone()));
RefundTask::register(&mut reg, codec.clone(), RefundTask::new(stripe));
reg
}
// application — merge libraries into a single registry
let mut registry = TaskRegistry::new();
registry.merge(billing_tasks(codec.clone(), stripe));
registry.merge(notification_tasks(codec.clone(), mailer));
MethodDescription
new()Create an empty registry
register(id, codec, task)Register a CoreTask implementation
register_with_metadata(id, codec, task, metadata)Register with metadata
register_fn(id, codec, closure)Register a closure as a task
register_fn_with_metadata(id, codec, closure, metadata)Register closure with metadata
register_join(id, codec, closure)Register a fork/join combiner
merge(other)Merge another registry (parent wins on collision)
with_codec(codec)RegistryBuilderStart a builder with shared codec
get(id)Option<UntypedCoreTask>Look up a task by ID
get_metadata(id)Option<&TaskMetadata>Get task metadata
contains(id)boolCheck if a task ID is registered
len() / is_empty()Registry size
task_ids()impl Iterator<Item = &str>Iterate registered IDs

Fluent builder for constructing workflows programmatically (alternative to the workflow! macro).

use sayiir_runtime::prelude::*;
let workflow = WorkflowBuilder::new(context)
.then("task_a", |x: i32| async move { Ok(x * 2) })
.then("task_b", |x: i32| async move { Ok(x + 1) })
.build()?;
Section titled “Composing from #[task] structs (recommended)”

Add a #[task] struct to the workflow. The task ID, output type, timeout, and retry policy are all derived from the type — no strings needed.

// Given: #[task] async fn add_ten(input: u32) -> Result<u32, BoxError> { … }
// Given: #[task(timeout = "5s")] async fn double(input: u32) -> Result<u32, BoxError> { … }
let workflow = WorkflowBuilder::new(ctx)
.with_registry()
.then_task::<AddTenTask>()
.then_task::<DoubleTask>()
.build()?;

Same as then_task, but for tasks with #[inject] dependencies that don’t implement Default.

.then_task_with(FetchTask::new(http_client.clone()))

Add a sequential task with an inline closure.

.then("double", |x: i32| async move { Ok(x * 2) })

Reference a pre-registered task by its string ID. Use this when task IDs are determined at runtime (e.g. loaded from config or a database).

Prefer .then_task::<T>() when the task is a #[task] struct known at compile time.

.then_registered::<u32>("validate")

Start parallel branches. When branches are #[task] structs, use add_task for type safety:

// Using #[task] structs (recommended)
.branches(|b| {
b.add_task::<SendEmailTask>();
b.add_task::<UpdateInventoryTask>();
})
.join("finalize", |outputs: BranchOutputs<_>| async move {
let email = outputs.get::<SendEmailTask>()?;
let inv = outputs.get::<UpdateInventoryTask>()?;
Ok(Summary { email, inv })
})

Or with inline closures:

.branches(|b| {
b.add("email", |input: Order| async move { Ok(send_email(input).await?) });
b.add("inventory", |input: Order| async move { Ok(update_inventory(input).await?) });
})
.join("finalize", |outputs: BranchOutputs<_>| async move {
let email: EmailResult = outputs.get("email")?;
let inv: InventoryResult = outputs.get("inventory")?;
Ok(Summary { email, inv })
})

Join parallel branches with a combining task. Uses BranchOutputs for type-safe named access to each branch result.

.join("combine", |outputs: BranchOutputs<_>| async move {
let count: u32 = outputs.get("count")?;
let name: String = outputs.get("name")?;
Ok(format!("{}: {}", name, count))
})

Start conditional branching with static type checking. The key function returns a BranchKey enum variant — the compiler ensures every variant has a matching .branch() call and that no orphan branches exist. Typos and missing cases are caught at compile time, not at runtime. The branch node ID is derived automatically from the enum type name.

#[derive(BranchKey)]
enum Route {
Billing,
Tech,
}
let wf = WorkflowBuilder::new(ctx)
.with_registry()
.then_task::<ClassifyTask>()
.route::<_, Route, _, _>( |input: ClassifyOutput| async move {
Ok(match input.intent {
Intent::Billing => Route::Billing,
Intent::Tech => Route::Tech,
})
})
.branch(Route::Billing, |sub| sub.then_task::<HandleBillingTask>())
.branch(Route::Tech, |sub| sub.then_task::<HandleTechTask>())
.default_branch(|sub| sub.then_task::<FallbackTask>())
.done()
.build()?;

After .done(), the output type is BranchEnvelope<BranchOut> containing branch (the key) and result.

Add a loop whose body repeats until it returns LoopResult::Done.

use sayiir_core::LoopResult;
let wf = WorkflowBuilder::new(ctx)
.then("setup", setup_fn)
.loop_task("refine", |draft: String| async move {
let improved = improve(&draft);
if is_good_enough(&improved) {
Ok(LoopResult::Done(improved))
} else {
Ok(LoopResult::Again(improved))
}
}, 5)
.then("finalize", finalize_fn)
.build()?;

Parameters:

  • id (&str) — Body task identifier. The loop node gets an auto-generated ID (loop_0, loop_1, …).
  • func — Closure returning Result<LoopResult<NewOutput>, BoxError>
  • max_iterations (u32) — Maximum iterations before failing (must be ≥ 1)

Use loop_task_with_policy for custom MaxIterationsPolicy:

use sayiir_core::MaxIterationsPolicy;
.loop_task_with_policy("poll", poll_fn, 100, MaxIterationsPolicy::ExitWithLast)

LoopResult<T>:

  • LoopResult::Again(value) — Continue iterating with value
  • LoopResult::Done(value) — Exit the loop with value
  • Serde format: {"_loop": "again"|"done", "value": ...}

MaxIterationsPolicy:

  • Fail (default) — Return an error when max iterations is reached
  • ExitWithLast — Exit with the last iteration’s output

Add a durable delay.

use std::time::Duration;
.delay("wait_24h", Duration::from_secs(86400))

Wait for an external signal.

.wait_for_signal("approval_step", "manager_approval", Some(Duration::from_secs(3600)))

with_registry() / with_existing_registry(registry)

Section titled “with_registry() / with_existing_registry(registry)”

Configure task registry.

let builder = WorkflowBuilder::new(context)
.with_existing_registry(registry);

then_flow(child) / then_serializable_flow(child)

Section titled “then_flow(child) / then_serializable_flow(child)”

Inline a child workflow. The child’s entire continuation tree is embedded as a single step. Use then_flow for non-serializable workflows and then_serializable_flow when both parent and child have registries (merges the child’s task registry into the parent’s).

let child = WorkflowBuilder::new(child_ctx)
.with_registry()
.then("double", |x: u32| async move { Ok(x * 2) })
.build()?;
let parent = WorkflowBuilder::new(parent_ctx)
.with_registry()
.then("add_ten", |x: u32| async move { Ok(x + 10) })
.then_serializable_flow(child)
.build()?;

In the workflow! macro, use the flow keyword:

let child = workflow! { name: "child", steps: [double] }.unwrap();
let parent = workflow! { name: "parent", steps: [add_ten, flow child] }.unwrap();

Build the workflow.

let workflow = builder.build()?;

Single-process, durable execution with automatic checkpointing.

use sayiir_runtime::prelude::*;
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
// Start a new workflow
let status = runner
.run(workflow, "instance-001", input_data)
.await?;
// Resume after crash
let status = runner
.resume(workflow, "instance-001")
.await?;
// Cancel
runner.cancel("instance-001", Some("user cancelled"), Some("admin")).await?;
// Pause
runner.pause("instance-001", Some("maintenance"), Some("ops")).await?;
// Unpause
runner.unpause("instance-001").await?;

Methods:

  • new(backend) — Create with any PersistentBackend
  • with_conflict_policy(policy) — Set the conflict policy (builder pattern). See ConflictPolicy below.
  • run(workflow, instance_id, input) — Run to completion
  • resume(workflow, instance_id) — Resume from checkpoint
  • cancel(instance_id, reason, by) — Cancel workflow
  • pause(instance_id, reason, by) — Pause workflow
  • unpause(instance_id) — Unpause workflow

Controls what happens when run() is called with an instance_id that already has a snapshot.

use sayiir_core::workflow::ConflictPolicy;
// Default: fail if instance already exists
let runner = CheckpointingRunner::new(backend.clone());
// Idempotent: return existing status
let runner = CheckpointingRunner::new(backend.clone())
.with_conflict_policy(ConflictPolicy::UseExisting);
// Force restart: delete old snapshot, run fresh
let runner = CheckpointingRunner::new(backend.clone())
.with_conflict_policy(ConflictPolicy::TerminateExisting);

Variants:

VariantBehavior
Fail (default)Returns RuntimeError::InstanceAlreadyExists if the instance exists.
UseExistingReturns the existing workflow status without re-running.
TerminateExistingDeletes the existing snapshot and starts a fresh execution.

Distributed, multi-machine execution with work claiming.

use sayiir_runtime::prelude::*;
use std::time::Duration;
let backend = PostgresBackend::new("postgresql://...").await?;
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry)
.with_claim_ttl(Some(Duration::from_secs(60)));
let handle = worker.spawn(
Duration::from_secs(5), // Poll interval
vec![workflow1, workflow2]
);
// Later: stop the worker
handle.shutdown();
handle.join().await?;

Or use the builder for auto-generated worker IDs:

// Auto-generated worker ID from hostname + PID
let worker = PooledWorker::builder(backend, registry).build();
// Or override with explicit ID and custom settings
let worker = PooledWorker::builder(backend, registry)
.worker_id("custom-worker-1")
.claim_ttl(Some(Duration::from_secs(600)))
.batch_size(NonZeroUsize::new(4).unwrap())
.build();

Methods:

  • new(worker_id, backend, registry) — Create worker with explicit ID
  • builder(backend, registry)PooledWorkerBuilder — Create builder with auto-generated ID
  • with_claim_ttl(Option<Duration>) — Set claim timeout
  • with_batch_size(NonZeroUsize) — Set tasks per poll (default: 1)
  • with_aging_interval(Duration) — Set priority aging interval (default: 300s, must be > 0)
  • spawn(poll_interval, workflows)WorkerHandle — Start polling

Simple in-memory execution without durability.

use sayiir_runtime::prelude::*;
let runner = InProcessRunner::new();
let result = runner.run(workflow, input_data).await?;

No persistence, no resume. Useful for testing or short-lived workflows.

Client for submitting and controlling workflow instances without executing tasks. Use this with PooledWorker for the distributed model.

use sayiir_runtime::WorkflowClient;
use sayiir_core::workflow::ConflictPolicy;
let backend = PostgresBackend::<JsonCodec>::connect(url).await?;
let client = WorkflowClient::new(backend)
.with_conflict_policy(ConflictPolicy::UseExisting);
// Submit a workflow
let (status, output) = client.submit(&workflow, "order-42", input).await?;
// Lifecycle operations
client.cancel("order-42", Some("Out of stock".into()), None).await?;
client.pause("order-42", Some("Maintenance".into()), None).await?;
client.unpause("order-42").await?;
client.send_event("order-42", "signal_name", payload).await?;
let status = client.status("order-42").await?;
// Retrieve a single task result (requires TaskResultStore)
let result = client.get_task_result("order-42", "validate_order").await?;
// Type-safe variant using #[task]-generated struct
let result = client.get_task_result_of::<ValidateOrderTask>("order-42").await?;

Methods:

  • new(backend) — Create client (wraps backend in Arc)
  • from_shared(Arc<B>) — Create from shared backend reference
  • with_conflict_policy(ConflictPolicy) — Set duplicate instance policy
  • backend()&Arc<B> — Access the backend
  • submit(workflow, instance_id, input)(WorkflowStatus, Option<Bytes>)
  • cancel(instance_id, reason, cancelled_by) — Store a cancel signal
  • pause(instance_id, reason, paused_by) — Store a pause signal
  • unpause(instance_id) — Unpause a paused workflow
  • send_event(instance_id, signal_name, payload) — Send an external event
  • status(instance_id)WorkflowStatus — Query current status
  • get_task_result(instance_id, task_id)Option<Bytes> — Get a single task result (requires B: TaskResultStore)
  • get_task_result_of::<T>(instance_id)Option<Bytes> — Type-safe variant that derives task ID from a TaskIdentifier impl (e.g. #[task]-generated struct)

Extension trait that adds a run_once convenience method directly on Workflow. Uses InProcessRunner internally — no backend setup, no instance ID.

use sayiir_runtime::prelude::*;
let status = workflow.run_once(input).await?;

Ideal for scripts, tests, and quick experiments. Included in the prelude.

Serialization trait for task inputs and outputs.

pub trait Codec: Send + Sync + 'static {
fn encode<T: Serialize>(&self, value: &T) -> Result<Vec<u8>, SerializationError>;
fn decode<T: for<'de> Deserialize<'de>>(&self, bytes: &[u8]) -> Result<T, SerializationError>;
}

Built-in implementations:

  • JsonCodec — JSON serialization (feature: json)
  • RkyvCodec — Zero-copy binary serialization (feature: rkyv)

Storage trait for workflow state.

pub trait PersistentBackend: Send + Sync + 'static {
async fn save_snapshot(&self, snapshot: &WorkflowSnapshot) -> Result<(), BackendError>;
async fn load_snapshot(&self, instance_id: &str) -> Result<Option<WorkflowSnapshot>, BackendError>;
async fn claim_task(&self, claim: TaskClaim) -> Result<bool, BackendError>;
async fn release_claim(&self, instance_id: &str, task_id: &str) -> Result<(), BackendError>;
async fn save_signal(&self, instance_id: &str, signal: SignalEvent) -> Result<(), BackendError>;
async fn load_signals(&self, instance_id: &str, signal_name: &str) -> Result<Vec<SignalEvent>, BackendError>;
async fn delete_signal(&self, signal_id: &str) -> Result<(), BackendError>;
}

Implementations:

  • InMemoryBackend — In-memory storage
  • PostgresBackend — PostgreSQL storage

Task execution trait.

#[async_trait]
pub trait CoreTask: Send + Sync + 'static {
async fn run(&self, input: &[u8], ctx: &WorkflowContext) -> Result<Vec<u8>, BoxError>;
fn metadata(&self) -> &TaskMetadata;
fn task_id(&self) -> &str;
}

Usually implemented via the #[task] macro, but can be implemented manually if needed.

Lightweight trait that only provides the task’s unique string identifier. Unlike RegisterableTask, it does not require CoreTask — making it usable in client-only contexts that refer to tasks by ID without pulling in the task implementation.

pub trait TaskIdentifier {
fn task_id() -> &'static str;
}

Automatically implemented for all RegisterableTask types (including #[task]-generated structs). Used by WorkflowClient::get_task_result_of::<T>() for type-safe task result retrieval.

Typed routing key for conditional branching. Instead of passing raw strings as branch keys (which are only validated at runtime), BranchKey enums give you static type checking — the compiler rejects typos, missing branches, and orphan keys at build time.

pub trait BranchKey: Send + Sync + 'static {
fn as_key(&self) -> &str;
fn all_keys() -> &'static [&'static str];
fn from_key(key: &str) -> Option<Self> where Self: Sized;
}

Use #[derive(BranchKey)] on a fieldless enum to auto-implement the trait. Variant names are converted to snake_case keys by default, with #[branch_key("custom")] for overrides:

use sayiir_runtime::prelude::*;
#[derive(BranchKey)]
enum TicketRoute {
Billing, // key: "billing"
TechSupport, // key: "tech_support"
#[branch_key("other")]
Fallback, // key: "other"
}

The key function returns a BranchKey variant, and .branch() accepts variants — not strings. The compiler ensures every variant is handled, eliminating an entire class of runtime routing errors. The derive macro is re-exported in the prelude when the macros feature is enabled.

use std::time::Duration;
use sayiir_core::task::RetryPolicy;
let policy = RetryPolicy {
max_retries: 3,
initial_delay: Duration::from_secs(1),
backoff_multiplier: 2.0,
};

Delay calculation: delay = initial_delay * multiplier^attempt

use sayiir_core::task::TaskMetadata;
use std::time::Duration;
let metadata = TaskMetadata {
display_name: Some("Process Order".to_string()),
description: Some("Validates and processes customer orders".to_string()),
timeout: Some(Duration::from_secs(30)),
retry_policy: Some(policy),
tags: vec!["orders".to_string(), "payment".to_string()],
};

Per-execution context carrying codec and metadata.

use sayiir_core::context::WorkflowContext;
use std::sync::Arc;
let ctx = WorkflowContext::new(
"order-pipeline",
Arc::new(JsonCodec),
Arc::new(()), // User metadata (can be any type)
);

Read-only context available from within a running task. Provides access to the workflow ID, instance ID, task ID, and task metadata. Set automatically by the runtime before each task executes.

use sayiir_core::context::TaskExecutionContext;
// Access via the task_context! macro (works in both async and sync contexts)
if let Some(ctx) = task_context!() {
println!("Workflow: {}", ctx.workflow_id);
println!("Instance: {}", ctx.instance_id);
println!("Task: {}", ctx.task_id);
if let Some(timeout) = ctx.metadata.timeout {
println!("Timeout: {:?}", timeout);
}
}

Fields:

  • workflow_id (Arc<str>) — The workflow definition identifier
  • instance_id (Arc<str>) — The workflow instance identifier (durable) or workflow ID (in-memory)
  • task_id (Arc<str>) — The current task identifier
  • metadata (TaskMetadata) — Task metadata (timeout, retry policy, version, tags, etc.)
  • workflow_metadata_json (Option<Arc<str>>) — Optional JSON-encoded workflow-level metadata

Macro to access the task execution context from within a running task. Returns Option<TaskExecutionContext>.

Tries tokio task-local storage first (async executor paths), then falls back to thread-local storage (sync paths used by Python GIL and Node.js main thread).

use sayiir_runtime::task_context;
#[task(id = "process")]
async fn process(input: Data) -> Result<Output, BoxError> {
let ctx = task_context!();
if let Some(ctx) = ctx {
tracing::info!(
workflow = %ctx.workflow_id,
instance = %ctx.instance_id,
task = %ctx.task_id,
"Processing"
);
}
Ok(do_process(input))
}

The task_context! macro is re-exported from sayiir_runtime (included in the prelude).

Sayiir separates errors into two domains:

Errors from workflow construction and hydration (build-time).

use sayiir_core::error::BuildError;
match builder.build() {
Err(BuildError::DuplicateTaskId(id)) => eprintln!("Duplicate: {id}"),
Err(BuildError::TaskNotFound(id)) => eprintln!("Missing task: {id}"),
Err(BuildError::EmptyBranch) => eprintln!("Branch needs at least one step"),
Err(BuildError::EmptyFork) => eprintln!("Fork has no branches"),
Err(BuildError::MissingBranches { branch_id, missing_keys }) => {
eprintln!("Branch '{branch_id}' is missing handlers for: {missing_keys:?}");
}
Err(BuildError::OrphanBranches { branch_id, orphan_keys }) => {
eprintln!("Branch '{branch_id}' has handlers for unknown keys: {orphan_keys:?}");
}
Err(BuildError::DefinitionMismatch { expected, found }) => {
eprintln!("Hash mismatch: expected {expected}, got {found}");
}
Ok(workflow) => { /* ... */ }
}

Errors from workflow execution (runtime).

Variants include TaskNotFound, TaskNotImplemented, DefinitionMismatch, Cancelled, Paused, TaskPanicked, TaskTimedOut, BranchKeyNotFound, Waiting, AwaitingSignal, and others.

Top-level error from the runtime layer, wrapping all error domains.

use sayiir_runtime::RuntimeError;
match result {
Err(RuntimeError::Workflow(e)) => eprintln!("Workflow error: {e}"),
Err(RuntimeError::Build(e)) => eprintln!("Build error: {e}"),
Err(RuntimeError::Backend(e)) => eprintln!("Storage error: {e}"),
Err(RuntimeError::Task(e)) => eprintln!("Task error: {e}"),
Err(RuntimeError::InstanceAlreadyExists(id)) => {
eprintln!("Instance already exists: {id}");
}
_ => {}
}

Type alias for Box<dyn std::error::Error + Send + Sync>.

use sayiir_core::error::BoxError;
async fn my_task(input: Data) -> Result<Output, BoxError> {
// Any error implementing Error + Send + Sync can be returned
Ok(process(input)?)
}

For complete API documentation with all types, traits, and methods, see: