Skip to content

Python API Reference

Annotate a function as a workflow task.

from sayiir import task
@task
def my_task(input: dict) -> dict:
return {"result": input["value"] * 2}
@task("custom_name") # Positional name argument
def my_task(input: dict) -> dict:
return {"result": input["value"] * 2}
@task(
name="custom_name",
timeout="30s",
retries=3,
tags=["io", "external"],
description="Processes data from external API"
)
def my_task(input: dict) -> dict:
return {"result": input["value"] * 2}

Parameters:

  • name (str, optional) — Override task ID. Can also be passed as first positional argument: @task("task_name")
  • timeout (str | float, optional) — Task timeout as duration string ("30s", "5m", "1h") or seconds
  • timeout_secs (float, optional) — Legacy timeout parameter (use timeout instead)
  • retries (int | RetryPolicy, optional) — Retry count (int shorthand with 1s/2x defaults) or full RetryPolicy
  • tags (list[str], optional) — Categorization tags
  • description (str, optional) — Human-readable description
  • priority (int, optional) — Execution priority (1–5, lower = higher priority, default: 3)

The decorator attaches metadata to the function without wrapping it. The function can still be called directly for testing.

Get the current task execution context from within a running task. Returns None if called outside of a task execution.

from sayiir import get_task_context
@task
def my_task(data: dict) -> dict:
ctx = get_task_context()
if ctx is not None:
print(f"Running task {ctx.task_id} in workflow {ctx.workflow_id}")
print(f"Instance: {ctx.instance_id}")
return process(data)

Returns: TaskExecutionContext | None

Read-only context available from within a running task. Provides access to workflow and task metadata.

Properties:

  • workflow_id (str) — The workflow definition identifier
  • instance_id (str) — The workflow instance identifier (for durable workflows) or workflow ID (for in-memory runs)
  • task_id (str) — The current task identifier
  • metadata (TaskMetadata) — Task metadata (timeout, retries, tags, version, etc.)
  • workflow_metadata (dict | None) — Workflow-level metadata passed via Flow("name", metadata={...})

Metadata attached to a task.

Properties:

  • display_name (str | None) — Human-readable name
  • description (str | None) — Task description
  • timeout_secs (float | None) — Task timeout in seconds
  • retries (RetryPolicy | None) — Retry configuration
  • tags (list[str] | None) — Categorization tags
  • version (str | None) — Task version string
  • priority (int | None) — Execution priority (1–5, lower = higher priority)
@task(timeout="30s", retries=3, tags=["io"])
def fetch_data(url: str) -> dict:
ctx = get_task_context()
if ctx:
print(f"Timeout: {ctx.metadata.timeout_secs}s")
print(f"Tags: {ctx.metadata.tags}")
return do_fetch(url)

Create a new workflow builder.

from sayiir import Flow
workflow = Flow("my-workflow")

Methods:

Add a sequential task to the pipeline.

workflow = (
Flow("pipeline")
.then(fetch_user)
.then(send_email)
.build()
)
# With custom name
workflow = (
Flow("pipeline")
.then(lambda x: x * 2, name="double")
.build()
)

Parameters:

  • task_func — Any callable: @task-decorated function, plain function, or lambda
  • name (str, optional) — Override the task ID (useful for lambdas)

Returns: Flow instance for chaining

Add a durable delay. No workers are held during the delay.

from datetime import timedelta
workflow = (
Flow("delayed")
.then(send_email)
.delay("wait_24h", timedelta(hours=24))
.then(send_reminder)
.build()
)
# Also accepts float seconds
workflow.delay("wait_5s", 5.0)

Parameters:

  • name (str) — Unique identifier for this delay step
  • duration (str | float | timedelta) — Delay duration as string ("30s", "5m", "1h"), seconds, or timedelta

Returns: Flow instance for chaining

.wait_for_signal(signal_name, *, name=None, timeout=None)

Section titled “.wait_for_signal(signal_name, *, name=None, timeout=None)”

Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.

workflow = (
Flow("approval")
.then(submit_request)
.wait_for_signal("manager_approval", timeout=timedelta(hours=48))
.then(process_approved)
.build()
)

Parameters:

  • signal_name (str) — The named signal to wait for
  • name (str, optional) — Node ID for this step (defaults to signal_name)
  • timeout (str | float | timedelta | None) — Optional timeout as duration string ("30s"), seconds, or timedelta

Returns: Flow instance for chaining

.loop(task_func, *, max_iterations=10, on_max=“fail”, name=None)

Section titled “.loop(task_func, *, max_iterations=10, on_max=“fail”, name=None)”

Add a loop whose body repeats until the task returns LoopResult.done().

from sayiir import Flow, LoopResult, task
@task
def refine(draft: str) -> dict:
improved = improve(draft)
if is_good_enough(improved):
return LoopResult.done(improved).to_dict()
return LoopResult.again(improved).to_dict()
workflow = (
Flow("iterative")
.then(initial_attempt)
.loop(refine, max_iterations=5)
.then(finalize)
.build()
)

Parameters:

  • task_func — Callable that returns a LoopResult. Use LoopResult.again(value) to continue or LoopResult.done(value) to exit.
  • max_iterations (int, optional) — Maximum iterations before on_max applies (default: 10, must be ≥ 1)
  • on_max (str, optional) — "fail" (default) raises an error; "exit_with_last" exits with the last iteration’s output
  • name (str, optional) — Override the task ID

Returns: Flow instance for chaining

Control flow type for loop body tasks.

from sayiir import LoopResult
# Continue the loop with a new value
LoopResult.again(new_value)
# Exit the loop with the final value
LoopResult.done(final_value)

Class methods:

  • LoopResult.again(value) — Continue iterating with value
  • LoopResult.done(value) — Exit the loop with value

Properties:

  • .is_again (bool) — True if this is an again result
  • .is_done (bool) — True if this is a done result
  • .value — The wrapped value

Wire format: Serializes as {"_loop": "again"|"done", "value": ...}. Call .to_dict() to get the serialized form.

Start a fork for parallel execution.

workflow = (
Flow("parallel")
.then(prepare_data)
.fork()
.branch(process_a)
.branch(process_b, process_c) # Multi-step branch
.join(combine_results)
.build()
)

Returns: ForkBuilder instance

Inline a child workflow. The child’s entire pipeline runs as a single composition step — its output becomes the input to the next step. Task registries are merged automatically.

from sayiir import task, Flow, run_workflow
@task
def double(x: int) -> int:
return x * 2
@task
def add_ten(x: int) -> int:
return x + 10
# Child workflow
child = Flow("child").then(double).build()
# Parent inlines the child
parent = (
Flow("parent")
.then(add_ten)
.then_flow(child)
.build()
)
result = run_workflow(parent, 5) # (5 + 10) * 2 = 30

Parameters:

  • workflow (Workflow) — A built workflow whose input matches the current output type

Returns: Flow instance with the child’s output type

Start conditional branching based on a routing key. The keys parameter declares all valid branch keys upfront — Sayiir validates .branch() calls against this set immediately and checks exhaustiveness at .done().

@task
def classify(ticket: dict) -> str:
return "billing" if ticket["type"] == "invoice" else "tech"
workflow = (
Flow("router")
.route(classify, keys=["billing", "tech"])
.branch("billing", handle_billing)
.branch("tech", handle_tech)
.default_branch(fallback)
.done()
.then(send_summary)
.build()
)

Parameters:

  • key_fn — Callable that returns a string routing key.
  • keys (list[str], keyword-only) — All valid routing keys. .branch() calls validate against this set, and .done() checks exhaustiveness.

Returns: BranchBuilder instance

Compile the workflow.

workflow = Flow("example").then(my_task).build()

Returns: Workflow instance

Builder for conditional branches. Created by calling .route() on a Flow.

Add a named branch with one or more chained tasks. The key is validated against the keys declared in .route() — a ValueError is raised immediately if the key is not in the declared set. The branch runs when the key function returns this key.

.branch("billing", handle_billing)
.branch("express", validate, ship_express, notify)

Parameters:

  • key (str) — The routing key that selects this branch (must be one of the declared keys)
  • *task_funcs — One or more task callables
  • name (str, optional) — Override the task ID of the first task

Returns: BranchBuilder instance for chaining

Set the fallback branch for unmatched keys.

.default_branch(fallback_handler)

Parameters:

  • *task_funcs — One or more task callables
  • name (str, optional) — Override the task ID of the first task

Returns: BranchBuilder instance for chaining

Finish the branching and return to the Flow builder. Raises ValueError if any declared key is missing a .branch() call, or if any .branch() key was not in the declared keys (orphan branch). The output type becomes a BranchEnvelope dict with branch (the matched key) and result (the branch output).

.route(classify, keys=["a", "b"])
.branch("a", handle_a)
.branch("b", handle_b)
.done()
.then(next_step) # receives {"branch": "a", "result": ...}

Returns: Flow instance for further chaining

Builder for parallel workflow branches. Created by calling .fork() on a Flow.

Add a branch with one or more chained tasks.

# Single-task branch
.branch(step_a)
# Multi-step branch (tasks run sequentially within the branch)
.branch(step_a, step_b, step_c)
# Lambda with custom name
.branch(lambda x: x + 1, name="increment")

Parameters:

  • *task_funcs — One or more task callables
  • name (str, optional) — Override the task ID of the first task in the branch

Returns: ForkBuilder instance for chaining

Join branches with a combining task that receives a dict of branch results.

def combine(results: dict) -> dict:
# results = {"branch_task_id_1": output1, "branch_task_id_2": output2}
return {"combined": results}
workflow = (
Flow("parallel")
.fork()
.branch(task_a)
.branch(task_b)
.join(combine)
.build()
)

Parameters:

  • task_func — Join callable that receives dict of branch results
  • name (str, optional) — Override the task ID

Returns: Flow instance for further chaining

Compiled workflow with task registry. Produced by Flow.build().

workflow = Flow("example").then(my_task).build()
print(workflow.workflow_id) # "example"
print(workflow.definition_hash) # "abc123..."

Properties:

  • workflow_id (str) — The workflow’s unique identifier
  • definition_hash (str) — Hash of the workflow definition

run_workflow(workflow, input_data, *, instance_id=None, backend=None, conflict_policy=None)

Section titled “run_workflow(workflow, input_data, *, instance_id=None, backend=None, conflict_policy=None)”

Run a workflow to completion. Without instance_id, runs entirely in-memory with no persistence (fastest path for prototyping). With instance_id and backend, runs with full checkpointing — but still returns the output directly. If the workflow doesn’t complete (e.g., parks on a delay or signal), raises WorkflowError.

from sayiir import run_workflow, PostgresBackend
# Prototype — no persistence
result = run_workflow(workflow, 21)
# Production — same function, just add params
backend = PostgresBackend("postgresql://localhost/sayiir")
result = run_workflow(workflow, 21, instance_id="run-001", backend=backend)
# Idempotent — safe to call multiple times
result = run_workflow(workflow, 21, instance_id="run-001", backend=backend,
conflict_policy="use_existing")

Parameters:

  • workflow (Workflow) — The workflow to run
  • input_data (Any) — Input to the first task
  • instance_id (str, optional) — Unique identifier for durable execution
  • backend (InMemoryBackend | PostgresBackend, optional) — Persistence backend (required with instance_id)
  • conflict_policy (str, optional) — What happens when instance_id already exists: "fail" (default), "use_existing", or "terminate_existing"

Returns: The workflow result (Any)

Raises:

  • WorkflowError if durable workflow did not complete. Use run_durable_workflow() when you need the full WorkflowStatus object.
  • InstanceAlreadyExistsError if conflict_policy is "fail" (default) and the instance already exists.

run_durable_workflow(workflow, instance_id, input_data, backend=None, conflict_policy=None)

Section titled “run_durable_workflow(workflow, instance_id, input_data, backend=None, conflict_policy=None)”

Run a workflow with checkpointing and durability. Returns WorkflowStatus.

from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
workflow = Flow("test").then(double).build()
status = run_durable_workflow(
workflow,
"run-001",
21,
backend=backend
)
if status.is_completed():
print(status.output) # 42

Parameters:

  • workflow (Workflow) — The workflow to run
  • instance_id (str) — Unique identifier for this execution instance
  • input_data (Any) — Input to the first task
  • backend (InMemoryBackend | PostgresBackend, optional) — Persistence backend (defaults to InMemoryBackend)
  • conflict_policy (str, optional) — What happens when instance_id already exists: "fail" (default), "use_existing", or "terminate_existing"

Returns: WorkflowStatus

resume_workflow(workflow, instance_id, backend)

Section titled “resume_workflow(workflow, instance_id, backend)”

Resume a durable workflow from a saved checkpoint.

from sayiir import resume_workflow
status = resume_workflow(workflow, "run-001", backend)

Parameters:

  • workflow (Workflow) — The workflow definition
  • instance_id (str) — The instance ID used when the workflow was started
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: WorkflowStatus

cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)

Section titled “cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)”

Request cancellation of a running durable workflow.

from sayiir import cancel_workflow
cancel_workflow(
"run-001",
backend,
reason="User requested cancellation",
cancelled_by="admin@example.com"
)

Parameters:

  • instance_id (str) — The instance ID of the workflow to cancel
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend
  • reason (str, optional) — Reason for cancellation
  • cancelled_by (str, optional) — Identifier of who requested cancellation

Returns: None

pause_workflow(instance_id, backend, reason=None, paused_by=None)

Section titled “pause_workflow(instance_id, backend, reason=None, paused_by=None)”

Pause a running workflow at the next checkpoint.

from sayiir import pause_workflow
pause_workflow(
"run-001",
backend,
reason="Maintenance window",
paused_by="ops@example.com"
)

Parameters:

  • instance_id (str) — The instance ID of the workflow to pause
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend
  • reason (str, optional) — Reason for pause
  • paused_by (str, optional) — Identifier of who requested pause

Returns: None

Allow a paused workflow to resume.

from sayiir import unpause_workflow
unpause_workflow("run-001", backend)

Parameters:

  • instance_id (str) — The instance ID of the workflow to unpause
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: None

send_signal(instance_id, signal_name, payload, backend)

Section titled “send_signal(instance_id, signal_name, payload, backend)”

Send an external signal to a workflow waiting on wait_for_signal.

from sayiir import send_signal
send_signal(
"run-001",
"manager_approval",
{"approved": True, "comments": "Looks good"},
backend
)

Parameters:

  • instance_id (str) — The instance ID of the target workflow
  • signal_name (str) — The signal name to send
  • payload (Any) — The payload data (will be serialized)
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: None

The payload is buffered in FIFO order per (instance_id, signal_name). When the workflow resumes and reaches the matching wait_for_signal node, it consumes the oldest buffered event.

In-memory storage for development and testing. State is lost when the process stops.

from sayiir import InMemoryBackend
backend = InMemoryBackend()

Fast and requires no external dependencies. Ideal for unit tests and local development.

PostgreSQL storage for production. State survives process restarts.

from sayiir import PostgresBackend
backend = PostgresBackend("postgresql://user:pass@localhost:5432/sayiir")

Parameters:

  • url (str) — PostgreSQL connection URL

Requirements:

  • PostgreSQL 13 or higher
  • Auto-migration: tables are created on first connection
  • Recommended: dedicated database for Sayiir

Status object returned by durable execution functions.

status = run_durable_workflow(workflow, "run-001", input_data, backend)
print(status.status) # "completed", "failed", "cancelled", etc.
print(status.output) # Final result if completed
print(status.error) # Error message if failed
print(status.reason) # Reason if cancelled/paused
print(status.cancelled_by) # Who cancelled (if applicable)
# Convenience methods
if status.is_completed():
print(status.output)
elif status.is_failed():
print(status.error)
elif status.is_cancelled():
print(f"Cancelled: {status.reason}")
elif status.is_paused():
print("Workflow is paused")
elif status.is_in_progress():
print("Still running")

Properties:

  • status (str) — Current status
  • output (Any | None) — Final result if completed
  • error (str | None) — Error message if failed
  • reason (str | None) — Reason for cancellation/pause
  • cancelled_by (str | None) — Identifier of who cancelled

Methods:

  • is_completed() → bool
  • is_failed() → bool
  • is_cancelled() → bool
  • is_in_progress() → bool
  • is_paused() → bool

Configuration for task retry behavior.

from sayiir import RetryPolicy
policy = RetryPolicy(
max_retries=3,
initial_delay_secs=1.0,
backoff_multiplier=2.0
)
@task(retries=policy)
def flaky_task(data: dict) -> dict:
# May fail and retry with exponential backoff
return process(data)

Parameters:

  • max_retries (int) — Maximum number of retries (default: 2)
  • initial_delay_secs (float) — Initial delay between retries (default: 1.0)
  • backoff_multiplier (float) — Exponential backoff multiplier (default: 2.0)

Delay Formula:

delay = initial_delay_secs * (backoff_multiplier ** attempt)

Examples:

  • Attempt 1: 1.0s
  • Attempt 2: 2.0s
  • Attempt 3: 4.0s

Worker(worker_id, backend, *, poll_interval=5.0, claim_ttl=300.0)

Section titled “Worker(worker_id, backend, *, poll_interval=5.0, claim_ttl=300.0)”

Create a distributed workflow worker that polls the backend for available tasks.

from sayiir import PostgresBackend
from sayiir.worker import Worker
backend = PostgresBackend("postgresql://localhost/sayiir")
worker = Worker("worker-1", backend, poll_interval=2.0, claim_ttl=600.0)

Parameters:

  • worker_id (str) — Unique identifier for this worker instance
  • backend (InMemoryBackend | PostgresBackend) — Shared persistence backend
  • poll_interval (float, optional) — Seconds between polls for available tasks (default: 5.0)
  • claim_ttl (float, optional) — Task claim time-to-live in seconds (default: 300.0)

Start the worker and return a handle for lifecycle control. Spawns a background thread that polls for tasks.

workflow = Flow("pipeline").then(my_task).build()
handle = worker.start([workflow])

Parameters:

  • workflows (list[Workflow]) — List of compiled workflows whose tasks this worker can execute

Returns: WorkerHandle

Handle for controlling a running worker. Obtained from Worker.start().

Request a graceful shutdown. Non-blocking — the worker finishes in-flight tasks before stopping.

handle.shutdown()

Block until the worker has fully stopped. Releases the GIL while waiting.

handle.shutdown()
handle.join() # blocks until shutdown completes

Client for submitting and controlling workflow instances. Unlike run_durable_workflow(), the client does not execute tasks — it only creates initial snapshots and stores lifecycle signals. A Worker picks up and executes the work.

WorkflowClient(backend, *, conflict_policy=None)

Section titled “WorkflowClient(backend, *, conflict_policy=None)”

Create a new workflow client.

from sayiir import WorkflowClient, PostgresBackend
backend = PostgresBackend("postgresql://localhost/sayiir")
client = WorkflowClient(backend, conflict_policy="use_existing")

Parameters:

  • backend (InMemoryBackend | PostgresBackend) — Shared persistence backend
  • conflict_policy (str, optional) — What to do when an instance_id already exists: "fail" (default), "use_existing", or "terminate_existing"

Submit a workflow for execution. Creates an initial snapshot so a Worker can pick it up.

status = client.submit(workflow, "order-42", {"items": [1, 2, 3]})

Returns: WorkflowStatus

.cancel(instance_id, *, reason=None, cancelled_by=None)

Section titled “.cancel(instance_id, *, reason=None, cancelled_by=None)”

Request cancellation of a workflow instance.

client.cancel("order-42", reason="Out of stock", cancelled_by="admin@example.com")

.pause(instance_id, *, reason=None, paused_by=None)

Section titled “.pause(instance_id, *, reason=None, paused_by=None)”

Pause a workflow instance at the next checkpoint.

client.pause("order-42", reason="Maintenance window")

Unpause a paused workflow instance.

client.unpause("order-42")

.send_signal(instance_id, signal_name, payload)

Section titled “.send_signal(instance_id, signal_name, payload)”

Send an external signal to a workflow instance.

client.send_signal("order-42", "manager_approval", {"approved": True})

Get the current status of a workflow instance.

status = client.status("order-42")
if status.is_completed():
print(status.output)

Returns: WorkflowStatus

Get a single task result from a workflow instance.

Returns the decoded task output, or None if the task was never executed. For completed or failed workflows, the result is recovered from the backend’s history or cache.

result = client.get_task_result("order-42", "validate_order")
if result is not None:
print(result)

Parameters:

  • instance_id (str) — The workflow instance ID
  • task_id (str) — The task ID to retrieve the result for

Returns: Any | None

Base exception for all Sayiir workflow errors.

from sayiir import WorkflowError
try:
run_workflow(workflow, input_data)
except WorkflowError as e:
print(f"Workflow error: {e}")

Raised when a task fails during execution.

from sayiir import TaskError
try:
run_workflow(workflow, input_data)
except TaskError as e:
print(f"Task failed: {e}")

Raised when a persistence backend operation fails.

from sayiir import BackendError
try:
backend = PostgresBackend("postgresql://invalid:url")
except BackendError as e:
print(f"Backend error: {e}")

Raised when running a durable workflow with an instance_id that already has a snapshot and conflict_policy is "fail" (the default).

from sayiir import InstanceAlreadyExistsError
try:
run_durable_workflow(workflow, "run-001", input_data, backend=backend)
except InstanceAlreadyExistsError:
print("Instance already exists — use conflict_policy='use_existing' for idempotent runs")

Sayiir supports Pydantic models for automatic validation and serialization:

from pydantic import BaseModel
from sayiir import task
class Order(BaseModel):
order_id: str
amount: float
class Receipt(BaseModel):
receipt_id: str
status: str
@task
def process_order(order: Order) -> Receipt:
# Input is automatically validated
# Output is automatically serialized
return Receipt(receipt_id="R123", status="paid")

If Pydantic is not installed or annotations are not models, validation/serialization is skipped.

Sayiir ships with inline type stubs and is PEP 561 compliant — editors and type checkers (mypy, pyright) get full autocompletion and type checking out of the box.

Tasks can be async functions. Sayiir detects coroutines automatically and awaits them:

@task
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()