diff --git a/README.md b/README.md index b7b3022..a5a8b83 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,15 @@ compared to systemd: ```sbt libraryDependencies += "com.github.eikek" %% "calev-core" % "0.6.0" ``` +- The *fs2* module contains utilities to work with + [FS2](https://github.com/functional-streams-for-scala/fs2) streams. + These were taken, thankfully and slightly modified to exchange cron expressions + for calendar events, from the + [fs2-cron](https://github.com/fthomas/fs2-cron) library. It is also published + for ScalaJS. With sbt, use + ```sbt + libraryDependencies += "com.github.eikek" %% "calev-fs2" % "0.6.0" + ``` - The *doobie* module contains `Meta`, `Read` and `Write` instances for `CalEvent` to use with [doobie](https://github.com/tpolecat/doobie). @@ -76,10 +85,8 @@ compared to systemd: libraryDependencies += "com.github.eikek" %% "calev-akka" % "0.6.0" ``` -Note that the fs2 module has been removed. The functionality is now -available for fs2 3.x from the -[fs2-cron](https://github.com/fthomas/fs2-cron) library. If calev-fs2 -is required for fs2 2.x, calev version 0.5.4 can be used. +Note that the fs2 module is also available via +[fs2-cron](https://github.com/fthomas/fs2-cron) library. ## Examples @@ -162,16 +169,16 @@ import java.time._ ce.asString // res4: String = "*-*-* 00/2:00:00" val now = LocalDateTime.now -// now: LocalDateTime = 2021-08-20T19:33:01.114 +// now: LocalDateTime = 2022-03-06T21:35:40.693 ce.nextElapse(now) -// res5: Option[LocalDateTime] = Some(value = 2021-08-20T20:00) +// res5: Option[LocalDateTime] = Some(value = 2022-03-06T22:00) ce.nextElapses(now, 5) // res6: List[LocalDateTime] = List( -// 2021-08-20T20:00, -// 2021-08-20T22:00, -// 2021-08-21T00:00, -// 2021-08-21T02:00, -// 2021-08-21T04:00 +// 2022-03-06T22:00, +// 2022-03-07T00:00, +// 2022-03-07T02:00, +// 2022-03-07T04:00, +// 2022-03-07T06:00 // ) ``` @@ -183,6 +190,47 @@ CalEvent.unsafe("1900-01-* 12,14:0:0").nextElapse(LocalDateTime.now) ``` +### FS2 + +The fs2 utilities allow to schedule things based on calendar events. +This is the same as [fs2-cron](https://github.com/fthomas/fs2-cron) +provides, only adopted to use calendar events instead of cron +expressions. The example is also from there. + +```scala +import cats.effect.IO +import _root_.fs2.Stream +import com.github.eikek.calev.fs2.Scheduler +import java.time.LocalTime + +val everyTwoSeconds = CalEvent.unsafe("*-*-* *:*:0/2") +// everyTwoSeconds: CalEvent = CalEvent( +// weekday = All, +// date = DateEvent(year = All, month = All, day = All), +// time = TimeEvent( +// hour = All, +// minute = All, +// seconds = List(values = Vector(Single(value = 0, rep = Some(value = 2)))) +// ), +// zone = None +// ) +val scheduler = Scheduler.systemDefault[IO] +// scheduler: Scheduler[IO] = com.github.eikek.calev.fs2.Scheduler$$anon$1@54c15e + +val printTime = Stream.eval(IO(println(LocalTime.now))) +// printTime: Stream[IO, Unit] = Stream(..) + +val task = scheduler.awakeEvery(everyTwoSeconds) >> printTime +// task: Stream[IO[x], Unit] = Stream(..) + +import cats.effect.unsafe.implicits._ +task.take(3).compile.drain.unsafeRunSync() +// 21:35:42.017 +// 21:35:44.001 +// 21:35:46.001 +``` + + ### Doobie When using doobie, this module contains instances to write and read @@ -213,24 +261,16 @@ val r = Record(CalEvent.unsafe("Mon *-*-* 0/2:15")) val insert = sql"INSERT INTO mytable (event) VALUES (${r.event})".update.run // insert: ConnectionIO[Int] = Suspend( -// a = BracketCase( -// acquire = Suspend( -// a = PrepareStatement(a = "INSERT INTO mytable (event) VALUES (?)") -// ), -// use = doobie.hi.connection$$$Lambda$44869/1756545421@1c20c20a, -// release = cats.effect.Bracket$$Lambda$44871/900537340@6afd9f65 +// a = Uncancelable( +// body = cats.effect.kernel.MonadCancel$$Lambda$2069/1459756682@24c4370a // ) // ) val select = sql"SELECT event FROM mytable WHERE id = 1".query[Record].unique // select: ConnectionIO[Record] = Suspend( -// a = BracketCase( -// acquire = Suspend( -// a = PrepareStatement(a = "SELECT event FROM mytable WHERE id = 1") -// ), -// use = doobie.hi.connection$$$Lambda$44869/1756545421@27babfba, -// release = cats.effect.Bracket$$Lambda$44871/900537340@e492da6 +// a = Uncancelable( +// body = cats.effect.kernel.MonadCancel$$Lambda$2069/1459756682@1b6daf20 // ) // ) ``` @@ -320,7 +360,7 @@ val jackson = JsonMapper .builder() .addModule(new CalevModule()) .build() -// jackson: JsonMapper = com.fasterxml.jackson.databind.json.JsonMapper@12633da9 +// jackson: JsonMapper = com.fasterxml.jackson.databind.json.JsonMapper@41a3656f val myEvent = CalEvent.unsafe("Mon *-*-* 05:00/10:00") // myEvent: CalEvent = CalEvent( @@ -388,7 +428,7 @@ CalevBehaviors.withCalevTimers[Message]() { scheduler => same } } -// res8: Behavior[Message] = Deferred(TimerSchedulerImpl.scala:29) +// res9: Behavior[Message] = Deferred(TimerSchedulerImpl.scala:29) ``` Use ```CalevBehaviors.withCalendarEvent``` to schedule messages according @@ -408,7 +448,7 @@ CalevBehaviors.withCalendarEvent(calEvent)( same } ) -// res9: Behavior[Message] = Deferred(InterceptorImpl.scala:29-30) +// res10: Behavior[Message] = Deferred(InterceptorImpl.scala:29-30) ``` #### Testing @@ -441,8 +481,8 @@ calevScheduler().scheduleOnceWithCalendarEvent(calEvent, () => { s"Called at: ${LocalTime.now}" ) }) -// res10: Option[..akka.actor.Cancellable] = Some( -// value = akka.actor.LightArrayRevolverScheduler$TaskHolder@393105f1 +// res11: Option[..akka.actor.Cancellable] = Some( +// value = akka.actor.LightArrayRevolverScheduler$TaskHolder@322f6283 // ) system.terminate() ``` diff --git a/build.sbt b/build.sbt index 5f1e468..cf4af8a 100644 --- a/build.sbt +++ b/build.sbt @@ -83,7 +83,7 @@ lazy val noPublish = Seq( val testSettings = Seq( libraryDependencies ++= (Dependencies.munit ++ Dependencies.logback).map(_ % Test), - testFrameworks += new TestFramework("munit.Framework") + testFrameworks += TestFrameworks.MUnit ) val buildInfoSettings = Seq( @@ -104,11 +104,12 @@ val buildInfoSettings = Seq( val scalafixSettings = Seq( semanticdbEnabled := true, // enable SemanticDB semanticdbVersion := scalafixSemanticdb.revision, // use Scalafix compatible version - ThisBuild / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.5.0" + ThisBuild / scalafixDependencies ++= Dependencies.organizeImports ) lazy val core = crossProject(JSPlatform, JVMPlatform) .crossType(CrossType.Pure) + .withoutSuffixFor(JVMPlatform) .in(file("modules/core")) .settings(sharedSettings) .settings(testSettings) @@ -119,10 +120,22 @@ lazy val core = crossProject(JSPlatform, JVMPlatform) Dependencies.fs2.map(_ % Test) ++ Dependencies.fs2io.map(_ % Test) ) -lazy val coreJVM = core.jvm -lazy val coreJS = core.js -lazy val doobieJVM = project +lazy val fs2 = crossProject(JSPlatform, JVMPlatform) + .crossType(CrossType.Pure) + .withoutSuffixFor(JVMPlatform) + .in(file("modules/fs2")) + .settings(sharedSettings) + .settings(testSettings) + .settings(scalafixSettings) + .settings( + name := "calev-fs2", + libraryDependencies ++= + Dependencies.fs2 + ) + .dependsOn(core) + +lazy val doobie = project .in(file("modules/doobie")) .settings(sharedSettings) .settings(testSettings) @@ -133,10 +146,11 @@ lazy val doobieJVM = project Dependencies.doobie ++ Dependencies.h2.map(_ % Test) ) - .dependsOn(coreJVM) + .dependsOn(core.jvm) lazy val circe = crossProject(JSPlatform, JVMPlatform) .crossType(CrossType.Pure) + .withoutSuffixFor(JVMPlatform) .in(file("modules/circe")) .settings(sharedSettings) .settings(testSettings) @@ -148,12 +162,10 @@ lazy val circe = crossProject(JSPlatform, JVMPlatform) Dependencies.circeAll.map(_ % Test) ) .dependsOn(core) -lazy val circeJVM = circe.jvm -lazy val circeJS = circe.js -lazy val jacksonJVM = project +lazy val jackson = project .in(file("modules/jackson")) - .dependsOn(coreJVM) + .dependsOn(core.jvm) .settings(sharedSettings) .settings(testSettings) .settings(scalafixSettings) @@ -170,9 +182,9 @@ lazy val jacksonJVM = project Dependencies.jacksonAll ) -lazy val akkaJVM = project +lazy val akka = project .in(file("modules/akka")) - .dependsOn(coreJVM) + .dependsOn(core.jvm) .settings(sharedSettings) .settings(scalafixSettings) .settings( @@ -195,23 +207,23 @@ lazy val readme = project .settings(noPublish) .settings( name := "calev-readme", + crossScalaVersions := Seq(scala212, scala213), libraryDependencies ++= Dependencies.circeAll, - scalacOptions := Seq(), + scalacOptions -= "-Xfatal-warnings", + scalacOptions -= "-Werror", mdocVariables := Map( "VERSION" -> latestRelease.value ), + mdocIn := (LocalRootProject / baseDirectory).value / "docs" / "readme.md", + mdocOut := (LocalRootProject / baseDirectory).value / "README.md", + fork := true, updateReadme := { mdoc.evaluated - val out = mdocOut.value / "readme.md" - val target = (LocalRootProject / baseDirectory).value / "README.md" - val logger = streams.value.log - logger.info(s"Updating readme: $out -> $target") - IO.copyFile(out, target) () } ) - .dependsOn(coreJVM, doobieJVM, circeJVM, jacksonJVM, akkaJVM) + .dependsOn(core.jvm, fs2.jvm, doobie, circe.jvm, jackson, akka) val root = project .in(file(".")) @@ -222,11 +234,13 @@ val root = project crossScalaVersions := Nil ) .aggregate( - coreJVM, - coreJS, - doobieJVM, - circeJVM, - circeJS, - jacksonJVM, - akkaJVM + core.jvm, + core.js, + fs2.jvm, + fs2.js, + doobie, + circe.jvm, + circe.js, + jackson, + akka ) diff --git a/docs/readme.md b/docs/readme.md index f57589a..06c55b4 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -54,6 +54,15 @@ compared to systemd: ```sbt libraryDependencies += "com.github.eikek" %% "calev-core" % "@VERSION@" ``` +- The *fs2* module contains utilities to work with + [FS2](https://github.com/functional-streams-for-scala/fs2) streams. + These were taken, thankfully and slightly modified to exchange cron expressions + for calendar events, from the + [fs2-cron](https://github.com/fthomas/fs2-cron) library. It is also published + for ScalaJS. With sbt, use + ```sbt + libraryDependencies += "com.github.eikek" %% "calev-fs2" % "@VERSION@" + ``` - The *doobie* module contains `Meta`, `Read` and `Write` instances for `CalEvent` to use with [doobie](https://github.com/tpolecat/doobie). @@ -76,10 +85,8 @@ compared to systemd: libraryDependencies += "com.github.eikek" %% "calev-akka" % "@VERSION@" ``` -Note that the fs2 module has been removed. The functionality is now -available for fs2 3.x from the -[fs2-cron](https://github.com/fthomas/fs2-cron) library. If calev-fs2 -is required for fs2 2.x, calev version 0.5.4 can be used. +Note that the fs2 module is also available via +[fs2-cron](https://github.com/fthomas/fs2-cron) library. ## Examples @@ -129,6 +136,31 @@ CalEvent.unsafe("1900-01-* 12,14:0:0").nextElapse(LocalDateTime.now) ``` +### FS2 + +The fs2 utilities allow to schedule things based on calendar events. +This is the same as [fs2-cron](https://github.com/fthomas/fs2-cron) +provides, only adopted to use calendar events instead of cron +expressions. The example is also from there. + +```scala mdoc +import cats.effect.IO +import _root_.fs2.Stream +import com.github.eikek.calev.fs2.Scheduler +import java.time.LocalTime + +val everyTwoSeconds = CalEvent.unsafe("*-*-* *:*:0/2") +val scheduler = Scheduler.systemDefault[IO] + +val printTime = Stream.eval(IO(println(LocalTime.now))) + +val task = scheduler.awakeEvery(everyTwoSeconds) >> printTime + +import cats.effect.unsafe.implicits._ +task.take(3).compile.drain.unsafeRunSync() +``` + + ### Doobie When using doobie, this module contains instances to write and read diff --git a/modules/fs2/src/main/scala/com/github/eikek/calev/fs2/Scheduler.scala b/modules/fs2/src/main/scala/com/github/eikek/calev/fs2/Scheduler.scala new file mode 100644 index 0000000..5bd7383 --- /dev/null +++ b/modules/fs2/src/main/scala/com/github/eikek/calev/fs2/Scheduler.scala @@ -0,0 +1,61 @@ +package com.github.eikek.calev.fs2 + +import java.time._ +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.FiniteDuration + +import cats.effect._ +import cats.implicits._ +import com.github.eikek.calev.CalEvent +import fs2.Stream + +trait Scheduler[F[_]] { + def fromNowUntilNext(schedule: CalEvent): F[FiniteDuration] + + def sleepUntilNext(schedule: CalEvent): F[Unit] + + def sleep(schedule: CalEvent): Stream[F, Unit] + + def awakeEvery(schedule: CalEvent): Stream[F, Unit] +} + +object Scheduler { + def systemDefault[F[_]](implicit + temporal: Temporal[F], + F: Sync[F] + ): Scheduler[F] = + from(F.delay(ZoneId.systemDefault())) + + def utc[F[_]](implicit F: Temporal[F]): Scheduler[F] = + from(F.pure(ZoneOffset.UTC)) + + def from[F[_]](zoneId: F[ZoneId])(implicit F: Temporal[F]): Scheduler[F] = + new Scheduler[F] { + override def fromNowUntilNext(schedule: CalEvent): F[FiniteDuration] = + now.flatMap { from => + schedule.nextElapse(from) match { + case Some(next) => + val durationInMillis = from.until(next, ChronoUnit.MILLIS) + F.pure(FiniteDuration(durationInMillis, TimeUnit.MILLISECONDS)) + case None => + val msg = s"Could not calculate the next date-time from $from " + + s"given the calendar event expression '${schedule.asString}'. This should never happen." + F.raiseError(new Throwable(msg)) + } + } + + def sleepUntilNext(schedule: CalEvent): F[Unit] = + Temporal[F].flatMap(fromNowUntilNext(schedule))(Temporal[F].sleep) + + def sleep(schedule: CalEvent): Stream[F, Unit] = + Stream.eval(sleepUntilNext(schedule)) + + def awakeEvery(schedule: CalEvent): Stream[F, Unit] = + sleep(schedule).repeat + + private val now: F[ZonedDateTime] = + (F.realTimeInstant, zoneId).mapN(_.atZone(_)) + } +} diff --git a/modules/fs2/src/test/scala/com/github/eikek/calev/fs2/SchedulerTest.scala b/modules/fs2/src/test/scala/com/github/eikek/calev/fs2/SchedulerTest.scala new file mode 100644 index 0000000..bedb477 --- /dev/null +++ b/modules/fs2/src/test/scala/com/github/eikek/calev/fs2/SchedulerTest.scala @@ -0,0 +1,40 @@ +package com.github.eikek.calev.fs2 + +import java.time.{Instant, ZoneId, ZoneOffset} + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import com.github.eikek.calev.CalEvent +import fs2.Stream +import munit.FunSuite + +class SchedulerTest extends FunSuite { + private val evenSeconds: CalEvent = CalEvent.unsafe("*-*-* *:*:0/2") + private def isEven(i: Long): Boolean = i % 2 == 0 + private def instantSeconds(i: Instant): Long = i.getEpochSecond + private val evalInstantNow: Stream[IO, Instant] = Stream.eval(IO(Instant.now())) + + private val schedulerSys = Scheduler.systemDefault[IO] + private val schedulerUtc = Scheduler.utc[IO] + + test("awakeEvery") { + val s1 = schedulerSys.awakeEvery(evenSeconds) >> evalInstantNow + val s2 = s1.map(instantSeconds).take(2).forall(isEven) + s2.compile.last.map(a => assertEquals(a, Option(true))).unsafeRunSync() + } + + test("sleep") { + val s1 = schedulerUtc.sleep(evenSeconds) >> evalInstantNow + val s2 = s1.map(instantSeconds).forall(isEven) + s2.compile.last.map(a => assertEquals(a, Option(true))).unsafeRunSync() + } + + test("timezones") { + val zoneId: ZoneId = ZoneOffset.ofTotalSeconds(1) + val scheduler = Scheduler.from(IO.pure(zoneId)) + + val s1 = scheduler.awakeEvery(evenSeconds) >> evalInstantNow + val s2 = s1.map(instantSeconds).take(2).forall(!isEven(_)) + s2.compile.last.map(a => assertEquals(a, Option(true))).unsafeRunSync() + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 90a5d8e..6330e57 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,6 +12,11 @@ object Dependencies { val logbackVersion = "1.2.11" val munitVersion = "0.7.29" val scalaTestVersion = "3.2.11" + val organizeImportsVersion = "0.6.0" + + val organizeImports = Seq( + "com.github.liancheng" %% "organize-imports" % organizeImportsVersion + ) val scalaTest = Seq( "org.scalatest" %% "scalatest" % scalaTestVersion % Test diff --git a/project/plugins.sbt b/project/plugins.sbt index 3cf263a..ea0cb07 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,5 +4,5 @@ addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.1.0") addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.9.0") -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.1") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.2.24") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")