cats / fs2是一个用于创建基于流的异步、非阻塞、高度可组合程序的库。在该库中,我们可以使用fs2.concurrent包中的Scheduler来执行定期任务。Scheduler有一个scheduleAtFixedRate方法来执行定期任务,但是没有scheduleWithFixedDelay方法。因此,我们需要创建一个类似于scheduleWithFixedDelay方法的功能。
下面是一个示例代码,该代码使用fs2.concurrent.Scheduler和fs2.Stream.create,实现了类似于ScheduledExecutorService.scheduleWithFixedDelay的效果。
import java.util.concurrent.TimeUnit
import cats.effect.{IO, Timer}
import fs2.Stream
import fs2.concurrent.{Scheduler, SignallingRef}
import scala.concurrent.duration.FiniteDuration
object ScheduleWithFixedDelayExample extends App {
val scheduler: Scheduler = Scheduler[IO](10)
val timer: Timer[IO] = IO.timer(scala.concurrent.ExecutionContext.global)
// Create a Stream that emits a message every 5 seconds
// with a delay of 2 seconds on the first run
def task(initialDelay: FiniteDuration, interval: FiniteDuration): Stream[IO, Unit] =
Stream.emit(())
.covary[IO]
.evalMap(_ => IO.println(s"Hello, world! ${System.currentTimeMillis()}"))
.delayBy(initialDelay)
.repeat
.metered(interval)
def scheduleWithFixedDelay(initialDelay: FiniteDuration, interval: FiniteDuration)(
f: () => Unit): IO[Unit] = {
def loop(lastCompletion: SignallingRef[IO, Long]): IO[Unit] = {
val schedule = for {
_ <- scheduler.delay(initialDelay)
before <- timer.clockRealTime(TimeUnit.MILLISECONDS)
_ <- IO(f())
after <- timer.clockRealTime(TimeUnit.MILLISECONDS)
timeSinceLastRun = after - before
actualDelay = interval.minus(timeSinceLastRun.milliseconds)
positiveDelay = actualDelay.toMillis.max(0L)
_ <- lastCompletion.set(after)
_ <- IO.sleep(FiniteDuration(positiveDelay, TimeUnit.MILLISECONDS))
} yield ()
val nextExecutionTime =
lastCompletion.get.map(_ + interval.toMillis).flatMap(scheduler.delay)
schedule >> nextExecutionTime.flatMap(loop)
}
SignallingRef[IO, Long](0L).flatMap(loop)
}
// Schedule a task that will run once every 5 seconds with a delay of 2 seconds on the first run
scheduleWithFixedDelay(FiniteDuration(2, TimeUnit.SECONDS), FiniteDuration(5, TimeUnit.SECONDS)) { () =>
println(s"Exec