Python API Reference
Decorators
Section titled “Decorators”Annotate a function as a workflow task.
from sayiir import task
@taskdef my_task(input: dict) -> dict: return {"result": input["value"] * 2}
@task( name="custom_name", timeout_secs=30, retries=RetryPolicy(max_retries=3), tags=["io", "external"], description="Processes data from external API")def my_task(input: dict) -> dict: return {"result": input["value"] * 2}Parameters:
name(str, optional) — Override task ID (default: function name)timeout_secs(float, optional) — Task timeout in secondsretries(RetryPolicy, optional) — Retry configurationtags(list[str], optional) — Categorization tagsdescription(str, optional) — Human-readable description
The decorator attaches metadata to the function without wrapping it. The function can still be called directly for testing.
Flow Builder
Section titled “Flow Builder”Flow(name)
Section titled “Flow(name)”Create a new workflow builder.
from sayiir import Flow
workflow = Flow("my-workflow")Methods:
.then(task_func, *, name=None)
Section titled “.then(task_func, *, name=None)”Add a sequential task to the pipeline.
workflow = ( Flow("pipeline") .then(fetch_user) .then(send_email) .build())
# With custom nameworkflow = ( Flow("pipeline") .then(lambda x: x * 2, name="double") .build())Parameters:
task_func— Any callable:@task-decorated function, plain function, or lambdaname(str, optional) — Override the task ID (useful for lambdas)
Returns: Flow instance for chaining
.delay(name, duration)
Section titled “.delay(name, duration)”Add a durable delay. No workers are held during the delay.
from datetime import timedelta
workflow = ( Flow("delayed") .then(send_email) .delay("wait_24h", timedelta(hours=24)) .then(send_reminder) .build())
# Also accepts float secondsworkflow.delay("wait_5s", 5.0)Parameters:
name(str) — Unique identifier for this delay stepduration(float | timedelta) — Delay duration in seconds or timedelta
Returns: Flow instance for chaining
.wait_for_signal(signal_name, *, name=None, timeout=None)
Section titled “.wait_for_signal(signal_name, *, name=None, timeout=None)”Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.
workflow = ( Flow("approval") .then(submit_request) .wait_for_signal("manager_approval", timeout=timedelta(hours=48)) .then(process_approved) .build())Parameters:
signal_name(str) — The named signal to wait forname(str, optional) — Node ID for this step (defaults to signal_name)timeout(float | timedelta | None) — Optional timeout
Returns: Flow instance for chaining
.fork()
Section titled “.fork()”Start a fork for parallel execution.
workflow = ( Flow("parallel") .then(prepare_data) .fork() .branch(process_a) .branch(process_b, process_c) # Multi-step branch .join(combine_results) .build())Returns: ForkBuilder instance
.build()
Section titled “.build()”Compile the workflow.
workflow = Flow("example").then(my_task).build()Returns: Workflow instance
ForkBuilder
Section titled “ForkBuilder”Builder for parallel workflow branches. Created by calling .fork() on a Flow.
.branch(*task_funcs, name=None)
Section titled “.branch(*task_funcs, name=None)”Add a branch with one or more chained tasks.
# Single-task branch.branch(step_a)
# Multi-step branch (tasks run sequentially within the branch).branch(step_a, step_b, step_c)
# Lambda with custom name.branch(lambda x: x + 1, name="increment")Parameters:
*task_funcs— One or more task callablesname(str, optional) — Override the task ID of the first task in the branch
Returns: ForkBuilder instance for chaining
.join(task_func, *, name=None)
Section titled “.join(task_func, *, name=None)”Join branches with a combining task that receives a dict of branch results.
def combine(results: dict) -> dict: # results = {"branch_task_id_1": output1, "branch_task_id_2": output2} return {"combined": results}
workflow = ( Flow("parallel") .fork() .branch(task_a) .branch(task_b) .join(combine) .build())Parameters:
task_func— Join callable that receives dict of branch resultsname(str, optional) — Override the task ID
Returns: Flow instance for further chaining
Workflow
Section titled “Workflow”Compiled workflow with task registry. Produced by Flow.build().
workflow = Flow("example").then(my_task).build()
print(workflow.workflow_id) # "example"print(workflow.definition_hash) # "abc123..."Properties:
workflow_id(str) — The workflow’s unique identifierdefinition_hash(str) — Hash of the workflow definition
Execution Functions
Section titled “Execution Functions”run_workflow(workflow, input_data)
Section titled “run_workflow(workflow, input_data)”Run a workflow to completion without persistence. Returns the result directly.
from sayiir import run_workflow
workflow = Flow("test").then(double).build()result = run_workflow(workflow, 21)print(result) # 42Parameters:
workflow(Workflow) — The workflow to runinput_data(Any) — Input to the first task
Returns: The workflow result (Any)
run_durable_workflow(workflow, instance_id, input_data, backend=None)
Section titled “run_durable_workflow(workflow, instance_id, input_data, backend=None)”Run a workflow with checkpointing and durability. Returns WorkflowStatus.
from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()workflow = Flow("test").then(double).build()
status = run_durable_workflow( workflow, "run-001", 21, backend=backend)
if status.is_completed(): print(status.output) # 42Parameters:
workflow(Workflow) — The workflow to runinstance_id(str) — Unique identifier for this execution instanceinput_data(Any) — Input to the first taskbackend(InMemoryBackend | PostgresBackend, optional) — Persistence backend (defaults to InMemoryBackend)
Returns: WorkflowStatus
resume_workflow(workflow, instance_id, backend)
Section titled “resume_workflow(workflow, instance_id, backend)”Resume a durable workflow from a saved checkpoint.
from sayiir import resume_workflow
status = resume_workflow(workflow, "run-001", backend)Parameters:
workflow(Workflow) — The workflow definitioninstance_id(str) — The instance ID used when the workflow was startedbackend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: WorkflowStatus
cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)
Section titled “cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)”Request cancellation of a running durable workflow.
from sayiir import cancel_workflow
cancel_workflow( "run-001", backend, reason="User requested cancellation", cancelled_by="admin@example.com")Parameters:
instance_id(str) — The instance ID of the workflow to cancelbackend(InMemoryBackend | PostgresBackend) — Persistence backendreason(str, optional) — Reason for cancellationcancelled_by(str, optional) — Identifier of who requested cancellation
Returns: None
pause_workflow(instance_id, backend, reason=None, paused_by=None)
Section titled “pause_workflow(instance_id, backend, reason=None, paused_by=None)”Pause a running workflow at the next checkpoint.
from sayiir import pause_workflow
pause_workflow( "run-001", backend, reason="Maintenance window", paused_by="ops@example.com")Parameters:
instance_id(str) — The instance ID of the workflow to pausebackend(InMemoryBackend | PostgresBackend) — Persistence backendreason(str, optional) — Reason for pausepaused_by(str, optional) — Identifier of who requested pause
Returns: None
unpause_workflow(instance_id, backend)
Section titled “unpause_workflow(instance_id, backend)”Allow a paused workflow to resume.
from sayiir import unpause_workflow
unpause_workflow("run-001", backend)Parameters:
instance_id(str) — The instance ID of the workflow to unpausebackend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: None
send_signal(instance_id, signal_name, payload, backend)
Section titled “send_signal(instance_id, signal_name, payload, backend)”Send an external signal to a workflow waiting on wait_for_signal.
from sayiir import send_signal
send_signal( "run-001", "manager_approval", {"approved": True, "comments": "Looks good"}, backend)Parameters:
instance_id(str) — The instance ID of the target workflowsignal_name(str) — The signal name to sendpayload(Any) — The payload data (will be serialized)backend(InMemoryBackend | PostgresBackend) — Persistence backend
Returns: None
The payload is buffered in FIFO order per (instance_id, signal_name). When the workflow resumes and reaches the matching wait_for_signal node, it consumes the oldest buffered event.
Backends
Section titled “Backends”InMemoryBackend()
Section titled “InMemoryBackend()”In-memory storage for development and testing. State is lost when the process stops.
from sayiir import InMemoryBackend
backend = InMemoryBackend()Fast and requires no external dependencies. Ideal for unit tests and local development.
PostgresBackend(url)
Section titled “PostgresBackend(url)”PostgreSQL storage for production. State survives process restarts.
from sayiir import PostgresBackend
backend = PostgresBackend("postgresql://user:pass@localhost:5432/sayiir")Parameters:
url(str) — PostgreSQL connection URL
Requirements:
- PostgreSQL 13 or higher
- Auto-migration: tables are created on first connection
- Recommended: dedicated database for Sayiir
WorkflowStatus
Section titled “WorkflowStatus”Status object returned by durable execution functions.
status = run_durable_workflow(workflow, "run-001", input_data, backend)
print(status.status) # "completed", "failed", "cancelled", etc.print(status.output) # Final result if completedprint(status.error) # Error message if failedprint(status.reason) # Reason if cancelled/pausedprint(status.cancelled_by) # Who cancelled (if applicable)
# Convenience methodsif status.is_completed(): print(status.output)elif status.is_failed(): print(status.error)elif status.is_cancelled(): print(f"Cancelled: {status.reason}")elif status.is_paused(): print("Workflow is paused")elif status.is_in_progress(): print("Still running")Properties:
status(str) — Current statusoutput(Any | None) — Final result if completederror(str | None) — Error message if failedreason(str | None) — Reason for cancellation/pausecancelled_by(str | None) — Identifier of who cancelled
Methods:
is_completed()→ boolis_failed()→ boolis_cancelled()→ boolis_in_progress()→ boolis_paused()→ bool
RetryPolicy
Section titled “RetryPolicy”Configuration for task retry behavior.
from sayiir import RetryPolicy
policy = RetryPolicy( max_retries=3, initial_delay_secs=1.0, backoff_multiplier=2.0)
@task(retries=policy)def flaky_task(data: dict) -> dict: # May fail and retry with exponential backoff return process(data)Parameters:
max_retries(int) — Maximum number of retries (default: 2)initial_delay_secs(float) — Initial delay between retries (default: 1.0)backoff_multiplier(float) — Exponential backoff multiplier (default: 2.0)
Delay Formula:
delay = initial_delay_secs * (backoff_multiplier ** attempt)Examples:
- Attempt 1: 1.0s
- Attempt 2: 2.0s
- Attempt 3: 4.0s
Exceptions
Section titled “Exceptions”WorkflowError
Section titled “WorkflowError”Base exception for all Sayiir workflow errors.
from sayiir import WorkflowError
try: run_workflow(workflow, input_data)except WorkflowError as e: print(f"Workflow error: {e}")TaskError
Section titled “TaskError”Raised when a task fails during execution.
from sayiir import TaskError
try: run_workflow(workflow, input_data)except TaskError as e: print(f"Task failed: {e}")BackendError
Section titled “BackendError”Raised when a persistence backend operation fails.
from sayiir import BackendError
try: backend = PostgresBackend("postgresql://invalid:url")except BackendError as e: print(f"Backend error: {e}")Type Annotations
Section titled “Type Annotations”Sayiir supports Pydantic models for automatic validation and serialization:
from pydantic import BaseModelfrom sayiir import task
class Order(BaseModel): order_id: str amount: float
class Receipt(BaseModel): receipt_id: str status: str
@taskdef process_order(order: Order) -> Receipt: # Input is automatically validated # Output is automatically serialized return Receipt(receipt_id="R123", status="paid")If Pydantic is not installed or annotations are not models, validation/serialization is skipped.
Sayiir ships with inline type stubs and is PEP 561 compliant — editors and type checkers (mypy, pyright) get full autocompletion and type checking out of the box.
Async tasks
Section titled “Async tasks”Tasks can be async functions. Sayiir detects coroutines automatically and awaits them:
@taskasync def fetch_data(url: str) -> dict: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.json()