Node.js API Reference
Task Definition
Section titled “Task Definition”task(id, fn, opts?)
Section titled “task(id, fn, opts?)”Create a named task with optional configuration.
import { task } from "sayiir";
const myTask = task("my-task", (input: { value: number }) => { return { result: input.value * 2 };});
const configured = task("configured-task", (input: string) => { return processData(input);}, { timeout: "30s", retries: 3, tags: ["io", "external"], description: "Processes input data",});Parameters:
id(string) — Unique task identifierfn((input: TIn) => TOut | Promise<TOut>) — The task function (sync or async)opts(TaskOptions, optional) — Configuration options
TaskOptions:
timeout(Duration, optional) — Task timeout ("30s","5m",1000)retries(number, optional) — Simple retry count with default backoffretry(RetryPolicy, optional) — Full retry configurationtags(string[], optional) — Categorization tagsdescription(string, optional) — Human-readable descriptionpriority(number, optional) — Execution priority (1–5, lower = higher priority, default: 3)input(ZodLike, optional) — Zod schema for input validationoutput(ZodLike, optional) — Zod schema for output validation
The task() function attaches metadata to the function without wrapping it. The function can still be called directly for testing.
Returns: TaskFn<TIn, TOut> — A branded function with metadata properties.
Task Execution Context
Section titled “Task Execution Context”getTaskContext()
Section titled “getTaskContext()”Get the current task execution context from within a running task. Returns null if called outside of a task execution.
import { task, getTaskContext } from "sayiir";
const myTask = task("my-task", (data: { value: number }) => { const ctx = getTaskContext(); if (ctx) { console.log(`Running task ${ctx.taskId} in workflow ${ctx.workflowId}`); console.log(`Instance: ${ctx.instanceId}`); } return { result: data.value * 2 };});Returns: TaskExecutionContext | null
TaskExecutionContext
Section titled “TaskExecutionContext”Read-only context available from within a running task. Provides access to workflow and task metadata.
interface TaskExecutionContext { workflowId: string; instanceId: string; taskId: string; metadata: TaskMetadata; workflowMetadata?: Record<string, unknown> | null;}
interface TaskMetadata { displayName?: string; description?: string; timeoutSecs?: number; retries?: { maxRetries: number; initialDelaySecs: number; backoffMultiplier: number; maxDelaySecs?: number; }; tags?: string[]; version?: string; priority?: number;}Properties:
workflowId(string) — The workflow definition identifierinstanceId(string) — The workflow instance identifier (for durable workflows) or workflow ID (for in-memory runs)taskId(string) — The current task identifiermetadata(TaskMetadata) — Task metadata (timeout, retries, tags, version, etc.)
const fetchData = task("fetch-data", async (url: string) => { const ctx = getTaskContext(); if (ctx) { console.log(`Timeout: ${ctx.metadata.timeoutSecs}s`); console.log(`Tags: ${ctx.metadata.tags}`); } return doFetch(url);}, { timeout: "30s", retries: 3, tags: ["io"] });Flow Builder
Section titled “Flow Builder”flow<TInput>(name)
Section titled “flow<TInput>(name)”Create a new type-safe workflow builder. The generic parameter TInput sets the workflow’s input type.
import { flow } from "sayiir";
const builder = flow<number>("my-workflow");Returns: Flow<TInput, TInput> — A flow builder for chaining.
.then(fn) / .then(id, fn, opts?)
Section titled “.then(fn) / .then(id, fn, opts?)”Add a sequential task step. Two overloads:
// With a task() function — uses the task's ID and metadataconst workflow = flow<number>("pipeline") .then(double) .then(addTen) .build();
// With an inline function — provide an IDconst workflow = flow<number>("pipeline") .then("double", (x) => x * 2) .then("format", (x) => `Result: ${x}`, { timeout: "10s" }) .build();
// Auto-generated ID (lambda_0, lambda_1, ...)const workflow = flow<number>("pipeline") .then((x) => x * 2) .build();Parameters (overload 1):
fn(TaskFn<TLast, TOut>) — Atask()function
Parameters (overload 2):
id(string) — Step identifierfn((input: TLast) => TOut | Promise<TOut>) — Task function (sync or async)opts(StepOptions, optional) — Step-leveltimeout,retries, orretry
Returns: Flow<TInput, Awaited<TOut>> — Updated flow with new output type.
.loop(fn, opts?) / .loop(id, fn, opts?)
Section titled “.loop(fn, opts?) / .loop(id, fn, opts?)”Add a loop whose body repeats until it returns LoopResult.done().
import { flow, task, LoopResult } from "sayiir";
const refine = task("refine", (draft: string) => { const improved = improve(draft); return isGoodEnough(improved) ? LoopResult.done(improved) : LoopResult.again(improved);});
const workflow = flow<string>("iterative") .then(initialAttempt) .loop(refine, { maxIterations: 5 }) .then(finalize) .build();Parameters (overload 1):
fn(TaskFn<TLast, LoopResult<TOut>>) — Atask()function returningLoopResultopts(LoopOptions, optional) —maxIterations(default: 10, must be ≥ 1),onMax("fail"|"exit_with_last")
Parameters (overload 2):
id(string) — Step identifierfn((input: TLast) => LoopResult<TOut>) — Loop body functionopts(LoopOptions, optional)
Returns: Flow<TInput, TOut> — The output type is the inner type of LoopResult.
LoopResult
Section titled “LoopResult”Control flow type for loop body tasks.
import { LoopResult } from "sayiir";
// Continue the loop with a new valueLoopResult.again(newValue); // { _loop: "again", value: newValue }
// Exit the loop with the final valueLoopResult.done(finalValue); // { _loop: "done", value: finalValue }Type: LoopResult<T> = { _loop: "again"; value: T } | { _loop: "done"; value: T }
LoopOptions
Section titled “LoopOptions”interface LoopOptions { maxIterations?: number; // default: 10 onMax?: "fail" | "exit_with_last"; // default: "fail"}.fork(branches)
Section titled “.fork(branches)”Start parallel execution with an array of branch definitions.
import { flow, branch, task } from "sayiir";
const sendEmail = task("send-email", (order) => ({ emailSent: true }));const shipOrder = task("ship-order", (order) => ({ shipped: true }));
const workflow = flow<Order>("process") .then(chargePayment) .fork([ branch("email", sendEmail), branch("shipping", shipOrder), ]) .join("finalize", ([email, shipping]) => ({ ...email, ...shipping })) .build();Parameters:
branches(BranchDef[]) — Array ofbranch()definitions
Returns: ForkBuilder<TInput, TLast, TBranches> — Builder requiring .join().
branch(name, fn)
Section titled “branch(name, fn)”Create a branch definition for use with .fork().
import { branch, task } from "sayiir";
const myBranch = branch("name", myTask);const inlineBranch = branch("compute", (x: number) => x * 2);Parameters:
name(string) — Branch namefn(TaskFn | Function) — Branch task function
Returns: BranchDef<TIn, TOut>
.join(id, fn)
Section titled “.join(id, fn)”Join forked branches with a combining function. The function receives a tuple of branch outputs in the same order as the branches array.
.fork([ branch("a", taskA), // returns string branch("b", taskB), // returns number]).join("merge", ([a, b]) => { // a: string, b: number — fully typed tuple return { text: a, count: b };})Parameters:
id(string) — Join step identifierfn((branches: InferBranchOutputs<TBranches>) => TOut) — Combining function
Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the join output type.
.delay(id, duration)
Section titled “.delay(id, duration)”Add a durable delay. No workers are held during the delay.
const workflow = flow<number>("delayed") .then("send-email", sendEmail) .delay("wait-24h", "24h") .then("send-reminder", sendReminder) .build();
// Also accepts millisecondsflow("quick").delay("short-wait", 5000);Parameters:
id(string) — Unique identifier for this delay stepduration(Duration) — Delay as a string ("30s","5m","1h") or milliseconds (number)
Returns: Flow<TInput, TLast> — Same output type (delay is pass-through).
.waitForSignal<TSignal>(id, signalName, opts?)
Section titled “.waitForSignal<TSignal>(id, signalName, opts?)”Wait for an external signal before continuing. The workflow parks and releases the worker until the signal arrives.
const workflow = flow<number>("approval") .then("submit", submitRequest) .waitForSignal<{ approved: boolean }>("wait-approval", "manager_approval", { timeout: "48h", }) .then("process", (signal) => { // signal: { approved: boolean } return signal.approved ? "approved" : "rejected"; }) .build();Parameters:
id(string) — Step identifiersignalName(string) — The named signal to wait foropts.timeout(Duration, optional) — Timeout duration
Returns: Flow<TInput, TSignal> — Output type becomes the signal payload type.
.thenFlow(workflow)
Section titled “.thenFlow(workflow)”Inline a child workflow. The child’s entire pipeline runs as a single composition step — its output becomes the input to the next step. Task registries are merged automatically.
import { flow, task } from "sayiir";
const double = task("double", (x: number) => x * 2);const addTen = task("add-ten", (x: number) => x + 10);
// Child workflowconst childWorkflow = flow<number>("child") .then(double) .build();
// Parent inlines the childconst parent = flow<number>("parent") .then(addTen) .thenFlow(childWorkflow) .then("format", (x) => `Result: ${x}`) .build();Parameters:
workflow(Workflow<TLast, TOut>) — A built workflow whose input type matches the current output type
Returns: Flow<TInput, Awaited<TOut>> — Flow continues with the child’s output type.
.route(keyFn, keys)
Section titled “.route(keyFn, keys)”Start conditional branching based on a routing key. The key function receives the current output and returns one of the declared keys. The keys array declares all valid branch keys upfront — TypeScript narrows the key type so .branch() only accepts declared values, and .done() checks exhaustiveness.
const workflow = flow<Ticket>("router") .route( (ticket) => ticket.type === "invoice" ? "billing" : "tech", ["billing", "tech"] as const ) .branch("billing", handleBilling) .branch("tech", handleTech) .defaultBranch("fallback", fallbackHandler) .done() .then("summarize", (envelope) => `Handled via ${envelope.branch}`) .build();Parameters:
keyFn(TaskFn<TLast, Keys[number]> | (input: TLast) => Keys[number] | Promise<Keys[number]>) — Function that returns one of the declared keys.keys(readonly string[]) — All valid branch keys. Useas constfor type-level key constraint.
Returns: RouteBuilder<TInput, TLast, Keys>
RouteBuilder
Section titled “RouteBuilder”Builder for conditional branches. Created by calling .route() on a Flow.
.branch(key, fn) / .branch(key, id, fn)
Section titled “.branch(key, fn) / .branch(key, id, fn)”Add a named branch. The key parameter is type-constrained to the keys declared in .route(). The branch runs when the key function returns this key.
.branch("billing", handleBilling).branch("express", "express-handler", expressHandler)Parameters (overload 1):
key(Keys[number]) — The routing key that selects this branch (must be one of the declared keys)fn(TaskFn | Function) — Branch task function
Parameters (overload 2):
key(Keys[number]) — The routing key that selects this branch (must be one of the declared keys)id(string) — Custom task IDfn(Function) — Branch task function
Returns: RouteBuilder for chaining
.defaultBranch(fn) / .defaultBranch(id, fn)
Section titled “.defaultBranch(fn) / .defaultBranch(id, fn)”Set the fallback branch for unmatched keys.
.defaultBranch("fallback", (input) => ({ status: "unmatched" }))Returns: RouteBuilder for chaining
.done()
Section titled “.done()”Finish the branching and return to the Flow builder. Throws an error if any declared key is missing a .branch() call, or if any .branch() key was not in the declared keys array (orphan branch). The output type becomes BranchEnvelope<T> with branch (the matched key) and result (the branch output).
.route(keyFn, ["a", "b"] as const) .branch("a", handleA) .branch("b", handleB).done().then("next", (envelope) => { // envelope: { branch: "a", result: ... }})Returns: Flow<TInput, BranchEnvelope<TBranchOut>>
.build()
Section titled “.build()”Compile the workflow definition.
const workflow = flow<number>("example").then(myTask).build();console.log(workflow.workflowId); // "example"console.log(workflow.definitionHash); // "abc123..."Returns: Workflow<TInput, TLast>
Workflow
Section titled “Workflow”Compiled workflow with task registry. Produced by Flow.build().
Properties:
workflowId(string) — The workflow’s unique identifierdefinitionHash(string) — SHA256 hash of the workflow definition
Execution Functions
Section titled “Execution Functions”runWorkflow(workflow, input, opts?)
Section titled “runWorkflow(workflow, input, opts?)”Run a workflow to completion. Without options, runs entirely in-memory with no persistence (fastest path for prototyping). With opts, runs with full checkpointing — but still returns the output directly. If the workflow doesn’t complete (e.g., parks on a delay or signal), throws WorkflowError.
import { runWorkflow, PostgresBackend } from "sayiir";
// Prototype — no persistenceconst result = await runWorkflow(workflow, 42);
// Production — same function, just add optionsconst backend = PostgresBackend.connect(process.env.DATABASE_URL!);const result = await runWorkflow(workflow, 42, { instanceId: "run-001", backend,});
// Idempotent — safe to call multiple timesconst result = await runWorkflow(workflow, 42, { instanceId: "run-001", backend, conflictPolicy: "useExisting",});Parameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinput(TIn) — Input to the first taskopts(DurableRunOptions, optional) — Durable execution options
DurableRunOptions:
instanceId(string) — Unique execution instance IDbackend(Backend) — Persistence backendconflictPolicy(ConflictPolicy, optional) — What to do wheninstanceIdalready has a snapshot:"fail"(default),"useExisting", or"terminateExisting"
Returns: Promise<TOut> — The workflow result
Throws: WorkflowError if durable workflow did not complete. Use runDurableWorkflow() when you need the full WorkflowStatus object.
runWorkflowSync(workflow, input)
Section titled “runWorkflowSync(workflow, input)”Run a workflow synchronously. All tasks must return plain values, not Promises. For async tasks, use runWorkflow() instead.
import { runWorkflowSync } from "sayiir";
const result = runWorkflowSync(workflow, 42);Parameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinput(TIn) — Input to the first task
Returns: TOut — The workflow result
runDurableWorkflow(workflow, instanceId, input, backend, conflictPolicy?)
Section titled “runDurableWorkflow(workflow, instanceId, input, backend, conflictPolicy?)”Run a workflow with checkpointing and durability. Returns WorkflowStatus.
import { runDurableWorkflow, InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();const status = runDurableWorkflow(workflow, "run-001", 42, backend);
if (status.status === "completed") { console.log(status.output); // 42}
// Idempotent — returns existing status if already runconst status = runDurableWorkflow(workflow, "run-001", 42, backend, "useExisting");Parameters:
workflow(Workflow<TIn, TOut>) — The workflow to runinstanceId(string) — Unique identifier for this executioninput(TIn) — Input to the first taskbackend(Backend) — Persistence backendconflictPolicy(ConflictPolicy, optional) —"fail"(default),"useExisting", or"terminateExisting"
Returns: WorkflowStatus<TOut>
resumeWorkflow(workflow, instanceId, backend)
Section titled “resumeWorkflow(workflow, instanceId, backend)”Resume a durable workflow from a saved checkpoint.
import { resumeWorkflow } from "sayiir";
const status = resumeWorkflow(workflow, "run-001", backend);Parameters:
workflow(Workflow<TIn, TOut>) — The workflow definitioninstanceId(string) — The instance ID used when the workflow was startedbackend(Backend) — Persistence backend
Returns: WorkflowStatus<TOut>
cancelWorkflow(instanceId, backend, opts?)
Section titled “cancelWorkflow(instanceId, backend, opts?)”Request cancellation of a running durable workflow.
import { cancelWorkflow } from "sayiir";
cancelWorkflow("run-001", backend, { reason: "User requested cancellation", cancelledBy: "admin@example.com",});Parameters:
instanceId(string) — The instance ID of the workflow to cancelbackend(Backend) — Persistence backendopts.reason(string, optional) — Reason for cancellationopts.cancelledBy(string, optional) — Identifier of who requested cancellation
pauseWorkflow(instanceId, backend, opts?)
Section titled “pauseWorkflow(instanceId, backend, opts?)”Pause a running workflow at the next checkpoint.
import { pauseWorkflow } from "sayiir";
pauseWorkflow("run-001", backend, { reason: "Maintenance window", pausedBy: "ops@example.com",});Parameters:
instanceId(string) — The instance ID of the workflow to pausebackend(Backend) — Persistence backendopts.reason(string, optional) — Reason for pauseopts.pausedBy(string, optional) — Identifier of who requested pause
unpauseWorkflow(instanceId, backend)
Section titled “unpauseWorkflow(instanceId, backend)”Allow a paused workflow to resume.
import { unpauseWorkflow } from "sayiir";
unpauseWorkflow("run-001", backend);Parameters:
instanceId(string) — The instance ID of the workflow to unpausebackend(Backend) — Persistence backend
sendSignal(instanceId, signalName, payload, backend)
Section titled “sendSignal(instanceId, signalName, payload, backend)”Send an external signal to a workflow waiting on waitForSignal.
import { sendSignal } from "sayiir";
sendSignal( "run-001", "manager_approval", { approved: true, comments: "Looks good" }, backend,);Parameters:
instanceId(string) — The instance ID of the target workflowsignalName(string) — The signal name to sendpayload(unknown) — The payload data (will be serialized)backend(Backend) — Persistence backend
The payload is buffered in FIFO order per (instanceId, signalName). When the workflow resumes and reaches the matching waitForSignal node, it consumes the oldest buffered event.
Backends
Section titled “Backends”InMemoryBackend
Section titled “InMemoryBackend”In-memory storage for development and testing. State is lost when the process stops.
import { InMemoryBackend } from "sayiir";
const backend = new InMemoryBackend();Fast and requires no external dependencies. Ideal for unit tests and local development.
PostgresBackend
Section titled “PostgresBackend”PostgreSQL storage for production. State survives process restarts.
import { PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://user:pass@localhost:5432/sayiir");Parameters:
url(string) — PostgreSQL connection URL
Requirements:
- PostgreSQL 13 or higher
- Auto-migration: tables are created on first connection
- Recommended: dedicated database for Sayiir
WorkflowStatus<TOut>
Section titled “WorkflowStatus<TOut>”Discriminated union returned by durable execution functions. Use TypeScript narrowing to access variant-specific fields.
const status = runDurableWorkflow(workflow, "run-001", input, backend);
switch (status.status) { case "completed": console.log(status.output); // TOut break; case "failed": console.log(status.error); // string break; case "cancelled": console.log(status.reason); // string | undefined console.log(status.cancelledBy); // string | undefined break; case "paused": console.log(status.reason); // string | undefined console.log(status.pausedBy); // string | undefined break; case "waiting": console.log(status.wakeAt); // string (ISO timestamp) console.log(status.delayId); // string break; case "awaiting_signal": console.log(status.signalId); // string console.log(status.signalName); // string console.log(status.wakeAt); // string | undefined break; case "in_progress": break;}Variants:
| Status | Fields |
|---|---|
completed | output: TOut |
in_progress | — |
failed | error: string |
cancelled | reason?: string, cancelledBy?: string |
paused | reason?: string, pausedBy?: string |
waiting | wakeAt: string, delayId: string |
awaiting_signal | signalId: string, signalName: string, wakeAt?: string |
Duration
Section titled “Duration”A number (milliseconds) or a human-readable string parsed by ms:
type Duration = string | number;
// Examples:"30s" // 30 seconds"5m" // 5 minutes"1h" // 1 hour"2d" // 2 days1000 // 1000 millisecondsRetryPolicy
Section titled “RetryPolicy”interface RetryPolicy { maxAttempts: number; initialDelay: Duration; backoffMultiplier?: number; // default: 2.0 maxDelay?: Duration;}Delay Formula:
delay = initialDelay * (backoffMultiplier ** attempt)StepOptions
Section titled “StepOptions”Options for inline .then() steps:
interface StepOptions { timeout?: Duration; retries?: number; retry?: RetryPolicy;}Distributed Worker
Section titled “Distributed Worker”Worker
Section titled “Worker”Create a distributed workflow worker that polls the backend for available tasks.
import { Worker, PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect(process.env.DATABASE_URL!);const worker = new Worker("worker-1", backend, [workflow], { pollInterval: "2s", claimTtl: "10m",});Constructor Parameters:
workerId(string) — Unique identifier for this worker instancebackend(Backend) — Shared persistence backend (InMemoryBackendorPostgresBackend)workflows(Workflow[]) — Compiled workflows whose tasks this worker can executeopts(WorkerOptions, optional) — Configuration options
WorkerOptions:
pollInterval(Duration, optional) — How often to poll for tasks ("5s","1m",2000)claimTtl(Duration, optional) — Task claim time-to-live ("5m","10m",300000)
Properties:
workerId(string) — The worker’s unique identifierbackend(Backend) — The persistence backendworkflows(Workflow[]) — The registered workflowsoptions(WorkerOptions) — The configuration options
.start()
Section titled “.start()”Start the worker and return a handle for lifecycle control. Spawns a background thread that polls for tasks, claims them, and dispatches them to registered task functions.
const handle = worker.start();process.on("SIGTERM", () => handle.shutdown());Returns: WorkerHandle
WorkerHandle
Section titled “WorkerHandle”Handle for controlling a running worker. Obtained from Worker.start().
.shutdown()
Section titled “.shutdown()”Request a graceful shutdown. The worker finishes in-flight tasks before stopping.
handle.shutdown();WorkflowClient
Section titled “WorkflowClient”Client for submitting and controlling workflow instances. Unlike runDurableWorkflow(), the client does not execute tasks — it only creates initial snapshots and stores lifecycle signals. A Worker picks up and executes the work.
new WorkflowClient(backend, opts?)
Section titled “new WorkflowClient(backend, opts?)”Create a new workflow client.
import { WorkflowClient, PostgresBackend } from "sayiir";
const backend = PostgresBackend.connect("postgresql://localhost/sayiir");const client = new WorkflowClient(backend, { conflictPolicy: "useExisting" });WorkflowClientOptions:
conflictPolicy(ConflictPolicy, optional) — What to do when aninstanceIdalready exists:"fail"(default),"useExisting", or"terminateExisting"
.submit(workflow, instanceId, input)
Section titled “.submit(workflow, instanceId, input)”Submit a workflow for execution. Creates an initial snapshot so a Worker can pick it up.
const status = client.submit(workflow, "order-42", { items: [1, 2, 3] });Returns: WorkflowStatus<TOut>
.cancel(instanceId, opts?)
Section titled “.cancel(instanceId, opts?)”Request cancellation of a workflow instance.
client.cancel("order-42", { reason: "Out of stock", cancelledBy: "admin@example.com",});Parameters:
instanceId(string) — The workflow instance IDopts.reason(string, optional) — Reason for cancellationopts.cancelledBy(string, optional) — Identifier of who requested cancellation
.pause(instanceId, opts?)
Section titled “.pause(instanceId, opts?)”Pause a workflow instance at the next checkpoint.
client.pause("order-42", { reason: "Maintenance window" });Parameters:
instanceId(string) — The workflow instance IDopts.reason(string, optional) — Reason for pauseopts.pausedBy(string, optional) — Identifier of who requested pause
.unpause(instanceId)
Section titled “.unpause(instanceId)”Unpause a paused workflow instance.
client.unpause("order-42");.sendSignal(instanceId, signalName, payload)
Section titled “.sendSignal(instanceId, signalName, payload)”Send an external signal to a workflow instance.
client.sendSignal("order-42", "manager_approval", { approved: true });Parameters:
instanceId(string) — The workflow instance IDsignalName(string) — The signal namepayload(unknown) — The signal payload (serialized to JSON)
.status(instanceId)
Section titled “.status(instanceId)”Get the current status of a workflow instance.
const status = client.status<string>("order-42");if (status.status === "completed") { console.log(status.output);}Returns: WorkflowStatus<TOut>
.getTaskResult(instanceId, taskId)
Section titled “.getTaskResult(instanceId, taskId)”Get a single task result from a workflow instance.
Returns the JSON-encoded task output, or null if the task was never executed. For completed or failed workflows, the result is recovered from the backend’s history or cache.
const result = client.getTaskResult("order-42", "validate_order");if (result !== null) { const parsed = JSON.parse(result); console.log(parsed);}Parameters:
instanceId(string) — The workflow instance IDtaskId(string) — The task ID to retrieve the result for
Returns: string | null
Exceptions
Section titled “Exceptions”WorkflowError
Section titled “WorkflowError”Base error for all Sayiir workflow errors.
import { WorkflowError } from "sayiir";
try { await runWorkflow(workflow, input);} catch (e) { if (e instanceof WorkflowError) { console.error(`Workflow error: ${e.message}`); }}TaskError
Section titled “TaskError”Raised when a task fails during execution. Extends WorkflowError.
BackendError
Section titled “BackendError”Raised when a persistence backend operation fails. Extends WorkflowError.
Zod Integration
Section titled “Zod Integration”Sayiir accepts Zod schemas as optional peer dependencies for runtime validation. Any object with a .parse(data): T method works.
import { z } from "zod";import { task } from "sayiir";
const InputSchema = z.object({ id: z.string(), amount: z.number() });const OutputSchema = z.object({ status: z.string() });
const myTask = task("validate", (input) => { // input is validated and typed return { status: "ok" };}, { input: InputSchema, output: OutputSchema,});If Zod is not installed or schemas are not provided, validation is skipped.
Async Tasks
Section titled “Async Tasks”runWorkflow() supports both sync and async task functions. Tasks can use fetch, timers, file I/O, or any async operation:
const fetchData = task("fetch", async (url: string) => { const res = await fetch(url); return res.json();});
const result = await runWorkflow(workflow, "https://api.example.com/data");For sync-only workflows, runWorkflowSync() provides a faster synchronous path. It throws if any task returns a Promise.