This version is in beta. Some features may change before release.

Tasks plugin

Database-backed background task queue for running work outside the request cycle.

umbral-tasks runs background work outside the request/response cycle. It uses your application's existing database as the broker - no Redis, no RabbitMQ, no separate infrastructure. A fresh umbral project gets background jobs for the cost of one .plugin(TasksPlugin) line.

What you get from TasksPlugin::default()

Registering the plugin creates one migration: a task_row table that tracks every enqueued job - its name, JSON payload, status, attempt count, schedule time, and error message on failure. The plugin also contributes a tasks-worker management subcommand reachable via cargo run -- tasks-worker (your project's binary dispatches it through umbral_cli::dispatch(app)).

Code
rust
App::builder()
.plugin(TasksPlugin::default())
.build()?;

Declaring a task with #[task]

The recommended way to declare a task is with the #[task] attribute macro. Define a typed payload struct that implements serde::Deserialize, annotate the handler, and call the generated registration function at boot time.

Code
rust
use serde::{Deserialize, Serialize};
 
#[derive(Serialize, Deserialize)]
pub struct WelcomeEmailPayload {
pub user_id: i64,
pub locale: String,
}
 
#[umbral::task]
async fn send_welcome(payload: WelcomeEmailPayload) -> Result<(), String> {
// ... send the email
Ok(())
}

The macro emits two things:

  1. The original async fn send_welcome unchanged, so you can call it directly in tests or other code.
  2. A companion pub fn register_send_welcome() that calls umbral_tasks::register_handler("send_welcome", ...) with a generated wrapper that JSON-deserialises the payload and forwards to your function.

Call the companion from Plugin::on_ready or your main function before the worker starts:

Code
rust
// in Plugin::on_ready or main.rs, before App::build()
register_send_welcome();

Then enqueue work anywhere in your request handlers:

Code
rust
use umbral_tasks::{EnqueueOptions, enqueue};
 
let _task_id = enqueue(
"send_welcome",
WelcomeEmailPayload { user_id: 42, locale: "en".to_string() },
EnqueueOptions::default(),
).await?;

Overriding the task name

By default the task name equals the Rust function identifier. Override it with name = "..." when you need a dotted or namespaced key:

Code
rust
#[umbral::task(name = "email.send_welcome")]
async fn send_welcome(payload: WelcomeEmailPayload) -> Result<(), String> {
Ok(())
}
 
// The generated companion is still named after the Rust identifier:
register_send_welcome();
 
// But enqueue uses the custom name:
enqueue("email.send_welcome", payload, EnqueueOptions::default()).await?;

Constraints the macro enforces

The macro emits a compile_error! with a targeted message if any of these rules are violated:

RuleError
Must be async fn#[task] requires an 'async fn'
Exactly one parameterfound N parameter(s)
Return type must be Result<(), String>requires -> Result<(), String>

Dynamic registration (advanced)

register_handler is available directly when you need to register a closure dynamically - for example, when the handler name is determined at runtime or when you need to skip the macro for a generic handler.

Code
rust
use umbral_tasks::{EnqueueOptions, enqueue, register_handler};
use serde::{Deserialize, Serialize};
 
#[derive(Serialize, Deserialize)]
struct WelcomePayload { user_id: i64 }
 
// At startup - before the worker runs.
register_handler("send_welcome_email", |payload: &str| async move {
let p: WelcomePayload = serde_json::from_str(payload).map_err(|e| e.to_string())?;
send_email(p.user_id).await.map_err(|e| e.to_string())
});
 
// In a handler, after a user signs up.
let _task_id = enqueue(
"send_welcome_email",
WelcomePayload { user_id: 42 },
EnqueueOptions::default(),
).await?;

The #[task] macro expands to exactly this pattern. Use it directly only when the macro's single-payload constraint does not fit your case.

Running the worker

Code
bash
# Run continuously - polls every second, handles Ctrl-C gracefully.
cargo run -- tasks-worker
 
# Run exactly one claim/dispatch iteration and exit (useful for tests or cron).
cargo run -- tasks-worker --once

The worker claims one pending row at a time inside a transaction, dispatches the handler, and writes back succeeded or failed. Panicking handlers are caught via tokio::task::spawn so one bad job cannot take the worker down.

EnqueueOptions lets you set max_attempts (default 3), scheduled_for (default Utc::now()), and the scheduling/timeout knobs below.

Delayed and scheduled execution (eta / delay)

Every task carries a run_at instant: the earliest moment the worker is allowed to claim it. The dequeue query only claims rows whose run_at <= now(), so a task with a future run_at stays invisible until its time comes. By default run_at is set to enqueue time, so tasks run as soon as a worker is free.

Two EnqueueOptions fields push a task into the future:

  • delay: Option<Duration> - run after a relative delay (run_at = now + delay).
  • eta: Option<DateTime<Utc>> - run at an absolute instant. eta takes precedence over delay if both are set.
Code
rust
use std::time::Duration;
use chrono::Utc;
use umbral_tasks::{EnqueueOptions, enqueue};
 
// Run in ~60 seconds.
enqueue("send_welcome", payload.clone(), EnqueueOptions {
delay: Some(Duration::from_secs(60)),
..EnqueueOptions::default()
}).await?;
 
// Run at a specific time.
enqueue("send_welcome", payload, EnqueueOptions {
eta: Some(Utc::now() + chrono::Duration::hours(2)),
..EnqueueOptions::default()
}).await?;

Priority

Each task carries a priority (an i32). Higher number = claimed first. The worker orders the queue by priority descending before anything else, so a high-priority task jumps ahead of normal work even if it was enqueued later. The default is 0 (normal); a positive value runs sooner, a negative value drains behind the defaults.

Code
rust
use umbral_tasks::{EnqueueOptions, enqueue};
 
// Urgent - claimed before any priority-0 work waiting in the queue.
enqueue("send_password_reset", payload, EnqueueOptions {
priority: Some(9),
..EnqueueOptions::default()
}).await?;

FIFO within a priority. Ties on priority break by scheduled_for then id, so two tasks at the same priority are claimed in the order they were enqueued - priority changes which band runs first, never the fairness inside a band.

Nullable column, additive migration
`priority` is a nullable `Option` so that adding it to an existing `task_row` table is a clean `ADD COLUMN priority INTEGER` (the migration engine can't emit `ADD COLUMN ... NOT NULL DEFAULT` against populated tables). `enqueue` always writes `Some(0)` when you don't set a priority, so new rows are never NULL; a NULL is only ever a legacy row that predates the column, and it drains at the **lowest** priority. (One backend caveat: under `priority DESC`, SQLite sorts NULLs last as intended, while Postgres sorts NULLs first - since enqueue never writes NULL, this only affects rare pre-column rows until they drain.)

Exponential-backoff retries

When a handler fails retriably (returns Err, panics, or times out - anything other than a missing handler), the worker no longer re-queues it immediately. Instead it pushes run_at into the future by an exponential backoff:

Code
txt
run_at = now + min(retry_backoff_base * 2^(attempts - 1), retry_backoff_max)

So the first retry waits retry_backoff_base, the next twice that, and so on, capped at retry_backoff_max. Once attempts reaches max_attempts the task is abandoned (failed) rather than retried. A missing handler is non-retriable and fails on the first attempt regardless of max_attempts.

The backoff knobs are worker-level defaults on WorkerOptions:

Code
rust
use std::time::Duration;
use umbral_tasks::{run_worker, WorkerOptions};
 
run_worker(WorkerOptions {
retry_backoff_base: Duration::from_secs(2), // default
retry_backoff_max: Duration::from_secs(5 * 60), // default
..WorkerOptions::default()
}).await;

The orphan-reclaim path applies the same backoff: a reclaimed-but-not-exhausted task is pushed forward by run_at too, so a job that keeps crashing its worker doesn't get retried in a tight loop.

Info

Per-task backoff overrides (and a per-task timeout, see below) are accepted on EnqueueOptions as a forward-compatible surface but are not yet persisted to the row in v1 - the worker applies its worker-level defaults to every task. Persisting per-task overrides as columns is a tracked follow-up (planning/features.md #82).

Per-task timeout

The worker wraps each handler invocation in tokio::time::timeout. A handler that overruns WorkerOptions::task_timeout is cancelled and recorded as a retriable failure (backed off via run_at, or abandoned if max_attempts is exhausted) - it never holds its claim until the visibility timeout fires.

Code
rust
use std::time::Duration;
use umbral_tasks::{run_worker, WorkerOptions};
 
run_worker(WorkerOptions {
task_timeout: Some(Duration::from_secs(30)), // default: 5 minutes; None disables
..WorkerOptions::default()
}).await;

This is distinct from the visibility timeout below: task_timeout fails a running handler promptly, whereas the visibility timeout only reclaims a row after a crashed worker stops renewing its lease.

Visibility timeout and at-least-once delivery

When a worker claims a task it stamps started_at = now(). If the worker process crashes mid-handler that row stays in running forever - a silent data loss. The visibility timeout closes this gap.

On every poll the worker first calls reclaim_orphaned_tasks(visibility_timeout): any row whose status = 'running' and started_at < now - visibility_timeout is considered abandoned. The reclaim pass:

  • resets the row to pending (clearing started_at) so the next worker poll picks it up again, as long as attempts < max_attempts.
  • marks the row failed if attempts >= max_attempts - no infinite retry loop.

The default timeout is 5 minutes (DEFAULT_VISIBILITY_TIMEOUT). Override it when building the worker:

Code
rust
use std::time::Duration;
use umbral_tasks::{run_worker, WorkerOptions};
 
run_worker(WorkerOptions {
visibility_timeout: Duration::from_secs(30 * 60), // 30 minutes
..WorkerOptions::default()
}).await;
Info

Set visibility_timeout to comfortably exceed the longest handler your application runs. A timeout shorter than the handler runtime will reclaim the task while it is still executing, resulting in duplicate runs (at-least-once, not exactly-once). Handlers must therefore be idempotent when max_attempts > 1.

You can also call reclaim_orphaned_tasks directly - for example, in a scheduled job or a one-shot management command - with any timeout you choose:

Code
rust
use std::time::Duration;
use umbral_tasks::reclaim_orphaned_tasks;
 
let reclaimed = reclaim_orphaned_tasks(Duration::from_secs(60)).await?;
println!("{reclaimed} orphaned task(s) reclaimed");

Periodic tasks (beat)

For recurring work - a nightly cleanup, an hourly digest, a job every five minutes - register a periodic schedule and run the beat process. Beat is the periodic-task scheduler; it does not run handlers itself. Each time a schedule is due, beat enqueues a normal task row; the worker drains it. Run them as two separate processes: the worker that executes handlers, and beat that schedules them.

Register schedules on the plugin builder with .periodic(name, schedule, task, payload):

Code
rust
use std::time::Duration;
use umbral_tasks::{Schedule, TasksPlugin};
 
App::builder()
.plugin(
TasksPlugin::default()
// Cron: midnight every day.
.periodic(
"cleanup",
Schedule::cron("0 0 * * *"),
"cleanup_task",
serde_json::json!({ "older_than_days": 30 }),
)
// Fixed interval: every hour.
.periodic(
"heartbeat",
Schedule::every(Duration::from_secs(3600)),
"heartbeat_task",
serde_json::json!({}),
),
)
.build()?;

name is the schedule's stable key - one periodic_task row per name. Re-registering with the same name updates the existing row (its task, payload, and schedule) instead of duplicating it; next_run is only recomputed when the schedule string actually changes, so an unchanged redeploy never starves the task.

Schedule format

VariantConstructorMeaning
CronSchedule::cron("min hour dom mon dow")Standard 5-field cron ("0 0 * * *" = midnight daily, "*/5 * * * *" = every 5 minutes). A 6-field form with a leading seconds column is also accepted.
IntervalSchedule::every(Duration)Fire every fixed duration after the previous run.

The schedule persists as a single string column ("cron:0 0 * * *" / "every:3600"). A schedule that yields no further fire time disables the row (enabled = false) rather than spinning.

Running beat

Code
bash
# Run the scheduler continuously alongside the worker.
cargo run -- tasks-beat
 
# Sync schedules, run one tick, and exit (tests / cron-driven beats).
cargo run -- tasks-beat --once

On startup beat syncs the registered schedules to periodic_task rows, then each tick (default every 5s) enqueues every due schedule.

Info

Multi-instance safe. You can run more than one tasks-beat process without double-firing. Each due row is claimed with an optimistic conditional UPDATE - `SET next_run =

WHERE id = ? AND next_run = ` - and the task is enqueued only if that UPDATE won the race (affected exactly one row). A second beat instance reading the same row finds `next_run` already advanced, its UPDATE matches nothing, and it enqueues nothing. The same conditional-claim guard the worker uses.

Task results & status

A handler can return any serializable value, not just Ok(()). On success the worker serializes the return value to JSON and stores it in the task row's result column - the result backend payload. Query a task's outcome by the id enqueue returned:

Code
rust
use umbral_tasks::{enqueue, register_handler, task_status, await_result, TaskState, EnqueueOptions};
use std::time::Duration;
 
// A handler that returns a value (any `R: Serialize`):
register_handler("sum", |payload: &str| async move {
let nums: Vec<i64> = serde_json::from_str(payload).map_err(|e| e.to_string())?;
Ok::<i64, String>(nums.iter().sum())
});
 
let id = enqueue("sum", vec![1, 2, 3], EnqueueOptions::default()).await?;
 
// Non-blocking: snapshot the current state (None if the row was drained).
let status = task_status(id).await?.expect("task exists");
assert_eq!(status.state, TaskState::Pending); // not run yet
 
// Blocking with a timeout: poll until terminal.
let status = await_result(id, Duration::from_secs(5)).await?;
assert_eq!(status.state, TaskState::Success);
assert_eq!(status.result, Some(serde_json::json!(6)));

task_status returns a TaskStatus { id, name, state, result, error, attempts, max_attempts, run_at, started_at, completed_at, created_at }. state is a TaskState - Pending, Running, Success, Failed, or Retrying (a row that failed an attempt but still has retries left). await_result polls every 50ms until the state is terminal (Success/Failed) or timeout elapses, in which case it returns TaskError::Timeout carrying the last observed status.

Info

Unit-returning handlers stay source-compatible: () implements Serialize (it serializes to JSON null), so an existing Ok(()) handler compiles unchanged and its result reads back as Some(Value::Null). A failing handler leaves result empty and records the message in error.

See arch.md and plugins/umbral-tasks/src/lib.rs for the design rationale.

Admin visibility

Operators browse and inspect the task queue in the admin. umbral_tasks::admin_model() returns a read-only AdminModel for the task_row table - register it on the admin alongside your own models:

Code
rust
use umbral_admin::AdminPlugin;
 
App::builder()
.plugin(TasksPlugin::default())
.plugin(AdminPlugin::default().register(umbral_tasks::admin_model()))
.build()?;

The list shows the columns an operator wants at a glance - id, name, status, priority, attempts / max_attempts, run_at, completed_at, created_at - with status / priority filters and a name / status search box. Every column is read-only: rows are written by enqueue and the worker, never hand-authored.

Retrying a failed task

The admin model attaches a "Retry selected" bulk action. Select one or more failed rows and run it to re-queue them: each failed task is reset to pending with run_at = now (immediately eligible), its error cleared, and its attempts counter reset to 0 so it gets a fresh max_attempts budget. The action only touches rows currently in the failed state - a pending, running, or succeeded task is left untouched, so you can't disturb a task a worker is mid-flight on.

The same logic is available programmatically:

Code
rust
// Re-queue task #42 if (and only if) it is currently failed.
let requeued: bool = umbral_tasks::retry_task(42).await?;

The admin_model() function and the umbral-admin dependency are gated behind the admin feature (on by default). A tasks-only app that doesn't run the admin builds with default-features = false and never pulls the admin into its dependency graph:

Code
toml
umbral-tasks = { version = "0.0.1", default-features = false }

Combining with signals

Info

For fire-and-forget work that must survive the request process - like sending a welcome email after user signup - enqueue a task from your signal handler rather than running the work inline. That way the task persists to the database and the worker picks it up even if the web process restarts before the handler returns.

Code
rust
// in your signup signal handler
enqueue("email.send_welcome", WelcomeEmailPayload { user_id }, EnqueueOptions::default()).await?;

The signals docs page is at /docs/v0.0.1/plugins/signals and covers how to connect signals to tasks end-to-end.

What is NOT in v1

  • FOR UPDATE SKIP LOCKED. The worker uses a conditional UPDATE claim that is correct on both SQLite and Postgres (a concurrent worker whose UPDATE matches zero rows loses the race cleanly). SELECT … FOR UPDATE SKIP LOCKED is a future optimization that avoids the wasted candidate SELECT, not a correctness fix.
  • Per-task backoff / timeout persistence. EnqueueOptions accepts the fields but the worker applies worker-level defaults in v1 (see the callout above). #82.
  • Status enum. Status is stored as a plain string (pending, running, succeeded, failed) because the M3 derive doesn't support enum SqlType yet.

The spec outline lives in docs/specs/outlines/. Source: plugins/umbral-tasks/src/lib.rs.