Skip to content

Commit

Permalink
record heartbeat in a separate task (#59)
Browse files Browse the repository at this point in the history
This fixes an issue where we were:

1. Not using an interval and therefore only waiting one time to send a
   heartbeat,
2. Cancelling the other select futures as a side effect of incorporating
   the heartbeat.

So to address these issues, we use a separate task entirely to record
heartbeats.
  • Loading branch information
maxcountryman authored Nov 11, 2024
1 parent 18b65cc commit 9ed6ca0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ impl<T: Task> Queue<T> {
/// manage state transitions as they process the task. In fact, all valid state
/// transitions are encapsulated by it and this is the only interface through
/// which task state should be altered.
#[derive(Debug, sqlx::FromRow)]
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct InProgressTask {
pub(crate) id: TaskId,
pub(crate) queue_name: String,
Expand Down
28 changes: 21 additions & 7 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ impl<T: Task + Sync> Worker<T> {
let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat)
.try_into()
.expect("Task heartbeat should be compatible with std::time");
let mut heartbeat_interval = tokio::time::interval(heartbeat);

let timeout = pg_interval_to_span(&in_progress_task.timeout)
.try_into()
Expand All @@ -736,6 +737,26 @@ impl<T: Task + Sync> Worker<T> {
// Execute savepoint, available directly to the task execute method.
let execute_tx = tx.begin().await?;

// Spawn a task to send heartbeats alongside task processing.
tokio::spawn({
let pool = self.queue.pool.clone();
let in_progress_task = in_progress_task.clone();
async move {
heartbeat_interval.tick().await;
loop {
tracing::trace!("Recording task heartbeat");

// N.B.: Heartbeats are recorded outside of the transaction to ensure
// visibility.
if let Err(err) = in_progress_task.record_heartbeat(&pool).await {
tracing::error!(err = %err, "Failed to record task heartbeat");
};

heartbeat_interval.tick().await;
}
}
});

tokio::select! {
result = self.task.execute(execute_tx, input) => {
match result {
Expand All @@ -751,13 +772,6 @@ impl<T: Task + Sync> Worker<T> {
}
}

// Ensure that in-progress task rows report liveness at the provided interval.
_ = tokio::time::sleep(heartbeat) => {
tracing::trace!("Sending task heartbeat");
// N.B.: Heartbeats are recorded outside of the transaction to ensure visibility.
in_progress_task.record_heartbeat(&self.queue.pool).await?
},

// Handle timed-out task execution.
_ = tokio::time::sleep(timeout) => {
tracing::error!("Task execution timed out");
Expand Down

0 comments on commit 9ed6ca0

Please sign in to comment.