Skip to content

Tutorial: Approval Workflow (Signals)

Learn how to build human-in-the-loop workflows using signals. We’ll create an expense approval system that waits for manager approval, handles timeouts, and auto-escalates to upper management.

An expense approval workflow that:

  • Submits expense for approval
  • Waits for manager decision (with 48-hour timeout)
  • Processes approved expenses
  • Auto-escalates to director on timeout

Signals let workflows pause and wait for external events like:

  • User approvals
  • Webhook callbacks
  • Manual interventions
  • Cancellation requests

Traditional workflows execute start-to-finish without human input. With signals:

  1. Workflow reaches a wait_for_signal() step and parks
  2. External system sends a signal with approval/rejection data
  3. Workflow resumes and continues with the signal payload
  4. Optional timeout triggers escalation if no response
from datetime import timedelta
from sayiir import (
task,
Flow,
run_durable_workflow,
send_signal,
resume_workflow,
InMemoryBackend
)
@task
def submit_expense(expense: dict) -> dict:
"""Initial expense submission."""
return {
**expense,
"status": "pending_approval",
"submitted_at": "2026-02-15T10:00:00Z"
}
@task
def process_approved(approval: dict) -> str:
"""Process approved expense for reimbursement."""
approver = approval.get("approver", "unknown")
notes = approval.get("notes", "")
# In production: trigger payment system
# stripe.Transfer.create(amount=..., destination=...)
return f"Expense approved by {approver} — processing reimbursement. Notes: {notes}"
# Build workflow with signal wait
workflow = (
Flow("expense-approval")
.then(submit_expense)
.wait_for_signal("manager_approval", timeout=timedelta(hours=48))
.then(process_approved)
.build()
)
backend = InMemoryBackend()
# Step 1: Submit expense
expense_data = {
"employee": "Alice Smith",
"amount": 250.00,
"category": "travel",
"description": "Client meeting transportation"
}
status = run_durable_workflow(
workflow=workflow,
workflow_id="exp-001",
initial_input=expense_data,
backend=backend
)
print(f"Workflow status: {status.state()}") # "waiting_for_signal"
print("Expense submitted — awaiting manager approval...")
# Step 2: Manager approves (could happen minutes or days later)
send_signal(
workflow_id="exp-001",
signal_name="manager_approval",
payload={
"approver": "Bob Johnson",
"decision": "approved",
"notes": "Valid expense, approved"
},
backend=backend
)
print("Signal sent — resuming workflow...")
# Step 3: Resume workflow to process approval
status = resume_workflow(
workflow=workflow,
workflow_id="exp-001",
backend=backend
)
print(f"Final status: {status.state()}") # "completed"
print(f"Result: {status.output()}")
# "Expense approved by Bob Johnson — processing reimbursement. Notes: Valid expense, approved"

Add a branch to handle rejected expenses:

@task
def process_rejected(rejection: dict) -> str:
"""Handle rejected expense."""
approver = rejection.get("approver", "unknown")
reason = rejection.get("reason", "No reason provided")
return f"Expense rejected by {approver}. Reason: {reason}"
@task
def route_decision(approval: dict) -> dict:
"""Route based on approval decision."""
if approval.get("decision") == "approved":
return approval
else:
raise ValueError(f"Rejected: {approval.get('reason')}")
# Workflow with rejection handling
workflow = (
Flow("expense-approval-v2")
.then(submit_expense)
.wait_for_signal("manager_approval", timeout=timedelta(hours=48))
.then(route_decision)
.then(process_approved)
.build()
)

What happens when the 48-hour timeout expires?

from sayiir import get_workflow_status, WorkflowState
# Submit expense
status = run_durable_workflow(
workflow=workflow,
workflow_id="exp-002",
initial_input=expense_data,
backend=backend
)
# Simulate 48 hours passing with no approval...
# (In production, a background job would check timeouts)
# After timeout, workflow fails with timeout error
status = get_workflow_status("exp-002", backend)
if status.state() == WorkflowState.FAILED:
error = status.error()
if "timeout" in error.lower():
print("Approval timeout — escalating to director")
# Trigger escalation workflow

Implement automatic escalation with nested workflows:

@task
def escalate_to_director(expense: dict) -> dict:
"""Escalate to director with higher timeout."""
return {**expense, "escalated": True, "escalated_to": "director"}
# Escalation workflow
escalation_workflow = (
Flow("expense-escalation")
.then(escalate_to_director)
.wait_for_signal("director_approval", timeout=timedelta(hours=72))
.then(process_approved)
.build()
)
# Main workflow with fallback
def submit_with_escalation(expense: dict) -> str:
"""Submit expense with automatic escalation."""
# Try manager approval first
status = run_durable_workflow(
workflow=workflow,
workflow_id=f"exp-{expense['id']}",
initial_input=expense,
backend=backend
)
# Wait for signal (this would be handled by your API/webhook)
status = resume_workflow(workflow, f"exp-{expense['id']}", backend)
# If manager timeout, escalate
if status.is_failed() and "timeout" in str(status.error()):
print("Manager timeout — escalating to director")
status = run_durable_workflow(
workflow=escalation_workflow,
workflow_id=f"exp-{expense['id']}-escalated",
initial_input=expense,
backend=backend
)
return status.output()
use sayiir_runtime::prelude::*;
use sayiir_persistence::InMemoryBackend;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Expense {
employee: String,
amount: f64,
category: String,
description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PendingExpense {
employee: String,
amount: f64,
status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Approval {
approver: String,
decision: String,
notes: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let backend = InMemoryBackend::new();
let ctx = WorkflowContext::new();
// Build workflow
let workflow = WorkflowBuilder::new(ctx)
.then("submit_expense", |expense: Expense| async move {
Ok(PendingExpense {
employee: expense.employee,
amount: expense.amount,
status: "pending_approval".into(),
})
})
.wait_for_signal(
"approval_wait",
"manager_approval",
Some(Duration::from_secs(48 * 3600)) // 48 hours
)
.then("process_approved", |approval: Approval| async move {
Ok(format!(
"Expense approved by {} — processing reimbursement. Notes: {}",
approval.approver,
approval.notes
))
})
.build();
let runner = CheckpointingRunner::new(backend.clone());
// Step 1: Submit expense
let expense = Expense {
employee: "Alice Smith".into(),
amount: 250.00,
category: "travel".into(),
description: "Client meeting transportation".into(),
};
let status = runner.run(&workflow, "exp-001", expense).await?;
println!("Workflow parked, awaiting signal...");
// Step 2: Manager approves
let approval = Approval {
approver: "Bob Johnson".into(),
decision: "approved".into(),
notes: "Valid expense, approved".into(),
};
backend.send_event("exp-001", "manager_approval", serde_json::to_value(approval)?).await?;
println!("Signal sent!");
// Step 3: Resume workflow
let status = runner.resume(&workflow, "exp-001").await?;
match status {
WorkflowStatus::Completed(output) => {
println!("Completed: {}", output);
}
_ => {
println!("Status: {:?}", status);
}
}
Ok(())
}

Use InMemoryBackend for fast testing without database setup:

from sayiir import InMemoryBackend, get_workflow_status
backend = InMemoryBackend()
# Submit workflow
status = run_durable_workflow(workflow, "test-001", expense_data, backend=backend)
# Simulate delay
import time
time.sleep(0.1)
# Send signal
send_signal("test-001", "manager_approval", {"decision": "approved"}, backend=backend)
# Resume
status = resume_workflow(workflow, "test-001", backend=backend)
assert status.is_completed()
assert "approved" in status.output()

Benefits for testing:

  • No Postgres setup required
  • Instant execution
  • Perfect for unit tests
  • Same API as PostgresBackend

Here’s how to integrate with a web API:

from fastapi import FastAPI, HTTPException
from sayiir import PostgresBackend, run_durable_workflow, send_signal, get_workflow_status
app = FastAPI()
backend = PostgresBackend("postgresql://localhost/expenses")
@app.post("/expenses")
async def submit_expense(expense: dict):
"""Submit new expense for approval."""
workflow_id = f"exp-{expense['id']}"
status = run_durable_workflow(
workflow=workflow,
workflow_id=workflow_id,
initial_input=expense,
backend=backend
)
return {
"workflow_id": workflow_id,
"status": "pending_approval"
}
@app.post("/expenses/{workflow_id}/approve")
async def approve_expense(workflow_id: str, approval: dict):
"""Manager approves expense."""
send_signal(
workflow_id=workflow_id,
signal_name="manager_approval",
payload=approval,
backend=backend
)
# Resume workflow
status = resume_workflow(workflow, workflow_id, backend)
return {
"workflow_id": workflow_id,
"status": status.state(),
"result": status.output() if status.is_completed() else None
}
@app.get("/expenses/{workflow_id}")
async def get_expense_status(workflow_id: str):
"""Check expense approval status."""
status = get_workflow_status(workflow_id, backend)
return {
"workflow_id": workflow_id,
"status": status.state(),
"result": status.output() if status.is_completed() else None,
"error": status.error() if status.is_failed() else None
}
from datetime import timedelta
from sayiir import (
task,
Flow,
run_durable_workflow,
send_signal,
resume_workflow,
InMemoryBackend,
get_workflow_status,
WorkflowState
)
@task
def submit_expense(expense: dict) -> dict:
return {**expense, "status": "pending_approval"}
@task
def process_approved(approval: dict) -> str:
return f"Expense approved by {approval['approver']} — processing reimbursement"
@task
def escalate_to_director(expense: dict) -> dict:
return {**expense, "escalated": True}
# Manager approval workflow
manager_workflow = (
Flow("expense-approval")
.then(submit_expense)
.wait_for_signal("manager_approval", timeout=timedelta(hours=48))
.then(process_approved)
.build()
)
# Director escalation workflow
director_workflow = (
Flow("expense-escalation")
.then(escalate_to_director)
.wait_for_signal("director_approval", timeout=timedelta(hours=72))
.then(process_approved)
.build()
)
def process_expense(expense: dict, backend: InMemoryBackend) -> str:
workflow_id = f"exp-{expense['id']}"
# Try manager approval
status = run_durable_workflow(manager_workflow, workflow_id, expense, backend=backend)
# In production: API endpoint would send signal
# For demo, send signal immediately
send_signal(workflow_id, "manager_approval",
{"approver": "Bob", "decision": "approved"}, backend)
status = resume_workflow(manager_workflow, workflow_id, backend)
# Handle timeout
if status.is_failed() and "timeout" in str(status.error()):
print("Escalating to director...")
escalation_id = f"{workflow_id}-escalated"
status = run_durable_workflow(director_workflow, escalation_id, expense, backend=backend)
return status.output() if status.is_completed() else "Processing..."
if __name__ == "__main__":
backend = InMemoryBackend()
expense = {
"id": 1001,
"employee": "Alice Smith",
"amount": 250.00,
"category": "travel"
}
result = process_expense(expense, backend)
print(result)

You now have a working approval workflow with signals. Explore more:

Try extending the workflow:

  • Add multiple approval levels (manager → director → CFO)
  • Implement partial approvals (approve portion of expense)
  • Add expense modification requests
  • Track approval history and audit trails