Skip to content

Python API Reference

Annotate a function as a workflow task.

from sayiir import task
@task
def my_task(input: dict) -> dict:
return {"result": input["value"] * 2}
@task(
name="custom_name",
timeout_secs=30,
retries=RetryPolicy(max_retries=3),
tags=["io", "external"],
description="Processes data from external API"
)
def my_task(input: dict) -> dict:
return {"result": input["value"] * 2}

Parameters:

  • name (str, optional) — Override task ID (default: function name)
  • timeout_secs (float, optional) — Task timeout in seconds
  • retries (RetryPolicy, optional) — Retry configuration
  • tags (list[str], optional) — Categorization tags
  • description (str, optional) — Human-readable description

The decorator attaches metadata to the function without wrapping it. The function can still be called directly for testing.

Create a new workflow builder.

from sayiir import Flow
workflow = Flow("my-workflow")

Methods:

Add a sequential task to the pipeline.

workflow = (
Flow("pipeline")
.then(fetch_user)
.then(send_email)
.build()
)
# With custom name
workflow = (
Flow("pipeline")
.then(lambda x: x * 2, name="double")
.build()
)

Parameters:

  • task_func — Any callable: @task-decorated function, plain function, or lambda
  • name (str, optional) — Override the task ID (useful for lambdas)

Returns: Flow instance for chaining

Add a durable delay. No workers are held during the delay.

from datetime import timedelta
workflow = (
Flow("delayed")
.then(send_email)
.delay("wait_24h", timedelta(hours=24))
.then(send_reminder)
.build()
)
# Also accepts float seconds
workflow.delay("wait_5s", 5.0)

Parameters:

  • name (str) — Unique identifier for this delay step
  • duration (float | timedelta) — Delay duration in seconds or timedelta

Returns: Flow instance for chaining

.wait_for_signal(signal_name, *, name=None, timeout=None)

Section titled “.wait_for_signal(signal_name, *, name=None, timeout=None)”

Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.

workflow = (
Flow("approval")
.then(submit_request)
.wait_for_signal("manager_approval", timeout=timedelta(hours=48))
.then(process_approved)
.build()
)

Parameters:

  • signal_name (str) — The named signal to wait for
  • name (str, optional) — Node ID for this step (defaults to signal_name)
  • timeout (float | timedelta | None) — Optional timeout

Returns: Flow instance for chaining

Start a fork for parallel execution.

workflow = (
Flow("parallel")
.then(prepare_data)
.fork()
.branch(process_a)
.branch(process_b, process_c) # Multi-step branch
.join(combine_results)
.build()
)

Returns: ForkBuilder instance

Compile the workflow.

workflow = Flow("example").then(my_task).build()

Returns: Workflow instance

Builder for parallel workflow branches. Created by calling .fork() on a Flow.

Add a branch with one or more chained tasks.

# Single-task branch
.branch(step_a)
# Multi-step branch (tasks run sequentially within the branch)
.branch(step_a, step_b, step_c)
# Lambda with custom name
.branch(lambda x: x + 1, name="increment")

Parameters:

  • *task_funcs — One or more task callables
  • name (str, optional) — Override the task ID of the first task in the branch

Returns: ForkBuilder instance for chaining

Join branches with a combining task that receives a dict of branch results.

def combine(results: dict) -> dict:
# results = {"branch_task_id_1": output1, "branch_task_id_2": output2}
return {"combined": results}
workflow = (
Flow("parallel")
.fork()
.branch(task_a)
.branch(task_b)
.join(combine)
.build()
)

Parameters:

  • task_func — Join callable that receives dict of branch results
  • name (str, optional) — Override the task ID

Returns: Flow instance for further chaining

Compiled workflow with task registry. Produced by Flow.build().

workflow = Flow("example").then(my_task).build()
print(workflow.workflow_id) # "example"
print(workflow.definition_hash) # "abc123..."

Properties:

  • workflow_id (str) — The workflow’s unique identifier
  • definition_hash (str) — Hash of the workflow definition

Run a workflow to completion without persistence. Returns the result directly.

from sayiir import run_workflow
workflow = Flow("test").then(double).build()
result = run_workflow(workflow, 21)
print(result) # 42

Parameters:

  • workflow (Workflow) — The workflow to run
  • input_data (Any) — Input to the first task

Returns: The workflow result (Any)

run_durable_workflow(workflow, instance_id, input_data, backend=None)

Section titled “run_durable_workflow(workflow, instance_id, input_data, backend=None)”

Run a workflow with checkpointing and durability. Returns WorkflowStatus.

from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
workflow = Flow("test").then(double).build()
status = run_durable_workflow(
workflow,
"run-001",
21,
backend=backend
)
if status.is_completed():
print(status.output) # 42

Parameters:

  • workflow (Workflow) — The workflow to run
  • instance_id (str) — Unique identifier for this execution instance
  • input_data (Any) — Input to the first task
  • backend (InMemoryBackend | PostgresBackend, optional) — Persistence backend (defaults to InMemoryBackend)

Returns: WorkflowStatus

resume_workflow(workflow, instance_id, backend)

Section titled “resume_workflow(workflow, instance_id, backend)”

Resume a durable workflow from a saved checkpoint.

from sayiir import resume_workflow
status = resume_workflow(workflow, "run-001", backend)

Parameters:

  • workflow (Workflow) — The workflow definition
  • instance_id (str) — The instance ID used when the workflow was started
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: WorkflowStatus

cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)

Section titled “cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)”

Request cancellation of a running durable workflow.

from sayiir import cancel_workflow
cancel_workflow(
"run-001",
backend,
reason="User requested cancellation",
cancelled_by="admin@example.com"
)

Parameters:

  • instance_id (str) — The instance ID of the workflow to cancel
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend
  • reason (str, optional) — Reason for cancellation
  • cancelled_by (str, optional) — Identifier of who requested cancellation

Returns: None

pause_workflow(instance_id, backend, reason=None, paused_by=None)

Section titled “pause_workflow(instance_id, backend, reason=None, paused_by=None)”

Pause a running workflow at the next checkpoint.

from sayiir import pause_workflow
pause_workflow(
"run-001",
backend,
reason="Maintenance window",
paused_by="ops@example.com"
)

Parameters:

  • instance_id (str) — The instance ID of the workflow to pause
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend
  • reason (str, optional) — Reason for pause
  • paused_by (str, optional) — Identifier of who requested pause

Returns: None

Allow a paused workflow to resume.

from sayiir import unpause_workflow
unpause_workflow("run-001", backend)

Parameters:

  • instance_id (str) — The instance ID of the workflow to unpause
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: None

send_signal(instance_id, signal_name, payload, backend)

Section titled “send_signal(instance_id, signal_name, payload, backend)”

Send an external signal to a workflow waiting on wait_for_signal.

from sayiir import send_signal
send_signal(
"run-001",
"manager_approval",
{"approved": True, "comments": "Looks good"},
backend
)

Parameters:

  • instance_id (str) — The instance ID of the target workflow
  • signal_name (str) — The signal name to send
  • payload (Any) — The payload data (will be serialized)
  • backend (InMemoryBackend | PostgresBackend) — Persistence backend

Returns: None

The payload is buffered in FIFO order per (instance_id, signal_name). When the workflow resumes and reaches the matching wait_for_signal node, it consumes the oldest buffered event.

In-memory storage for development and testing. State is lost when the process stops.

from sayiir import InMemoryBackend
backend = InMemoryBackend()

Fast and requires no external dependencies. Ideal for unit tests and local development.

PostgreSQL storage for production. State survives process restarts.

from sayiir import PostgresBackend
backend = PostgresBackend("postgresql://user:pass@localhost:5432/sayiir")

Parameters:

  • url (str) — PostgreSQL connection URL

Requirements:

  • PostgreSQL 13 or higher
  • Auto-migration: tables are created on first connection
  • Recommended: dedicated database for Sayiir

Status object returned by durable execution functions.

status = run_durable_workflow(workflow, "run-001", input_data, backend)
print(status.status) # "completed", "failed", "cancelled", etc.
print(status.output) # Final result if completed
print(status.error) # Error message if failed
print(status.reason) # Reason if cancelled/paused
print(status.cancelled_by) # Who cancelled (if applicable)
# Convenience methods
if status.is_completed():
print(status.output)
elif status.is_failed():
print(status.error)
elif status.is_cancelled():
print(f"Cancelled: {status.reason}")
elif status.is_paused():
print("Workflow is paused")
elif status.is_in_progress():
print("Still running")

Properties:

  • status (str) — Current status
  • output (Any | None) — Final result if completed
  • error (str | None) — Error message if failed
  • reason (str | None) — Reason for cancellation/pause
  • cancelled_by (str | None) — Identifier of who cancelled

Methods:

  • is_completed() → bool
  • is_failed() → bool
  • is_cancelled() → bool
  • is_in_progress() → bool
  • is_paused() → bool

Configuration for task retry behavior.

from sayiir import RetryPolicy
policy = RetryPolicy(
max_retries=3,
initial_delay_secs=1.0,
backoff_multiplier=2.0
)
@task(retries=policy)
def flaky_task(data: dict) -> dict:
# May fail and retry with exponential backoff
return process(data)

Parameters:

  • max_retries (int) — Maximum number of retries (default: 2)
  • initial_delay_secs (float) — Initial delay between retries (default: 1.0)
  • backoff_multiplier (float) — Exponential backoff multiplier (default: 2.0)

Delay Formula:

delay = initial_delay_secs * (backoff_multiplier ** attempt)

Examples:

  • Attempt 1: 1.0s
  • Attempt 2: 2.0s
  • Attempt 3: 4.0s

Base exception for all Sayiir workflow errors.

from sayiir import WorkflowError
try:
run_workflow(workflow, input_data)
except WorkflowError as e:
print(f"Workflow error: {e}")

Raised when a task fails during execution.

from sayiir import TaskError
try:
run_workflow(workflow, input_data)
except TaskError as e:
print(f"Task failed: {e}")

Raised when a persistence backend operation fails.

from sayiir import BackendError
try:
backend = PostgresBackend("postgresql://invalid:url")
except BackendError as e:
print(f"Backend error: {e}")

Sayiir supports Pydantic models for automatic validation and serialization:

from pydantic import BaseModel
from sayiir import task
class Order(BaseModel):
order_id: str
amount: float
class Receipt(BaseModel):
receipt_id: str
status: str
@task
def process_order(order: Order) -> Receipt:
# Input is automatically validated
# Output is automatically serialized
return Receipt(receipt_id="R123", status="paid")

If Pydantic is not installed or annotations are not models, validation/serialization is skipped.

Sayiir ships with inline type stubs and is PEP 561 compliant — editors and type checkers (mypy, pyright) get full autocompletion and type checking out of the box.

Tasks can be async functions. Sayiir detects coroutines automatically and awaits them:

@task
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()