Node.js API Reference
Task Definition
Section titled “Task Definition”task(id, fn, opts?)
Section titled “task(id, fn, opts?)”Create a named task with optional configuration.
import { task } from "sayiir";
const myTask = task("my-task", (input: { value: number }) => { return { result: input.value * 2 };});
const configured = task("configured-task", (input: string) => { return processData(input);}, { timeout: "30s", retries: 3, tags: ["io", "external"], description: "Processes input data",});Parameters:
id(string) — Unique task identifierfn((input: TIn) => TOut | Promise<TOut>) — The task function (sync or async)opts(TaskOptions, optional) — Configuration options
TaskOptions:
timeout(Duration, optional) — Task timeout ("30s","5m",1000)retries(number, optional) — Simple retry count with default backoffretry(RetryPolicy, optional) — Full retry configurationtags(string[], optional) — Categorization tagsdescription(string, optional) — Human-readable descriptioninput(ZodLike, optional) — Zod schema for input validationoutput(ZodLike, optional) — Zod schema for output validation
The task() function attaches metadata to the function without wrapping it. The function can still be called directly for testing.
Returns: TaskFn<TIn, TOut> — A branded function with metadata properties.
Flow Builder
Section titled “Flow Builder”flow<TInput>(name)
Section titled “flow<TInput>(name)”Create a new type-safe workflow builder. The generic parameter TInput sets the workflow’s input type.
import { flow } from "sayiir";
const builder = flow<number>("my-workflow");Returns: Flow<TInput, TInput> — A flow builder for chaining.
.then(fn) / .then(id, fn, opts?)
Section titled “.then(fn) / .then(id, fn, opts?)”Add a sequential task step. Two overloads:
// With a task() function — uses the task's ID and metadataconst workflow = flow<number>("pipeline") .then(double) .then(addTen) .build();
// With an inline function — provide an IDconst workflow = flow<number>("pipeline") .then("double", (x) => x * 2) .then("format", (x) => `Result: ${x}`, { timeout: "10s" }) .build();
// Auto-generated ID (lambda_0, lambda_1, ...)const workflow = flow<number>("pipeline") .then((x) => x * 2) .build();Parameters (overload 1):
fn(TaskFn<TLast, TOut>) — Atask()function
Parameters (overload 2):
id(string) — Step identifierfn((input: TLast) => TOut | Promise<TOut>) — Task function (sync or async)opts(StepOptions, optional) — Step-leveltimeout,retries, orretry
Returns: Flow<TInput, Awaited<TOut>> — Updated flow with new output type.
.fork(branches)
Section titled “.fork(branches)”Start parallel execution with an array of branch definitions.
import { flow, branch, task } from "sayiir";
const sendEmail = task("send-email", (order) => ({ emailSent: true }));const shipOrder = task("ship-order", (order) => ({ shipped: true }));
const workflow = flow<Order>("process") .then(chargePayment) .fork([ branch("email", sendEmail), branch("shipping", shipOrder), ]) .join("finalize", ([email, shipping]) => ({ ...email, ...shipping })) .build();Parameters:
branches(BranchDef[]) — Array ofbranch()definitions
Returns: ForkBuilder<TInput, TLast, TBranches> — Builder requiring .join().
branch(name, fn)
Section titled “branch(name, fn)”Create a branch definition for use with .fork().
import { branch, task } from "sayiir";
const myBranch = branch("name", myTask);const inlineBranch = branch("compute", (x: number) => x * 2);Parameters:
name(string) — Branch namefn(TaskFn | Function) — Branch task function
Returns: BranchDef<TIn, TOut>
.join(id, fn)
Section titled “.join(id, fn)”Join forked branches with a combining function. The function receives a tuple of branch outputs in the same order as the branches array.
.fork([ branch("a", taskA), // returns string branch("b", taskB), // returns number]).join("merge", ([a, b]) => { // a: string, b: number — fully typed tuple return { text: a, count: b };})Parameters:
id(string) — Join step identifierfn((branches: InferBranchOutputs<TBranches>) => TOut) — Combining function
Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the join output type.
.delay(id, duration)
Section titled “.delay(id, duration)”Add a durable delay. No workers are held during the delay.
const workflow = flow<number>("delayed") .then("send-email", sendEmail) .delay("wait-24h", "24h") .then("send-reminder", sendReminder) .build();
// Also accepts millisecondsflow("quick").delay("short-wait", 5000);Parameters:
id(string) — Unique identifier for this delay stepduration(Duration) — Delay as a string ("30s","5m","1h") or milliseconds (number)
Returns: Flow<TInput, TLast> — Same output type (delay is pass-through).
.waitForSignal<TSignal>(id, signalName, opts?)
Section titled “.waitForSignal<TSignal>(id, signalName, opts?)”Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.
const workflow = flow<number>("approval") .then("submit", submitRequest) .waitForSignal<{ approved: boolean }>("wait-approval", "manager_approval", { timeout: "48h", }) .then("process", (signal) => { // signal: { approved: boolean } return signal.approved ? "approved" : "rejected"; }) .build();Parameters:
id(string) — Step identifiersignalName(string) — The named signal to wait foropts.timeout(Duration, optional) — Timeout duration
Returns: Flow<TInput, TSignal> — Output type becomes the signal payload type.
.build()
Section titled “.build()”Compile the workflow definition.
const workflow = flow<number>("example").then(myTask).build();console.log(workflow.workflowId); // "example"console.log(workflow.definitionHash); // "abc123..."Returns: Workflow<TInput, TLast>
Workflow
Section titled “Workflow”Compiled workflow with task registry. Produced by Flow.build().
Properties:
workflowId(string) — The workflow’s unique identifierdefinitionHash(string) — SHA256 hash of the workflow definition
Execution Functions
Section titled “Execution Functions”runWorkflow(workflow, input)
Section titled “runWorkflow(workflow, input)”Run a workflow to completion without persistence. Supports both sync and async task functions.
import { runWorkflow } from "sayiir";
const result = await runWorkflow(workflow, 42);console.log(result); // The final outputParameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinput(TIn) — Input to the first task
Returns: Promise<TOut> — The workflow result
runWorkflowSync(workflow, input)
Section titled “runWorkflowSync(workflow, input)”Run a workflow synchronously. All tasks must return plain values, not Promises. For async tasks, use runWorkflow() instead.
import { runWorkflowSync } from "sayiir";
const result = runWorkflowSync(workflow, 42);Parameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinput(TIn) — Input to the first task
Returns: TOut — The workflow result
runDurableWorkflow(workflow, instanceId, input, backend)
Section titled “runDurableWorkflow(workflow, instanceId, input, backend)”Run a workflow with checkpointing and durability. Returns WorkflowStatus.
import { runDurableWorkflow, InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();const status = runDurableWorkflow(workflow, "run-001", 42, backend);
if (status.status === "completed") { console.log(status.output); // 42}Parameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinstanceId(string) — Unique identifier for this executioninput(TIn) — Input to the first taskbackend(Backend) — Persistence backend
Returns: WorkflowStatus<TOut>
resumeWorkflow(workflow, instanceId, backend)
Section titled “resumeWorkflow(workflow, instanceId, backend)”Resume a durable workflow from a saved checkpoint.
import { resumeWorkflow } from "sayiir";
const status = resumeWorkflow(workflow, "run-001", backend);Parameters:
workflow(Workflow<TIn, TOut>) — The workflow definitioninstanceId(string) — The instance ID used when the workflow was startedbackend(Backend) — Persistence backend
Returns: WorkflowStatus<TOut>
cancelWorkflow(instanceId, backend, opts?)
Section titled “cancelWorkflow(instanceId, backend, opts?)”Request cancellation of a running durable workflow.
import { cancelWorkflow } from "sayiir";
cancelWorkflow("run-001", backend, { reason: "User requested cancellation", cancelledBy: "admin@example.com",});Parameters:
instanceId(string) — The instance ID of the workflow to cancelbackend(Backend) — Persistence backendopts.reason(string, optional) — Reason for cancellationopts.cancelledBy(string, optional) — Identifier of who requested cancellation
pauseWorkflow(instanceId, backend, opts?)
Section titled “pauseWorkflow(instanceId, backend, opts?)”Pause a running workflow at the next checkpoint.
import { pauseWorkflow } from "sayiir";
pauseWorkflow("run-001", backend, { reason: "Maintenance window", pausedBy: "ops@example.com",});Parameters:
instanceId(string) — The instance ID of the workflow to pausebackend(Backend) — Persistence backendopts.reason(string, optional) — Reason for pauseopts.pausedBy(string, optional) — Identifier of who requested pause
unpauseWorkflow(instanceId, backend)
Section titled “unpauseWorkflow(instanceId, backend)”Allow a paused workflow to resume.
import { unpauseWorkflow } from "sayiir";
unpauseWorkflow("run-001", backend);Parameters:
instanceId(string) — The instance ID of the workflow to unpausebackend(Backend) — Persistence backend
sendSignal(instanceId, signalName, payload, backend)
Section titled “sendSignal(instanceId, signalName, payload, backend)”Send an external signal to a workflow waiting on waitForSignal.
import { sendSignal } from "sayiir";
sendSignal( "run-001", "manager_approval", { approved: true, comments: "Looks good" }, backend,);Parameters:
instanceId(string) — The instance ID of the target workflowsignalName(string) — The signal name to sendpayload(unknown) — The payload data (will be serialized)backend(Backend) — Persistence backend
The payload is buffered in FIFO order per (instanceId, signalName). When the workflow resumes and reaches the matching waitForSignal node, it consumes the oldest buffered event.
Backends
Section titled “Backends”InMemoryBackend
Section titled “InMemoryBackend”In-memory storage for development and testing. State is lost when the process stops.
import { InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();Fast and requires no external dependencies. Ideal for unit tests and local development.
PostgresBackend
Section titled “PostgresBackend”PostgreSQL storage for production. State survives process restarts.
import { PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://user:pass@localhost:5432/sayiir");Parameters:
url(string) — PostgreSQL connection URL
Requirements:
- PostgreSQL 13 or higher
- Auto-migration: tables are created on first connection
- Recommended: dedicated database for Sayiir
WorkflowStatus<TOut>
Section titled “WorkflowStatus<TOut>”Discriminated union returned by durable execution functions. Use TypeScript narrowing to access variant-specific fields.
const status = runDurableWorkflow(workflow, "run-001", input, backend);
switch (status.status) { case "completed": console.log(status.output); // TOut break; case "failed": console.log(status.error); // string break; case "cancelled": console.log(status.reason); // string | undefined console.log(status.cancelledBy); // string | undefined break; case "paused": console.log(status.reason); // string | undefined console.log(status.pausedBy); // string | undefined break; case "waiting": console.log(status.wakeAt); // string (ISO timestamp) console.log(status.delayId); // string break; case "awaiting_signal": console.log(status.signalId); // string console.log(status.signalName); // string console.log(status.wakeAt); // string | undefined break; case "in_progress": break;}Variants:
| Status | Fields |
|---|---|
completed | output: TOut |
in_progress | — |
failed | error: string |
cancelled | reason?: string, cancelledBy?: string |
paused | reason?: string, pausedBy?: string |
waiting | wakeAt: string, delayId: string |
awaiting_signal | signalId: string, signalName: string, wakeAt?: string |
Duration
Section titled “Duration”A number (milliseconds) or a human-readable string parsed by ms:
type Duration = string | number;
// Examples:"30s" // 30 seconds"5m" // 5 minutes"1h" // 1 hour"2d" // 2 days1000 // 1000 millisecondsRetryPolicy
Section titled “RetryPolicy”interface RetryPolicy { maxAttempts: number; initialDelay: Duration; backoffMultiplier?: number; // default: 2.0 maxDelay?: Duration;}Delay Formula:
delay = initialDelay * (backoffMultiplier ** attempt)StepOptions
Section titled “StepOptions”Options for inline .then() steps:
interface StepOptions { timeout?: Duration; retries?: number; retry?: RetryPolicy;}Exceptions
Section titled “Exceptions”WorkflowError
Section titled “WorkflowError”Base error for all Sayiir workflow errors.
import { WorkflowError } from "sayiir";
try { await runWorkflow(workflow, input);} catch (e) { if (e instanceof WorkflowError) { console.error(`Workflow error: ${e.message}`); }}TaskError
Section titled “TaskError”Raised when a task fails during execution. Extends WorkflowError.
BackendError
Section titled “BackendError”Raised when a persistence backend operation fails. Extends WorkflowError.
Zod Integration
Section titled “Zod Integration”Sayiir accepts Zod schemas as optional peer dependencies for runtime validation. Any object with a .parse(data): T method works.
import { z } from "zod";import { task } from "sayiir";
const InputSchema = z.object({ id: z.string(), amount: z.number() });const OutputSchema = z.object({ status: z.string() });
const myTask = task("validate", (input) => { // input is validated and typed return { status: "ok" };}, { input: InputSchema, output: OutputSchema,});If Zod is not installed or schemas are not provided, validation is skipped.
Async Tasks
Section titled “Async Tasks”runWorkflow() supports both sync and async task functions. Tasks can use fetch, timers, file I/O, or any async operation:
const fetchData = task("fetch", async (url: string) => { const res = await fetch(url); return res.json();});
const result = await runWorkflow(workflow, "https://api.example.com/data");For sync-only workflows, runWorkflowSync() provides a faster synchronous path. It throws if any task returns a Promise.