Tutorial: Approval Workflow (Signals)
Learn how to build human-in-the-loop workflows using signals. We’ll create an expense approval system that waits for manager approval, handles timeouts, and auto-escalates to upper management.
What We’re Building
Section titled “What We’re Building”An expense approval workflow that:
- Submits expense for approval
- Waits for manager decision (with 48-hour timeout)
- Processes approved expenses
- Auto-escalates to director on timeout
Signals let workflows pause and wait for external events like:
- User approvals
- Webhook callbacks
- Manual interventions
- Cancellation requests
The Signal Pattern
Section titled “The Signal Pattern”Traditional workflows execute start-to-finish without human input. With signals:
- Workflow reaches a
wait_for_signal()step and parks - External system sends a signal with approval/rejection data
- Workflow resumes and continues with the signal payload
- Optional timeout triggers escalation if no response
Python Implementation
Section titled “Python Implementation”Basic Approval Flow
Section titled “Basic Approval Flow”from datetime import timedeltafrom sayiir import ( task, Flow, run_durable_workflow, send_signal, resume_workflow, InMemoryBackend)
@taskdef submit_expense(expense: dict) -> dict: """Initial expense submission.""" return { **expense, "status": "pending_approval", "submitted_at": "2026-02-15T10:00:00Z" }
@taskdef process_approved(approval: dict) -> str: """Process approved expense for reimbursement.""" approver = approval.get("approver", "unknown") notes = approval.get("notes", "")
# In production: trigger payment system # stripe.Transfer.create(amount=..., destination=...)
return f"Expense approved by {approver} — processing reimbursement. Notes: {notes}"
# Build workflow with signal waitworkflow = ( Flow("expense-approval") .then(submit_expense) .wait_for_signal("manager_approval", timeout=timedelta(hours=48)) .then(process_approved) .build())
backend = InMemoryBackend()
# Step 1: Submit expenseexpense_data = { "employee": "Alice Smith", "amount": 250.00, "category": "travel", "description": "Client meeting transportation"}
status = run_durable_workflow( workflow=workflow, workflow_id="exp-001", initial_input=expense_data, backend=backend)
print(f"Workflow status: {status.state()}") # "waiting_for_signal"print("Expense submitted — awaiting manager approval...")
# Step 2: Manager approves (could happen minutes or days later)send_signal( workflow_id="exp-001", signal_name="manager_approval", payload={ "approver": "Bob Johnson", "decision": "approved", "notes": "Valid expense, approved" }, backend=backend)
print("Signal sent — resuming workflow...")
# Step 3: Resume workflow to process approvalstatus = resume_workflow( workflow=workflow, workflow_id="exp-001", backend=backend)
print(f"Final status: {status.state()}") # "completed"print(f"Result: {status.output()}")# "Expense approved by Bob Johnson — processing reimbursement. Notes: Valid expense, approved"Handling Rejections
Section titled “Handling Rejections”Add a branch to handle rejected expenses:
@taskdef process_rejected(rejection: dict) -> str: """Handle rejected expense.""" approver = rejection.get("approver", "unknown") reason = rejection.get("reason", "No reason provided")
return f"Expense rejected by {approver}. Reason: {reason}"
@taskdef route_decision(approval: dict) -> dict: """Route based on approval decision.""" if approval.get("decision") == "approved": return approval else: raise ValueError(f"Rejected: {approval.get('reason')}")
# Workflow with rejection handlingworkflow = ( Flow("expense-approval-v2") .then(submit_expense) .wait_for_signal("manager_approval", timeout=timedelta(hours=48)) .then(route_decision) .then(process_approved) .build())Timeout Handling
Section titled “Timeout Handling”What happens when the 48-hour timeout expires?
from sayiir import get_workflow_status, WorkflowState
# Submit expensestatus = run_durable_workflow( workflow=workflow, workflow_id="exp-002", initial_input=expense_data, backend=backend)
# Simulate 48 hours passing with no approval...# (In production, a background job would check timeouts)
# After timeout, workflow fails with timeout errorstatus = get_workflow_status("exp-002", backend)
if status.state() == WorkflowState.FAILED: error = status.error() if "timeout" in error.lower(): print("Approval timeout — escalating to director") # Trigger escalation workflowAuto-Escalation Pattern
Section titled “Auto-Escalation Pattern”Implement automatic escalation with nested workflows:
@taskdef escalate_to_director(expense: dict) -> dict: """Escalate to director with higher timeout.""" return {**expense, "escalated": True, "escalated_to": "director"}
# Escalation workflowescalation_workflow = ( Flow("expense-escalation") .then(escalate_to_director) .wait_for_signal("director_approval", timeout=timedelta(hours=72)) .then(process_approved) .build())
# Main workflow with fallbackdef submit_with_escalation(expense: dict) -> str: """Submit expense with automatic escalation.""" # Try manager approval first status = run_durable_workflow( workflow=workflow, workflow_id=f"exp-{expense['id']}", initial_input=expense, backend=backend )
# Wait for signal (this would be handled by your API/webhook) status = resume_workflow(workflow, f"exp-{expense['id']}", backend)
# If manager timeout, escalate if status.is_failed() and "timeout" in str(status.error()): print("Manager timeout — escalating to director") status = run_durable_workflow( workflow=escalation_workflow, workflow_id=f"exp-{expense['id']}-escalated", initial_input=expense, backend=backend )
return status.output()Rust Implementation
Section titled “Rust Implementation”use sayiir_runtime::prelude::*;use sayiir_persistence::InMemoryBackend;use serde::{Deserialize, Serialize};use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]struct Expense { employee: String, amount: f64, category: String, description: String,}
#[derive(Debug, Clone, Serialize, Deserialize)]struct PendingExpense { employee: String, amount: f64, status: String,}
#[derive(Debug, Clone, Serialize, Deserialize)]struct Approval { approver: String, decision: String, notes: String,}
#[tokio::main]async fn main() -> anyhow::Result<()> { let backend = InMemoryBackend::new(); let ctx = WorkflowContext::new();
// Build workflow let workflow = WorkflowBuilder::new(ctx) .then("submit_expense", |expense: Expense| async move { Ok(PendingExpense { employee: expense.employee, amount: expense.amount, status: "pending_approval".into(), }) }) .wait_for_signal( "approval_wait", "manager_approval", Some(Duration::from_secs(48 * 3600)) // 48 hours ) .then("process_approved", |approval: Approval| async move { Ok(format!( "Expense approved by {} — processing reimbursement. Notes: {}", approval.approver, approval.notes )) }) .build();
let runner = CheckpointingRunner::new(backend.clone());
// Step 1: Submit expense let expense = Expense { employee: "Alice Smith".into(), amount: 250.00, category: "travel".into(), description: "Client meeting transportation".into(), };
let status = runner.run(&workflow, "exp-001", expense).await?; println!("Workflow parked, awaiting signal...");
// Step 2: Manager approves let approval = Approval { approver: "Bob Johnson".into(), decision: "approved".into(), notes: "Valid expense, approved".into(), };
backend.send_event("exp-001", "manager_approval", serde_json::to_value(approval)?).await?; println!("Signal sent!");
// Step 3: Resume workflow let status = runner.resume(&workflow, "exp-001").await?;
match status { WorkflowStatus::Completed(output) => { println!("Completed: {}", output); } _ => { println!("Status: {:?}", status); } }
Ok(())}use sayiir_runtime::prelude::*;use sayiir_persistence::InMemoryBackend;use serde::{Deserialize, Serialize};use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]struct Expense { employee: String, amount: f64, escalated: bool,}
#[derive(Debug, Clone, Serialize, Deserialize)]struct Approval { approver: String, decision: String,}
async fn run_with_escalation( expense: Expense, workflow_id: &str, backend: InMemoryBackend) -> anyhow::Result<String> { let ctx = WorkflowContext::new(); let runner = CheckpointingRunner::new(backend.clone());
// Manager approval workflow (48h timeout) let manager_workflow = WorkflowBuilder::new(ctx.clone()) .then("submit", |e: Expense| async move { Ok(e) }) .wait_for_signal("wait", "manager_approval", Some(Duration::from_secs(48 * 3600))) .then("process", |a: Approval| async move { Ok(format!("Approved by manager: {}", a.approver)) }) .build();
// Try manager approval let status = runner.run(&manager_workflow, workflow_id, expense.clone()).await?;
// Wait for signal... let status = runner.resume(&manager_workflow, workflow_id).await?;
match status { WorkflowStatus::Completed(output) => Ok(output), WorkflowStatus::Failed(err) if err.contains("timeout") => { println!("Manager timeout — escalating to director");
// Director approval workflow (72h timeout) let director_workflow = WorkflowBuilder::new(ctx) .then("escalate", |mut e: Expense| async move { e.escalated = true; Ok(e) }) .wait_for_signal("wait", "director_approval", Some(Duration::from_secs(72 * 3600))) .then("process", |a: Approval| async move { Ok(format!("Approved by director: {}", a.approver)) }) .build();
let escalation_id = format!("{}-escalated", workflow_id); let status = runner.run(&director_workflow, &escalation_id, expense).await?;
// In production, wait for director signal... Ok("Escalated to director".into()) } _ => Err(anyhow::anyhow!("Unexpected workflow state")) }}
#[tokio::main]async fn main() -> anyhow::Result<()> { let backend = InMemoryBackend::new();
let expense = Expense { employee: "Alice Smith".into(), amount: 1500.00, // High amount requiring director approval escalated: false, };
let result = run_with_escalation(expense, "exp-high-001", backend).await?; println!("Result: {}", result);
Ok(())}Testing with InMemory Backend
Section titled “Testing with InMemory Backend”Use InMemoryBackend for fast testing without database setup:
from sayiir import InMemoryBackend, get_workflow_status
backend = InMemoryBackend()
# Submit workflowstatus = run_durable_workflow(workflow, "test-001", expense_data, backend=backend)
# Simulate delayimport timetime.sleep(0.1)
# Send signalsend_signal("test-001", "manager_approval", {"decision": "approved"}, backend=backend)
# Resumestatus = resume_workflow(workflow, "test-001", backend=backend)
assert status.is_completed()assert "approved" in status.output()Benefits for testing:
- No Postgres setup required
- Instant execution
- Perfect for unit tests
- Same API as PostgresBackend
Integration Example
Section titled “Integration Example”Here’s how to integrate with a web API:
from fastapi import FastAPI, HTTPExceptionfrom sayiir import PostgresBackend, run_durable_workflow, send_signal, get_workflow_status
app = FastAPI()backend = PostgresBackend("postgresql://localhost/expenses")
@app.post("/expenses")async def submit_expense(expense: dict): """Submit new expense for approval.""" workflow_id = f"exp-{expense['id']}"
status = run_durable_workflow( workflow=workflow, workflow_id=workflow_id, initial_input=expense, backend=backend )
return { "workflow_id": workflow_id, "status": "pending_approval" }
@app.post("/expenses/{workflow_id}/approve")async def approve_expense(workflow_id: str, approval: dict): """Manager approves expense.""" send_signal( workflow_id=workflow_id, signal_name="manager_approval", payload=approval, backend=backend )
# Resume workflow status = resume_workflow(workflow, workflow_id, backend)
return { "workflow_id": workflow_id, "status": status.state(), "result": status.output() if status.is_completed() else None }
@app.get("/expenses/{workflow_id}")async def get_expense_status(workflow_id: str): """Check expense approval status.""" status = get_workflow_status(workflow_id, backend)
return { "workflow_id": workflow_id, "status": status.state(), "result": status.output() if status.is_completed() else None, "error": status.error() if status.is_failed() else None }Full Code Listing
Section titled “Full Code Listing”from datetime import timedeltafrom sayiir import ( task, Flow, run_durable_workflow, send_signal, resume_workflow, InMemoryBackend, get_workflow_status, WorkflowState)
@taskdef submit_expense(expense: dict) -> dict: return {**expense, "status": "pending_approval"}
@taskdef process_approved(approval: dict) -> str: return f"Expense approved by {approval['approver']} — processing reimbursement"
@taskdef escalate_to_director(expense: dict) -> dict: return {**expense, "escalated": True}
# Manager approval workflowmanager_workflow = ( Flow("expense-approval") .then(submit_expense) .wait_for_signal("manager_approval", timeout=timedelta(hours=48)) .then(process_approved) .build())
# Director escalation workflowdirector_workflow = ( Flow("expense-escalation") .then(escalate_to_director) .wait_for_signal("director_approval", timeout=timedelta(hours=72)) .then(process_approved) .build())
def process_expense(expense: dict, backend: InMemoryBackend) -> str: workflow_id = f"exp-{expense['id']}"
# Try manager approval status = run_durable_workflow(manager_workflow, workflow_id, expense, backend=backend)
# In production: API endpoint would send signal # For demo, send signal immediately send_signal(workflow_id, "manager_approval", {"approver": "Bob", "decision": "approved"}, backend)
status = resume_workflow(manager_workflow, workflow_id, backend)
# Handle timeout if status.is_failed() and "timeout" in str(status.error()): print("Escalating to director...") escalation_id = f"{workflow_id}-escalated" status = run_durable_workflow(director_workflow, escalation_id, expense, backend=backend)
return status.output() if status.is_completed() else "Processing..."
if __name__ == "__main__": backend = InMemoryBackend()
expense = { "id": 1001, "employee": "Alice Smith", "amount": 250.00, "category": "travel" }
result = process_expense(expense, backend) print(result)use sayiir_runtime::prelude::*;use sayiir_persistence::InMemoryBackend;use serde::{Deserialize, Serialize};use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]struct Expense { id: u32, employee: String, amount: f64,}
#[derive(Debug, Clone, Serialize, Deserialize)]struct Approval { approver: String, decision: String,}
#[tokio::main]async fn main() -> anyhow::Result<()> { let backend = InMemoryBackend::new(); let runner = CheckpointingRunner::new(backend.clone());
// Build workflow let ctx = WorkflowContext::new(); let workflow = WorkflowBuilder::new(ctx) .then("submit", |expense: Expense| async move { println!("Submitted expense {} by {}", expense.id, expense.employee); Ok(expense) }) .wait_for_signal("wait", "manager_approval", Some(Duration::from_secs(48 * 3600))) .then("process", |approval: Approval| async move { Ok(format!("Approved by {}", approval.approver)) }) .build();
let expense = Expense { id: 1001, employee: "Alice Smith".into(), amount: 250.00, };
// Submit let status = runner.run(&workflow, "exp-1001", expense).await?; println!("Workflow waiting for approval...");
// Approve let approval = Approval { approver: "Bob Johnson".into(), decision: "approved".into(), }; backend.send_event("exp-1001", "manager_approval", serde_json::to_value(approval)?).await?;
// Resume let status = runner.resume(&workflow, "exp-1001").await?;
if let WorkflowStatus::Completed(output) = status { println!("Result: {}", output); }
Ok(())}Next Steps
Section titled “Next Steps”You now have a working approval workflow with signals. Explore more:
- Signals Guide — Advanced signal patterns and best practices
- Webhooks Integration — Connect external systems via webhooks
- Timeout Strategies — Handle timeouts gracefully
- Multi-Step Approvals — Chain multiple approval stages
Try extending the workflow:
- Add multiple approval levels (manager → director → CFO)
- Implement partial approvals (approve portion of expense)
- Add expense modification requests
- Track approval history and audit trails