Skip to content

Tutorial: Background Jobs (Rust)

Learn how to build a production-ready background job system with Sayiir’s Rust runtime. We’ll create an email campaign pipeline that handles retries, timeouts, and scales to distributed workers.

An email campaign system that:

  • Fetches recipient lists from a database
  • Renders email templates with personalization
  • Sends emails via an external service (with retries)
  • Scales to distributed worker processes

This tutorial demonstrates Sayiir’s Rust API, which gives you maximum performance and type safety.

Create a new Rust project and add Sayiir dependencies:

Terminal window
cargo new email-pipeline
cd email-pipeline

Add to Cargo.toml:

[dependencies]
sayiir-runtime = "0.1"
sayiir-persistence = "0.1"
sayiir-postgres = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"

Create src/tasks.rs with your task definitions:

use sayiir_runtime::prelude::*;
use sayiir_core::error::BoxError;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Campaign {
pub id: String,
pub template: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Recipients {
pub emails: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RenderedEmail {
pub subject: String,
pub body: String,
pub recipients: Vec<String>,
}
/// Fetch recipient list for campaign
#[task(timeout = "10s")]
pub async fn fetch_recipients(campaign: Campaign) -> Result<Recipients, BoxError> {
// In production: query your database
// let rows = sqlx::query!("SELECT email FROM subscribers WHERE active = true")
// .fetch_all(&pool)
// .await?;
println!("Fetching recipients for campaign: {}", campaign.id);
Ok(Recipients {
emails: vec![
"alice@example.com".into(),
"bob@example.com".into(),
"carol@example.com".into(),
],
})
}
/// Render email template with campaign data
#[task(timeout = "5s")]
pub async fn render_template(input: (Campaign, Recipients)) -> Result<RenderedEmail, BoxError> {
let (campaign, recipients) = input;
// In production: use a template engine like Handlebars or Tera
let subject = format!("Campaign: {}", campaign.id);
let body = campaign.template.replace("{{campaign_id}}", &campaign.id);
println!("Rendered template for {} recipients", recipients.emails.len());
Ok(RenderedEmail {
subject,
body,
recipients: recipients.emails,
})
}
/// Send emails via external service with automatic retries
#[task(
timeout = "30s",
retries = 3,
backoff = "1s"
)]
pub async fn send_emails(email: RenderedEmail) -> Result<String, BoxError> {
// In production: integrate with SendGrid, Postmark, AWS SES, etc.
// let response = reqwest::Client::new()
// .post("https://api.sendgrid.com/v3/mail/send")
// .json(&payload)
// .send()
// .await?;
println!(
"Sending email '{}' to {} recipients",
email.subject,
email.recipients.len()
);
// Simulate occasional failure for retry demonstration
// if rand::random::<f32>() < 0.2 {
// return Err("Transient API error".into());
// }
Ok(format!("Sent to {} recipients", email.recipients.len()))
}

The #[task] macro configures:

  • timeout: Maximum execution time before task fails
  • retries: Number of retry attempts on failure
  • backoff: Initial delay between retries (grows exponentially)

Create src/main.rs to compose tasks into a workflow:

mod tasks;
use sayiir_runtime::prelude::*;
use sayiir_persistence::InMemoryBackend;
use tasks::{Campaign, fetch_recipients, render_template, send_emails};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Build workflow using the workflow! macro
let registry = TaskRegistry::new();
let workflow = workflow!(
"email-pipeline",
JsonCodec,
registry,
fetch_recipients => render_template => send_emails
)?;
// Create backend and runner
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
// Execute campaign
let campaign = Campaign {
id: "summer-sale-2026".into(),
template: "Check out our summer sale! Campaign: {{campaign_id}}".into(),
};
let status = runner
.run(
workflow.workflow(),
"campaign-001",
campaign
)
.await?;
match status {
WorkflowStatus::Completed(output) => {
println!("Campaign completed: {}", output);
}
WorkflowStatus::Failed(error) => {
eprintln!("Campaign failed: {}", error);
}
_ => {
println!("Campaign status: {:?}", status);
}
}
Ok(())
}

Run it:

Terminal window
cargo run

Output:

Fetching recipients for campaign: summer-sale-2026
Rendered template for 3 recipients
Sending email 'Campaign: summer-sale-2026' to 3 recipients
Campaign completed: Sent to 3 recipients

The CheckpointingRunner provides automatic crash recovery. Let’s see it in action:

use sayiir_persistence::InMemoryBackend;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
let workflow_id = "campaign-001";
let campaign = Campaign {
id: "summer-sale".into(),
template: "Sale alert: {{campaign_id}}".into(),
};
// First run - completes successfully
let status = runner.run(&workflow, workflow_id, campaign.clone()).await?;
println!("First run: {:?}", status);
// Second run with same ID - returns cached result immediately
let status = runner.run(&workflow, workflow_id, campaign).await?;
println!("Second run (cached): {:?}", status);
Ok(())
}

Key benefits:

  • Tasks are checkpointed after completion
  • Crashed workflows resume from last checkpoint
  • Completed workflows return cached results
  • No duplicate work or side effects

For production, scale to multiple worker processes pulling from a shared queue:

use sayiir_runtime::prelude::*;
use std::sync::Arc;
pub fn build_task_registry() -> TaskRegistry {
let mut registry = TaskRegistry::new();
registry.register("fetch_recipients", Arc::new(fetch_recipients));
registry.register("render_template", Arc::new(render_template));
registry.register("send_emails", Arc::new(send_emails));
registry
}
use sayiir_runtime::WorkflowRegistry;
pub fn build_workflow_registry() -> WorkflowRegistry {
let mut registry = WorkflowRegistry::new();
let task_registry = build_task_registry();
let workflow = workflow!(
"email-pipeline",
JsonCodec,
task_registry,
fetch_recipients => render_template => send_emails
).unwrap();
registry.register("email-pipeline", workflow);
registry
}
use sayiir_postgres::PostgresBackend;
use sayiir_runtime::PooledWorker;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Shared backend across all workers
let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
// Registry of available workflows
let workflow_registry = Arc::new(build_workflow_registry());
// Spawn worker pool (4 workers)
let workers: Vec<_> = (0..4)
.map(|id| {
let backend = backend.clone();
let registry = workflow_registry.clone();
tokio::spawn(async move {
let worker = PooledWorker::new(
format!("worker-{}", id),
backend,
registry
);
println!("Worker {} started", id);
worker.run().await
})
})
.collect();
// Wait for all workers
for worker in workers {
worker.await??;
}
Ok(())
}

From anywhere in your application:

use sayiir_persistence::PostgresBackend;
async fn submit_campaign(campaign_id: &str) -> anyhow::Result<()> {
let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
let campaign = Campaign {
id: campaign_id.into(),
template: "Newsletter: {{campaign_id}}".into(),
};
// Submit workflow - workers will pick it up automatically
let workflow_id = format!("campaign-{}", campaign_id);
backend.enqueue_workflow("email-pipeline", &workflow_id, campaign).await?;
println!("Campaign {} queued", workflow_id);
Ok(())
}

Workers automatically:

  • Poll the backend for pending workflows
  • Execute tasks with proper checkpointing
  • Retry failed tasks with exponential backoff
  • Handle crashes and resume interrupted workflows

Complete src/main.rs with all features:

mod tasks;
use sayiir_runtime::prelude::*;
use sayiir_postgres::PostgresBackend;
use std::sync::Arc;
use tasks::{Campaign, fetch_recipients, render_template, send_emails};
fn build_task_registry() -> TaskRegistry {
let mut registry = TaskRegistry::new();
registry.register("fetch_recipients", Arc::new(fetch_recipients));
registry.register("render_template", Arc::new(render_template));
registry.register("send_emails", Arc::new(send_emails));
registry
}
fn build_workflow_registry() -> WorkflowRegistry {
let mut registry = WorkflowRegistry::new();
let task_registry = build_task_registry();
let workflow = workflow!(
"email-pipeline",
JsonCodec,
task_registry,
fetch_recipients => render_template => send_emails
).unwrap();
registry.register("email-pipeline", workflow);
registry
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// For development: use InMemoryBackend
// let backend = InMemoryBackend::new();
// For production: use PostgresBackend
let backend = PostgresBackend::connect("postgresql://localhost/sayiir_jobs").await?;
let workflow_registry = Arc::new(build_workflow_registry());
// Launch worker pool
let workers: Vec<_> = (0..4)
.map(|id| {
let backend = backend.clone();
let registry = workflow_registry.clone();
tokio::spawn(async move {
let worker = PooledWorker::new(
format!("worker-{}", id),
backend,
registry
);
worker.run().await
})
})
.collect();
// Submit a test campaign
let campaign = Campaign {
id: "spring-2026".into(),
template: "Spring collection: {{campaign_id}}".into(),
};
backend.enqueue_workflow("email-pipeline", "campaign-spring", campaign).await?;
println!("Campaign queued - workers processing...");
// Wait for workers (in production, run workers as separate processes)
for worker in workers {
worker.await??;
}
Ok(())
}

You now have a production-ready background job system. Explore more:

Try extending the pipeline:

  • Add parallel branches for SMS and push notifications
  • Implement rate limiting with task delays
  • Add A/B testing by splitting recipients
  • Send delivery reports via signals