Skip to content

Commit

Permalink
refactor dequeue transaction handling
Browse files Browse the repository at this point in the history
This refactor encapsulates pending task dequeue operations within their
own transactions, updating the task row state to prevent duplicate
processing by other callers. By doing so, we ensure that state changes
are immediately visible, accurately reflecting task ownership throughout
its processing lifecycle.

Additionally, tasks now include a configurable heartbeat interval and a
record of the last heartbeat. Workers periodically update the task row
to indicate ongoing liveness. Should a task’s heartbeat become stale,
the dequeue method can select it for reassignment.

It's important to note that a missed heartbeat alone does not
definitively indicate task abandonment, as a worker might resume
processing after a temporary delay. To guard against this type of
partial failure, workers also acquire a transaction-level advisory
lock on the task ID. As long as a worker's transaction remains active,
this lock prevents other workers from processing the same task,
ensuring exclusive ownership and consistent processing even across
intermittent failures.

A notable benefit of these changes is that task progress states are fully
utilized and in-progress tasks are visible globally. Furthermore,
transaction overhead is reduced as a dequeue's transaction is only held
for the duration of obtaining an available task. That said, a second
transaction is still maintained for the duration of execution so
long-running tasks still benefit from decomposition into e.g. multiple
job steps.
  • Loading branch information
maxcountryman committed Nov 10, 2024
1 parent eff837e commit 45b571f
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 99 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions migrations/20241110164319_3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Force anything running this migration to use the right search path.
set local search_path to underway;

alter table underway.task
add column if not exists heartbeat interval not null default interval '30 seconds';

alter table underway.task
add column if not exists last_heartbeat_at timestamp with time zone;
34 changes: 12 additions & 22 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ impl<T: Task> EnqueuedJob<T> {
input,
retry_policy as "retry_policy: RetryPolicy",
timeout,
heartbeat,
concurrency_key
from underway.task
where input->>'job_id' = $1
Expand Down Expand Up @@ -2375,9 +2376,8 @@ mod tests {
};
job.enqueue(&input).await?;

let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2415,9 +2415,8 @@ mod tests {
};
job.enqueue(&input).await?;

let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2467,9 +2466,8 @@ mod tests {
};
job.enqueue(&input).await?;

let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2565,9 +2563,8 @@ mod tests {
};
job.enqueue(&input).await?;

let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2604,9 +2601,8 @@ mod tests {
};
job.enqueue(&input).await?;

let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2694,9 +2690,8 @@ mod tests {
worker.process_next_task().await?;

// Inspect the second task.
let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2758,8 +2753,7 @@ mod tests {
let enqueued_job = job.enqueue(&input).await?;

// Dequeue the first task.
let mut conn = pool.acquire().await?;
let Some(dequeued_task) = queue.dequeue(&mut conn).await? else {
let Some(dequeued_task) = queue.dequeue().await? else {
panic!("Task should exist");
};

Expand Down Expand Up @@ -2793,8 +2787,7 @@ mod tests {
worker.process_next_task().await?;

// Dequeue the second task.
let mut conn = pool.acquire().await?;
let Some(dequeued_task) = queue.dequeue(&mut conn).await? else {
let Some(dequeued_task) = queue.dequeue().await? else {
panic!("Next task should exist");
};

Expand Down Expand Up @@ -2861,9 +2854,8 @@ mod tests {
worker.process_next_task().await?;

// Inspect the second task.
let mut conn = pool.acquire().await?;
let pending_task = queue
.dequeue(&mut conn)
.dequeue()
.await?
.expect("There should be an enqueued task");

Expand Down Expand Up @@ -2917,8 +2909,7 @@ mod tests {
let enqueued_job = job.enqueue(&input).await?;

// Dequeue the first task.
let mut conn = pool.acquire().await?;
let Some(dequeued_task) = queue.dequeue(&mut conn).await? else {
let Some(dequeued_task) = queue.dequeue().await? else {
panic!("Task should exist");
};

Expand Down Expand Up @@ -2961,8 +2952,7 @@ mod tests {
worker.process_next_task().await?;

// Dequeue the second task.
let mut conn = pool.acquire().await?;
let Some(dequeued_task) = queue.dequeue(&mut conn).await? else {
let Some(dequeued_task) = queue.dequeue().await? else {
panic!("Next task should exist");
};

Expand Down
Loading

0 comments on commit 45b571f

Please sign in to comment.