Skip to content

Commit

Permalink
stale dequeues consider retries (#60)
Browse files Browse the repository at this point in the history
This fixes an issue where dequeuing a stall retry would not consider if
the related task had available retries or not.

To address this we rework the dequeue query to check the retry record
directly as part of the condition for selecting a stale task.

We also provide an update over attempt rows when a task row is selected
to ensure that prior in-progress attempts are marked as failed.
  • Loading branch information
maxcountryman authored Nov 12, 2024
1 parent 9ed6ca0 commit eff6ea5
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 11 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 73 additions & 9 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,18 +644,26 @@ impl<T: Task> Queue<T> {
let in_progress_task = sqlx::query_as!(
InProgressTask,
r#"
update underway.task
set state = $3,
last_attempt_at = now(),
last_heartbeat_at = now()
where id = (
with selected_task as (
select id
from underway.task
where task_queue_name = $1
and (
state = $2
or (state = $3 and last_heartbeat_at < now() - heartbeat)
)
-- See if there are any available stalled tasks.
or (
state = $3
-- Has heartbeat stalled?
and last_heartbeat_at < now() - heartbeat
-- Are there remaining retries?
and (retry_policy).max_attempts > (
select count(*)
from underway.task_attempt
where task_id = id
and task_queue_name = $1
)
)
)
and created_at + delay <= now()
order by
priority desc,
Expand All @@ -664,8 +672,14 @@ impl<T: Task> Queue<T> {
limit 1
for update skip locked
)
update underway.task
set state = $3,
last_attempt_at = now(),
last_heartbeat_at = now()
from selected_task
where underway.task.id = selected_task.id
returning
id as "id: TaskId",
underway.task.id as "id: TaskId",
task_queue_name as "queue_name",
input,
timeout,
Expand All @@ -675,7 +689,7 @@ impl<T: Task> Queue<T> {
"#,
self.name,
TaskState::Pending as TaskState,
TaskState::InProgress as TaskState
TaskState::InProgress as TaskState,
)
.fetch_optional(&mut *tx)
.await?;
Expand All @@ -684,6 +698,26 @@ impl<T: Task> Queue<T> {
let task_id = in_progress_task.id;
tracing::Span::current().record("task.id", task_id.as_hyphenated().to_string());

// Update previous in-progress task attempts to mark them as failed.
//
// This ensures that if a stuck task was selected, we also update its attempt
// row.
sqlx::query!(
r#"
update underway.task_attempt
set state = $3
where task_id = $1
and task_queue_name = $2
and state = $4
"#,
task_id as TaskId,
self.name,
TaskState::Failed as TaskState,
TaskState::InProgress as TaskState
)
.execute(&mut *tx)
.await?;

// Insert a new task attempt row
sqlx::query!(
r#"
Expand Down Expand Up @@ -2688,6 +2722,21 @@ mod tests {
"No tasks should be dequeued since task is in-progress and not stale"
);

// Check attempt row is in-progress.
let attempt_rows = sqlx::query!(
r#"
select state as "state: TaskState"
from underway.task_attempt
where task_id = $1 and state = $2
"#,
task_id as TaskId,
TaskState::InProgress as TaskState
)
.fetch_all(&pool)
.await?;

assert_eq!(attempt_rows.len(), 1);

// Set a stale heartbeat.
sqlx::query!(
r#"
Expand All @@ -2705,6 +2754,21 @@ mod tests {
"A stale task should be dequeued"
);

// Check attempt row is failed.
let attempt_rows = sqlx::query!(
r#"
select state as "state: TaskState"
from underway.task_attempt
where task_id = $1 and state = $2
"#,
task_id as TaskId,
TaskState::Failed as TaskState
)
.fetch_all(&pool)
.await?;

assert_eq!(attempt_rows.len(), 1);

Ok(())
}
}

0 comments on commit eff6ea5

Please sign in to comment.