Skip to content

Commit

Permalink
provide graceful shutdown interface (#8)
Browse files Browse the repository at this point in the history
This introduces a mechanism for politely asking Underway to shutdown. To
do so, a new function, `graceful_shutdown` is provided. Calling this
function will send a notification to a Postgres channel. Workers listen
on this channel and when a message is received will stop processing new
tasks. If they're already processing a task, then they wait until that
task is done or the task timeout has elapsed, whichever is first.

In order to cleanly stop the queue, this function should be used. If
stopping in-progress tasks is safe for your use case, then this can be
ignored and the queue can be stopped without any delay.

Closes #5
  • Loading branch information
maxcountryman authored Oct 13, 2024
1 parent 47e30c0 commit 03e956b
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 14 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tokio = { version = "1.40.0", features = [
tracing = { version = "0.1.40", features = ["log"] }
ulid = { version = "1.1.3", features = ["uuid"] }
uuid = { version = "1.10.0", features = ["v4"] }
num_cpus = "1.16.0"

[dev-dependencies]
futures = "0.3.30"
22 changes: 22 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,28 @@ impl<T: Task> QueueBuilder<T, PoolSet> {
}
}

pub(crate) const SHUTDOWN_CHANNEL: &str = "underway_shutdown";

/// Initiates a graceful shutdown by sending a `NOTIFY` to the
/// `underway_shutdown` channel via the `pg_notify` function.
///
/// Workers listen on this channel and when a message is received will stop
/// processing further tasks and wait for in-progress tasks to finish or
/// timeout.
///
/// This can be useful when combined with [`tokio::signal`] to ensure queues are
/// stopped cleanly when stopping your application.
pub async fn graceful_shutdown<'a, E>(executor: E) -> Result
where
E: PgExecutor<'a>,
{
sqlx::query!("select pg_notify($1, $2)", SHUTDOWN_CHANNEL, "")
.execute(executor)
.await?;

Ok(())
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
Loading

0 comments on commit 03e956b

Please sign in to comment.