Tutorial: Order Processing (Python)
Learn how to build a production-ready order processing pipeline with Sayiir. We’ll start with a simple sequential flow and progressively add durability, retries, and parallelism.
What We’re Building
Section titled “What We’re Building”An order processing pipeline that:
- Validates incoming orders
- Charges payment (with automatic retries)
- Checks inventory
- Sends confirmation emails
By the end, you’ll have a durable workflow that survives crashes and scales to handle thousands of orders.
Project Setup
Section titled “Project Setup”First, create a new Python project and install Sayiir:
mkdir order-processor && cd order-processorpython -m venv venvsource venv/bin/activate # On Windows: venv\Scripts\activatepip install sayiirDefine Tasks
Section titled “Define Tasks”Tasks are the building blocks of workflows. Let’s define our order processing tasks with proper error handling and retry policies:
from sayiir import task, RetryPolicy
@taskdef validate_order(order: dict) -> dict: """Validate order data and business rules.""" if order.get("amount", 0) <= 0: raise ValueError("Invalid amount: must be positive")
if not order.get("customer_email"): raise ValueError("Customer email is required")
return {**order, "validated": True}
@task( timeout_secs=30, retries=RetryPolicy( max_retries=3, initial_delay_secs=1.0, backoff_multiplier=2.0 ))def charge_payment(order: dict) -> dict: """Charge payment with automatic retries.""" # In production: integrate with Stripe, Square, etc. # payment_intent = stripe.PaymentIntent.create( # amount=int(order["amount"] * 100), # currency="usd", # customer=order["customer_id"] # )
return { **order, "payment_id": "pay_123abc", "charged": True }
@taskdef send_confirmation(order: dict) -> str: """Send order confirmation email.""" # In production: integrate with SendGrid, Postmark, etc. # send_email( # to=order["customer_email"], # subject=f"Order {order['order_id']} Confirmed", # body=f"Payment ID: {order['payment_id']}" # )
return f"Order {order['order_id']} confirmed (payment: {order['payment_id']})"Notice the @task decorator configures:
- Timeout: Fails if charge_payment takes longer than 30 seconds
- Retries: Automatically retries failed payments up to 3 times with exponential backoff (1s, 2s, 4s)
Build a Simple Workflow
Section titled “Build a Simple Workflow”Let’s compose these tasks into a sequential workflow:
from sayiir import Flow, run_workflow
# Create a sequential workflowworkflow = ( Flow("order-processing") .then(validate_order) .then(charge_payment) .then(send_confirmation) .build())
# Execute the workfloworder_data = { "order_id": 1, "customer_email": "alice@example.com", "amount": 99.99}
result = run_workflow(workflow, order_data)print(result) # "Order 1 confirmed (payment: pay_123abc)"This works great for simple cases, but what happens if your process crashes during payment? The order might be charged twice on restart.
Make It Durable
Section titled “Make It Durable”Switch to run_durable_workflow to get automatic checkpointing and crash recovery:
from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
status = run_durable_workflow( workflow=workflow, workflow_id="order-1", initial_input=order_data, backend=backend)
print(status.is_completed()) # Trueprint(status.output()) # "Order 1 confirmed (payment: pay_123abc)"Now if your process crashes:
- Tasks are checkpointed after each step
- On restart, the workflow resumes from the last checkpoint
- Already-completed tasks are never re-executed
Testing durability: Try running this twice with the same workflow_id. The second run will immediately return the cached result without re-executing tasks.
Add Parallel Branches
Section titled “Add Parallel Branches”Real order processing needs to do multiple things at once. Let’s check inventory while charging payment:
@taskdef check_inventory(order: dict) -> dict: """Verify items are in stock.""" # In production: query inventory database # inventory = db.query("SELECT stock FROM products WHERE id = ?", order["product_id"])
return {**order, "in_stock": True}
@taskdef finalize(results: dict) -> str: """Combine results from parallel branches.""" payment = results["charge_payment"] inventory = results["check_inventory"]
return f"Order complete: paid={payment['charged']}, stock={inventory['in_stock']}"
# Build workflow with parallel branchesworkflow = ( Flow("order-v2") .then(validate_order) .fork() .branch(charge_payment) .branch(check_inventory) .join(finalize) .build())
status = run_durable_workflow( workflow=workflow, workflow_id="order-2", initial_input=order_data, backend=backend)
print(status.output()) # "Order complete: paid=True, stock=True"The fork()/join() pattern:
- Executes
charge_paymentandcheck_inventoryin parallel - Waits for both to complete
- Passes results to
finalizeas a dict keyed by task name
Switch to Postgres
Section titled “Switch to Postgres”For production, use PostgresBackend for true durability across server restarts:
from sayiir import PostgresBackend
# Create database: createdb sayiir_ordersbackend = PostgresBackend("postgresql://localhost/sayiir_orders")
status = run_durable_workflow( workflow=workflow, workflow_id="order-3", initial_input=order_data, backend=backend)Now your workflow state survives:
- Process crashes
- Server restarts
- Deployments
You can query workflow status from anywhere:
from sayiir import get_workflow_status
status = get_workflow_status("order-3", backend)if status.is_running(): print("Still processing...")elif status.is_completed(): print(f"Done: {status.output()}")elif status.is_failed(): print(f"Failed: {status.error()}")Full Code Listing
Section titled “Full Code Listing”Here’s the complete working example:
from sayiir import ( task, Flow, RetryPolicy, run_durable_workflow, PostgresBackend)
# Task definitions@taskdef validate_order(order: dict) -> dict: if order.get("amount", 0) <= 0: raise ValueError("Invalid amount: must be positive") if not order.get("customer_email"): raise ValueError("Customer email is required") return {**order, "validated": True}
@task( timeout_secs=30, retries=RetryPolicy( max_retries=3, initial_delay_secs=1.0, backoff_multiplier=2.0 ))def charge_payment(order: dict) -> dict: # Integrate with payment provider return {**order, "payment_id": "pay_123abc", "charged": True}
@taskdef check_inventory(order: dict) -> dict: # Query inventory database return {**order, "in_stock": True}
@taskdef finalize(results: dict) -> str: payment = results["charge_payment"] inventory = results["check_inventory"] return f"Order complete: paid={payment['charged']}, stock={inventory['in_stock']}"
# Workflow definitionworkflow = ( Flow("order-processing") .then(validate_order) .fork() .branch(charge_payment) .branch(check_inventory) .join(finalize) .build())
# Executionif __name__ == "__main__": backend = PostgresBackend("postgresql://localhost/sayiir_orders")
order = { "order_id": 1001, "customer_email": "alice@example.com", "product_id": "widget-42", "amount": 99.99 }
status = run_durable_workflow( workflow=workflow, workflow_id=f"order-{order['order_id']}", initial_input=order, backend=backend )
print(status.output())Next Steps
Section titled “Next Steps”Now that you have a working order processing pipeline, explore:
- Retry Policies — Fine-tune retry behavior for different failure scenarios
- Distributed Workers — Scale to multiple worker processes
- Signals — Add manual approval steps or cancellation
- Monitoring — Track workflow metrics and debug failures
Try adding more features:
- Send notification to warehouse system after payment
- Add fraud detection as a parallel branch
- Implement order cancellation with signals
- Add webhook notifications for order status updates