Skip to content

Node.js API Reference

Create a named task with optional configuration.

import { task } from "sayiir";
const myTask = task("my-task", (input: { value: number }) => {
return { result: input.value * 2 };
});
const configured = task("configured-task", (input: string) => {
return processData(input);
}, {
timeout: "30s",
retries: 3,
tags: ["io", "external"],
description: "Processes input data",
});

Parameters:

  • id (string) — Unique task identifier
  • fn ((input: TIn) => TOut | Promise<TOut>) — The task function (sync or async)
  • opts (TaskOptions, optional) — Configuration options

TaskOptions:

  • timeout (Duration, optional) — Task timeout ("30s", "5m", 1000)
  • retries (number, optional) — Simple retry count with default backoff
  • retry (RetryPolicy, optional) — Full retry configuration
  • tags (string[], optional) — Categorization tags
  • description (string, optional) — Human-readable description
  • priority (number, optional) — Execution priority (1–5, lower = higher priority, default: 3)
  • input (ZodLike, optional) — Zod schema for input validation
  • output (ZodLike, optional) — Zod schema for output validation

The task() function attaches metadata to the function without wrapping it. The function can still be called directly for testing.

Returns: TaskFn<TIn, TOut> — A branded function with metadata properties.

Get the current task execution context from within a running task. Returns null if called outside of a task execution.

import { task, getTaskContext } from "sayiir";
const myTask = task("my-task", (data: { value: number }) => {
const ctx = getTaskContext();
if (ctx) {
console.log(`Running task ${ctx.taskId} in workflow ${ctx.workflowId}`);
console.log(`Instance: ${ctx.instanceId}`);
}
return { result: data.value * 2 };
});

Returns: TaskExecutionContext | null

Read-only context available from within a running task. Provides access to workflow and task metadata.

interface TaskExecutionContext {
workflowId: string;
instanceId: string;
taskId: string;
metadata: TaskMetadata;
workflowMetadata?: Record<string, unknown> | null;
}
interface TaskMetadata {
displayName?: string;
description?: string;
timeoutSecs?: number;
retries?: {
maxRetries: number;
initialDelaySecs: number;
backoffMultiplier: number;
maxDelaySecs?: number;
};
tags?: string[];
version?: string;
priority?: number;
}

Properties:

  • workflowId (string) — The workflow definition identifier
  • instanceId (string) — The workflow instance identifier (for durable workflows) or workflow ID (for in-memory runs)
  • taskId (string) — The current task identifier
  • metadata (TaskMetadata) — Task metadata (timeout, retries, tags, version, etc.)
const fetchData = task("fetch-data", async (url: string) => {
const ctx = getTaskContext();
if (ctx) {
console.log(`Timeout: ${ctx.metadata.timeoutSecs}s`);
console.log(`Tags: ${ctx.metadata.tags}`);
}
return doFetch(url);
}, { timeout: "30s", retries: 3, tags: ["io"] });

Create a new type-safe workflow builder. The generic parameter TInput sets the workflow’s input type.

import { flow } from "sayiir";
const builder = flow<number>("my-workflow");

Returns: Flow<TInput, TInput> — A flow builder for chaining.

Add a sequential task step. Two overloads:

// With a task() function — uses the task's ID and metadata
const workflow = flow<number>("pipeline")
.then(double)
.then(addTen)
.build();
// With an inline function — provide an ID
const workflow = flow<number>("pipeline")
.then("double", (x) => x * 2)
.then("format", (x) => `Result: ${x}`, { timeout: "10s" })
.build();
// Auto-generated ID (lambda_0, lambda_1, ...)
const workflow = flow<number>("pipeline")
.then((x) => x * 2)
.build();

Parameters (overload 1):

  • fn (TaskFn<TLast, TOut>) — A task() function

Parameters (overload 2):

  • id (string) — Step identifier
  • fn ((input: TLast) => TOut | Promise<TOut>) — Task function (sync or async)
  • opts (StepOptions, optional) — Step-level timeout, retries, or retry

Returns: Flow<TInput, Awaited<TOut>> — Updated flow with new output type.

Add a loop whose body repeats until it returns LoopResult.done().

import { flow, task, LoopResult } from "sayiir";
const refine = task("refine", (draft: string) => {
const improved = improve(draft);
return isGoodEnough(improved)
? LoopResult.done(improved)
: LoopResult.again(improved);
});
const workflow = flow<string>("iterative")
.then(initialAttempt)
.loop(refine, { maxIterations: 5 })
.then(finalize)
.build();

Parameters (overload 1):

  • fn (TaskFn<TLast, LoopResult<TOut>>) — A task() function returning LoopResult
  • opts (LoopOptions, optional) — maxIterations (default: 10, must be ≥ 1), onMax ("fail" | "exit_with_last")

Parameters (overload 2):

  • id (string) — Step identifier
  • fn ((input: TLast) => LoopResult<TOut>) — Loop body function
  • opts (LoopOptions, optional)

Returns: Flow<TInput, TOut> — The output type is the inner type of LoopResult.

Control flow type for loop body tasks.

import { LoopResult } from "sayiir";
// Continue the loop with a new value
LoopResult.again(newValue); // { _loop: "again", value: newValue }
// Exit the loop with the final value
LoopResult.done(finalValue); // { _loop: "done", value: finalValue }

Type: LoopResult<T> = { _loop: "again"; value: T } | { _loop: "done"; value: T }

interface LoopOptions {
maxIterations?: number; // default: 10
onMax?: "fail" | "exit_with_last"; // default: "fail"
}

Start parallel execution with an array of branch definitions.

import { flow, branch, task } from "sayiir";
const sendEmail = task("send-email", (order) => ({ emailSent: true }));
const shipOrder = task("ship-order", (order) => ({ shipped: true }));
const workflow = flow<Order>("process")
.then(chargePayment)
.fork([
branch("email", sendEmail),
branch("shipping", shipOrder),
])
.join("finalize", ([email, shipping]) => ({ ...email, ...shipping }))
.build();

Parameters:

  • branches (BranchDef[]) — Array of branch() definitions

Returns: ForkBuilder<TInput, TLast, TBranches> — Builder requiring .join().

Create a branch definition for use with .fork().

import { branch, task } from "sayiir";
const myBranch = branch("name", myTask);
const inlineBranch = branch("compute", (x: number) => x * 2);

Parameters:

  • name (string) — Branch name
  • fn (TaskFn | Function) — Branch task function

Returns: BranchDef<TIn, TOut>

Join forked branches with a combining function. The function receives a tuple of branch outputs in the same order as the branches array.

.fork([
branch("a", taskA), // returns string
branch("b", taskB), // returns number
])
.join("merge", ([a, b]) => {
// a: string, b: number — fully typed tuple
return { text: a, count: b };
})

Parameters:

  • id (string) — Join step identifier
  • fn ((branches: InferBranchOutputs<TBranches>) => TOut) — Combining function

Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the join output type.

Add a durable delay. No workers are held during the delay.

const workflow = flow<number>("delayed")
.then("send-email", sendEmail)
.delay("wait-24h", "24h")
.then("send-reminder", sendReminder)
.build();
// Also accepts milliseconds
flow("quick").delay("short-wait", 5000);

Parameters:

  • id (string) — Unique identifier for this delay step
  • duration (Duration) — Delay as a string ("30s", "5m", "1h") or milliseconds (number)

Returns: Flow<TInput, TLast> — Same output type (delay is pass-through).

.waitForSignal<TSignal>(id, signalName, opts?)

Section titled “.waitForSignal<TSignal>(id, signalName, opts?)”

Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.

const workflow = flow<number>("approval")
.then("submit", submitRequest)
.waitForSignal<{ approved: boolean }>("wait-approval", "manager_approval", {
timeout: "48h",
})
.then("process", (signal) => {
// signal: { approved: boolean }
return signal.approved ? "approved" : "rejected";
})
.build();

Parameters:

  • id (string) — Step identifier
  • signalName (string) — The named signal to wait for
  • opts.timeout (Duration, optional) — Timeout duration

Returns: Flow<TInput, TSignal> — Output type becomes the signal payload type.

Inline a child workflow. The child’s entire pipeline runs as a single composition step — its output becomes the input to the next step. Task registries are merged automatically.

import { flow, task } from "sayiir";
const double = task("double", (x: number) => x * 2);
const addTen = task("add-ten", (x: number) => x + 10);
// Child workflow
const childWorkflow = flow<number>("child")
.then(double)
.build();
// Parent inlines the child
const parent = flow<number>("parent")
.then(addTen)
.thenFlow(childWorkflow)
.then("format", (x) => `Result: ${x}`)
.build();

Parameters:

  • workflow (Workflow<TLast, TOut>) — A built workflow whose input type matches the current output type

Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the child’s output type.

Start conditional branching based on a routing key. The key function receives the current output and returns one of the declared keys. The keys array declares all valid branch keys upfront — TypeScript narrows the key type so .branch() only accepts declared values, and .done() checks exhaustiveness.

const workflow = flow<Ticket>("router")
.route(
(ticket) => ticket.type === "invoice" ? "billing" : "tech",
["billing", "tech"] as const
)
.branch("billing", handleBilling)
.branch("tech", handleTech)
.defaultBranch("fallback", fallbackHandler)
.done()
.then("summarize", (envelope) => `Handled via ${envelope.branch}`)
.build();

Parameters:

  • keyFn (TaskFn<TLast, Keys[number]> | (input: TLast) => Keys[number] | Promise<Keys[number]>) — Function that returns one of the declared keys.
  • keys (readonly string[]) — All valid branch keys. Use as const for type-level key constraint.

Returns: RouteBuilder<TInput, TLast, Keys>

Builder for conditional branches. Created by calling .route() on a Flow.

Add a named branch. The key parameter is type-constrained to the keys declared in .route(). The branch runs when the key function returns this key.

.branch("billing", handleBilling)
.branch("express", "express-handler", expressHandler)

Parameters (overload 1):

  • key (Keys[number]) — The routing key that selects this branch (must be one of the declared keys)
  • fn (TaskFn | Function) — Branch task function

Parameters (overload 2):

  • key (Keys[number]) — The routing key that selects this branch (must be one of the declared keys)
  • id (string) — Custom task ID
  • fn (Function) — Branch task function

Returns: RouteBuilder for chaining

.defaultBranch(fn) / .defaultBranch(id, fn)

Section titled “.defaultBranch(fn) / .defaultBranch(id, fn)”

Set the fallback branch for unmatched keys.

.defaultBranch("fallback", (input) => ({ status: "unmatched" }))

Returns: RouteBuilder for chaining

Finish the branching and return to the Flow builder. Throws an error if any declared key is missing a .branch() call, or if any .branch() key was not in the declared keys array (orphan branch). The output type becomes BranchEnvelope<T> with branch (the matched key) and result (the branch output).

.route(keyFn, ["a", "b"] as const)
.branch("a", handleA)
.branch("b", handleB)
.done()
.then("next", (envelope) => {
// envelope: { branch: "a", result: ... }
})

Returns: Flow<TInput, BranchEnvelope<TBranchOut>>

Compile the workflow definition.

const workflow = flow<number>("example").then(myTask).build();
console.log(workflow.workflowId); // "example"
console.log(workflow.definitionHash); // "abc123..."

Returns: Workflow<TInput, TLast>

Compiled workflow with task registry. Produced by Flow.build().

Properties:

  • workflowId (string) — The workflow’s unique identifier
  • definitionHash (string) — SHA256 hash of the workflow definition

Run a workflow to completion. Without options, runs entirely in-memory with no persistence (fastest path for prototyping). With opts, runs with full checkpointing — but still returns the output directly. If the workflow doesn’t complete (e.g., parks on a delay or signal), throws WorkflowError.

import { runWorkflow, PostgresBackend } from "sayiir";
// Prototype — no persistence
const result = await runWorkflow(workflow, 42);
// Production — same function, just add options
const backend = PostgresBackend.connect(process.env.DATABASE_URL!);
const result = await runWorkflow(workflow, 42, {
instanceId: "run-001",
backend,
});
// Idempotent — safe to call multiple times
const result = await runWorkflow(workflow, 42, {
instanceId: "run-001",
backend,
conflictPolicy: "useExisting",
});

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • input (TIn) — Input to the first task
  • opts (DurableRunOptions, optional) — Durable execution options

DurableRunOptions:

  • instanceId (string) — Unique execution instance ID
  • backend (Backend) — Persistence backend
  • conflictPolicy (ConflictPolicy, optional) — What to do when instanceId already has a snapshot: "fail" (default), "useExisting", or "terminateExisting"

Returns: Promise<TOut> — The workflow result

Throws: WorkflowError if durable workflow did not complete. Use runDurableWorkflow() when you need the full WorkflowStatus object.

Run a workflow synchronously. All tasks must return plain values, not Promises. For async tasks, use runWorkflow() instead.

import { runWorkflowSync } from "sayiir";
const result = runWorkflowSync(workflow, 42);

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • input (TIn) — Input to the first task

Returns: TOut — The workflow result

runDurableWorkflow(workflow, instanceId, input, backend, conflictPolicy?)

Section titled “runDurableWorkflow(workflow, instanceId, input, backend, conflictPolicy?)”

Run a workflow with checkpointing and durability. Returns WorkflowStatus.

import { runDurableWorkflow, InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();
const status = runDurableWorkflow(workflow, "run-001", 42, backend);
if (status.status === "completed") {
console.log(status.output); // 42
}
// Idempotent — returns existing status if already run
const status = runDurableWorkflow(workflow, "run-001", 42, backend, "useExisting");

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • instanceId (string) — Unique identifier for this execution
  • input (TIn) — Input to the first task
  • backend (Backend) — Persistence backend
  • conflictPolicy (ConflictPolicy, optional) — "fail" (default), "useExisting", or "terminateExisting"

Returns: WorkflowStatus<TOut>

resumeWorkflow(workflow, instanceId, backend)

Section titled “resumeWorkflow(workflow, instanceId, backend)”

Resume a durable workflow from a saved checkpoint.

import { resumeWorkflow } from "sayiir";
const status = resumeWorkflow(workflow, "run-001", backend);

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow definition
  • instanceId (string) — The instance ID used when the workflow was started
  • backend (Backend) — Persistence backend

Returns: WorkflowStatus<TOut>

cancelWorkflow(instanceId, backend, opts?)

Section titled “cancelWorkflow(instanceId, backend, opts?)”

Request cancellation of a running durable workflow.

import { cancelWorkflow } from "sayiir";
cancelWorkflow("run-001", backend, {
reason: "User requested cancellation",
cancelledBy: "admin@example.com",
});

Parameters:

  • instanceId (string) — The instance ID of the workflow to cancel
  • backend (Backend) — Persistence backend
  • opts.reason (string, optional) — Reason for cancellation
  • opts.cancelledBy (string, optional) — Identifier of who requested cancellation

Pause a running workflow at the next checkpoint.

import { pauseWorkflow } from "sayiir";
pauseWorkflow("run-001", backend, {
reason: "Maintenance window",
pausedBy: "ops@example.com",
});

Parameters:

  • instanceId (string) — The instance ID of the workflow to pause
  • backend (Backend) — Persistence backend
  • opts.reason (string, optional) — Reason for pause
  • opts.pausedBy (string, optional) — Identifier of who requested pause

Allow a paused workflow to resume.

import { unpauseWorkflow } from "sayiir";
unpauseWorkflow("run-001", backend);

Parameters:

  • instanceId (string) — The instance ID of the workflow to unpause
  • backend (Backend) — Persistence backend

sendSignal(instanceId, signalName, payload, backend)

Section titled “sendSignal(instanceId, signalName, payload, backend)”

Send an external signal to a workflow waiting on waitForSignal.

import { sendSignal } from "sayiir";
sendSignal(
"run-001",
"manager_approval",
{ approved: true, comments: "Looks good" },
backend,
);

Parameters:

  • instanceId (string) — The instance ID of the target workflow
  • signalName (string) — The signal name to send
  • payload (unknown) — The payload data (will be serialized)
  • backend (Backend) — Persistence backend

The payload is buffered in FIFO order per (instanceId, signalName). When the workflow resumes and reaches the matching waitForSignal node, it consumes the oldest buffered event.

In-memory storage for development and testing. State is lost when the process stops.

import { InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();

Fast and requires no external dependencies. Ideal for unit tests and local development.

PostgreSQL storage for production. State survives process restarts.

import { PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://user:pass@localhost:5432/sayiir");

Parameters:

  • url (string) — PostgreSQL connection URL

Requirements:

  • PostgreSQL 13 or higher
  • Auto-migration: tables are created on first connection
  • Recommended: dedicated database for Sayiir

Discriminated union returned by durable execution functions. Use TypeScript narrowing to access variant-specific fields.

const status = runDurableWorkflow(workflow, "run-001", input, backend);
switch (status.status) {
case "completed":
console.log(status.output); // TOut
break;
case "failed":
console.log(status.error); // string
break;
case "cancelled":
console.log(status.reason); // string | undefined
console.log(status.cancelledBy); // string | undefined
break;
case "paused":
console.log(status.reason); // string | undefined
console.log(status.pausedBy); // string | undefined
break;
case "waiting":
console.log(status.wakeAt); // string (ISO timestamp)
console.log(status.delayId); // string
break;
case "awaiting_signal":
console.log(status.signalId); // string
console.log(status.signalName); // string
console.log(status.wakeAt); // string | undefined
break;
case "in_progress":
break;
}

Variants:

StatusFields
completedoutput: TOut
in_progress
failederror: string
cancelledreason?: string, cancelledBy?: string
pausedreason?: string, pausedBy?: string
waitingwakeAt: string, delayId: string
awaiting_signalsignalId: string, signalName: string, wakeAt?: string

A number (milliseconds) or a human-readable string parsed by ms:

type Duration = string | number;
// Examples:
"30s" // 30 seconds
"5m" // 5 minutes
"1h" // 1 hour
"2d" // 2 days
1000 // 1000 milliseconds
interface RetryPolicy {
maxAttempts: number;
initialDelay: Duration;
backoffMultiplier?: number; // default: 2.0
maxDelay?: Duration;
}

Delay Formula:

delay = initialDelay * (backoffMultiplier ** attempt)

Options for inline .then() steps:

interface StepOptions {
timeout?: Duration;
retries?: number;
retry?: RetryPolicy;
}

Create a distributed workflow worker that polls the backend for available tasks.

import { Worker, PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect(process.env.DATABASE_URL!);
const worker = new Worker("worker-1", backend, [workflow], {
pollInterval: "2s",
claimTtl: "10m",
});

Constructor Parameters:

  • workerId (string) — Unique identifier for this worker instance
  • backend (Backend) — Shared persistence backend (InMemoryBackend or PostgresBackend)
  • workflows (Workflow[]) — Compiled workflows whose tasks this worker can execute
  • opts (WorkerOptions, optional) — Configuration options

WorkerOptions:

  • pollInterval (Duration, optional) — How often to poll for tasks ("5s", "1m", 2000)
  • claimTtl (Duration, optional) — Task claim time-to-live ("5m", "10m", 300000)

Properties:

  • workerId (string) — The worker’s unique identifier
  • backend (Backend) — The persistence backend
  • workflows (Workflow[]) — The registered workflows
  • options (WorkerOptions) — The configuration options

Start the worker and return a handle for lifecycle control. Spawns a background thread that polls for tasks, claims them, and dispatches them to registered task functions.

const handle = worker.start();
process.on("SIGTERM", () => handle.shutdown());

Returns: WorkerHandle

Handle for controlling a running worker. Obtained from Worker.start().

Request a graceful shutdown. The worker finishes in-flight tasks before stopping.

handle.shutdown();

Client for submitting and controlling workflow instances. Unlike runDurableWorkflow(), the client does not execute tasks — it only creates initial snapshots and stores lifecycle signals. A Worker picks up and executes the work.

Create a new workflow client.

import { WorkflowClient, PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://localhost/sayiir");
const client = new WorkflowClient(backend, { conflictPolicy: "useExisting" });

WorkflowClientOptions:

  • conflictPolicy (ConflictPolicy, optional) — What to do when an instanceId already exists: "fail" (default), "useExisting", or "terminateExisting"

Submit a workflow for execution. Creates an initial snapshot so a Worker can pick it up.

const status = client.submit(workflow, "order-42", { items: [1, 2, 3] });

Returns: WorkflowStatus<TOut>

Request cancellation of a workflow instance.

client.cancel("order-42", {
reason: "Out of stock",
cancelledBy: "admin@example.com",
});

Parameters:

  • instanceId (string) — The workflow instance ID
  • opts.reason (string, optional) — Reason for cancellation
  • opts.cancelledBy (string, optional) — Identifier of who requested cancellation

Pause a workflow instance at the next checkpoint.

client.pause("order-42", { reason: "Maintenance window" });

Parameters:

  • instanceId (string) — The workflow instance ID
  • opts.reason (string, optional) — Reason for pause
  • opts.pausedBy (string, optional) — Identifier of who requested pause

Unpause a paused workflow instance.

client.unpause("order-42");

.sendSignal(instanceId, signalName, payload)

Section titled “.sendSignal(instanceId, signalName, payload)”

Send an external signal to a workflow instance.

client.sendSignal("order-42", "manager_approval", { approved: true });

Parameters:

  • instanceId (string) — The workflow instance ID
  • signalName (string) — The signal name
  • payload (unknown) — The signal payload (serialized to JSON)

Get the current status of a workflow instance.

const status = client.status<string>("order-42");
if (status.status === "completed") {
console.log(status.output);
}

Returns: WorkflowStatus<TOut>

Get a single task result from a workflow instance.

Returns the JSON-encoded task output, or null if the task was never executed. For completed or failed workflows, the result is recovered from the backend’s history or cache.

const result = client.getTaskResult("order-42", "validate_order");
if (result !== null) {
const parsed = JSON.parse(result);
console.log(parsed);
}

Parameters:

  • instanceId (string) — The workflow instance ID
  • taskId (string) — The task ID to retrieve the result for

Returns: string | null

Base error for all Sayiir workflow errors.

import { WorkflowError } from "sayiir";
try {
await runWorkflow(workflow, input);
} catch (e) {
if (e instanceof WorkflowError) {
console.error(`Workflow error: ${e.message}`);
}
}

Raised when a task fails during execution. Extends WorkflowError.

Raised when a persistence backend operation fails. Extends WorkflowError.

Sayiir accepts Zod schemas as optional peer dependencies for runtime validation. Any object with a .parse(data): T method works.

import { z } from "zod";
import { task } from "sayiir";
const InputSchema = z.object({ id: z.string(), amount: z.number() });
const OutputSchema = z.object({ status: z.string() });
const myTask = task("validate", (input) => {
// input is validated and typed
return { status: "ok" };
}, {
input: InputSchema,
output: OutputSchema,
});

If Zod is not installed or schemas are not provided, validation is skipped.

runWorkflow() supports both sync and async task functions. Tasks can use fetch, timers, file I/O, or any async operation:

const fetchData = task("fetch", async (url: string) => {
const res = await fetch(url);
return res.json();
});
const result = await runWorkflow(workflow, "https://api.example.com/data");

For sync-only workflows, runWorkflowSync() provides a faster synchronous path. It throws if any task returns a Promise.