diff --git a/.sqlx/query-fa13a4875b569ce138438632ab68c60ec62e001cbe007b9a75776cf54e21f9cc.json b/.sqlx/query-24e7b0c00cda5634d04fe27cb833f12803108d0b4235d39fe372acd4b364d99d.json similarity index 82% rename from .sqlx/query-fa13a4875b569ce138438632ab68c60ec62e001cbe007b9a75776cf54e21f9cc.json rename to .sqlx/query-24e7b0c00cda5634d04fe27cb833f12803108d0b4235d39fe372acd4b364d99d.json index 2b8a66b..8a2f8ec 100644 --- a/.sqlx/query-fa13a4875b569ce138438632ab68c60ec62e001cbe007b9a75776cf54e21f9cc.json +++ b/.sqlx/query-24e7b0c00cda5634d04fe27cb833f12803108d0b4235d39fe372acd4b364d99d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select\n id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n retry_policy as \"retry_policy: RetryPolicy\",\n timeout,\n concurrency_key\n from underway.task\n where input->>'job_id' = $1\n and state = $2\n ", + "query": "\n select\n id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n retry_policy as \"retry_policy: RetryPolicy\",\n timeout,\n heartbeat,\n concurrency_key\n from underway.task\n where input->>'job_id' = $1\n and state = $2\n ", "describe": { "columns": [ { @@ -54,6 +54,11 @@ }, { "ordinal": 5, + "name": "heartbeat", + "type_info": "Interval" + }, + { + "ordinal": 6, "name": "concurrency_key", "type_info": "Text" } @@ -83,8 +88,9 @@ false, false, false, + false, true ] }, - "hash": "fa13a4875b569ce138438632ab68c60ec62e001cbe007b9a75776cf54e21f9cc" + "hash": "24e7b0c00cda5634d04fe27cb833f12803108d0b4235d39fe372acd4b364d99d" } diff --git a/.sqlx/query-9731fc6ba31c2cc1bd00b8e177c44ca85285a2606f9485a9f38a3585ce48b00b.json b/.sqlx/query-9731fc6ba31c2cc1bd00b8e177c44ca85285a2606f9485a9f38a3585ce48b00b.json new file mode 100644 index 0000000..e683a16 --- /dev/null +++ b/.sqlx/query-9731fc6ba31c2cc1bd00b8e177c44ca85285a2606f9485a9f38a3585ce48b00b.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n update underway.task\n set updated_at = now(),\n last_heartbeat_at = now()\n where id = $1\n and task_queue_name = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "9731fc6ba31c2cc1bd00b8e177c44ca85285a2606f9485a9f38a3585ce48b00b" +} diff --git a/.sqlx/query-705e4322b77ede1818f4cda7d50141b623908a7c980a09dbe8a3ad40c84fae7e.json b/.sqlx/query-d726027de5fc4369abc0bc76f0ce0ec74f97447850cebdebba1c0748059ce17e.json similarity index 72% rename from .sqlx/query-705e4322b77ede1818f4cda7d50141b623908a7c980a09dbe8a3ad40c84fae7e.json rename to .sqlx/query-d726027de5fc4369abc0bc76f0ce0ec74f97447850cebdebba1c0748059ce17e.json index d539d57..874cc6f 100644 --- a/.sqlx/query-705e4322b77ede1818f4cda7d50141b623908a7c980a09dbe8a3ad40c84fae7e.json +++ b/.sqlx/query-d726027de5fc4369abc0bc76f0ce0ec74f97447850cebdebba1c0748059ce17e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n update underway.task\n set state = $3, last_attempt_at = now()\n where id = (\n select id\n from underway.task\n where task_queue_name = $1\n and state = $2\n and created_at + delay <= now()\n order by priority desc, created_at, id\n limit 1\n for update skip locked\n )\n returning\n id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n timeout,\n retry_policy as \"retry_policy: RetryPolicy\",\n concurrency_key\n ", + "query": "\n update underway.task\n set state = $3, last_attempt_at = now()\n where id = (\n select id\n from underway.task\n where task_queue_name = $1\n and (\n state = $2\n or (state = $3 and last_heartbeat_at < now() - heartbeat)\n )\n and created_at + delay <= now()\n order by\n priority desc,\n created_at,\n id\n limit 1\n for update skip locked\n )\n returning\n id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n timeout,\n heartbeat,\n retry_policy as \"retry_policy: RetryPolicy\",\n concurrency_key\n ", "describe": { "columns": [ { @@ -25,6 +25,11 @@ }, { "ordinal": 4, + "name": "heartbeat", + "type_info": "Interval" + }, + { + "ordinal": 5, "name": "retry_policy: RetryPolicy", "type_info": { "Custom": { @@ -53,7 +58,7 @@ } }, { - "ordinal": 5, + "ordinal": 6, "name": "concurrency_key", "type_info": "Text" } @@ -97,8 +102,9 @@ false, false, false, + false, true ] }, - "hash": "705e4322b77ede1818f4cda7d50141b623908a7c980a09dbe8a3ad40c84fae7e" + "hash": "d726027de5fc4369abc0bc76f0ce0ec74f97447850cebdebba1c0748059ce17e" } diff --git a/migrations/20241110164319_3.sql b/migrations/20241110164319_3.sql new file mode 100644 index 0000000..f19f629 --- /dev/null +++ b/migrations/20241110164319_3.sql @@ -0,0 +1,8 @@ +-- Force anything running this migration to use the right search path. +set local search_path to underway; + +alter table underway.task +add column if not exists heartbeat interval not null default interval '30 seconds'; + +alter table underway.task +add column if not exists last_heartbeat_at timestamp with time zone; diff --git a/src/job.rs b/src/job.rs index 09ae464..6f97238 100644 --- a/src/job.rs +++ b/src/job.rs @@ -901,6 +901,7 @@ impl EnqueuedJob { input, retry_policy as "retry_policy: RetryPolicy", timeout, + heartbeat, concurrency_key from underway.task where input->>'job_id' = $1 @@ -2375,9 +2376,8 @@ mod tests { }; job.enqueue(&input).await?; - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2415,9 +2415,8 @@ mod tests { }; job.enqueue(&input).await?; - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2467,9 +2466,8 @@ mod tests { }; job.enqueue(&input).await?; - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2565,9 +2563,8 @@ mod tests { }; job.enqueue(&input).await?; - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2604,9 +2601,8 @@ mod tests { }; job.enqueue(&input).await?; - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2694,9 +2690,8 @@ mod tests { worker.process_next_task().await?; // Inspect the second task. - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2758,8 +2753,7 @@ mod tests { let enqueued_job = job.enqueue(&input).await?; // Dequeue the first task. - let mut conn = pool.acquire().await?; - let Some(dequeued_task) = queue.dequeue(&mut conn).await? else { + let Some(dequeued_task) = queue.dequeue().await? else { panic!("Task should exist"); }; @@ -2793,8 +2787,7 @@ mod tests { worker.process_next_task().await?; // Dequeue the second task. - let mut conn = pool.acquire().await?; - let Some(dequeued_task) = queue.dequeue(&mut conn).await? else { + let Some(dequeued_task) = queue.dequeue().await? else { panic!("Next task should exist"); }; @@ -2861,9 +2854,8 @@ mod tests { worker.process_next_task().await?; // Inspect the second task. - let mut conn = pool.acquire().await?; let pending_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be an enqueued task"); @@ -2917,8 +2909,7 @@ mod tests { let enqueued_job = job.enqueue(&input).await?; // Dequeue the first task. - let mut conn = pool.acquire().await?; - let Some(dequeued_task) = queue.dequeue(&mut conn).await? else { + let Some(dequeued_task) = queue.dequeue().await? else { panic!("Task should exist"); }; @@ -2961,8 +2952,7 @@ mod tests { worker.process_next_task().await?; // Dequeue the second task. - let mut conn = pool.acquire().await?; - let Some(dequeued_task) = queue.dequeue(&mut conn).await? else { + let Some(dequeued_task) = queue.dequeue().await? else { panic!("Next task should exist"); }; diff --git a/src/queue.rs b/src/queue.rs index 4f842e2..eabef4d 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -107,7 +107,7 @@ //! queue.enqueue(&pool, &task, &()).await?; //! //! let mut tx = pool.begin().await?; -//! if let Some(task) = queue.dequeue(&mut *tx).await? { +//! if let Some(task) = queue.dequeue().await? { //! // Process the task here //! } //! # Ok::<(), Box>(()) @@ -624,7 +624,7 @@ impl Queue { /// // Dequeue the enqueued task. /// let mut tx = pool.begin().await?; /// let pending_task = queue - /// .dequeue(&mut *tx) + /// .dequeue() /// .await? /// .expect("There should be a pending task."); /// # Ok::<(), Box>(()) @@ -632,11 +632,15 @@ impl Queue { /// # } /// ``` #[instrument( - skip(self, conn), + skip(self), fields(queue.name = self.name, task.id = tracing::field::Empty), err )] - pub async fn dequeue(&self, conn: &mut PgConnection) -> Result> { + pub async fn dequeue(&self) -> Result> { + // Transaction scoped to finding the next task and setting its state to + // "in-progress". + let mut tx = self.pool.begin().await?; + let in_progress_task = sqlx::query_as!( InProgressTask, r#" @@ -646,9 +650,15 @@ impl Queue { select id from underway.task where task_queue_name = $1 - and state = $2 + and ( + state = $2 + or (state = $3 and last_heartbeat_at < now() - heartbeat) + ) and created_at + delay <= now() - order by priority desc, created_at, id + order by + priority desc, + created_at, + id limit 1 for update skip locked ) @@ -657,6 +667,7 @@ impl Queue { task_queue_name as "queue_name", input, timeout, + heartbeat, retry_policy as "retry_policy: RetryPolicy", concurrency_key "#, @@ -664,7 +675,7 @@ impl Queue { TaskState::Pending as TaskState, TaskState::InProgress as TaskState ) - .fetch_optional(&mut *conn) + .fetch_optional(&mut *tx) .await?; if let Some(in_progress_task) = &in_progress_task { @@ -697,10 +708,12 @@ impl Queue { self.name, TaskState::InProgress as TaskState ) - .execute(&mut *conn) + .execute(&mut *tx) .await?; } + tx.commit().await?; + Ok(in_progress_task) } @@ -969,6 +982,7 @@ pub struct InProgressTask { pub(crate) queue_name: String, pub(crate) input: serde_json::Value, pub(crate) timeout: sqlx::postgres::types::PgInterval, + pub(crate) heartbeat: sqlx::postgres::types::PgInterval, pub(crate) retry_policy: RetryPolicy, pub(crate) concurrency_key: Option, } @@ -1260,6 +1274,27 @@ impl InProgressTask { .fetch_one(executor) .await?) } + + pub(crate) async fn record_heartbeat<'a, E>(&self, executor: E) -> Result + where + E: PgExecutor<'a>, + { + sqlx::query!( + r#" + update underway.task + set updated_at = now(), + last_heartbeat_at = now() + where id = $1 + and task_queue_name = $2 + "#, + self.id as TaskId, + self.queue_name + ) + .execute(executor) + .await?; + + Ok(()) + } } #[instrument(skip(executor), err)] @@ -1753,9 +1788,8 @@ mod tests { let task_id = queue.enqueue(&pool, &task, &input).await?; // Dequeue the task - let mut conn = pool.acquire().await?; let in_progress_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("There should be a task to dequeue"); @@ -1820,13 +1854,7 @@ mod tests { let handles: Vec<_> = (0..5) .map(|_| { let queue = queue.clone(); - let pool = pool.clone(); - tokio::spawn(async move { - let mut tx = pool.begin().await?; - let ret = queue.dequeue(&mut tx).await; - tx.commit().await?; - ret - }) + tokio::spawn(async move { queue.dequeue().await }) }) .collect(); @@ -1859,8 +1887,7 @@ mod tests { .await?; // Attempt to dequeue without enqueuing any tasks - let mut conn = pool.acquire().await?; - let dequeued_task = queue.dequeue(&mut conn).await?; + let dequeued_task = queue.dequeue().await?; assert!(dequeued_task.is_none()); @@ -1884,10 +1911,7 @@ mod tests { let mut tx = pool.begin().await?; // N.B. Task must be dequeued to ensure an attempt row is created. - queue - .dequeue(&mut tx) - .await? - .expect("A task should be dequeued"); + queue.dequeue().await?.expect("A task should be dequeued"); // Verify the task state let task_row = sqlx::query!( @@ -1939,10 +1963,7 @@ mod tests { let delay = 1.minute(); let mut conn = pool.acquire().await?; - let in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); in_progress_task.retry_after(&mut conn, delay).await?; @@ -1991,10 +2012,7 @@ mod tests { let task_id = queue.enqueue(&pool, &task, &input).await?; let mut conn = pool.acquire().await?; - let in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); // Cancel the task in_progress_task.mark_cancelled(&mut conn).await?; @@ -2031,10 +2049,7 @@ mod tests { let mut tx = pool.begin().await?; // N.B. Task must be dequeued to ensure attempt row is created. - let in_progress_task = queue - .dequeue(&mut tx) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); // Mark the task as succeeded in_progress_task.mark_succeeded(&mut tx).await?; @@ -2071,10 +2086,7 @@ mod tests { let mut tx = pool.begin().await?; // N.B. We can't mark a task failed if it hasn't been dequeued. - let in_progress_task = queue - .dequeue(&mut tx) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); // Mark the task as failed in_progress_task.mark_failed(&mut tx).await?; @@ -2115,10 +2127,7 @@ mod tests { let mut tx = pool.begin().await?; // N.B. Task must be dequeued to ensure attempt row is created. - let in_progress_task = queue - .dequeue(&mut tx) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); // Update task failure. in_progress_task.record_failure(&mut tx, error).await?; @@ -2344,10 +2353,7 @@ mod tests { // Dequeue the task to ensure the attempt row is created. let mut conn = pool.acquire().await?; - let in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); let attempt_rows = sqlx::query!( r#" @@ -2396,7 +2402,7 @@ mod tests { } assert!( - queue.dequeue(&mut conn).await?.is_none(), + queue.dequeue().await?.is_none(), "The task succeeded so nothing else should be queued" ); @@ -2415,10 +2421,7 @@ mod tests { // Dequeue the task to ensure the attempt row is created. let mut conn = pool.acquire().await?; - let in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); let attempt_rows = sqlx::query!( r#" @@ -2469,7 +2472,7 @@ mod tests { } assert!( - queue.dequeue(&mut conn).await?.is_none(), + queue.dequeue().await?.is_none(), "The task failed so nothing else should be queued" ); @@ -2488,10 +2491,7 @@ mod tests { // Dequeue the task to ensure the attempt row is created. let mut conn = pool.acquire().await?; - let in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); let attempt_rows = sqlx::query!( r#" @@ -2542,7 +2542,7 @@ mod tests { } assert!( - queue.dequeue(&mut conn).await?.is_none(), + queue.dequeue().await?.is_none(), "The task is cancelled so nothing else should be queued" ); @@ -2560,10 +2560,7 @@ mod tests { let task_id = queue.enqueue(&pool, &TestTask, &json!("{}")).await?; let mut conn = pool.acquire().await?; - let mut in_progress_task = queue - .dequeue(&mut conn) - .await? - .expect("A task should be dequeued"); + let mut in_progress_task = queue.dequeue().await?.expect("A task should be dequeued"); async fn fetch_and_verify_attempts( pool: &PgPool, @@ -2620,7 +2617,7 @@ mod tests { in_progress_task.retry_after(&mut conn, Span::new()).await?; in_progress_task = queue - .dequeue(&mut conn) + .dequeue() .await? .expect("Task should be dequeued again"); @@ -2635,7 +2632,7 @@ mod tests { // Simulate the task being rescheduled again. in_progress_task.retry_after(&mut conn, Span::new()).await?; - assert!(queue.dequeue(&mut conn).await?.is_some()); + assert!(queue.dequeue().await?.is_some()); // Third verification fetch_and_verify_attempts( diff --git a/src/task.rs b/src/task.rs index 394a8be..9e1e880 100644 --- a/src/task.rs +++ b/src/task.rs @@ -432,6 +432,47 @@ pub trait Task: Send + 'static { Span::new() } + /// Specifies interval on which the task's heartbeat timestamp should be + /// updated. + /// + /// This is used by workers to update the task row with the current + /// timestamp. Doing so makes it possible to detect tasks that may have + /// become stuck, for example because a worker crashed or otherwise + /// didn't exit cleanly. + /// + /// The default is 30 seconds. + /// + /// # Example + /// + /// ```rust + /// use jiff::{Span, ToSpan}; + /// use sqlx::{Postgres, Transaction}; + /// use underway::{task::Result as TaskResult, Task}; + /// + /// struct LivelyTask; + /// + /// impl Task for LivelyTask { + /// type Input = (); + /// type Output = (); + /// + /// async fn execute( + /// &self, + /// _tx: Transaction<'_, Postgres>, + /// _input: Self::Input, + /// ) -> TaskResult { + /// Ok(()) + /// } + /// + /// // Set the heartbeat interval to 1 second. + /// fn heartbeat(&self) -> Span { + /// 1.second() + /// } + /// } + /// ``` + fn heartbeat(&self) -> Span { + 30.seconds() + } + /// Provides an optional concurrency key for the task. /// /// Concurrency keys are used to limit how many tasks of a specific type are diff --git a/src/worker.rs b/src/worker.rs index 4f6140f..437d23c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -630,6 +630,7 @@ impl Worker { } Ok(None) => { // We tried to process a task but found none so we'll stop trying. + tracing::trace!("No task found"); break; } } @@ -709,28 +710,40 @@ impl Worker { err )] pub async fn process_next_task(&self) -> Result> { - let mut tx = self.queue.pool.begin().await?; - - let Some(in_progress_task) = self.queue.dequeue(&mut tx).await? else { + let Some(in_progress_task) = self.queue.dequeue().await? else { return Ok(None); }; let task_id = in_progress_task.id; tracing::Span::current().record("task.id", task_id.as_hyphenated().to_string()); - // Ensure that only one worker may process a task of a given concurrency key at - // a time. + // Transaction scoped to the task execution. + let mut tx = self.queue.pool.begin().await?; + if let Some(concurrency_key) = &in_progress_task.concurrency_key { + // Acquire an advisory lock on the concurrency key to ensure that only one + // worker can process tasks with the same concurrency key at a time. acquire_advisory_xact_lock(&mut *tx, concurrency_key).await?; + } else { + // Acquire an advisory lock on the task ID to prevent multiple workers from + // processing the same task simultaneously. This lock ensures that only one + // worker can process this specific task while the transaction is active. + acquire_advisory_xact_lock(&mut *tx, &task_id.to_string()).await?; } let input: T::Input = serde_json::from_value(in_progress_task.input.clone())?; + let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat) + .try_into() + .expect("Task heartbeat should be compatible with std::time"); + let timeout = pg_interval_to_span(&in_progress_task.timeout) .try_into() .expect("Task timeout should be compatible with std::time"); + // Execute savepoint, available directly to the task execute method. let execute_tx = tx.begin().await?; + tokio::select! { result = self.task.execute(execute_tx, input) => { match result { @@ -746,6 +759,14 @@ impl Worker { } } + // Ensure that in-progress task rows report liveness at the provided interval. + _ = tokio::time::sleep(heartbeat) => { + tracing::trace!("Sending task heartbeat"); + // N.B.: Heartbeats are recorded outside of the transaction to ensure visibility. + in_progress_task.record_heartbeat(&self.queue.pool).await? + }, + + // Handle timed-out task execution. _ = tokio::time::sleep(timeout) => { tracing::error!("Task execution timed out"); let retry_policy = &in_progress_task.retry_policy; @@ -925,15 +946,14 @@ mod tests { // Enqueue a task. let task = TestTask; queue.enqueue(&pool, &task, &()).await?; - let mut conn = pool.acquire().await?; - assert!(queue.dequeue(&mut conn).await?.is_some()); + assert!(queue.dequeue().await?.is_some()); // Process the task. let worker = Worker::new(queue.clone(), task); worker.process_next_task().await?; // Ensure the task is no longer available on the queue. - assert!(queue.dequeue(&mut conn).await?.is_none()); + assert!(queue.dequeue().await?.is_none()); Ok(()) }