From 9ed6ca0ccbd582d615cd64a30dab3c2e571ce204 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Mon, 11 Nov 2024 14:04:11 -0800 Subject: [PATCH] record heartbeat in a separate task (#59) This fixes an issue where we were: 1. Not using an interval and therefore only waiting one time to send a heartbeat, 2. Cancelling the other select futures as a side effect of incorporating the heartbeat. So to address these issues, we use a separate task entirely to record heartbeats. --- src/queue.rs | 2 +- src/worker.rs | 28 +++++++++++++++++++++------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 3e7523c..3f63a22 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -978,7 +978,7 @@ impl Queue { /// manage state transitions as they process the task. In fact, all valid state /// transitions are encapsulated by it and this is the only interface through /// which task state should be altered. -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, Clone, sqlx::FromRow)] pub struct InProgressTask { pub(crate) id: TaskId, pub(crate) queue_name: String, diff --git a/src/worker.rs b/src/worker.rs index 14bd1d4..894757f 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -728,6 +728,7 @@ impl Worker { let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat) .try_into() .expect("Task heartbeat should be compatible with std::time"); + let mut heartbeat_interval = tokio::time::interval(heartbeat); let timeout = pg_interval_to_span(&in_progress_task.timeout) .try_into() @@ -736,6 +737,26 @@ impl Worker { // Execute savepoint, available directly to the task execute method. let execute_tx = tx.begin().await?; + // Spawn a task to send heartbeats alongside task processing. + tokio::spawn({ + let pool = self.queue.pool.clone(); + let in_progress_task = in_progress_task.clone(); + async move { + heartbeat_interval.tick().await; + loop { + tracing::trace!("Recording task heartbeat"); + + // N.B.: Heartbeats are recorded outside of the transaction to ensure + // visibility. + if let Err(err) = in_progress_task.record_heartbeat(&pool).await { + tracing::error!(err = %err, "Failed to record task heartbeat"); + }; + + heartbeat_interval.tick().await; + } + } + }); + tokio::select! { result = self.task.execute(execute_tx, input) => { match result { @@ -751,13 +772,6 @@ impl Worker { } } - // Ensure that in-progress task rows report liveness at the provided interval. - _ = tokio::time::sleep(heartbeat) => { - tracing::trace!("Sending task heartbeat"); - // N.B.: Heartbeats are recorded outside of the transaction to ensure visibility. - in_progress_task.record_heartbeat(&self.queue.pool).await? - }, - // Handle timed-out task execution. _ = tokio::time::sleep(timeout) => { tracing::error!("Task execution timed out");