Skip to content

Node.js API Reference

Create a named task with optional configuration.

import { task } from "sayiir";
const myTask = task("my-task", (input: { value: number }) => {
return { result: input.value * 2 };
});
const configured = task("configured-task", (input: string) => {
return processData(input);
}, {
timeout: "30s",
retries: 3,
tags: ["io", "external"],
description: "Processes input data",
});

Parameters:

  • id (string) — Unique task identifier
  • fn ((input: TIn) => TOut | Promise<TOut>) — The task function (sync or async)
  • opts (TaskOptions, optional) — Configuration options

TaskOptions:

  • timeout (Duration, optional) — Task timeout ("30s", "5m", 1000)
  • retries (number, optional) — Simple retry count with default backoff
  • retry (RetryPolicy, optional) — Full retry configuration
  • tags (string[], optional) — Categorization tags
  • description (string, optional) — Human-readable description
  • input (ZodLike, optional) — Zod schema for input validation
  • output (ZodLike, optional) — Zod schema for output validation

The task() function attaches metadata to the function without wrapping it. The function can still be called directly for testing.

Returns: TaskFn<TIn, TOut> — A branded function with metadata properties.

Create a new type-safe workflow builder. The generic parameter TInput sets the workflow’s input type.

import { flow } from "sayiir";
const builder = flow<number>("my-workflow");

Returns: Flow<TInput, TInput> — A flow builder for chaining.

Add a sequential task step. Two overloads:

// With a task() function — uses the task's ID and metadata
const workflow = flow<number>("pipeline")
.then(double)
.then(addTen)
.build();
// With an inline function — provide an ID
const workflow = flow<number>("pipeline")
.then("double", (x) => x * 2)
.then("format", (x) => `Result: ${x}`, { timeout: "10s" })
.build();
// Auto-generated ID (lambda_0, lambda_1, ...)
const workflow = flow<number>("pipeline")
.then((x) => x * 2)
.build();

Parameters (overload 1):

  • fn (TaskFn<TLast, TOut>) — A task() function

Parameters (overload 2):

  • id (string) — Step identifier
  • fn ((input: TLast) => TOut | Promise<TOut>) — Task function (sync or async)
  • opts (StepOptions, optional) — Step-level timeout, retries, or retry

Returns: Flow<TInput, Awaited<TOut>> — Updated flow with new output type.

Start parallel execution with an array of branch definitions.

import { flow, branch, task } from "sayiir";
const sendEmail = task("send-email", (order) => ({ emailSent: true }));
const shipOrder = task("ship-order", (order) => ({ shipped: true }));
const workflow = flow<Order>("process")
.then(chargePayment)
.fork([
branch("email", sendEmail),
branch("shipping", shipOrder),
])
.join("finalize", ([email, shipping]) => ({ ...email, ...shipping }))
.build();

Parameters:

  • branches (BranchDef[]) — Array of branch() definitions

Returns: ForkBuilder<TInput, TLast, TBranches> — Builder requiring .join().

Create a branch definition for use with .fork().

import { branch, task } from "sayiir";
const myBranch = branch("name", myTask);
const inlineBranch = branch("compute", (x: number) => x * 2);

Parameters:

  • name (string) — Branch name
  • fn (TaskFn | Function) — Branch task function

Returns: BranchDef<TIn, TOut>

Join forked branches with a combining function. The function receives a tuple of branch outputs in the same order as the branches array.

.fork([
branch("a", taskA), // returns string
branch("b", taskB), // returns number
])
.join("merge", ([a, b]) => {
// a: string, b: number — fully typed tuple
return { text: a, count: b };
})

Parameters:

  • id (string) — Join step identifier
  • fn ((branches: InferBranchOutputs<TBranches>) => TOut) — Combining function

Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the join output type.

Add a durable delay. No workers are held during the delay.

const workflow = flow<number>("delayed")
.then("send-email", sendEmail)
.delay("wait-24h", "24h")
.then("send-reminder", sendReminder)
.build();
// Also accepts milliseconds
flow("quick").delay("short-wait", 5000);

Parameters:

  • id (string) — Unique identifier for this delay step
  • duration (Duration) — Delay as a string ("30s", "5m", "1h") or milliseconds (number)

Returns: Flow<TInput, TLast> — Same output type (delay is pass-through).

.waitForSignal<TSignal>(id, signalName, opts?)

Section titled “.waitForSignal<TSignal>(id, signalName, opts?)”

Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.

const workflow = flow<number>("approval")
.then("submit", submitRequest)
.waitForSignal<{ approved: boolean }>("wait-approval", "manager_approval", {
timeout: "48h",
})
.then("process", (signal) => {
// signal: { approved: boolean }
return signal.approved ? "approved" : "rejected";
})
.build();

Parameters:

  • id (string) — Step identifier
  • signalName (string) — The named signal to wait for
  • opts.timeout (Duration, optional) — Timeout duration

Returns: Flow<TInput, TSignal> — Output type becomes the signal payload type.

Compile the workflow definition.

const workflow = flow<number>("example").then(myTask).build();
console.log(workflow.workflowId); // "example"
console.log(workflow.definitionHash); // "abc123..."

Returns: Workflow<TInput, TLast>

Compiled workflow with task registry. Produced by Flow.build().

Properties:

  • workflowId (string) — The workflow’s unique identifier
  • definitionHash (string) — SHA256 hash of the workflow definition

Run a workflow to completion without persistence. Supports both sync and async task functions.

import { runWorkflow } from "sayiir";
const result = await runWorkflow(workflow, 42);
console.log(result); // The final output

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • input (TIn) — Input to the first task

Returns: Promise<TOut> — The workflow result

Run a workflow synchronously. All tasks must return plain values, not Promises. For async tasks, use runWorkflow() instead.

import { runWorkflowSync } from "sayiir";
const result = runWorkflowSync(workflow, 42);

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • input (TIn) — Input to the first task

Returns: TOut — The workflow result

runDurableWorkflow(workflow, instanceId, input, backend)

Section titled “runDurableWorkflow(workflow, instanceId, input, backend)”

Run a workflow with checkpointing and durability. Returns WorkflowStatus.

import { runDurableWorkflow, InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();
const status = runDurableWorkflow(workflow, "run-001", 42, backend);
if (status.status === "completed") {
console.log(status.output); // 42
}

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow to run
  • instanceId (string) — Unique identifier for this execution
  • input (TIn) — Input to the first task
  • backend (Backend) — Persistence backend

Returns: WorkflowStatus<TOut>

resumeWorkflow(workflow, instanceId, backend)

Section titled “resumeWorkflow(workflow, instanceId, backend)”

Resume a durable workflow from a saved checkpoint.

import { resumeWorkflow } from "sayiir";
const status = resumeWorkflow(workflow, "run-001", backend);

Parameters:

  • workflow (Workflow<TIn, TOut>) — The workflow definition
  • instanceId (string) — The instance ID used when the workflow was started
  • backend (Backend) — Persistence backend

Returns: WorkflowStatus<TOut>

cancelWorkflow(instanceId, backend, opts?)

Section titled “cancelWorkflow(instanceId, backend, opts?)”

Request cancellation of a running durable workflow.

import { cancelWorkflow } from "sayiir";
cancelWorkflow("run-001", backend, {
reason: "User requested cancellation",
cancelledBy: "admin@example.com",
});

Parameters:

  • instanceId (string) — The instance ID of the workflow to cancel
  • backend (Backend) — Persistence backend
  • opts.reason (string, optional) — Reason for cancellation
  • opts.cancelledBy (string, optional) — Identifier of who requested cancellation

Pause a running workflow at the next checkpoint.

import { pauseWorkflow } from "sayiir";
pauseWorkflow("run-001", backend, {
reason: "Maintenance window",
pausedBy: "ops@example.com",
});

Parameters:

  • instanceId (string) — The instance ID of the workflow to pause
  • backend (Backend) — Persistence backend
  • opts.reason (string, optional) — Reason for pause
  • opts.pausedBy (string, optional) — Identifier of who requested pause

Allow a paused workflow to resume.

import { unpauseWorkflow } from "sayiir";
unpauseWorkflow("run-001", backend);

Parameters:

  • instanceId (string) — The instance ID of the workflow to unpause
  • backend (Backend) — Persistence backend

sendSignal(instanceId, signalName, payload, backend)

Section titled “sendSignal(instanceId, signalName, payload, backend)”

Send an external signal to a workflow waiting on waitForSignal.

import { sendSignal } from "sayiir";
sendSignal(
"run-001",
"manager_approval",
{ approved: true, comments: "Looks good" },
backend,
);

Parameters:

  • instanceId (string) — The instance ID of the target workflow
  • signalName (string) — The signal name to send
  • payload (unknown) — The payload data (will be serialized)
  • backend (Backend) — Persistence backend

The payload is buffered in FIFO order per (instanceId, signalName). When the workflow resumes and reaches the matching waitForSignal node, it consumes the oldest buffered event.

In-memory storage for development and testing. State is lost when the process stops.

import { InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();

Fast and requires no external dependencies. Ideal for unit tests and local development.

PostgreSQL storage for production. State survives process restarts.

import { PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://user:pass@localhost:5432/sayiir");

Parameters:

  • url (string) — PostgreSQL connection URL

Requirements:

  • PostgreSQL 13 or higher
  • Auto-migration: tables are created on first connection
  • Recommended: dedicated database for Sayiir

Discriminated union returned by durable execution functions. Use TypeScript narrowing to access variant-specific fields.

const status = runDurableWorkflow(workflow, "run-001", input, backend);
switch (status.status) {
case "completed":
console.log(status.output); // TOut
break;
case "failed":
console.log(status.error); // string
break;
case "cancelled":
console.log(status.reason); // string | undefined
console.log(status.cancelledBy); // string | undefined
break;
case "paused":
console.log(status.reason); // string | undefined
console.log(status.pausedBy); // string | undefined
break;
case "waiting":
console.log(status.wakeAt); // string (ISO timestamp)
console.log(status.delayId); // string
break;
case "awaiting_signal":
console.log(status.signalId); // string
console.log(status.signalName); // string
console.log(status.wakeAt); // string | undefined
break;
case "in_progress":
break;
}

Variants:

StatusFields
completedoutput: TOut
in_progress
failederror: string
cancelledreason?: string, cancelledBy?: string
pausedreason?: string, pausedBy?: string
waitingwakeAt: string, delayId: string
awaiting_signalsignalId: string, signalName: string, wakeAt?: string

A number (milliseconds) or a human-readable string parsed by ms:

type Duration = string | number;
// Examples:
"30s" // 30 seconds
"5m" // 5 minutes
"1h" // 1 hour
"2d" // 2 days
1000 // 1000 milliseconds
interface RetryPolicy {
maxAttempts: number;
initialDelay: Duration;
backoffMultiplier?: number; // default: 2.0
maxDelay?: Duration;
}

Delay Formula:

delay = initialDelay * (backoffMultiplier ** attempt)

Options for inline .then() steps:

interface StepOptions {
timeout?: Duration;
retries?: number;
retry?: RetryPolicy;
}

Base error for all Sayiir workflow errors.

import { WorkflowError } from "sayiir";
try {
await runWorkflow(workflow, input);
} catch (e) {
if (e instanceof WorkflowError) {
console.error(`Workflow error: ${e.message}`);
}
}

Raised when a task fails during execution. Extends WorkflowError.

Raised when a persistence backend operation fails. Extends WorkflowError.

Sayiir accepts Zod schemas as optional peer dependencies for runtime validation. Any object with a .parse(data): T method works.

import { z } from "zod";
import { task } from "sayiir";
const InputSchema = z.object({ id: z.string(), amount: z.number() });
const OutputSchema = z.object({ status: z.string() });
const myTask = task("validate", (input) => {
// input is validated and typed
return { status: "ok" };
}, {
input: InputSchema,
output: OutputSchema,
});

If Zod is not installed or schemas are not provided, validation is skipped.

runWorkflow() supports both sync and async task functions. Tasks can use fetch, timers, file I/O, or any async operation:

const fetchData = task("fetch", async (url: string) => {
const res = await fetch(url);
return res.json();
});
const result = await runWorkflow(workflow, "https://api.example.com/data");

For sync-only workflows, runWorkflowSync() provides a faster synchronous path. It throws if any task returns a Promise.