Tutorial: Background Jobs (Rust)
Learn how to build a production-ready background job system with Sayiir’s Rust runtime. We’ll create an email campaign pipeline that handles retries, timeouts, and scales to distributed workers.
What We’re Building
Section titled “What We’re Building”An email campaign system that:
- Fetches recipient lists from a database
- Renders email templates with personalization
- Sends emails via an external service (with retries)
- Scales to distributed worker processes
This tutorial demonstrates Sayiir’s Rust API, which gives you maximum performance and type safety.
Cargo Setup
Section titled “Cargo Setup”Create a new Rust project and add Sayiir dependencies:
cargo new email-pipelinecd email-pipelineAdd to Cargo.toml:
[dependencies]sayiir-runtime = "0.1"sayiir-persistence = "0.1"sayiir-postgres = "0.1"tokio = { version = "1", features = ["full"] }serde = { version = "1", features = ["derive"] }serde_json = "1"anyhow = "1"Define Tasks with #[task]
Section titled “Define Tasks with #[task]”Create src/tasks.rs with your task definitions:
use sayiir_runtime::prelude::*;use sayiir_core::error::BoxError;use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct Campaign { pub id: String, pub template: String,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct Recipients { pub emails: Vec<String>,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct RenderedEmail { pub subject: String, pub body: String, pub recipients: Vec<String>,}
/// Fetch recipient list for campaign#[task(timeout = "10s")]pub async fn fetch_recipients(campaign: Campaign) -> Result<Recipients, BoxError> { // In production: query your database // let rows = sqlx::query!("SELECT email FROM subscribers WHERE active = true") // .fetch_all(&pool) // .await?;
println!("Fetching recipients for campaign: {}", campaign.id);
Ok(Recipients { emails: vec![ "alice@example.com".into(), "bob@example.com".into(), "carol@example.com".into(), ], })}
/// Render email template with campaign data#[task(timeout = "5s")]pub async fn render_template(input: (Campaign, Recipients)) -> Result<RenderedEmail, BoxError> { let (campaign, recipients) = input;
// In production: use a template engine like Handlebars or Tera let subject = format!("Campaign: {}", campaign.id); let body = campaign.template.replace("{{campaign_id}}", &campaign.id);
println!("Rendered template for {} recipients", recipients.emails.len());
Ok(RenderedEmail { subject, body, recipients: recipients.emails, })}
/// Send emails via external service with automatic retries#[task( timeout = "30s", retries = 3, backoff = "1s")]pub async fn send_emails(email: RenderedEmail) -> Result<String, BoxError> { // In production: integrate with SendGrid, Postmark, AWS SES, etc. // let response = reqwest::Client::new() // .post("https://api.sendgrid.com/v3/mail/send") // .json(&payload) // .send() // .await?;
println!( "Sending email '{}' to {} recipients", email.subject, email.recipients.len() );
// Simulate occasional failure for retry demonstration // if rand::random::<f32>() < 0.2 { // return Err("Transient API error".into()); // }
Ok(format!("Sent to {} recipients", email.recipients.len()))}The #[task] macro configures:
- timeout: Maximum execution time before task fails
- retries: Number of retry attempts on failure
- backoff: Initial delay between retries (grows exponentially)
Compose with workflow!
Section titled “Compose with workflow!”Create src/main.rs to compose tasks into a workflow:
mod tasks;
use sayiir_runtime::prelude::*;use sayiir_persistence::InMemoryBackend;use tasks::{Campaign, fetch_recipients, render_template, send_emails};
#[tokio::main]async fn main() -> anyhow::Result<()> { // Build workflow using the workflow! macro let registry = TaskRegistry::new();
let workflow = workflow!( "email-pipeline", JsonCodec, registry, fetch_recipients => render_template => send_emails )?;
// Create backend and runner let backend = InMemoryBackend::new(); let runner = CheckpointingRunner::new(backend);
// Execute campaign let campaign = Campaign { id: "summer-sale-2026".into(), template: "Check out our summer sale! Campaign: {{campaign_id}}".into(), };
let status = runner .run( workflow.workflow(), "campaign-001", campaign ) .await?;
match status { WorkflowStatus::Completed(output) => { println!("Campaign completed: {}", output); } WorkflowStatus::Failed(error) => { eprintln!("Campaign failed: {}", error); } _ => { println!("Campaign status: {:?}", status); } }
Ok(())}Run it:
cargo runOutput:
Fetching recipients for campaign: summer-sale-2026Rendered template for 3 recipientsSending email 'Campaign: summer-sale-2026' to 3 recipientsCampaign completed: Sent to 3 recipientsRun with CheckpointingRunner
Section titled “Run with CheckpointingRunner”The CheckpointingRunner provides automatic crash recovery. Let’s see it in action:
use sayiir_persistence::InMemoryBackend;
#[tokio::main]async fn main() -> anyhow::Result<()> { let backend = InMemoryBackend::new(); let runner = CheckpointingRunner::new(backend);
let workflow_id = "campaign-001"; let campaign = Campaign { id: "summer-sale".into(), template: "Sale alert: {{campaign_id}}".into(), };
// First run - completes successfully let status = runner.run(&workflow, workflow_id, campaign.clone()).await?; println!("First run: {:?}", status);
// Second run with same ID - returns cached result immediately let status = runner.run(&workflow, workflow_id, campaign).await?; println!("Second run (cached): {:?}", status);
Ok(())}Key benefits:
- Tasks are checkpointed after completion
- Crashed workflows resume from last checkpoint
- Completed workflows return cached results
- No duplicate work or side effects
Scale with Distributed Workers
Section titled “Scale with Distributed Workers”For production, scale to multiple worker processes pulling from a shared queue:
Step 1: Create Task Registry
Section titled “Step 1: Create Task Registry”use sayiir_runtime::prelude::*;use std::sync::Arc;
pub fn build_task_registry() -> TaskRegistry { let mut registry = TaskRegistry::new();
registry.register("fetch_recipients", Arc::new(fetch_recipients)); registry.register("render_template", Arc::new(render_template)); registry.register("send_emails", Arc::new(send_emails));
registry}Step 2: Register Workflow
Section titled “Step 2: Register Workflow”use sayiir_runtime::WorkflowRegistry;
pub fn build_workflow_registry() -> WorkflowRegistry { let mut registry = WorkflowRegistry::new();
let task_registry = build_task_registry(); let workflow = workflow!( "email-pipeline", JsonCodec, task_registry, fetch_recipients => render_template => send_emails ).unwrap();
registry.register("email-pipeline", workflow); registry}Step 3: Launch Worker Pool
Section titled “Step 3: Launch Worker Pool”use sayiir_postgres::PostgresBackend;use sayiir_runtime::PooledWorker;
#[tokio::main]async fn main() -> anyhow::Result<()> { // Shared backend across all workers let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
// Registry of available workflows let workflow_registry = Arc::new(build_workflow_registry());
// Spawn worker pool (4 workers) let workers: Vec<_> = (0..4) .map(|id| { let backend = backend.clone(); let registry = workflow_registry.clone();
tokio::spawn(async move { let worker = PooledWorker::new( format!("worker-{}", id), backend, registry );
println!("Worker {} started", id); worker.run().await }) }) .collect();
// Wait for all workers for worker in workers { worker.await??; }
Ok(())}Step 4: Submit Jobs
Section titled “Step 4: Submit Jobs”From anywhere in your application:
use sayiir_persistence::PostgresBackend;
async fn submit_campaign(campaign_id: &str) -> anyhow::Result<()> { let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
let campaign = Campaign { id: campaign_id.into(), template: "Newsletter: {{campaign_id}}".into(), };
// Submit workflow - workers will pick it up automatically let workflow_id = format!("campaign-{}", campaign_id); backend.enqueue_workflow("email-pipeline", &workflow_id, campaign).await?;
println!("Campaign {} queued", workflow_id); Ok(())}Workers automatically:
- Poll the backend for pending workflows
- Execute tasks with proper checkpointing
- Retry failed tasks with exponential backoff
- Handle crashes and resume interrupted workflows
Full Code Listing
Section titled “Full Code Listing”Complete src/main.rs with all features:
mod tasks;
use sayiir_runtime::prelude::*;use sayiir_postgres::PostgresBackend;use std::sync::Arc;use tasks::{Campaign, fetch_recipients, render_template, send_emails};
fn build_task_registry() -> TaskRegistry { let mut registry = TaskRegistry::new(); registry.register("fetch_recipients", Arc::new(fetch_recipients)); registry.register("render_template", Arc::new(render_template)); registry.register("send_emails", Arc::new(send_emails)); registry}
fn build_workflow_registry() -> WorkflowRegistry { let mut registry = WorkflowRegistry::new(); let task_registry = build_task_registry();
let workflow = workflow!( "email-pipeline", JsonCodec, task_registry, fetch_recipients => render_template => send_emails ).unwrap();
registry.register("email-pipeline", workflow); registry}
#[tokio::main]async fn main() -> anyhow::Result<()> { // For development: use InMemoryBackend // let backend = InMemoryBackend::new();
// For production: use PostgresBackend let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
let workflow_registry = Arc::new(build_workflow_registry());
// Launch worker pool let workers: Vec<_> = (0..4) .map(|id| { let backend = backend.clone(); let registry = workflow_registry.clone();
tokio::spawn(async move { let worker = PooledWorker::new( format!("worker-{}", id), backend, registry ); worker.run().await }) }) .collect();
// Submit a test campaign let campaign = Campaign { id: "spring-2026".into(), template: "Spring collection: {{campaign_id}}".into(), };
backend.enqueue_workflow("email-pipeline", "campaign-spring", campaign).await?;
println!("Campaign queued - workers processing...");
// Wait for workers (in production, run workers as separate processes) for worker in workers { worker.await??; }
Ok(())}Next Steps
Section titled “Next Steps”You now have a production-ready background job system. Explore more:
- Distributed Workers Guide — Best practices for worker pools
- Postgres Production Guide — Connection pooling, migrations, monitoring
- Error Handling — Custom retry policies and dead-letter queues
- Monitoring — Track job metrics and debug failures
Try extending the pipeline:
- Add parallel branches for SMS and push notifications
- Implement rate limiting with task delays
- Add A/B testing by splitting recipients
- Send delivery reports via signals