Skip to content

Tutorial: Order Processing (Python)

Learn how to build a production-ready order processing pipeline with Sayiir. We’ll start with a simple sequential flow and progressively add durability, retries, and parallelism.

An order processing pipeline that:

  • Validates incoming orders
  • Charges payment (with automatic retries)
  • Checks inventory
  • Sends confirmation emails

By the end, you’ll have a durable workflow that survives crashes and scales to handle thousands of orders.

First, create a new Python project and install Sayiir:

Terminal window
mkdir order-processor && cd order-processor
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install sayiir

Tasks are the building blocks of workflows. Let’s define our order processing tasks with proper error handling and retry policies:

from sayiir import task, RetryPolicy
@task
def validate_order(order: dict) -> dict:
"""Validate order data and business rules."""
if order.get("amount", 0) <= 0:
raise ValueError("Invalid amount: must be positive")
if not order.get("customer_email"):
raise ValueError("Customer email is required")
return {**order, "validated": True}
@task(
timeout_secs=30,
retries=RetryPolicy(
max_retries=3,
initial_delay_secs=1.0,
backoff_multiplier=2.0
)
)
def charge_payment(order: dict) -> dict:
"""Charge payment with automatic retries."""
# In production: integrate with Stripe, Square, etc.
# payment_intent = stripe.PaymentIntent.create(
# amount=int(order["amount"] * 100),
# currency="usd",
# customer=order["customer_id"]
# )
return {
**order,
"payment_id": "pay_123abc",
"charged": True
}
@task
def send_confirmation(order: dict) -> str:
"""Send order confirmation email."""
# In production: integrate with SendGrid, Postmark, etc.
# send_email(
# to=order["customer_email"],
# subject=f"Order {order['order_id']} Confirmed",
# body=f"Payment ID: {order['payment_id']}"
# )
return f"Order {order['order_id']} confirmed (payment: {order['payment_id']})"

Notice the @task decorator configures:

  • Timeout: Fails if charge_payment takes longer than 30 seconds
  • Retries: Automatically retries failed payments up to 3 times with exponential backoff (1s, 2s, 4s)

Let’s compose these tasks into a sequential workflow:

from sayiir import Flow, run_workflow
# Create a sequential workflow
workflow = (
Flow("order-processing")
.then(validate_order)
.then(charge_payment)
.then(send_confirmation)
.build()
)
# Execute the workflow
order_data = {
"order_id": 1,
"customer_email": "alice@example.com",
"amount": 99.99
}
result = run_workflow(workflow, order_data)
print(result) # "Order 1 confirmed (payment: pay_123abc)"

This works great for simple cases, but what happens if your process crashes during payment? The order might be charged twice on restart.

Switch to run_durable_workflow to get automatic checkpointing and crash recovery:

from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
status = run_durable_workflow(
workflow=workflow,
workflow_id="order-1",
initial_input=order_data,
backend=backend
)
print(status.is_completed()) # True
print(status.output()) # "Order 1 confirmed (payment: pay_123abc)"

Now if your process crashes:

  • Tasks are checkpointed after each step
  • On restart, the workflow resumes from the last checkpoint
  • Already-completed tasks are never re-executed

Testing durability: Try running this twice with the same workflow_id. The second run will immediately return the cached result without re-executing tasks.

Real order processing needs to do multiple things at once. Let’s check inventory while charging payment:

@task
def check_inventory(order: dict) -> dict:
"""Verify items are in stock."""
# In production: query inventory database
# inventory = db.query("SELECT stock FROM products WHERE id = ?", order["product_id"])
return {**order, "in_stock": True}
@task
def finalize(results: dict) -> str:
"""Combine results from parallel branches."""
payment = results["charge_payment"]
inventory = results["check_inventory"]
return f"Order complete: paid={payment['charged']}, stock={inventory['in_stock']}"
# Build workflow with parallel branches
workflow = (
Flow("order-v2")
.then(validate_order)
.fork()
.branch(charge_payment)
.branch(check_inventory)
.join(finalize)
.build()
)
status = run_durable_workflow(
workflow=workflow,
workflow_id="order-2",
initial_input=order_data,
backend=backend
)
print(status.output()) # "Order complete: paid=True, stock=True"

The fork()/join() pattern:

  1. Executes charge_payment and check_inventory in parallel
  2. Waits for both to complete
  3. Passes results to finalize as a dict keyed by task name

For production, use PostgresBackend for true durability across server restarts:

from sayiir import PostgresBackend
# Create database: createdb sayiir_orders
backend = PostgresBackend("postgresql://localhost/sayiir_orders")
status = run_durable_workflow(
workflow=workflow,
workflow_id="order-3",
initial_input=order_data,
backend=backend
)

Now your workflow state survives:

  • Process crashes
  • Server restarts
  • Deployments

You can query workflow status from anywhere:

from sayiir import get_workflow_status
status = get_workflow_status("order-3", backend)
if status.is_running():
print("Still processing...")
elif status.is_completed():
print(f"Done: {status.output()}")
elif status.is_failed():
print(f"Failed: {status.error()}")

Here’s the complete working example:

from sayiir import (
task,
Flow,
RetryPolicy,
run_durable_workflow,
PostgresBackend
)
# Task definitions
@task
def validate_order(order: dict) -> dict:
if order.get("amount", 0) <= 0:
raise ValueError("Invalid amount: must be positive")
if not order.get("customer_email"):
raise ValueError("Customer email is required")
return {**order, "validated": True}
@task(
timeout_secs=30,
retries=RetryPolicy(
max_retries=3,
initial_delay_secs=1.0,
backoff_multiplier=2.0
)
)
def charge_payment(order: dict) -> dict:
# Integrate with payment provider
return {**order, "payment_id": "pay_123abc", "charged": True}
@task
def check_inventory(order: dict) -> dict:
# Query inventory database
return {**order, "in_stock": True}
@task
def finalize(results: dict) -> str:
payment = results["charge_payment"]
inventory = results["check_inventory"]
return f"Order complete: paid={payment['charged']}, stock={inventory['in_stock']}"
# Workflow definition
workflow = (
Flow("order-processing")
.then(validate_order)
.fork()
.branch(charge_payment)
.branch(check_inventory)
.join(finalize)
.build()
)
# Execution
if __name__ == "__main__":
backend = PostgresBackend("postgresql://localhost/sayiir_orders")
order = {
"order_id": 1001,
"customer_email": "alice@example.com",
"product_id": "widget-42",
"amount": 99.99
}
status = run_durable_workflow(
workflow=workflow,
workflow_id=f"order-{order['order_id']}",
initial_input=order,
backend=backend
)
print(status.output())

Now that you have a working order processing pipeline, explore:

Try adding more features:

  • Send notification to warehouse system after payment
  • Add fraud detection as a parallel branch
  • Implement order cancellation with signals
  • Add webhook notifications for order status updates