Skip to content

Commit

Permalink
refactor scheduler iterator (#62)
Browse files Browse the repository at this point in the history
This refactors the scheduler iterator into its own type which manages
the upcoming schedule directly. At the same time, this iterator now
produces `Zoned` values instead of calculating relative durations.

It also provides a function, `wait_until`, which consumes `Zoned` and
checks if the current time indicates we're done waiting relative to the
provided time.

When the provided time is in the future, we wait until that instant and
then make the same check again.
  • Loading branch information
maxcountryman authored Nov 12, 2024
1 parent eff6ea5 commit a6e43de
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ tokio-util = "0.7.12"

[dev-dependencies]
futures = "0.3.30"
tokio = { version = "1.40.0", features = ["test-util"] }
156 changes: 140 additions & 16 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration as StdDuration};

use jiff::{tz::TimeZone, Zoned};
use jiff_cron::Schedule;
use jiff_cron::{Schedule, ScheduleIterator};
use sqlx::postgres::{PgAdvisoryLock, PgListener};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::instrument;

Expand Down Expand Up @@ -272,7 +273,9 @@ impl<T: Task> Scheduler<T> {

// TODO: Handle updates to schedules?

for until_next in zoned_schedule.into_iter() {
for next in zoned_schedule.iter() {
tracing::debug!(?next, "Waiting until next scheduled task enqueue");

tokio::select! {
notify_shutdown = shutdown_listener.recv() => {
match notify_shutdown {
Expand All @@ -289,8 +292,7 @@ impl<T: Task> Scheduler<T> {
break
}

_ = tokio::time::sleep(until_next) => {
tracing::debug!(?until_next, "Sleeping until next scheduled task enqueue");
_ = wait_until(&next) => {
self.process_next_schedule(&input).await?
}
}
Expand Down Expand Up @@ -324,6 +326,23 @@ fn queue_scheduler_lock(queue_name: &str) -> PgAdvisoryLock {
PgAdvisoryLock::new(format!("{queue_name}-scheduler"))
}

async fn wait_until(next: &Zoned) {
let tz = next.time_zone();
loop {
let now = Zoned::now().with_time_zone(tz.to_owned());
if now >= *next {
break;
}

let until_next = next.duration_until(&now).unsigned_abs();
if until_next == StdDuration::ZERO {
break;
}

tokio::time::sleep_until(Instant::now() + until_next).await;
}
}

/// Schedule paired with its time zone.
#[derive(Debug, PartialEq)]
pub struct ZonedSchedule {
Expand Down Expand Up @@ -355,24 +374,24 @@ impl ZonedSchedule {
.expect("iana_name should always be Some because new ensures valid time zone")
}

fn tz(&self) -> TimeZone {
self.timezone.to_owned()
/// Returns an iterator of `Zoned` where each is a time at which the
/// schedule should fire.
pub fn iter(&self) -> ZonedScheduleIterator {
ZonedScheduleIterator {
upcoming: self.schedule.upcoming(self.timezone.clone()),
}
}
}

fn now_with_tz(&self) -> Zoned {
Zoned::now().with_time_zone(self.tz())
}
pub struct ZonedScheduleIterator<'a> {
upcoming: ScheduleIterator<'a>,
}

impl Iterator for ZonedSchedule {
type Item = StdDuration;
impl Iterator for ZonedScheduleIterator<'_> {
type Item = Zoned;

fn next(&mut self) -> Option<Self::Item> {
self.schedule.upcoming(self.tz()).next().map(|next_zoned| {
self.now_with_tz()
.duration_until(&next_zoned)
.unsigned_abs()
})
self.upcoming.next()
}
}

Expand Down Expand Up @@ -409,3 +428,108 @@ impl FromStr for ZonedSchedule {
ZonedSchedule::new(cron_expr, time_zone_name)
}
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;

use jiff::ToSpan;

use super::*;

#[test]
fn zoned_schedule_creation_valid() {
let cron_expr = "0 0 * * * * *"; // Every hour at minute 0
let time_zone_name = "UTC";
let schedule = ZonedSchedule::new(cron_expr, time_zone_name);

assert!(
schedule.is_ok(),
"Expected ZonedSchedule to be created successfully"
);
}

#[test]
fn zoned_schedule_creation_invalid_cron() {
let cron_expr = "invalid cron";
let time_zone_name = "UTC";
let schedule = ZonedSchedule::new(cron_expr, time_zone_name);

assert!(
schedule.is_err(),
"Expected error due to invalid cron expression"
);
}

#[test]
fn zoned_schedule_creation_invalid_time_zone() {
let cron_expr = "0 0 * * * * *";
let time_zone_name = "Invalid/TimeZone";
let schedule = ZonedSchedule::new(cron_expr, time_zone_name);

assert!(schedule.is_err(), "Expected error due to invalid time zone");
}

#[test]
fn zoned_schedule_parses() {
"0 0 * * * *[America/Los_Angeles]"
.parse::<ZonedSchedule>()
.expect("A schedule should be parsed");
}

#[tokio::test]
async fn wait_until_past_time() {
let tz = TimeZone::UTC;
let next = Zoned::now()
.with_time_zone(tz.to_owned())
.saturating_sub(10.seconds());

let start = SystemTime::now();
wait_until(&next).await;
let elapsed = start.elapsed().unwrap();
assert!(
elapsed < StdDuration::from_millis(10),
"Expected immediate return"
);
}

#[tokio::test]
async fn wait_until_future_time() {
let tz = TimeZone::UTC;
let next = Zoned::now()
.with_time_zone(tz.to_owned())
.saturating_add(5.seconds());

// Pause and control tokio's time
tokio::time::pause();

let handle = tokio::spawn({
let next = next.clone();
async move { wait_until(&next).await }
});
tokio::time::advance(StdDuration::from_secs(5)).await;

handle.await.expect("Failed to run wait_until");
let elapsed: StdDuration = (&Zoned::now().with_time_zone(tz.to_owned()) - &next)
.try_into()
.unwrap();
assert!(
elapsed < StdDuration::from_millis(10),
"Expected precise completion"
);
}

#[tokio::test]
async fn wait_until_exact_time() {
let tz = TimeZone::UTC;
let next = Zoned::now().with_time_zone(tz.to_owned());

let start = SystemTime::now();
wait_until(&next).await;
let elapsed = start.elapsed().unwrap();
assert!(
elapsed < StdDuration::from_millis(10),
"Expected immediate return"
);
}
}

0 comments on commit a6e43de

Please sign in to comment.