diff --git a/src/queue.rs b/src/queue.rs index 3e7523c..3f63a22 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -978,7 +978,7 @@ impl Queue { /// manage state transitions as they process the task. In fact, all valid state /// transitions are encapsulated by it and this is the only interface through /// which task state should be altered. -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, Clone, sqlx::FromRow)] pub struct InProgressTask { pub(crate) id: TaskId, pub(crate) queue_name: String, diff --git a/src/worker.rs b/src/worker.rs index 14bd1d4..894757f 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -728,6 +728,7 @@ impl Worker { let heartbeat = pg_interval_to_span(&in_progress_task.heartbeat) .try_into() .expect("Task heartbeat should be compatible with std::time"); + let mut heartbeat_interval = tokio::time::interval(heartbeat); let timeout = pg_interval_to_span(&in_progress_task.timeout) .try_into() @@ -736,6 +737,26 @@ impl Worker { // Execute savepoint, available directly to the task execute method. let execute_tx = tx.begin().await?; + // Spawn a task to send heartbeats alongside task processing. + tokio::spawn({ + let pool = self.queue.pool.clone(); + let in_progress_task = in_progress_task.clone(); + async move { + heartbeat_interval.tick().await; + loop { + tracing::trace!("Recording task heartbeat"); + + // N.B.: Heartbeats are recorded outside of the transaction to ensure + // visibility. + if let Err(err) = in_progress_task.record_heartbeat(&pool).await { + tracing::error!(err = %err, "Failed to record task heartbeat"); + }; + + heartbeat_interval.tick().await; + } + } + }); + tokio::select! { result = self.task.execute(execute_tx, input) => { match result { @@ -751,13 +772,6 @@ impl Worker { } } - // Ensure that in-progress task rows report liveness at the provided interval. - _ = tokio::time::sleep(heartbeat) => { - tracing::trace!("Sending task heartbeat"); - // N.B.: Heartbeats are recorded outside of the transaction to ensure visibility. - in_progress_task.record_heartbeat(&self.queue.pool).await? - }, - // Handle timed-out task execution. _ = tokio::time::sleep(timeout) => { tracing::error!("Task execution timed out");