Skip to content

Commit

Permalink
copy updates
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcountryman committed Oct 10, 2024
1 parent 90030cc commit 9ea78c0
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "underway"
description = "Distributed task queue over Postgres"
description = "PostgreSQL-backed job queue for reliable background task processing"
version = "0.0.0"
edition = "2021"
license = "MIT"
Expand Down
15 changes: 10 additions & 5 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,20 @@
//! # }
//! ```
//!
//! ## Transactional enqueue
//!
//! Sometimes a job should only be enqueued when other conditions are met.
//!
//! For example, perhaps we're implementing user registration. We'd like to send
//! a welcome email upon completing the flow. However, if something goes wrong
//! and we need to reset the flow, we don't want to send an email.
//! For example, perhaps we're implementing user registration and we'd like to
//! send a welcome email upon completing the flow. However, if something goes
//! wrong and we need to reset the flow, we'd like to avoid sending such an
//! email.
//!
//! To accomodate use cases like this, we can also make use of
//! To accomodate use cases like this, we can make use of
//! [`Job::enqueue_using`], which allows us to specify a transaction. Should the
//! transaction be rolled back, then our job won't be enqueued.
//! transaction be rolled back, then our job won't be enqueued. (An ID will
//! still be returned by this method, so it's up to our application to recognize
//! when a failure has occurred and ignore any such IDs.)
//!
//! ```rust
//! # use sqlx::PgPool;
Expand Down
44 changes: 11 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
//!
//! Let's say we wanted to send welcome emails upon the successful registration
//! of a new user. If we're building a web application, we want to defer work
//! like this such that we aren't slowing down the response. We can use Underway
//! to create a background job for sending emails. For instance, this basic
//! example illustrates:
//! like this so we can return a response quickly back to the browser. We can
//! use Underway to create a background job for sending emails without blocking
//! the response:
//!
//! ```rust,no_run
//! use std::env;
Expand Down Expand Up @@ -100,7 +100,8 @@
//!
//! # Concepts
//!
//! Underway has been designed around several core concepts:
//! Underway has been designed around several core concepts, which build on one
//! another to deliver a robust background-job framework:
//!
//! - [Tasks](#tasks) represent a well-structure unit of work.
//! - [Jobs](#jobs) are a higher-level abstraction over the [`Task`] trait.
Expand All @@ -113,7 +114,7 @@
//! input.
//!
//! This is the lowest-level concept in our design, with everything else being
//! built around this idea.
//! built on top of or around this idea.
//!
//! See [`task`] for more details about tasks.
//!
Expand All @@ -123,7 +124,7 @@
//! and operating tasks.
//!
//! In most cases, applications will use jobs to define tasks instead of using
//! the task trait directly.
//! the `Task` trait directly.
//!
//! See [`job`] for more details about jobs.
//!
Expand All @@ -136,11 +137,8 @@
//!
//! ## Workers
//!
//! Workers are derived from tasks and use their queue to find new work to
//! execute.
//!
//! Besides this polling style of work execution, workers are also used to
//! manage cron-like schedules.
//! Workers are responsible for executing tasks. They poll the queue for new
//! tasks, and when found, try to invoke the task's execute routine.
//!
//! See [`worker`] for more details about workers.
Expand All @@ -163,12 +161,12 @@ mod scheduler;
pub mod task;
pub mod worker;

/// SQLx [`Migrator`] which provides `underway`'s schema migrations.
/// A SQLx [`Migrator`] which provides Underway's schema migrations.
///
/// These migrations must be applied before queues, tasks, and workers can be
/// run.
///
/// **Note: Changes are managed within a dedicated schema, called "underway".**
/// **Note**: Changes are managed within a dedicated schema, called "underway".
///
/// # Example
///
Expand All @@ -191,23 +189,3 @@ pub mod worker;
/// # });
/// # }
pub static MIGRATOR: Migrator = sqlx::migrate!();

/// Error enum which provides all `underway` error types.
#[derive(Debug, thiserror::Error)]
enum Error {
/// Job-related errors.
#[error(transparent)]
Job(#[from] job::Error),

/// Queue-relayed errors.
#[error(transparent)]
Queue(#[from] queue::Error),

/// Task-relayed errors.
#[error(transparent)]
Task(#[from] task::Error),

/// Worker-relayed errors.
#[error(transparent)]
Worker(#[from] worker::Error),
}
51 changes: 31 additions & 20 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
//! Queues provide an interface for managing task execution.
//! Queues provide an interface for managing task lifecycle.
//!
//! Tasks are enqueued onto the queue, using the [`Queue::enqueue`] method, and
//! later dequeued, using the [`Queue::dequeue`] method, when they're executed.
//!
//! The semantics for retrieving a task from the queue are defined by the order
//! of insertion (first-in, first-out) or the priority of the task defines. If a
//! priority is defined, then it's considered before the order the
//! task was inserted.
//! of insertion, first-in, first-out (FIFO), or the priority the task defines.
//! If a priority is defined, then it's considered before the order the task was
//! inserted.
//!
//! # Dead-letter queues
//!
//! When a dead-letter queue name is provided, a secondary queue is created with
//! this name. This is a queue of "dead letters". In other words, it's a queue
//! of tasks that have failed on the queue and can't be retried. This can be
//! useful for identifying patterns of failures or reprocessing failed tasks
//! when they're likely to succeed again.
//! of tasks that have failed and can't be retried.
//!
//! Dead-letter queues can be useful for identifying patterns of failures or
//! reprocessing failed tasks at later date.
//!
//! To enable dead-letter queues, simply provide its name:
//!
//! ```rust
//! # use tokio::runtime::Runtime;
Expand All @@ -39,6 +42,7 @@
//! # let pool = PgPool::connect("postgres://user:password@localhost/database").await?;
//! let queue = Queue::builder()
//! .name("example_queue")
//! // Enable the dead-letter queue.
//! .dead_letter_queue("example_dlq")
//! .pool(pool.clone())
//! .build()
Expand All @@ -58,10 +62,12 @@
//! For example, `Job` provides an [`enqueue`](crate::Job::enqueue) method,
//! which wraps its queue's enqueue method. Likewise, when a job spins up a
//! worker via its [`run`](crate::Job::run) method, that worker uses its queue's
//! dequeue method.
//! dequeue method and in both cases there's no need to use the queue methods
//! directly.
//!
//! With that said, a queue may be interfaced with directly and operated
//! manually if desired:
//!
//! With that said, a queue may be interfaced with directly. As an example, we
//! may enqueue and dequeue like so:
//! ```rust
//! # use tokio::runtime::Runtime;
//! # use underway::Task;
Expand Down Expand Up @@ -117,8 +123,9 @@
//!
//! Of course it's also possible to interface directly with the queue to achieve
//! the same if desired. Schedules can be set with the
//! [`schedule`](Queue::schedule) method. Once set, a worker can be used to run
//! the schedule via the [`run_scheduler`](crate::Job::run_scheduler) method:
//! [`Queue::schedule`](Queue::schedule) method. Once set, a scheduler can be
//! used to run the schedule via the [`Scheduler::run`](crate::Scheduler::run)
//! method:
//!
//! ```rust
//! # use tokio::runtime::Runtime;
Expand Down Expand Up @@ -147,7 +154,7 @@
//! .build()
//! .await?;
//!
//! // Set a quarter-hour schedule.
//! // Set a quarter-hour schedule; IANA timezones are mandatory.
//! let quarter_hour = "0 */15 * * * *[America/Los_Angeles]".parse()?;
//! queue.schedule(&pool, quarter_hour, ()).await?;
//!
Expand All @@ -160,6 +167,8 @@
//! // Run a scheduler based on our configured schedule.
//! scheduler.run().await?;
//!
//! // Don't forget that there's no workers running, so even if we schedule work, nothing will
//! // happen!
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! # });
//! # }
Expand All @@ -173,6 +182,9 @@
//! [`Queue::run_deletion_every`] should be called to start the deletion
//! routine.
//!
//! **Note**: Tasks will not be deleted from the queue if this routine is not
//! running!
//!
//! ```rust
//! # use tokio::runtime::Runtime;
//! # use underway::Task;
Expand Down Expand Up @@ -200,6 +212,7 @@
//! .build()
//! .await?;
//!
//! // Ensure we remove tasks that have an expired TTL.
//! queue.run_deletion().await?;
//!
//! # Ok::<(), Box<dyn std::error::Error>>(())
Expand Down Expand Up @@ -235,7 +248,7 @@ pub enum Error {
#[error(transparent)]
Json(#[from] serde_json::Error),

/// Error returned by the `cron` crate.
/// Error returned by the `jiff_cron` crate.
#[error(transparent)]
Cron(#[from] jiff_cron::error::Error),

Expand All @@ -257,9 +270,7 @@ pub enum Error {

/// Task queue.
///
/// Queues are responsible for managing tasks that implement the `Task` trait.
/// They are generic over task types, meaning that each queue contains only a
/// specific type of task, ensuring type safety and correctness at compile time.
/// Queues are responsible for managing task lifecycle.
#[derive(Debug)]
pub struct Queue<T: Task> {
name: String,
Expand Down Expand Up @@ -491,7 +502,7 @@ impl<T: Task> Queue<T> {
/// Runs deletion clean up of expired tasks in a loop, sleeping between
/// deletions for the specified period.
///
/// Note that tasks are only deleted when this routine or `run_deletion` is
/// **Note:** Tasks are only deleted when this routine or `run_deletion` is
/// running.
///
/// # Errors
Expand All @@ -510,8 +521,8 @@ impl<T: Task> Queue<T> {

/// Runs deletion clean up of expired tasks every hour.
///
/// Note that tasks are only deleted when this routine or
/// `run_deletion_every` is running.
/// **Note:** Tasks are only deleted when this routine or `run_deletion` is
/// running.
///
/// # Errors
///
Expand Down
Loading

0 comments on commit 9ea78c0

Please sign in to comment.