Skip to content

Python Quick Start

Terminal window
pip install sayiir

Define tasks with @task and chain them with Flow:

from sayiir import task, Flow, run_workflow
@task
def fetch_user(user_id: int) -> dict:
return {"id": user_id, "name": "Alice"}
@task
def send_email(user: dict) -> str:
return f"Sent welcome to {user['name']}"
workflow = Flow("welcome").then(fetch_user).then(send_email).build()
result = run_workflow(workflow, 42)
print(result) # "Sent welcome to Alice"

Use run_durable_workflow to checkpoint after each task. If the process crashes, call resume_workflow to continue from the last checkpoint.

from sayiir import task, Flow, run_durable_workflow, InMemoryBackend
@task(timeout="30s")
def process_order(order_id: int) -> dict:
return {"order_id": order_id, "status": "processed"}
@task
def send_confirmation(order: dict) -> str:
return f"Confirmed order {order['order_id']}"
workflow = Flow("order").then(process_order).then(send_confirmation).build()
# Checkpoints after each task — resume from last checkpoint on crash
status = run_durable_workflow(workflow, "order-123", 42)
print(status.output)

Swap to PostgresBackend for production — everything else stays the same:

from sayiir import PostgresBackend
# Connects and runs migrations automatically
backend = PostgresBackend("postgresql://localhost/sayiir")
status = run_durable_workflow(workflow, "order-123", 42, backend=backend)

Sayiir automatically validates inputs and outputs when you use Pydantic models:

from pydantic import BaseModel
from sayiir import task, Flow, run_workflow
class OrderInput(BaseModel):
order_id: int
amount: float
class OrderResult(BaseModel):
status: str
message: str
@task
def process(order: OrderInput) -> OrderResult:
return OrderResult(status="ok", message=f"Processed ${order.amount}")
workflow = Flow("typed").then(process).build()
result = run_workflow(workflow, {"order_id": 1, "amount": 99.99})

Pydantic models are serialized and deserialized automatically at each checkpoint boundary.

You don’t need @task for every step — pass inline functions directly:

from sayiir import Flow, run_workflow
workflow = (
Flow("quick")
.then("double", lambda x: x * 2)
.then("format", lambda x: f"Result: {x}")
.build()
)
result = run_workflow(workflow, 21)
# "Result: 42"

Use @task when you need metadata (retries, timeouts, tags) or want to reuse a task across workflows.

Use .fork() and .join() to run branches in parallel:

from sayiir import task, Flow, branch, run_workflow
@task
def validate_payment(order: dict) -> dict:
return {"payment": "valid"}
@task
def check_inventory(order: dict) -> dict:
return {"stock": "available"}
workflow = (
Flow("checkout")
.fork([
branch("payment", validate_payment),
branch("inventory", check_inventory),
])
.join("finalize", lambda results: {**results[0], **results[1]})
.build()
)
result = run_workflow(workflow, {"id": 1})