Signals & Events
What are signals
Section titled “What are signals”Signals let workflows pause and wait for external events—like user approvals, webhook callbacks, or manual triggers—before continuing execution. When a workflow waits for a signal, it releases the worker thread, making it available for other work.
Signals are durable: they are persisted to the backend and survive process restarts. Once a signal is received, the workflow can be resumed from where it left off.
Waiting for signals
Section titled “Waiting for signals”Use wait_for_signal to pause a workflow until an external event arrives. The workflow will checkpoint its state and stop execution at that point.
from datetime import timedeltafrom sayiir import task, Flow
@taskdef create_order(order_id: int) -> dict: return {"order_id": order_id, "status": "pending"}
@taskdef fulfill(approval: dict) -> str: return f"Fulfilled order (approved by {approval.get('by', 'unknown')})"
workflow = ( Flow("approval") .then(create_order) .wait_for_signal("manager_approval", timeout=timedelta(hours=24)) .then(fulfill) .build())The workflow will:
- Execute
create_order - Checkpoint and wait for the “manager_approval” signal
- Once received, execute
fulfillwith the signal payload
use std::time::Duration;use sayiir_core::prelude::*;use serde_json::json;
let workflow = WorkflowBuilder::new(ctx) .then("create_order", |id: u64| async move { Ok(json!({ "order_id": id, "status": "pending" })) }) .wait_for_signal("approval_wait", "manager_approval", Some(Duration::from_secs(86400))) .then("fulfill", |approval: serde_json::Value| async move { Ok(format!("Fulfilled (approved by {})", approval["by"])) }) .build()?;The first string argument to wait_for_signal is the node name, the second is the signal name.
Sending signals
Section titled “Sending signals”External systems send signals using send_signal (Python) or backend.send_event (Rust). You must provide:
- The workflow instance ID
- The signal name (must match the name in
wait_for_signal) - A payload (any JSON-serializable data)
from sayiir import send_signal, InMemoryBackend
backend = InMemoryBackend()
# External system sends the signalsend_signal( "order-42", "manager_approval", {"by": "alice", "approved_at": "2026-02-15T10:30:00Z"}, backend=backend)use serde_json::json;
let payload = json!({ "by": "alice", "approved_at": "2026-02-15T10:30:00Z"});
backend.send_event("order-42", "manager_approval", payload).await?;Buffering semantics
Section titled “Buffering semantics”Signals are durably buffered in the backend. This means:
- If a signal is sent before the workflow reaches
wait_for_signal, it will be consumed immediately when the workflow gets there. - If a signal is sent after the workflow reaches
wait_for_signal, the workflow will resume as soon asresumeis called. - There are no race conditions—signals are never lost.
This buffering ensures that external systems can send signals at any time without worrying about timing.
from sayiir import run_durable_workflow, send_signal, resume_workflow
# Start the workflowstatus = run_durable_workflow(workflow, "order-42", 42, backend=backend)print(status) # WaitingForSignal
# Signal can be sent immediately or hours latersend_signal("order-42", "manager_approval", {"by": "alice"}, backend=backend)
# Resume to continue executionstatus = resume_workflow(workflow, "order-42", backend=backend)print(status) # Completeduse sayiir_runtime::CheckpointingRunner;
let runner = CheckpointingRunner::new(backend.clone());
// Start the workflowlet status = runner.run(&workflow, "order-42", 42u64).await?;println!("{:?}", status); // WaitingForSignal
// Signal can be sent immediately or hours laterlet payload = json!({"by": "alice"});backend.send_event("order-42", "manager_approval", payload).await?;
// Resume to continue executionlet status = runner.resume(&workflow, "order-42").await?;println!("{:?}", status); // CompletedTimeout patterns
Section titled “Timeout patterns”You can specify an optional timeout when waiting for a signal. If the timeout expires before the signal is received, the workflow will fail with a timeout error.
from datetime import timedelta
# Wait up to 24 hours for approvalworkflow = ( Flow("approval") .then(create_order) .wait_for_signal("manager_approval", timeout=timedelta(hours=24)) .then(fulfill) .build())
# If no signal arrives within 24 hours, the workflow failsuse std::time::Duration;
// Wait up to 24 hours for approvallet workflow = WorkflowBuilder::new(ctx) .then("create_order", |id: u64| async move { Ok(json!({ "order_id": id, "status": "pending" })) }) .wait_for_signal( "approval_wait", "manager_approval", Some(Duration::from_secs(86400)) // 24 hours ) .then("fulfill", |approval: serde_json::Value| async move { Ok(format!("Fulfilled (approved by {})", approval["by"])) }) .build()?;
// If no signal arrives within 24 hours, the workflow failsTimeouts are also durable. If the process crashes during a timeout period, the remaining time will be calculated correctly when the workflow is resumed.
Complete example
Section titled “Complete example”Here’s a full example showing the lifecycle of a workflow with signals:
from datetime import timedeltafrom sayiir import ( task, Flow, run_durable_workflow, send_signal, resume_workflow, InMemoryBackend)
@taskdef create_order(order_id: int) -> dict: print(f"Creating order {order_id}") return {"order_id": order_id, "status": "pending"}
@taskdef fulfill(approval: dict) -> str: approver = approval.get("by", "unknown") print(f"Fulfilling order (approved by {approver})") return f"Fulfilled order (approved by {approver})"
workflow = ( Flow("approval") .then(create_order) .wait_for_signal("manager_approval", timeout=timedelta(hours=24)) .then(fulfill) .build())
backend = InMemoryBackend()
# Start workflowstatus = run_durable_workflow(workflow, "order-42", 42, backend=backend)print(f"Status: {status}") # WaitingForSignal
# External system sends approvalsend_signal("order-42", "manager_approval", {"by": "alice"}, backend=backend)
# Resume workflowstatus = resume_workflow(workflow, "order-42", backend=backend)print(f"Final status: {status}") # Completeduse sayiir_core::prelude::*;use sayiir_persistence::InMemoryBackend;use sayiir_runtime::CheckpointingRunner;use serde_json::json;use std::time::Duration;
#[task]async fn create_order(id: u64) -> Result<serde_json::Value, BoxError> { println!("Creating order {}", id); Ok(json!({ "order_id": id, "status": "pending" }))}
#[task]async fn fulfill(approval: serde_json::Value) -> Result<String, BoxError> { let approver = approval["by"].as_str().unwrap_or("unknown"); println!("Fulfilling order (approved by {})", approver); Ok(format!("Fulfilled order (approved by {})", approver))}
#[tokio::main]async fn main() -> Result<(), BoxError> { let ctx = WorkflowContext::default(); let backend = InMemoryBackend::new(); let runner = CheckpointingRunner::new(backend.clone());
let workflow = WorkflowBuilder::new(ctx) .with_registry() .then_fn(create_order) .wait_for_signal( "approval_wait", "manager_approval", Some(Duration::from_secs(86400)) ) .then_fn(fulfill) .build()?;
// Start workflow let status = runner.run(&workflow, "order-42", 42u64).await?; println!("Status: {:?}", status); // WaitingForSignal
// External system sends approval let payload = json!({"by": "alice"}); backend.send_event("order-42", "manager_approval", payload).await?;
// Resume workflow let status = runner.resume(&workflow, "order-42").await?; println!("Final status: {:?}", status); // Completed
Ok(())}