Skip to content

Durable Workflows

Sayiir uses continuation-based execution: after each task completes, it saves a checkpoint (snapshot) of the workflow state — including the current position in the task graph. If the process crashes or restarts, you call resume to continue from the last checkpointed continuation. There is no replay — tasks that already completed are never re-executed.

This checkpoint-resume mechanism ensures that long-running workflows can survive process crashes, deployments, or infrastructure failures without losing progress.

Sayiir supports two backend types:

  • InMemoryBackend: Fast, ephemeral storage for development and testing. State is lost when the process stops.
  • PostgresBackend: Durable, persistent storage for production. State survives process restarts.
from sayiir import InMemoryBackend, PostgresBackend
# Development/testing
backend = InMemoryBackend()
# Production
backend = PostgresBackend(
host="localhost",
port=5432,
user="postgres",
password="password",
database="sayiir"
)

Sayiir provides full control over workflow execution with pause, unpause, cancel, and resume operations. These operations are all durable—the workflow state is persisted and can be resumed even after a process restart.

from sayiir import cancel_workflow
# Cancel a running or paused workflow
cancel_workflow("job-123", backend=backend)
from sayiir import pause_workflow, unpause_workflow
# Pause a running workflow
pause_workflow("job-123", backend=backend, reason="Maintenance window")
# Unpause to allow resumption
unpause_workflow("job-123", backend=backend)
from sayiir import resume_workflow
# Resume from last checkpoint
status = resume_workflow(workflow, "job-123", backend=backend)
print(f"Workflow status: {status}")

Delays are checkpointed just like task completions. If a workflow is waiting on a delay when the process crashes, the delay will be resumed from the correct point after recovery.

from datetime import timedelta
from sayiir import task, Flow, run_durable_workflow
@task
def send_email(user_id: int) -> str:
return f"Email sent to user {user_id}"
@task
def send_reminder(user_id: int) -> str:
return f"Reminder sent to user {user_id}"
# Wait 24 hours between email and reminder
workflow = (
Flow("email_campaign")
.then(send_email)
.delay(timedelta(hours=24))
.then(send_reminder)
.build()
)
status = run_durable_workflow(workflow, "campaign-1", 42, backend=backend)

If the process crashes during the 24-hour delay, calling resume will continue the delay from where it left off, not restart it from zero.

Every durable workflow run requires an instance_id. By default, calling run with an instance_id that already has a snapshot will fail — this prevents accidental overwrites of in-flight state. You can change this behavior with a conflict policy:

PolicyBehavior
fail (default)Raises an error if a snapshot already exists for this instance_id.
use_existingReturns the existing workflow status without re-running. Useful for at-least-once submission patterns where the caller may retry.
terminate_existingDeletes the existing snapshot and starts a fresh execution. Use when you want to force-restart a stuck or stale workflow.
from sayiir import run_durable_workflow, InMemoryBackend
backend = InMemoryBackend()
# Default: fail if instance already exists
status = run_durable_workflow(workflow, "order-42", input_data, backend=backend)
# Idempotent submission — safe to call multiple times
status = run_durable_workflow(
workflow, "order-42", input_data,
backend=backend,
conflict_policy="use_existing",
)
# Force restart
status = run_durable_workflow(
workflow, "order-42", input_data,
backend=backend,
conflict_policy="terminate_existing",
)

The convenience function run_workflow also accepts conflict_policy when used with instance_id:

from sayiir import run_workflow
result = run_workflow(
workflow, input_data,
instance_id="order-42",
backend=backend,
conflict_policy="use_existing",
)

When using the default fail policy, you can catch the specific error:

from sayiir import run_durable_workflow, InstanceAlreadyExistsError
try:
run_durable_workflow(workflow, "order-42", input_data, backend=backend)
except InstanceAlreadyExistsError:
print("Workflow already running for this instance ID")