Tasks plugin
Database-backed background task queue for running work outside the request cycle.
umbral-tasks runs background work outside the request/response cycle. It uses
your application's existing database as the broker - no Redis, no RabbitMQ, no
separate infrastructure. A fresh umbral project gets background jobs for the cost
of one .plugin(TasksPlugin) line.
What you get from TasksPlugin::default()
Registering the plugin creates one migration: a task_row table that tracks
every enqueued job - its name, JSON payload, status, attempt count, schedule
time, and error message on failure. The plugin also contributes a tasks-worker
management subcommand reachable via cargo run -- tasks-worker (your project's
binary dispatches it through umbral_cli::dispatch(app)).
App::builder() .plugin(TasksPlugin::default()) .build()?;Declaring a task with #[task]
The recommended way to declare a task is with the #[task] attribute macro.
Define a typed payload struct that implements serde::Deserialize, annotate the
handler, and call the generated registration function at boot time.
use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)]pub struct WelcomeEmailPayload { pub user_id: i64, pub locale: String,} #[umbral::task]async fn send_welcome(payload: WelcomeEmailPayload) -> Result<(), String> { // ... send the email Ok(())}The macro emits two things:
- The original
async fn send_welcomeunchanged, so you can call it directly in tests or other code. - A companion
pub fn register_send_welcome()that callsumbral_tasks::register_handler("send_welcome", ...)with a generated wrapper that JSON-deserialises the payload and forwards to your function.
Call the companion from Plugin::on_ready or your main function before the
worker starts:
// in Plugin::on_ready or main.rs, before App::build()register_send_welcome();Then enqueue work anywhere in your request handlers:
use umbral_tasks::{EnqueueOptions, enqueue}; let _task_id = enqueue( "send_welcome", WelcomeEmailPayload { user_id: 42, locale: "en".to_string() }, EnqueueOptions::default(),).await?;Overriding the task name
By default the task name equals the Rust function identifier. Override it with
name = "..." when you need a dotted or namespaced key:
#[umbral::task(name = "email.send_welcome")]async fn send_welcome(payload: WelcomeEmailPayload) -> Result<(), String> { Ok(())} // The generated companion is still named after the Rust identifier:register_send_welcome(); // But enqueue uses the custom name:enqueue("email.send_welcome", payload, EnqueueOptions::default()).await?;Constraints the macro enforces
The macro emits a compile_error! with a targeted message if any of these rules
are violated:
| Rule | Error |
|---|---|
Must be async fn | #[task] requires an 'async fn' |
| Exactly one parameter | found N parameter(s) |
Return type must be Result<(), String> | requires -> Result<(), String> |
Dynamic registration (advanced)
register_handler is available directly when you need to register a closure
dynamically - for example, when the handler name is determined at runtime or when
you need to skip the macro for a generic handler.
use umbral_tasks::{EnqueueOptions, enqueue, register_handler};use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)]struct WelcomePayload { user_id: i64 } // At startup - before the worker runs.register_handler("send_welcome_email", |payload: &str| async move { let p: WelcomePayload = serde_json::from_str(payload).map_err(|e| e.to_string())?; send_email(p.user_id).await.map_err(|e| e.to_string())}); // In a handler, after a user signs up.let _task_id = enqueue( "send_welcome_email", WelcomePayload { user_id: 42 }, EnqueueOptions::default(),).await?;The #[task] macro expands to exactly this pattern. Use it directly only when
the macro's single-payload constraint does not fit your case.
Running the worker
# Run continuously - polls every second, handles Ctrl-C gracefully.cargo run -- tasks-worker # Run exactly one claim/dispatch iteration and exit (useful for tests or cron).cargo run -- tasks-worker --onceThe worker claims one pending row at a time inside a transaction, dispatches the
handler, and writes back succeeded or failed. Panicking handlers are caught
via tokio::task::spawn so one bad job cannot take the worker down.
EnqueueOptions lets you set max_attempts (default 3), scheduled_for
(default Utc::now()), and the scheduling/timeout knobs below.
Delayed and scheduled execution (eta / delay)
Every task carries a run_at instant: the earliest moment the worker is
allowed to claim it. The dequeue query only claims rows whose run_at <= now(), so a task with a future run_at stays invisible until its time
comes. By default run_at is set to enqueue time, so tasks run as soon as
a worker is free.
Two EnqueueOptions fields push a task into the future:
delay: Option<Duration>- run after a relative delay (run_at = now + delay).eta: Option<DateTime<Utc>>- run at an absolute instant.etatakes precedence overdelayif both are set.
use std::time::Duration;use chrono::Utc;use umbral_tasks::{EnqueueOptions, enqueue}; // Run in ~60 seconds.enqueue("send_welcome", payload.clone(), EnqueueOptions { delay: Some(Duration::from_secs(60)), ..EnqueueOptions::default()}).await?; // Run at a specific time.enqueue("send_welcome", payload, EnqueueOptions { eta: Some(Utc::now() + chrono::Duration::hours(2)), ..EnqueueOptions::default()}).await?;Priority
Each task carries a priority (an i32). Higher number = claimed first.
The worker orders the queue by priority descending before anything else, so a
high-priority task jumps ahead of normal work even if it was enqueued later. The
default is 0 (normal); a positive value runs sooner, a negative value drains
behind the defaults.
use umbral_tasks::{EnqueueOptions, enqueue}; // Urgent - claimed before any priority-0 work waiting in the queue.enqueue("send_password_reset", payload, EnqueueOptions { priority: Some(9), ..EnqueueOptions::default()}).await?;FIFO within a priority. Ties on priority break by scheduled_for then
id, so two tasks at the same priority are claimed in the order they were
enqueued - priority changes which band runs first, never the fairness inside a
band.
Exponential-backoff retries
When a handler fails retriably (returns Err, panics, or times out - anything
other than a missing handler), the worker no longer re-queues it immediately.
Instead it pushes run_at into the future by an exponential backoff:
run_at = now + min(retry_backoff_base * 2^(attempts - 1), retry_backoff_max)So the first retry waits retry_backoff_base, the next twice that, and so on,
capped at retry_backoff_max. Once attempts reaches max_attempts the task is
abandoned (failed) rather than retried. A missing handler is non-retriable and
fails on the first attempt regardless of max_attempts.
The backoff knobs are worker-level defaults on WorkerOptions:
use std::time::Duration;use umbral_tasks::{run_worker, WorkerOptions}; run_worker(WorkerOptions { retry_backoff_base: Duration::from_secs(2), // default retry_backoff_max: Duration::from_secs(5 * 60), // default ..WorkerOptions::default()}).await;The orphan-reclaim path applies the same backoff: a reclaimed-but-not-exhausted
task is pushed forward by run_at too, so a job that keeps crashing its worker
doesn't get retried in a tight loop.
Per-task backoff overrides (and a per-task timeout, see below) are accepted on
EnqueueOptions as a forward-compatible surface but are not yet persisted to the
row in v1 - the worker applies its worker-level defaults to every task. Persisting
per-task overrides as columns is a tracked follow-up (planning/features.md #82).
Per-task timeout
The worker wraps each handler invocation in tokio::time::timeout. A handler
that overruns WorkerOptions::task_timeout is cancelled and recorded as a
retriable failure (backed off via run_at, or abandoned if max_attempts is
exhausted) - it never holds its claim until the visibility timeout fires.
use std::time::Duration;use umbral_tasks::{run_worker, WorkerOptions}; run_worker(WorkerOptions { task_timeout: Some(Duration::from_secs(30)), // default: 5 minutes; None disables ..WorkerOptions::default()}).await;This is distinct from the visibility timeout below: task_timeout fails a
running handler promptly, whereas the visibility timeout only reclaims a row
after a crashed worker stops renewing its lease.
Visibility timeout and at-least-once delivery
When a worker claims a task it stamps started_at = now(). If the worker process
crashes mid-handler that row stays in running forever - a silent data loss. The
visibility timeout closes this gap.
On every poll the worker first calls reclaim_orphaned_tasks(visibility_timeout):
any row whose status = 'running' and started_at < now - visibility_timeout is
considered abandoned. The reclaim pass:
- resets the row to
pending(clearingstarted_at) so the next worker poll picks it up again, as long asattempts < max_attempts. - marks the row
failedifattempts >= max_attempts- no infinite retry loop.
The default timeout is 5 minutes (DEFAULT_VISIBILITY_TIMEOUT). Override it
when building the worker:
use std::time::Duration;use umbral_tasks::{run_worker, WorkerOptions}; run_worker(WorkerOptions { visibility_timeout: Duration::from_secs(30 * 60), // 30 minutes ..WorkerOptions::default()}).await;Set visibility_timeout to comfortably exceed the longest handler your application
runs. A timeout shorter than the handler runtime will reclaim the task while it is
still executing, resulting in duplicate runs (at-least-once, not exactly-once).
Handlers must therefore be idempotent when max_attempts > 1.
You can also call reclaim_orphaned_tasks directly - for example, in a scheduled
job or a one-shot management command - with any timeout you choose:
use std::time::Duration;use umbral_tasks::reclaim_orphaned_tasks; let reclaimed = reclaim_orphaned_tasks(Duration::from_secs(60)).await?;println!("{reclaimed} orphaned task(s) reclaimed");Periodic tasks (beat)
For recurring work - a nightly cleanup, an hourly digest, a job every five minutes - register a periodic schedule and run the beat process. Beat is the periodic-task scheduler; it does not run handlers itself. Each time a schedule is due, beat enqueues a normal task row; the worker drains it. Run them as two separate processes: the worker that executes handlers, and beat that schedules them.
Register schedules on the plugin builder with .periodic(name, schedule, task, payload):
use std::time::Duration;use umbral_tasks::{Schedule, TasksPlugin}; App::builder() .plugin( TasksPlugin::default() // Cron: midnight every day. .periodic( "cleanup", Schedule::cron("0 0 * * *"), "cleanup_task", serde_json::json!({ "older_than_days": 30 }), ) // Fixed interval: every hour. .periodic( "heartbeat", Schedule::every(Duration::from_secs(3600)), "heartbeat_task", serde_json::json!({}), ), ) .build()?;name is the schedule's stable key - one periodic_task row per name.
Re-registering with the same name updates the existing row (its task, payload,
and schedule) instead of duplicating it; next_run is only recomputed when the
schedule string actually changes, so an unchanged redeploy never starves the task.
Schedule format
| Variant | Constructor | Meaning |
|---|---|---|
| Cron | Schedule::cron("min hour dom mon dow") | Standard 5-field cron ("0 0 * * *" = midnight daily, "*/5 * * * *" = every 5 minutes). A 6-field form with a leading seconds column is also accepted. |
| Interval | Schedule::every(Duration) | Fire every fixed duration after the previous run. |
The schedule persists as a single string column ("cron:0 0 * * *" /
"every:3600"). A schedule that yields no further fire time disables the row
(enabled = false) rather than spinning.
Running beat
# Run the scheduler continuously alongside the worker.cargo run -- tasks-beat # Sync schedules, run one tick, and exit (tests / cron-driven beats).cargo run -- tasks-beat --onceOn startup beat syncs the registered schedules to periodic_task rows, then each
tick (default every 5s) enqueues every due schedule.
Multi-instance safe. You can run more than one tasks-beat process without
double-firing. Each due row is claimed with an optimistic conditional UPDATE -
`SET next_run =
Task results & status
A handler can return any serializable value, not just Ok(()). On success the
worker serializes the return value to JSON and stores it in the task row's
result column - the result backend payload. Query a task's outcome by the
id enqueue returned:
use umbral_tasks::{enqueue, register_handler, task_status, await_result, TaskState, EnqueueOptions};use std::time::Duration; // A handler that returns a value (any `R: Serialize`):register_handler("sum", |payload: &str| async move { let nums: Vec<i64> = serde_json::from_str(payload).map_err(|e| e.to_string())?; Ok::<i64, String>(nums.iter().sum())}); let id = enqueue("sum", vec![1, 2, 3], EnqueueOptions::default()).await?; // Non-blocking: snapshot the current state (None if the row was drained).let status = task_status(id).await?.expect("task exists");assert_eq!(status.state, TaskState::Pending); // not run yet // Blocking with a timeout: poll until terminal.let status = await_result(id, Duration::from_secs(5)).await?;assert_eq!(status.state, TaskState::Success);assert_eq!(status.result, Some(serde_json::json!(6)));task_status returns a TaskStatus { id, name, state, result, error, attempts, max_attempts, run_at, started_at, completed_at, created_at }. state is a
TaskState - Pending, Running, Success, Failed, or Retrying (a row
that failed an attempt but still has retries left). await_result polls every
50ms until the state is terminal (Success/Failed) or timeout elapses, in
which case it returns TaskError::Timeout carrying the last observed status.
Unit-returning handlers stay source-compatible: () implements Serialize (it
serializes to JSON null), so an existing Ok(()) handler compiles unchanged
and its result reads back as Some(Value::Null). A failing handler leaves
result empty and records the message in error.
See arch.md and plugins/umbral-tasks/src/lib.rs for the design rationale.
Admin visibility
Operators browse and inspect the task queue in the admin. umbral_tasks::admin_model() returns a read-only AdminModel for the task_row table - register it on the admin alongside your own models:
use umbral_admin::AdminPlugin; App::builder() .plugin(TasksPlugin::default()) .plugin(AdminPlugin::default().register(umbral_tasks::admin_model())) .build()?;The list shows the columns an operator wants at a glance - id, name, status, priority, attempts / max_attempts, run_at, completed_at, created_at - with status / priority filters and a name / status search box. Every column is read-only: rows are written by enqueue and the worker, never hand-authored.
Retrying a failed task
The admin model attaches a "Retry selected" bulk action. Select one or more failed rows and run it to re-queue them: each failed task is reset to pending with run_at = now (immediately eligible), its error cleared, and its attempts counter reset to 0 so it gets a fresh max_attempts budget. The action only touches rows currently in the failed state - a pending, running, or succeeded task is left untouched, so you can't disturb a task a worker is mid-flight on.
The same logic is available programmatically:
// Re-queue task #42 if (and only if) it is currently failed.let requeued: bool = umbral_tasks::retry_task(42).await?;The admin_model() function and the umbral-admin dependency are gated behind the admin feature (on by default). A tasks-only app that doesn't run the admin builds with default-features = false and never pulls the admin into its dependency graph:
umbral-tasks = { version = "0.0.1", default-features = false }Combining with signals
For fire-and-forget work that must survive the request process - like sending a welcome email after user signup - enqueue a task from your signal handler rather than running the work inline. That way the task persists to the database and the worker picks it up even if the web process restarts before the handler returns.
// in your signup signal handlerenqueue("email.send_welcome", WelcomeEmailPayload { user_id }, EnqueueOptions::default()).await?;The signals docs page is at /docs/v0.0.1/plugins/signals and covers how to connect signals to tasks end-to-end.
What is NOT in v1
FOR UPDATE SKIP LOCKED. The worker uses a conditional UPDATE claim that is correct on both SQLite and Postgres (a concurrent worker whose UPDATE matches zero rows loses the race cleanly).SELECT … FOR UPDATE SKIP LOCKEDis a future optimization that avoids the wasted candidate SELECT, not a correctness fix.- Per-task backoff / timeout persistence.
EnqueueOptionsaccepts the fields but the worker applies worker-level defaults in v1 (see the callout above). #82. - Status enum. Status is stored as a plain string (
pending,running,succeeded,failed) because the M3 derive doesn't support enumSqlTypeyet.
The spec outline lives in docs/specs/outlines/. Source: plugins/umbral-tasks/src/lib.rs.