Python API Reference
Decorators
Section titled “Decorators”Annotate a function as a workflow task.
from sayiir import task
@taskdef my_task(input: dict) -> dict: return {"result": input["value"] * 2}
@task("custom_name") # Positional name argumentdef my_task(input: dict) -> dict: return {"result": input["value"] * 2}
@task( name="custom_name", timeout="30s", retries=3, tags=["io", "external"], description="Processes data from external API")def my_task(input: dict) -> dict: return {"result": input["value"] * 2}Parameters:
name(str, optional) — Override task ID. Can also be passed as first positional argument:@task("task_name")timeout(str | float, optional) — Task timeout as duration string ("30s","5m","1h") or secondstimeout_secs(float, optional) — Legacy timeout parameter (usetimeoutinstead)retries(int | RetryPolicy, optional) — Retry count (int shorthand with 1s/2x defaults) or fullRetryPolicytags(list[str], optional) — Categorization tagsdescription(str, optional) — Human-readable descriptionpriority(int, optional) — Execution priority (1–5, lower = higher priority, default: 3)
The decorator attaches metadata to the function without wrapping it. The function can still be called directly for testing.
Task Execution Context
Section titled “Task Execution Context”get_task_context()
Section titled “get_task_context()”Get the current task execution context from within a running task. Returns None if called outside of a task execution.
from sayiir import get_task_context
@taskdef my_task(data: dict) -> dict: ctx = get_task_context() if ctx is not None: print(f"Running task {ctx.task_id} in workflow {ctx.workflow_id}") print(f"Instance: {ctx.instance_id}") return process(data)Returns: TaskExecutionContext | None
TaskExecutionContext
Section titled “TaskExecutionContext”Read-only context available from within a running task. Provides access to workflow and task metadata.
Properties:
workflow_id(str) — The workflow definition identifierinstance_id(str) — The workflow instance identifier (for durable workflows) or workflow ID (for in-memory runs)task_id(str) — The current task identifiermetadata(TaskMetadata) — Task metadata (timeout, retries, tags, version, etc.)workflow_metadata(dict | None) — Workflow-level metadata passed viaFlow("name", metadata={...})
TaskMetadata
Section titled “TaskMetadata”Metadata attached to a task.
Properties:
display_name(str | None) — Human-readable namedescription(str | None) — Task descriptiontimeout_secs(float | None) — Task timeout in secondsretries(RetryPolicy | None) — Retry configurationtags(list[str] | None) — Categorization tagsversion(str | None) — Task version stringpriority(int | None) — Execution priority (1–5, lower = higher priority)
@task(timeout="30s", retries=3, tags=["io"])def fetch_data(url: str) -> dict: ctx = get_task_context() if ctx: print(f"Timeout: {ctx.metadata.timeout_secs}s") print(f"Tags: {ctx.metadata.tags}") return do_fetch(url)Flow Builder
Section titled “Flow Builder”Flow(name)
Section titled “Flow(name)”Create a new workflow builder.
from sayiir import Flow
workflow = Flow("my-workflow")Methods:
.then(task_func, *, name=None)
Section titled “.then(task_func, *, name=None)”Add a sequential task to the pipeline.
workflow = ( Flow("pipeline") .then(fetch_user) .then(send_email) .build())
# With custom nameworkflow = ( Flow("pipeline") .then(lambda x: x * 2, name="double") .build())Parameters:
task_func— Any callable:@task-decorated function, plain function, or lambdaname(str, optional) — Override the task ID (useful for lambdas)
Returns: Flow instance for chaining
.delay(name, duration)
Section titled “.delay(name, duration)”Add a durable delay. No workers are held during the delay.
from datetime import timedelta
workflow = ( Flow("delayed") .then(send_email) .delay("wait_24h", timedelta(hours=24)) .then(send_reminder) .build())
# Also accepts float secondsworkflow.delay("wait_5s", 5.0)Parameters:
name(str) — Unique identifier for this delay stepduration(str | float | timedelta) — Delay duration as string ("30s","5m","1h"), seconds, or timedelta
Returns: Flow instance for chaining
.wait_for_signal(signal_name, *, name=None, timeout=None)
Section titled “.wait_for_signal(signal_name, *, name=None, timeout=None)”Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.
workflow = ( Flow("approval") .then(submit_request) .wait_for_signal("manager_approval", timeout=timedelta(hours=48)) .then(process_approved) .build())Parameters:
signal_name(str) — The named signal to wait forname(str, optional) — Node ID for this step (defaults to signal_name)timeout(str | float | timedelta | None) — Optional timeout as duration string ("30s"), seconds, or timedelta
Returns: Flow instance for chaining
.loop(task_func, *, max_iterations=10, on_max=“fail”, name=None)
Section titled “.loop(task_func, *, max_iterations=10, on_max=“fail”, name=None)”Add a loop whose body repeats until the task returns LoopResult.done().
from sayiir import Flow, LoopResult, task
@taskdef refine(draft: str) -> dict: improved = improve(draft) if is_good_enough(improved): return LoopResult.done(improved).to_dict() return LoopResult.again(improved).to_dict()
workflow = ( Flow("iterative") .then(initial_attempt) .loop(refine, max_iterations=5) .then(finalize) .build())Parameters:
task_func— Callable that returns aLoopResult. UseLoopResult.again(value)to continue orLoopResult.done(value)to exit.max_iterations(int, optional) — Maximum iterations beforeon_maxapplies (default: 10, must be ≥ 1)on_max(str, optional) —"fail"(default) raises an error;"exit_with_last"exits with the last iteration’s outputname(str, optional) — Override the task ID
Returns: Flow instance for chaining
LoopResult
Section titled “LoopResult”Control flow type for loop body tasks.
from sayiir import LoopResult
# Continue the loop with a new valueLoopResult.again(new_value)
# Exit the loop with the final valueLoopResult.done(final_value)Class methods:
LoopResult.again(value)— Continue iterating withvalueLoopResult.done(value)— Exit the loop withvalue
Properties:
.is_again(bool) — True if this is anagainresult.is_done(bool) — True if this is adoneresult.value— The wrapped value
Wire format: Serializes as {"_loop": "again"|"done", "value": ...}. Call .to_dict() to get the serialized form.
.fork()
Section titled “.fork()”Start a fork for parallel execution.
workflow = ( Flow("parallel") .then(prepare_data) .fork() .branch(process_a) .branch(process_b, process_c) # Multi-step branch .join(combine_results) .build())Returns: ForkBuilder instance
.then_flow(workflow)
Section titled “.then_flow(workflow)”Inline a child workflow. The child’s entire pipeline runs as a single composition step — its output becomes the input to the next step. Task registries are merged automatically.
from sayiir import task, Flow, run_workflow
@taskdef double(x: int) -> int: return x * 2
@taskdef add_ten(x: int) -> int: return x + 10
# Child workflowchild = Flow("child").then(double).build()
# Parent inlines the childparent = ( Flow("parent") .then(add_ten) .then_flow(child) .build())
result = run_workflow(parent, 5) # (5 + 10) * 2 = 30Parameters:
workflow(Workflow) — A built workflow whose input matches the current output type
Returns: Flow instance with the child’s output type
.route(key_fn, *, keys)
Section titled “.route(key_fn, *, keys)”Start conditional branching based on a routing key. The keys parameter declares all valid branch keys upfront — Sayiir validates .branch() calls against this set immediately and checks exhaustiveness at .done().
@taskdef classify(ticket: dict) -> str: return "billing" if ticket["type"] == "invoice" else "tech"
workflow = ( Flow("router") .route(classify, keys=["billing", "tech"]) .branch("billing", handle_billing) .branch("tech", handle_tech) .default_branch(fallback) .done() .then(send_summary) .build())Parameters:
key_fn— Callable that returns a string routing key.keys(list[str], keyword-only) — All valid routing keys..branch()calls validate against this set, and.done()checks exhaustiveness.
Returns: BranchBuilder instance
.build()
Section titled “.build()”Compile the workflow.
workflow = Flow("example").then(my_task).build()Returns: Workflow instance
BranchBuilder
Section titled “BranchBuilder”Builder for conditional branches. Created by calling .route() on a Flow.
.branch(key, *task_funcs, name=None)
Section titled “.branch(key, *task_funcs, name=None)”Add a named branch with one or more chained tasks. The key is validated against the keys declared in .route() — a ValueError is raised immediately if the key is not in the declared set. The branch runs when the key function returns this key.
.branch("billing", handle_billing).branch("express", validate, ship_express, notify)Parameters:
key(str) — The routing key that selects this branch (must be one of the declared keys)*task_funcs— One or more task callablesname(str, optional) — Override the task ID of the first task
Returns: BranchBuilder instance for chaining
.default_branch(*task_funcs, name=None)
Section titled “.default_branch(*task_funcs, name=None)”Set the fallback branch for unmatched keys.
.default_branch(fallback_handler)Parameters:
*task_funcs— One or more task callablesname(str, optional) — Override the task ID of the first task
Returns: BranchBuilder instance for chaining
.done()
Section titled “.done()”Finish the branching and return to the Flow builder. Raises ValueError if any declared key is missing a .branch() call, or if any .branch() key was not in the declared keys (orphan branch). The output type becomes a BranchEnvelope dict with branch (the matched key) and result (the branch output).
.route(classify, keys=["a", "b"]) .branch("a", handle_a) .branch("b", handle_b).done().then(next_step) # receives {"branch": "a", "result": ...}Returns: Flow instance for further chaining
ForkBuilder
Section titled “ForkBuilder”Builder for parallel workflow branches. Created by calling .fork() on a Flow.
.branch(*task_funcs, name=None)
Section titled “.branch(*task_funcs, name=None)”Add a branch with one or more chained tasks.
# Single-task branch.branch(step_a)
# Multi-step branch (tasks run sequentially within the branch).branch(step_a, step_b, step_c)
# Lambda with custom name.branch(lambda x: x + 1, name="increment")Parameters:
*task_funcs— One or more task callablesname(str, optional) — Override the task ID of the first task in the branch
Returns: ForkBuilder instance for chaining
.join(task_func, *, name=None)
Section titled “.join(task_func, *, name=None)”Join branches with a combining task that receives a dict of branch results.
def combine(results: dict) -> dict: # results = {"branch_task_id_1": output1, "branch_task_id_2": output2} return {"combined": results}
workflow = ( Flow("parallel") .fork() .branch(task_a) .branch(task_b) .join(combine) .build())Parameters:
task_func— Join callable that receives dict of branch resultsname(str, optional) — Override the task ID
Returns: Flow instance for further chaining
Workflow
Section titled “Workflow”Compiled workflow with task registry. Produced by Flow.build().
workflow = Flow("example").then(my_task).build()
print(workflow.workflow_id) # "example"print(workflow.definition_hash) # "abc123..."Properties:
workflow_id(str) — The workflow’s unique identifierdefinition_hash(str) — Hash of the workflow definition
Execution Functions
Section titled “Execution Functions”run_workflow(workflow, input_data, *, instance_id=None, backend=None, conflict_policy=None)
Section titled “run_workflow(workflow, input_data, *, instance_id=None, backend=None, conflict_policy=None)”Run a workflow to completion. Without instance_id, runs entirely in-memory with no persistence (fastest path for prototyping). With instance_id and backend, runs with full checkpointing — but still returns the output directly. If the workflow doesn’t complete (e.g., parks on a delay or signal), raises WorkflowError.
from sayiir import run_workflow, PostgresBackend
# Prototype — no persistenceresult = run_workflow(workflow, 21)
# Production — same function, just add paramsbackend = PostgresBackend("postgresql://localhost/sayiir")result = run_workflow(workflow, 21, instance_id="run-001", backend=backend)
# Idempotent — safe to call multiple timesresult = run_workflow(workflow, 21, instance_id="run-001", backend=backend, conflict_policy="use_existing")Parameters:
workflow(Workflow) — The workflow to runinput_data(Any) — Input to the first taskinstance_id(str, optional) — Unique identifier for durable executionbackend(InMemoryBackend | PostgresBackend, optional) — Persistence backend (required withinstance_id)conflict_policy(str, optional) — What happens wheninstance_idalready exists:"fail"(default),"use_existing", or"terminate_existing"
Returns: The workflow result (Any)
Raises:
WorkflowErrorif durable workflow did not complete. Userun_durable_workflow()when you need the fullWorkflowStatusobject.InstanceAlreadyExistsErrorifconflict_policyis"fail"(default) and the instance already exists.
run_durable_workflow(workflow, instance_id, input_data, backend=None, conflict_policy=None)
Section titled “run_durable_workflow(workflow, instance_id, input_data, backend=None, conflict_policy=None)”Run a workflow with checkpointing and durability. Returns WorkflowStatus.
from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()workflow = Flow("test").then(double).build()
status = run_durable_workflow( workflow, "run-001", 21, backend=backend)
if status.is_completed(): print(status.output) # 42Parameters:
workflow(Workflow) — The workflow to runinstance_id(str) — Unique identifier for this execution instanceinput_data(Any) — Input to the first taskbackend(InMemoryBackend | PostgresBackend, optional) — Persistence backend (defaults to InMemoryBackend)conflict_policy(str, optional) — What happens wheninstance_idalready exists:"fail"(default),"use_existing", or"terminate_existing"
Returns: WorkflowStatus
resume_workflow(workflow, instance_id, backend)
Section titled “resume_workflow(workflow, instance_id, backend)”Resume a durable workflow from a saved checkpoint.
from sayiir import resume_workflow
status = resume_workflow(workflow, "run-001", backend)Parameters:
workflow(Workflow) — The workflow definitioninstance_id(str) — The instance ID used when the workflow was startedbackend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: WorkflowStatus
cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)
Section titled “cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)”Request cancellation of a running durable workflow.
from sayiir import cancel_workflow
cancel_workflow( "run-001", backend, reason="User requested cancellation", cancelled_by="admin@example.com")Parameters:
instance_id(str) — The instance ID of the workflow to cancelbackend(InMemoryBackend | PostgresBackend) — Persistence backendreason(str, optional) — Reason for cancellationcancelled_by(str, optional) — Identifier of who requested cancellation
Returns: None
pause_workflow(instance_id, backend, reason=None, paused_by=None)
Section titled “pause_workflow(instance_id, backend, reason=None, paused_by=None)”Pause a running workflow at the next checkpoint.
from sayiir import pause_workflow
pause_workflow( "run-001", backend, reason="Maintenance window", paused_by="ops@example.com")Parameters:
instance_id(str) — The instance ID of the workflow to pausebackend(InMemoryBackend | PostgresBackend) — Persistence backendreason(str, optional) — Reason for pausepaused_by(str, optional) — Identifier of who requested pause
Returns: None
unpause_workflow(instance_id, backend)
Section titled “unpause_workflow(instance_id, backend)”Allow a paused workflow to resume.
from sayiir import unpause_workflow
unpause_workflow("run-001", backend)Parameters:
instance_id(str) — The instance ID of the workflow to unpausebackend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: None
send_signal(instance_id, signal_name, payload, backend)
Section titled “send_signal(instance_id, signal_name, payload, backend)”Send an external signal to a workflow waiting on wait_for_signal.
from sayiir import send_signal
send_signal( "run-001", "manager_approval", {"approved": True, "comments": "Looks good"}, backend)Parameters:
instance_id(str) — The instance ID of the target workflowsignal_name(str) — The signal name to sendpayload(Any) — The payload data (will be serialized)backend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: None
The payload is buffered in FIFO order per (instance_id, signal_name). When the workflow resumes and reaches the matching wait_for_signal node, it consumes the oldest buffered event.
Backends
Section titled “Backends”InMemoryBackend()
Section titled “InMemoryBackend()”In-memory storage for development and testing. State is lost when the process stops.
from sayiir import InMemoryBackend
backend = InMemoryBackend()Fast and requires no external dependencies. Ideal for unit tests and local development.
PostgresBackend(url)
Section titled “PostgresBackend(url)”PostgreSQL storage for production. State survives process restarts.
from sayiir import PostgresBackend
backend = PostgresBackend("postgresql://user:pass@localhost:5432/sayiir")Parameters:
url(str) — PostgreSQL connection URL
Requirements:
- PostgreSQL 13 or higher
- Auto-migration: tables are created on first connection
- Recommended: dedicated database for Sayiir
WorkflowStatus
Section titled “WorkflowStatus”Status object returned by durable execution functions.
status = run_durable_workflow(workflow, "run-001", input_data, backend)
print(status.status) # "completed", "failed", "cancelled", etc.print(status.output) # Final result if completedprint(status.error) # Error message if failedprint(status.reason) # Reason if cancelled/pausedprint(status.cancelled_by) # Who cancelled (if applicable)
# Convenience methodsif status.is_completed(): print(status.output)elif status.is_failed(): print(status.error)elif status.is_cancelled(): print(f"Cancelled: {status.reason}")elif status.is_paused(): print("Workflow is paused")elif status.is_in_progress(): print("Still running")Properties:
status(str) — Current statusoutput(Any | None) — Final result if completederror(str | None) — Error message if failedreason(str | None) — Reason for cancellation/pausecancelled_by(str | None) — Identifier of who cancelled
Methods:
is_completed()→ boolis_failed()→ boolis_cancelled()→ boolis_in_progress()→ boolis_paused()→ bool
RetryPolicy
Section titled “RetryPolicy”Configuration for task retry behavior.
from sayiir import RetryPolicy
policy = RetryPolicy( max_retries=3, initial_delay_secs=1.0, backoff_multiplier=2.0)
@task(retries=policy)def flaky_task(data: dict) -> dict: # May fail and retry with exponential backoff return process(data)Parameters:
max_retries(int) — Maximum number of retries (default: 2)initial_delay_secs(float) — Initial delay between retries (default: 1.0)backoff_multiplier(float) — Exponential backoff multiplier (default: 2.0)
Delay Formula:
delay = initial_delay_secs * (backoff_multiplier ** attempt)Examples:
- Attempt 1: 1.0s
- Attempt 2: 2.0s
- Attempt 3: 4.0s
Distributed Worker
Section titled “Distributed Worker”Worker(worker_id, backend, *, poll_interval=5.0, claim_ttl=300.0)
Section titled “Worker(worker_id, backend, *, poll_interval=5.0, claim_ttl=300.0)”Create a distributed workflow worker that polls the backend for available tasks.
from sayiir import PostgresBackendfrom sayiir.worker import Worker
backend = PostgresBackend("postgresql://localhost/sayiir")worker = Worker("worker-1", backend, poll_interval=2.0, claim_ttl=600.0)Parameters:
worker_id(str) — Unique identifier for this worker instancebackend(InMemoryBackend | PostgresBackend) — Shared persistence backendpoll_interval(float, optional) — Seconds between polls for available tasks (default: 5.0)claim_ttl(float, optional) — Task claim time-to-live in seconds (default: 300.0)
.start(workflows)
Section titled “.start(workflows)”Start the worker and return a handle for lifecycle control. Spawns a background thread that polls for tasks.
workflow = Flow("pipeline").then(my_task).build()handle = worker.start([workflow])Parameters:
workflows(list[Workflow]) — List of compiled workflows whose tasks this worker can execute
Returns: WorkerHandle
WorkerHandle
Section titled “WorkerHandle”Handle for controlling a running worker. Obtained from Worker.start().
.shutdown()
Section titled “.shutdown()”Request a graceful shutdown. Non-blocking — the worker finishes in-flight tasks before stopping.
handle.shutdown().join()
Section titled “.join()”Block until the worker has fully stopped. Releases the GIL while waiting.
handle.shutdown()handle.join() # blocks until shutdown completesWorkflowClient
Section titled “WorkflowClient”Client for submitting and controlling workflow instances. Unlike run_durable_workflow(), the client does not execute tasks — it only creates initial snapshots and stores lifecycle signals. A Worker picks up and executes the work.
WorkflowClient(backend, *, conflict_policy=None)
Section titled “WorkflowClient(backend, *, conflict_policy=None)”Create a new workflow client.
from sayiir import WorkflowClient, PostgresBackend
backend = PostgresBackend("postgresql://localhost/sayiir")client = WorkflowClient(backend, conflict_policy="use_existing")Parameters:
backend(InMemoryBackend | PostgresBackend) — Shared persistence backendconflict_policy(str, optional) — What to do when aninstance_idalready exists:"fail"(default),"use_existing", or"terminate_existing"
.submit(workflow, instance_id, input)
Section titled “.submit(workflow, instance_id, input)”Submit a workflow for execution. Creates an initial snapshot so a Worker can pick it up.
status = client.submit(workflow, "order-42", {"items": [1, 2, 3]})Returns: WorkflowStatus
.cancel(instance_id, *, reason=None, cancelled_by=None)
Section titled “.cancel(instance_id, *, reason=None, cancelled_by=None)”Request cancellation of a workflow instance.
client.cancel("order-42", reason="Out of stock", cancelled_by="admin@example.com").pause(instance_id, *, reason=None, paused_by=None)
Section titled “.pause(instance_id, *, reason=None, paused_by=None)”Pause a workflow instance at the next checkpoint.
client.pause("order-42", reason="Maintenance window").unpause(instance_id)
Section titled “.unpause(instance_id)”Unpause a paused workflow instance.
client.unpause("order-42").send_signal(instance_id, signal_name, payload)
Section titled “.send_signal(instance_id, signal_name, payload)”Send an external signal to a workflow instance.
client.send_signal("order-42", "manager_approval", {"approved": True}).status(instance_id)
Section titled “.status(instance_id)”Get the current status of a workflow instance.
status = client.status("order-42")if status.is_completed(): print(status.output)Returns: WorkflowStatus
.get_task_result(instance_id, task_id)
Section titled “.get_task_result(instance_id, task_id)”Get a single task result from a workflow instance.
Returns the decoded task output, or None if the task was never executed. For completed or failed workflows, the result is recovered from the backend’s history or cache.
result = client.get_task_result("order-42", "validate_order")if result is not None: print(result)Parameters:
instance_id(str) — The workflow instance IDtask_id(str) — The task ID to retrieve the result for
Returns: Any | None
Exceptions
Section titled “Exceptions”WorkflowError
Section titled “WorkflowError”Base exception for all Sayiir workflow errors.
from sayiir import WorkflowError
try: run_workflow(workflow, input_data)except WorkflowError as e: print(f"Workflow error: {e}")TaskError
Section titled “TaskError”Raised when a task fails during execution.
from sayiir import TaskError
try: run_workflow(workflow, input_data)except TaskError as e: print(f"Task failed: {e}")BackendError
Section titled “BackendError”Raised when a persistence backend operation fails.
from sayiir import BackendError
try: backend = PostgresBackend("postgresql://invalid:url")except BackendError as e: print(f"Backend error: {e}")InstanceAlreadyExistsError
Section titled “InstanceAlreadyExistsError”Raised when running a durable workflow with an instance_id that already has a snapshot and conflict_policy is "fail" (the default).
from sayiir import InstanceAlreadyExistsError
try: run_durable_workflow(workflow, "run-001", input_data, backend=backend)except InstanceAlreadyExistsError: print("Instance already exists — use conflict_policy='use_existing' for idempotent runs")Type Annotations
Section titled “Type Annotations”Sayiir supports Pydantic models for automatic validation and serialization:
from pydantic import BaseModelfrom sayiir import task
class Order(BaseModel): order_id: str amount: float
class Receipt(BaseModel): receipt_id: str status: str
@taskdef process_order(order: Order) -> Receipt: # Input is automatically validated # Output is automatically serialized return Receipt(receipt_id="R123", status="paid")If Pydantic is not installed or annotations are not models, validation/serialization is skipped.
Sayiir ships with inline type stubs and is PEP 561 compliant — editors and type checkers (mypy, pyright) get full autocompletion and type checking out of the box.
Async tasks
Section titled “Async tasks”Tasks can be async functions. Sayiir detects coroutines automatically and awaits them:
@taskasync def fetch_data(url: str) -> dict: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.json()