Durable Workflows
What makes a workflow durable
Section titled “What makes a workflow durable”Sayiir uses continuation-based execution: after each task completes, it saves a checkpoint (snapshot) of the workflow state — including the current position in the task graph. If the process crashes or restarts, you call resume to continue from the last checkpointed continuation. There is no replay — tasks that already completed are never re-executed.
This checkpoint-resume mechanism ensures that long-running workflows can survive process crashes, deployments, or infrastructure failures without losing progress.
InMemory vs PostgreSQL
Section titled “InMemory vs PostgreSQL”Sayiir supports two backend types:
- InMemoryBackend: Fast, ephemeral storage for development and testing. State is lost when the process stops.
- PostgresBackend: Durable, persistent storage for production. State survives process restarts.
from sayiir import InMemoryBackend, PostgresBackend
# Development/testingbackend = InMemoryBackend()
# Productionbackend = PostgresBackend( host="localhost", port=5432, user="postgres", password="password", database="sayiir")import { InMemoryBackend, PostgresBackend } from "sayiir";
// Development/testingconst backend = new InMemoryBackend();
// Productionconst backend = PostgresBackend.connect( "postgresql://postgres:password@localhost:5432/sayiir");use sayiir_persistence::InMemoryBackend;use sayiir_postgres::PostgresBackend;use sayiir_runtime::prelude::*;
// Development/testinglet backend = InMemoryBackend::new();
// Productionlet backend = PostgresBackend::<JsonCodec>::connect( "postgresql://localhost/sayiir").await?;Lifecycle operations
Section titled “Lifecycle operations”Sayiir provides full control over workflow execution with pause, unpause, cancel, and resume operations. These operations are all durable—the workflow state is persisted and can be resumed even after a process restart.
Cancel a workflow
Section titled “Cancel a workflow”from sayiir import cancel_workflow
# Cancel a running or paused workflowcancel_workflow("job-123", backend=backend)import { cancelWorkflow } from "sayiir";
// Cancel a running or paused workflowcancelWorkflow("job-123", backend, { reason: "No longer needed", cancelledBy: "admin"});use sayiir_runtime::WorkflowClient;
let client = WorkflowClient::from_shared(runner.backend().clone());client.cancel("job-123", Some("No longer needed".into()), Some("admin".into())).await?;Pause and unpause
Section titled “Pause and unpause”from sayiir import pause_workflow, unpause_workflow
# Pause a running workflowpause_workflow("job-123", backend=backend, reason="Maintenance window")
# Unpause to allow resumptionunpause_workflow("job-123", backend=backend)import { pauseWorkflow, unpauseWorkflow } from "sayiir";
// Pause a running workflowpauseWorkflow("job-123", backend, { reason: "Maintenance window", pausedBy: "ops-team"});
// Unpause to allow resumptionunpauseWorkflow("job-123", backend);use sayiir_runtime::WorkflowClient;
let client = WorkflowClient::from_shared(runner.backend().clone());
// Pause a running workflowclient.pause("job-123", Some("Maintenance window".into()), Some("ops-team".into())).await?;
// Unpause to allow resumptionclient.unpause("job-123").await?;Resume after crash or pause
Section titled “Resume after crash or pause”from sayiir import resume_workflow
# Resume from last checkpointstatus = resume_workflow(workflow, "job-123", backend=backend)print(f"Workflow status: {status}")import { resumeWorkflow } from "sayiir";
// Resume from last checkpointconst status = resumeWorkflow(workflow, "job-123", backend);console.log(`Workflow status: ${status.status}`);// Resume from last checkpointlet status = runner.resume(&workflow, "job-123").await?;println!("Workflow status: {:?}", status);Durable delays
Section titled “Durable delays”Delays are checkpointed just like task completions. If a workflow is waiting on a delay when the process crashes, the delay will be resumed from the correct point after recovery.
from datetime import timedeltafrom sayiir import task, Flow, run_durable_workflow
@taskdef send_email(user_id: int) -> str: return f"Email sent to user {user_id}"
@taskdef send_reminder(user_id: int) -> str: return f"Reminder sent to user {user_id}"
# Wait 24 hours between email and reminderworkflow = ( Flow("email_campaign") .then(send_email) .delay(timedelta(hours=24)) .then(send_reminder) .build())
status = run_durable_workflow(workflow, "campaign-1", 42, backend=backend)import { task, flow, runDurableWorkflow } from "sayiir";
const sendEmail = task("send_email", (userId: number) => { return `Email sent to user ${userId}`;});
const sendReminder = task("send_reminder", (userId: number) => { return `Reminder sent to user ${userId}`;});
// Wait 24 hours between email and reminderconst workflow = flow<number>("email_campaign") .then(sendEmail) .delay("wait_24h", "24h") .then(sendReminder) .build();
const status = runDurableWorkflow(workflow, "campaign-1", 42, backend);use std::time::Duration;use sayiir_core::prelude::*;
let workflow = WorkflowBuilder::new(ctx) .then("send_email", |user_id: u64| async move { Ok(format!("Email sent to user {}", user_id)) }) .delay("wait_24h", Duration::from_secs(86400)) .then("send_reminder", |user_id: u64| async move { Ok(format!("Reminder sent to user {}", user_id)) }) .build()?;
let status = runner.run(&workflow, "campaign-1", 42u64).await?;If the process crashes during the 24-hour delay, calling resume will continue the delay from where it left off, not restart it from zero.
Idempotency and conflict policies
Section titled “Idempotency and conflict policies”Every durable workflow run requires an instance_id. By default, calling run with an instance_id that already has a snapshot will fail — this prevents accidental overwrites of in-flight state. You can change this behavior with a conflict policy:
| Policy | Behavior |
|---|---|
fail (default) | Raises an error if a snapshot already exists for this instance_id. |
use_existing | Returns the existing workflow status without re-running. Useful for at-least-once submission patterns where the caller may retry. |
terminate_existing | Deletes the existing snapshot and starts a fresh execution. Use when you want to force-restart a stuck or stale workflow. |
from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
# Default: fail if instance already existsstatus = run_durable_workflow(workflow, "order-42", input_data, backend=backend)
# Idempotent submission — safe to call multiple timesstatus = run_durable_workflow( workflow, "order-42", input_data, backend=backend, conflict_policy="use_existing",)
# Force restartstatus = run_durable_workflow( workflow, "order-42", input_data, backend=backend, conflict_policy="terminate_existing",)The convenience function run_workflow also accepts conflict_policy when used with instance_id:
from sayiir import run_workflow
result = run_workflow( workflow, input_data, instance_id="order-42", backend=backend, conflict_policy="use_existing",)import { runDurableWorkflow, runWorkflow, InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();
// Default: fail if instance already existsconst status = runDurableWorkflow(workflow, "order-42", input, backend);
// Idempotent submission — safe to call multiple timesconst status = runDurableWorkflow(workflow, "order-42", input, backend, "useExisting");
// Force restartconst status = runDurableWorkflow(workflow, "order-42", input, backend, "terminateExisting");
// Or via runWorkflow optionsconst result = await runWorkflow(workflow, input, { instanceId: "order-42", backend, conflictPolicy: "useExisting",});use sayiir_runtime::prelude::*;use sayiir_core::workflow::ConflictPolicy;
let backend = InMemoryBackend::new();
// Default: Faillet runner = CheckpointingRunner::new(backend.clone());let status = runner.run(&workflow, "order-42", input).await?;
// Idempotent submissionlet runner = CheckpointingRunner::new(backend.clone()) .with_conflict_policy(ConflictPolicy::UseExisting);let status = runner.run(&workflow, "order-42", input).await?;
// Force restartlet runner = CheckpointingRunner::new(backend.clone()) .with_conflict_policy(ConflictPolicy::TerminateExisting);let status = runner.run(&workflow, "order-42", input).await?;Catching duplicate instance errors
Section titled “Catching duplicate instance errors”When using the default fail policy, you can catch the specific error:
from sayiir import run_durable_workflow, InstanceAlreadyExistsError
try: run_durable_workflow(workflow, "order-42", input_data, backend=backend)except InstanceAlreadyExistsError: print("Workflow already running for this instance ID")import { runDurableWorkflow, WorkflowError } from "sayiir";
try { runDurableWorkflow(workflow, "order-42", input, backend);} catch (e) { if (e instanceof WorkflowError && e.message.includes("already exists")) { console.log("Workflow already running for this instance ID"); }}use sayiir_runtime::RuntimeError;
match runner.run(&workflow, "order-42", input).await { Err(RuntimeError::InstanceAlreadyExists(id)) => { println!("Workflow already running: {id}"); } other => { /* handle normally */ }}