Skip to content

Commit

Permalink
ensure heartbeat task is cancelled after execution
Browse files Browse the repository at this point in the history
This fixes an issue where the heartbeat spawned task would continue to
run indefinitely, even after execution had been completed. We address
this by incorporating the join handle as a select branch, ensuring that
the future is cancelled when either the execution finishes or it times
out.
  • Loading branch information
maxcountryman committed Nov 13, 2024
1 parent 016eae8 commit f5f60d7
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,23 +725,20 @@ impl<T: Task + Sync> Worker<T> {

let input: T::Input = serde_json::from_value(in_progress_task.input.clone())?;

let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat)
.try_into()
.expect("Task heartbeat should be compatible with std::time");
let mut heartbeat_interval = tokio::time::interval(heartbeat);

let timeout = pg_interval_to_span(&in_progress_task.timeout)
.try_into()
.expect("Task timeout should be compatible with std::time");

// Execute savepoint, available directly to the task execute method.
let execute_tx = tx.begin().await?;
let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat)
.try_into()
.expect("Task heartbeat should be compatible with std::time");

// Spawn a task to send heartbeats alongside task processing.
tokio::spawn({
let heartbeat_task = tokio::spawn({
let pool = self.queue.pool.clone();
let in_progress_task = in_progress_task.clone();
async move {
let mut heartbeat_interval = tokio::time::interval(heartbeat);
heartbeat_interval.tick().await;
loop {
tracing::trace!("Recording task heartbeat");
Expand All @@ -757,6 +754,9 @@ impl<T: Task + Sync> Worker<T> {
}
});

// Execute savepoint, available directly to the task execute method.
let execute_tx = tx.begin().await?;

tokio::select! {
result = self.task.execute(execute_tx, input) => {
match result {
Expand All @@ -778,6 +778,12 @@ impl<T: Task + Sync> Worker<T> {
let retry_policy = &in_progress_task.retry_policy;
self.handle_task_timeout(&mut tx, &in_progress_task, retry_policy, timeout).await?;
}

// Select the heartbeat task so that it'll be cancelled after execution or timeout.
_ = heartbeat_task => {
tracing::error!("Heartbeat task failed unexpectedly (this is a critical error!)");
}

}

tx.commit().await?;
Expand Down

0 comments on commit f5f60d7

Please sign in to comment.