From a6e43dee9b4b95c7a488473ab411b146bd09e2ed Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Tue, 12 Nov 2024 09:20:40 -0800 Subject: [PATCH] refactor scheduler iterator (#62) This refactors the scheduler iterator into its own type which manages the upcoming schedule directly. At the same time, this iterator now produces `Zoned` values instead of calculating relative durations. It also provides a function, `wait_until`, which consumes `Zoned` and checks if the current time indicates we're done waiting relative to the provided time. When the provided time is in the future, we wait until that instant and then make the same check again. --- Cargo.toml | 1 + src/scheduler.rs | 156 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 141 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 88c056f..8327f30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,4 @@ tokio-util = "0.7.12" [dev-dependencies] futures = "0.3.30" +tokio = { version = "1.40.0", features = ["test-util"] } diff --git a/src/scheduler.rs b/src/scheduler.rs index 39feb40..b53df93 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,8 +1,9 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration as StdDuration}; use jiff::{tz::TimeZone, Zoned}; -use jiff_cron::Schedule; +use jiff_cron::{Schedule, ScheduleIterator}; use sqlx::postgres::{PgAdvisoryLock, PgListener}; +use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::instrument; @@ -272,7 +273,9 @@ impl Scheduler { // TODO: Handle updates to schedules? - for until_next in zoned_schedule.into_iter() { + for next in zoned_schedule.iter() { + tracing::debug!(?next, "Waiting until next scheduled task enqueue"); + tokio::select! { notify_shutdown = shutdown_listener.recv() => { match notify_shutdown { @@ -289,8 +292,7 @@ impl Scheduler { break } - _ = tokio::time::sleep(until_next) => { - tracing::debug!(?until_next, "Sleeping until next scheduled task enqueue"); + _ = wait_until(&next) => { self.process_next_schedule(&input).await? } } @@ -324,6 +326,23 @@ fn queue_scheduler_lock(queue_name: &str) -> PgAdvisoryLock { PgAdvisoryLock::new(format!("{queue_name}-scheduler")) } +async fn wait_until(next: &Zoned) { + let tz = next.time_zone(); + loop { + let now = Zoned::now().with_time_zone(tz.to_owned()); + if now >= *next { + break; + } + + let until_next = next.duration_until(&now).unsigned_abs(); + if until_next == StdDuration::ZERO { + break; + } + + tokio::time::sleep_until(Instant::now() + until_next).await; + } +} + /// Schedule paired with its time zone. #[derive(Debug, PartialEq)] pub struct ZonedSchedule { @@ -355,24 +374,24 @@ impl ZonedSchedule { .expect("iana_name should always be Some because new ensures valid time zone") } - fn tz(&self) -> TimeZone { - self.timezone.to_owned() + /// Returns an iterator of `Zoned` where each is a time at which the + /// schedule should fire. + pub fn iter(&self) -> ZonedScheduleIterator { + ZonedScheduleIterator { + upcoming: self.schedule.upcoming(self.timezone.clone()), + } } +} - fn now_with_tz(&self) -> Zoned { - Zoned::now().with_time_zone(self.tz()) - } +pub struct ZonedScheduleIterator<'a> { + upcoming: ScheduleIterator<'a>, } -impl Iterator for ZonedSchedule { - type Item = StdDuration; +impl Iterator for ZonedScheduleIterator<'_> { + type Item = Zoned; fn next(&mut self) -> Option { - self.schedule.upcoming(self.tz()).next().map(|next_zoned| { - self.now_with_tz() - .duration_until(&next_zoned) - .unsigned_abs() - }) + self.upcoming.next() } } @@ -409,3 +428,108 @@ impl FromStr for ZonedSchedule { ZonedSchedule::new(cron_expr, time_zone_name) } } + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use jiff::ToSpan; + + use super::*; + + #[test] + fn zoned_schedule_creation_valid() { + let cron_expr = "0 0 * * * * *"; // Every hour at minute 0 + let time_zone_name = "UTC"; + let schedule = ZonedSchedule::new(cron_expr, time_zone_name); + + assert!( + schedule.is_ok(), + "Expected ZonedSchedule to be created successfully" + ); + } + + #[test] + fn zoned_schedule_creation_invalid_cron() { + let cron_expr = "invalid cron"; + let time_zone_name = "UTC"; + let schedule = ZonedSchedule::new(cron_expr, time_zone_name); + + assert!( + schedule.is_err(), + "Expected error due to invalid cron expression" + ); + } + + #[test] + fn zoned_schedule_creation_invalid_time_zone() { + let cron_expr = "0 0 * * * * *"; + let time_zone_name = "Invalid/TimeZone"; + let schedule = ZonedSchedule::new(cron_expr, time_zone_name); + + assert!(schedule.is_err(), "Expected error due to invalid time zone"); + } + + #[test] + fn zoned_schedule_parses() { + "0 0 * * * *[America/Los_Angeles]" + .parse::() + .expect("A schedule should be parsed"); + } + + #[tokio::test] + async fn wait_until_past_time() { + let tz = TimeZone::UTC; + let next = Zoned::now() + .with_time_zone(tz.to_owned()) + .saturating_sub(10.seconds()); + + let start = SystemTime::now(); + wait_until(&next).await; + let elapsed = start.elapsed().unwrap(); + assert!( + elapsed < StdDuration::from_millis(10), + "Expected immediate return" + ); + } + + #[tokio::test] + async fn wait_until_future_time() { + let tz = TimeZone::UTC; + let next = Zoned::now() + .with_time_zone(tz.to_owned()) + .saturating_add(5.seconds()); + + // Pause and control tokio's time + tokio::time::pause(); + + let handle = tokio::spawn({ + let next = next.clone(); + async move { wait_until(&next).await } + }); + tokio::time::advance(StdDuration::from_secs(5)).await; + + handle.await.expect("Failed to run wait_until"); + let elapsed: StdDuration = (&Zoned::now().with_time_zone(tz.to_owned()) - &next) + .try_into() + .unwrap(); + assert!( + elapsed < StdDuration::from_millis(10), + "Expected precise completion" + ); + } + + #[tokio::test] + async fn wait_until_exact_time() { + let tz = TimeZone::UTC; + let next = Zoned::now().with_time_zone(tz.to_owned()); + + let start = SystemTime::now(); + wait_until(&next).await; + let elapsed = start.elapsed().unwrap(); + assert!( + elapsed < StdDuration::from_millis(10), + "Expected immediate return" + ); + } +}