From 62cb64b72a17c28e10d0dbb7ffedf6be6e88e196 Mon Sep 17 00:00:00 2001 From: David van Geest Date: Fri, 27 Oct 2023 23:57:28 -0400 Subject: [PATCH 1/5] WIP working. --- .../scala/ca/dvgi/periodic/Periodic.scala | 14 +++ .../periodic/jdk/FutureJdkAutoUpdater.scala | 4 +- .../periodic/jdk/IdentityJdkAutoUpdater.scala | 4 +- .../ca/dvgi/periodic/jdk/JdkAutoUpdater.scala | 108 +++-------------- .../ca/dvgi/periodic/jdk/JdkPeriodic.scala | 114 ++++++++++++++++++ 5 files changed, 152 insertions(+), 92 deletions(-) create mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala create mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala new file mode 100644 index 0000000..c38b918 --- /dev/null +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala @@ -0,0 +1,14 @@ +package ca.dvgi.periodic + +import scala.concurrent.duration.FiniteDuration +import org.slf4j.Logger + +trait Periodic[F[_], T] extends AutoCloseable { + def start( + log: Logger, + initialDelay: FiniteDuration, + fn: () => F[T], + interval: F[T] => F[FiniteDuration], + attemptStrategy: UpdateAttemptStrategy + ): Unit +} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala index 77b38d0..f156e10 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala @@ -9,7 +9,9 @@ class FutureJdkAutoUpdater[T]( blockUntilReadyTimeout: Option[Duration] = None, executorOverride: Option[ScheduledExecutorService] = None ) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) { - override protected def evalUpdate(ut: Future[T]): T = Await.result(ut, Duration.Inf) + override protected def evalUpdate[A](ua: Future[A]): A = Await.result(ua, Duration.Inf) + + override protected def pureUpdate[A](a: A): Future[A] = Future.successful(a) } object FutureJdkAutoUpdater { diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala index 9df8131..8c37f28 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala @@ -7,7 +7,9 @@ class IdentityJdkAutoUpdater[T]( blockUntilReadyTimeout: Option[Duration] = None, executorOverride: Option[ScheduledExecutorService] = None ) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) { - override protected def evalUpdate(ut: Identity[T]): T = ut + override protected def evalUpdate[A](ua: Identity[A]): A = ua + + override protected def pureUpdate[A](a: A): Identity[A] = a } object IdentityJdkAutoUpdater { diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala index d7cba4e..85e6cf4 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala @@ -13,7 +13,6 @@ import org.slf4j.Logger import scala.util.Success import scala.util.Failure import scala.concurrent.Await -import java.util.concurrent.ScheduledFuture /** An AutoUpdater based on the JDK's ScheduledExecutorService. * @@ -31,15 +30,15 @@ abstract class JdkAutoUpdater[U[T], T]( executorOverride: Option[ScheduledExecutorService] = None ) extends AutoUpdater[U, Future, T] { - private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) + protected def evalUpdate[A](ua: U[A]): A - private case object CloseLock + protected def pureUpdate[A](a: A): U[A] - @volatile private var closed = false + private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) - @volatile private var variable: Option[T] = None + private val periodic = new JdkPeriodic[U, T]("var update", evalUpdate(_), executor) - @volatile private var nextTask: Option[ScheduledFuture[_]] = None + @volatile private var variable: Option[T] = None override def start( log: Logger, @@ -70,10 +69,19 @@ abstract class JdkAutoUpdater[U[T], T]( variable = Some(value) ready.complete(Success(())) log.info("Successfully initialized") - scheduleUpdate(updateInterval.duration(value))( + periodic.start( log, - updateVar, - updateInterval, + updateInterval.duration(value), + () => { + val ut = updateVar() + val t = evalUpdate(ut) + variable = Some(t) + ut + }, + ut => { + val t = evalUpdate(ut) + pureUpdate(updateInterval.duration(t)) + }, updateAttemptStrategy ) case Failure(e) => @@ -98,89 +106,9 @@ abstract class JdkAutoUpdater[U[T], T]( override def latest: Option[T] = variable override def close(): Unit = { - CloseLock.synchronized { - closed = true - nextTask.foreach(_.cancel(true)) - } + periodic.close() if (executorOverride.isEmpty) { val _ = executor.shutdownNow() } - () - } - - protected def evalUpdate(ut: U[T]): T - - private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit - log: Logger, - updateVar: () => U[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy - ): Unit = { - log.info(s"Scheduling update of var in: $nextUpdate") - - CloseLock.synchronized { - if (!closed) - nextTask = Some( - executor.schedule( - new UpdateVar(1), - nextUpdate.length, - nextUpdate.unit - ) - ) - } - () - } - - private class UpdateVar(attempt: Int)(implicit - log: Logger, - updateVar: () => U[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy - ) extends Runnable { - def run(): Unit = { - log.info("Attempting var update...") - try { - val newV = evalUpdate(updateVar()) - variable = Some(newV) - log.info("Successfully updated") - scheduleUpdate(updateInterval.duration(newV)) - } catch { - case NonFatal(e) => - updateAttemptStrategy match { - case UpdateAttemptStrategy.Infinite(attemptInterval) => - reattempt(e, attemptInterval) - case UpdateAttemptStrategy.Finite(attemptInterval, maxAttempts, _) - if attempt < maxAttempts => - reattempt(e, attemptInterval) - case UpdateAttemptStrategy.Finite(_, _, attemptExhaustionBehavior) => - log.error("Var update attempts exhausted! Final attempt exception", e) - attemptExhaustionBehavior.run(log) - } - } - } - - private def reattempt(e: Throwable, delay: FiniteDuration)(implicit - log: Logger, - updateVar: () => U[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy - ): Unit = { - log.warn( - s"Unhandled exception when trying to update var, retrying in $delay", - e - ) - - CloseLock.synchronized { - if (!closed) - nextTask = Some( - executor.schedule( - new UpdateVar(attempt + 1), - delay.length, - delay.unit - ) - ) - } - () - } } } diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala new file mode 100644 index 0000000..8e270ed --- /dev/null +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala @@ -0,0 +1,114 @@ +package ca.dvgi.periodic.jdk + +import scala.concurrent.duration.FiniteDuration +import ca.dvgi.periodic.UpdateAttemptStrategy +import ca.dvgi.periodic.Periodic +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.ScheduledExecutorService +import scala.util.control.NonFatal +import org.slf4j.Logger + +class JdkPeriodic[F[_], T]( + operationName: String, + evalF: F[FiniteDuration] => FiniteDuration, + executor: ScheduledExecutorService +) extends Periodic[F, T] { + + private case object CloseLock + + @volatile private var closed = false + + @volatile private var nextTask: Option[ScheduledFuture[_]] = None + + override def start( + log: Logger, + initialDelay: FiniteDuration, + fn: () => F[T], + interval: F[T] => F[FiniteDuration], + attemptStrategy: UpdateAttemptStrategy + ): Unit = { + scheduleUpdate(initialDelay)(log, fn, interval, attemptStrategy) + } + + private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit + log: Logger, + fn: () => F[T], + interval: F[T] => F[FiniteDuration], + attemptStrategy: UpdateAttemptStrategy + ): Unit = { + CloseLock.synchronized { + if (!closed) + nextTask = Some( + executor.schedule( + new FnRunnable(1), + nextUpdate.length, + nextUpdate.unit + ) + ) + } + () + } + + private class FnRunnable(attempt: Int)(implicit + log: Logger, + fn: () => F[T], + interval: F[T] => F[FiniteDuration], + attemptStrategy: UpdateAttemptStrategy + ) extends Runnable { + def run(): Unit = { + try { + log.info(s"Attempting $operationName...") + val result = fn() + log.info(s"Successfully executed $operationName") + scheduleUpdate(evalF(interval(result))) + } catch { + case NonFatal(e) => + attemptStrategy match { + case UpdateAttemptStrategy.Infinite(attemptInterval) => + reattempt(e, attemptInterval) + case UpdateAttemptStrategy.Finite(attemptInterval, maxAttempts, _) + if attempt < maxAttempts => + reattempt(e, attemptInterval) + case UpdateAttemptStrategy.Finite(_, _, attemptExhaustionBehavior) => + log.error( + s"${operationName.capitalize} attempts exhausted! Final attempt exception", + e + ) + attemptExhaustionBehavior.run(log) + } + } + } + + private def reattempt(e: Throwable, delay: FiniteDuration)(implicit + log: Logger, + fn: () => F[T], + interval: F[T] => F[FiniteDuration], + attemptStrategy: UpdateAttemptStrategy + ): Unit = { + log.warn( + s"Unhandled exception when trying to $operationName, retrying in $delay", + e + ) + + CloseLock.synchronized { + if (!closed) + nextTask = Some( + executor.schedule( + new FnRunnable(attempt + 1), + delay.length, + delay.unit + ) + ) + } + () + } + } + + override def close(): Unit = { + CloseLock.synchronized { + closed = true + nextTask.foreach(_.cancel(true)) + } + () + } +} From bfc9237dcb726ce3e628c0acfb2723f32f9180da Mon Sep 17 00:00:00 2001 From: David van Geest Date: Sun, 29 Oct 2023 22:30:03 -0400 Subject: [PATCH 2/5] Progress on core. --- .../scala/ca/dvgi/periodic/AutoUpdater.scala | 34 ------ .../ca/dvgi/periodic/AutoUpdatingVar.scala | 47 +++++--- .../scala/ca/dvgi/periodic/Periodic.scala | 18 ++- .../ca/dvgi/periodic/jdk/Evaluator.scala | 19 +++ .../periodic/jdk/FutureJdkAutoUpdater.scala | 23 ---- .../periodic/jdk/IdentityJdkAutoUpdater.scala | 21 ---- .../ca/dvgi/periodic/jdk/JdkAutoUpdater.scala | 114 ------------------ .../ca/dvgi/periodic/jdk/JdkPeriodic.scala | 107 ++++++++++++++-- .../periodic/AutoUpdaterTestsFuture.scala | 50 ++++---- ...ala => FutureJdkAutoUpdatingVarTest.scala} | 11 +- ...a => IdentityJdkAutoUpdatingVarTest.scala} | 13 +- 11 files changed, 202 insertions(+), 255 deletions(-) delete mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala create mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala delete mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala delete mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala delete mode 100644 periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala rename periodic-core/src/test/scala/ca/dvgi/periodic/jdk/{FutureJdkAutoUpdaterTest.scala => FutureJdkAutoUpdatingVarTest.scala} (77%) rename periodic-core/src/test/scala/ca/dvgi/periodic/jdk/{IdentityJdkAutoUpdaterTest.scala => IdentityJdkAutoUpdatingVarTest.scala} (71%) diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala deleted file mode 100644 index defea6c..0000000 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala +++ /dev/null @@ -1,34 +0,0 @@ -package ca.dvgi.periodic - -import org.slf4j.Logger - -/** AutoUpdatingVar delegates most functionality to an AutoUpdater, which may have many - * implementations. - */ -trait AutoUpdater[U[_], R[_], T] extends AutoCloseable { - - /** Initializes the var for the first time, handling errors as specified. If successful, schedules - * the next update. - * - * @param log - * Implementations should use this logger for consistency. - * - * @return - * An effect, which, if successfully completed, signifies that a value is available. If - * initialization failed, the effect should also be failed. - */ - def start( - log: Logger, - updateVar: () => U[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, - handleInitializationError: PartialFunction[Throwable, U[T]] - ): R[Unit] - - /** The latest in-memory value of the variable. - * - * @return - * Some[T] if the variable has been initialized successfully, otherwise None. - */ - def latest: Option[T] -} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala index 8277e85..2fc5328 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala @@ -2,9 +2,9 @@ package ca.dvgi.periodic import scala.reflect.ClassTag import org.slf4j.LoggerFactory -import ca.dvgi.periodic.jdk._ import scala.concurrent.duration.Duration import scala.concurrent.Future +import ca.dvgi.periodic.jdk._ import java.util.concurrent.ScheduledExecutorService /** A variable that updates itself. `latest` can be called from multiple threads, which are all @@ -34,10 +34,11 @@ import java.util.concurrent.ScheduledExecutorService * A name for this variable, used in logging. If unspecified, the simple class name of T will be * used. */ -class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])( +class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])( updateVar: => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, + blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty, varNameOverride: Option[String] = None )(implicit ct: ClassTag[T]) @@ -52,12 +53,26 @@ class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])( log.info(s"Starting. ${updateAttemptStrategy.description}") - private val _ready = autoUpdater.start( + @volatile private var variable: Option[T] = None + + private val _ready = periodic.scheduleNow( log, + "initialize var", () => updateVar, - updateInterval, - updateAttemptStrategy, - handleInitializationError + newV => { + variable = Some(newV) + periodic.scheduleRecurring( + log, + "update var", + updateInterval.duration(newV), + () => updateVar, + v => variable = Some(v), + v => updateInterval.duration(v), + updateAttemptStrategy + ) + }, + handleInitializationError, + blockUntilReadyTimeout ) /** @return @@ -75,10 +90,10 @@ class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])( * @throws UnreadyAutoUpdatingVarException * if there is not yet a value to return */ - def latest: T = autoUpdater.latest.getOrElse(throw UnreadyAutoUpdatingVarException) + def latest: T = variable.getOrElse(throw UnreadyAutoUpdatingVarException) override def close(): Unit = { - autoUpdater.close() + periodic.close() log.info(s"Shut down sucessfully") } } @@ -88,17 +103,19 @@ object AutoUpdatingVar { /** @see * [[ca.dvgi.periodic.AutoUpdatingVar]] */ - def apply[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])( + def apply[U[_], R[_], T](periodic: Periodic[U, R, T])( updateVar: => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, + blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty, varNameOverride: Option[String] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[U, R, T] = { - new AutoUpdatingVar(autoUpdater)( + new AutoUpdatingVar(periodic)( updateVar, updateInterval, updateAttemptStrategy, + blockUntilReadyTimeout, handleInitializationError, varNameOverride ) @@ -115,17 +132,18 @@ object AutoUpdatingVar { updateVar: => T, updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, + blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, T] = PartialFunction.empty, varNameOverride: Option[String] = None, - blockUntilReadyTimeout: Option[Duration] = None, executorOverride: Option[ScheduledExecutorService] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = { new AutoUpdatingVar( - new IdentityJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride) + new JdkPeriodic[Identity, T](executorOverride) )( updateVar, updateInterval, updateAttemptStrategy, + blockUntilReadyTimeout, handleInitializationError, varNameOverride ) @@ -142,17 +160,18 @@ object AutoUpdatingVar { updateVar: => Future[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, + blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty, varNameOverride: Option[String] = None, - blockUntilReadyTimeout: Option[Duration] = None, executorOverride: Option[ScheduledExecutorService] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = { new AutoUpdatingVar( - new FutureJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride) + new JdkPeriodic[Future, T](executorOverride) )( updateVar, updateInterval, updateAttemptStrategy, + blockUntilReadyTimeout, handleInitializationError, varNameOverride ) diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala index c38b918..3a6e27e 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala @@ -2,13 +2,25 @@ package ca.dvgi.periodic import scala.concurrent.duration.FiniteDuration import org.slf4j.Logger +import scala.concurrent.duration.Duration -trait Periodic[F[_], T] extends AutoCloseable { - def start( +trait Periodic[F[_], R[_], T] extends AutoCloseable { + def scheduleNow( log: Logger, + operationName: String, + fn: () => F[T], + onSuccess: T => Unit, + handleError: PartialFunction[Throwable, F[T]], + blockUntilCompleteTimeout: Option[Duration] = None + ): R[Unit] + + def scheduleRecurring( + log: Logger, + operationName: String, initialDelay: FiniteDuration, fn: () => F[T], - interval: F[T] => F[FiniteDuration], + onSuccess: T => Unit, + interval: T => FiniteDuration, attemptStrategy: UpdateAttemptStrategy ): Unit } diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala new file mode 100644 index 0000000..f5fe817 --- /dev/null +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala @@ -0,0 +1,19 @@ +package ca.dvgi.periodic.jdk + +import scala.concurrent.Future +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +trait Evaluator[F[_]] { + def apply[A](fa: F[A]): A +} + +object Evaluator { + implicit val futureEvaluator: Evaluator[Future] = new Evaluator[Future] { + def apply[A](fa: Future[A]): A = Await.result(fa, Duration.Inf) + } + + implicit val identityEvaluator: Evaluator[Identity] = new Evaluator[Identity] { + def apply[A](fa: Identity[A]): A = fa + } +} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala deleted file mode 100644 index f156e10..0000000 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala +++ /dev/null @@ -1,23 +0,0 @@ -package ca.dvgi.periodic.jdk - -import scala.concurrent.duration.Duration -import java.util.concurrent.ScheduledExecutorService -import scala.concurrent.Future -import scala.concurrent.Await - -class FutureJdkAutoUpdater[T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None -) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) { - override protected def evalUpdate[A](ua: Future[A]): A = Await.result(ua, Duration.Inf) - - override protected def pureUpdate[A](a: A): Future[A] = Future.successful(a) -} - -object FutureJdkAutoUpdater { - def apply[T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None - ): FutureJdkAutoUpdater[T] = - new FutureJdkAutoUpdater(blockUntilReadyTimeout, executorOverride) -} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala deleted file mode 100644 index 8c37f28..0000000 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala +++ /dev/null @@ -1,21 +0,0 @@ -package ca.dvgi.periodic.jdk - -import scala.concurrent.duration.Duration -import java.util.concurrent.ScheduledExecutorService - -class IdentityJdkAutoUpdater[T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None -) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) { - override protected def evalUpdate[A](ua: Identity[A]): A = ua - - override protected def pureUpdate[A](a: A): Identity[A] = a -} - -object IdentityJdkAutoUpdater { - def apply[T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None - ): IdentityJdkAutoUpdater[T] = - new IdentityJdkAutoUpdater(blockUntilReadyTimeout, executorOverride) -} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala deleted file mode 100644 index 85e6cf4..0000000 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala +++ /dev/null @@ -1,114 +0,0 @@ -package ca.dvgi.periodic.jdk - -import ca.dvgi.periodic._ -import scala.concurrent.duration._ -import scala.util.control.NonFatal -import java.util.concurrent.Executors -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.TimeUnit -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.util.Try -import org.slf4j.Logger -import scala.util.Success -import scala.util.Failure -import scala.concurrent.Await - -/** An AutoUpdater based on the JDK's ScheduledExecutorService. - * - * By default, a JdkAutoUpdater starts a new thread to handle its updates. If you are running many - * JdkAutoUpdaters, you may want to consider providing a shared ScheduledExecutorService to them. - * - * @param blockUntilReadyTimeout - * If specified, will block the calling AutoUpdatingVar instantiation until it succeeds, fails, - * or the timeout is reached. - * @param executorOverride - * If present, will be used instead of starting a new thread. - */ -abstract class JdkAutoUpdater[U[T], T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None -) extends AutoUpdater[U, Future, T] { - - protected def evalUpdate[A](ua: U[A]): A - - protected def pureUpdate[A](a: A): U[A] - - private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) - - private val periodic = new JdkPeriodic[U, T]("var update", evalUpdate(_), executor) - - @volatile private var variable: Option[T] = None - - override def start( - log: Logger, - updateVar: () => U[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, - handleInitializationError: PartialFunction[Throwable, U[T]] - ): Future[Unit] = { - val ready = Promise[Unit]() - - executor.schedule( - new Runnable { - def run(): Unit = { - val tryV = - Try(try { - try { - log.info("Attempting initialization...") - evalUpdate(updateVar()) - } catch { - case NonFatal(e) => - log.error("Failed to initialize var", e) - throw e - } - } catch (handleInitializationError.andThen(evalUpdate _))) - - tryV match { - case Success(value) => - variable = Some(value) - ready.complete(Success(())) - log.info("Successfully initialized") - periodic.start( - log, - updateInterval.duration(value), - () => { - val ut = updateVar() - val t = evalUpdate(ut) - variable = Some(t) - ut - }, - ut => { - val t = evalUpdate(ut) - pureUpdate(updateInterval.duration(t)) - }, - updateAttemptStrategy - ) - case Failure(e) => - ready.complete(Failure(e)) - } - } - }, - 0, - TimeUnit.NANOSECONDS - ) - - blockUntilReadyTimeout match { - case Some(timeout) => - Try(Await.result(ready.future, timeout)) match { - case Success(_) => Future.successful(()) - case Failure(exception) => throw exception - } - case None => ready.future - } - } - - override def latest: Option[T] = variable - - override def close(): Unit = { - periodic.close() - if (executorOverride.isEmpty) { - val _ = executor.shutdownNow() - } - } -} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala index 8e270ed..04fc978 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala @@ -7,12 +7,22 @@ import java.util.concurrent.ScheduledFuture import java.util.concurrent.ScheduledExecutorService import scala.util.control.NonFatal import org.slf4j.Logger +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Success +import scala.util.Failure +import java.util.concurrent.TimeUnit +import scala.util.Try +import java.util.concurrent.Executors +import scala.concurrent.duration.Duration +import scala.concurrent.Await class JdkPeriodic[F[_], T]( - operationName: String, - evalF: F[FiniteDuration] => FiniteDuration, - executor: ScheduledExecutorService -) extends Periodic[F, T] { + executorOverride: Option[ScheduledExecutorService] = None +)(implicit evalF: Evaluator[F]) + extends Periodic[F, Future, T] { + + private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) private case object CloseLock @@ -20,20 +30,76 @@ class JdkPeriodic[F[_], T]( @volatile private var nextTask: Option[ScheduledFuture[_]] = None - override def start( + def scheduleNow( + log: Logger, + operationName: String, + fn: () => F[T], + onSuccess: T => Unit, + handleError: PartialFunction[Throwable, F[T]], + blockUntilCompleteTimeout: Option[Duration] = None + ): Future[Unit] = { + val ready = Promise[Unit]() + + executor.schedule( + new Runnable { + def run(): Unit = { + val tryFn = + Try(try { + try { + log.info(s"Attempting to $operationName...") + evalF(fn()) + } catch { + case NonFatal(e) => + log.error(s"Failed to $operationName", e) + throw e + } + } catch { + case NonFatal(t) => + evalF(handleError.applyOrElse(t, (t: Throwable) => throw t)) + }) + + tryFn match { + case Success(value) => + onSuccess(value) + ready.complete(Success(())) + log.info(s"Successfully completed $operationName") + case Failure(e) => + ready.complete(Failure(e)) + } + } + }, + 0, + TimeUnit.NANOSECONDS + ) + + blockUntilCompleteTimeout match { + case Some(timeout) => + Try(Await.result(ready.future, timeout)) match { + case Success(_) => Future.successful(()) + case Failure(exception) => throw exception + } + case None => ready.future + } + } + + override def scheduleRecurring( log: Logger, + operationName: String, initialDelay: FiniteDuration, fn: () => F[T], - interval: F[T] => F[FiniteDuration], + onSuccess: T => Unit, + interval: T => FiniteDuration, attemptStrategy: UpdateAttemptStrategy ): Unit = { - scheduleUpdate(initialDelay)(log, fn, interval, attemptStrategy) + scheduleNext(initialDelay)(log, operationName, fn, onSuccess, interval, attemptStrategy) } - private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit + private def scheduleNext(nextUpdate: FiniteDuration)(implicit log: Logger, + operationName: String, fn: () => F[T], - interval: F[T] => F[FiniteDuration], + onSuccess: T => Unit, + interval: T => FiniteDuration, attemptStrategy: UpdateAttemptStrategy ): Unit = { CloseLock.synchronized { @@ -51,16 +117,19 @@ class JdkPeriodic[F[_], T]( private class FnRunnable(attempt: Int)(implicit log: Logger, + operationName: String, fn: () => F[T], - interval: F[T] => F[FiniteDuration], + onSuccess: T => Unit, + interval: T => FiniteDuration, attemptStrategy: UpdateAttemptStrategy ) extends Runnable { def run(): Unit = { try { log.info(s"Attempting $operationName...") - val result = fn() + val result = evalF(fn()) log.info(s"Successfully executed $operationName") - scheduleUpdate(evalF(interval(result))) + onSuccess(result) + scheduleNext(interval(result)) } catch { case NonFatal(e) => attemptStrategy match { @@ -82,7 +151,8 @@ class JdkPeriodic[F[_], T]( private def reattempt(e: Throwable, delay: FiniteDuration)(implicit log: Logger, fn: () => F[T], - interval: F[T] => F[FiniteDuration], + onSuccess: T => Unit, + interval: T => FiniteDuration, attemptStrategy: UpdateAttemptStrategy ): Unit = { log.warn( @@ -108,7 +178,18 @@ class JdkPeriodic[F[_], T]( CloseLock.synchronized { closed = true nextTask.foreach(_.cancel(true)) + if (executorOverride.isEmpty) { + val _ = executor.shutdownNow() + } } () } } + +object JdkPeriodic { + def apply[F[_], T]( + executorOverride: Option[ScheduledExecutorService] = None + )(implicit evalF: Evaluator[F]): JdkPeriodic[F, T] = { + new JdkPeriodic[F, T](executorOverride) + } +} diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala index 755587e..74cd8bd 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala @@ -6,7 +6,7 @@ import scala.concurrent.Future import scala.util.Success import scala.concurrent.Await -trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { +trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { case object TestException extends RuntimeException @@ -31,10 +31,10 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def evalU[T](ut: U[T]): T - def testAll(autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int])(implicit + def testAll(periodic: () => Periodic[U, Future, Int])(implicit loc: munit.Location ): Unit = { - implicit val au = autoUpdater + implicit val per = periodic testBasicsWithBlocking() testAdjustsUpdateInterval() @@ -55,15 +55,16 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testBasicsWithBlocking( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { val holder = new VarHolder - val v = new AutoUpdatingVar(autoUpdater(Some(1.second)))( + val v = new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) + UpdateAttemptStrategy.Infinite(1.second), + Some(1.second) ) (v, holder) }, @@ -96,16 +97,17 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testAdjustsUpdateInterval( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { val holder = new VarHolder() - val v = new AutoUpdatingVar(autoUpdater(Some(1.second)))( + val v = new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Dynamic((i: Int) => i * 1.second), - UpdateAttemptStrategy.Infinite(1.second) + UpdateAttemptStrategy.Infinite(1.second), + Some(1.second) ) (v, holder) }, @@ -136,12 +138,12 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testReturnsFailedReady( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { - new AutoUpdatingVar(autoUpdater(None))( + new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second) @@ -156,12 +158,12 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testThrowsFromLatest( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { - new AutoUpdatingVar(autoUpdater(None))( + new AutoUpdatingVar(periodic())( pureU { Thread.sleep(1000) 1 @@ -179,17 +181,18 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testThrowsFromConstructor( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { test( "returns a failed future from constructor if the first update fails and instructed to block" ) { intercept[TestException.type] { - new AutoUpdatingVar(autoUpdater(Some(1.second)))( + new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) + UpdateAttemptStrategy.Infinite(1.second), + Some(1.second) ) } } @@ -198,15 +201,16 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testHandlesInititializationErrors( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { - new AutoUpdatingVar(autoUpdater(Some(1.second)))( + new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second), + Some(1.second), { case _ => pureU(42) } @@ -223,17 +227,18 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testInfiniteReattempts( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { FunFixture( _ => { val holder = new VarErrorHolder val v = - new AutoUpdatingVar(autoUpdater(Some(1.second)))( + new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.second), UpdateAttemptStrategy.Infinite(1.second), + Some(1.second), { case _ => pureU(42) } @@ -260,7 +265,7 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { def testFiniteReattempts( )(implicit loc: munit.Location, - autoUpdater: Option[Duration] => AutoUpdater[U, Future, Int] + periodic: () => Periodic[U, Future, Int] ): Unit = { var terminated = false @@ -268,11 +273,12 @@ trait AutoUpdaterTestsFuture[U[_]] extends FunSuite { _ => { val holder = new VarErrorHolder val v = - new AutoUpdatingVar(autoUpdater(Some(1.second)))( + new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.second), UpdateAttemptStrategy .Finite(1.second, 2, UpdateAttemptExhaustionBehavior.Custom(_ => terminated = true)), + Some(1.second), { case _ => pureU(42) } diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdaterTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala similarity index 77% rename from periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdaterTest.scala rename to periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala index ed1bd3d..c06ba6a 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdaterTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala @@ -7,16 +7,16 @@ import java.util.concurrent.ScheduledExecutorService import scala.concurrent.Await import scala.concurrent.Future -class FutureJdkAutoUpdaterTest extends AutoUpdaterTestsFuture[Future] { +class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] { implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global def evalU[T](ut: Future[T]): T = Await.result(ut, Duration.Inf) def pureU(thunk: => Int): Future[Int] = Future(thunk) - def autoUpdaterBuilder() = new FutureJdkAutoUpdater[Int](_, None) + def periodicBuilder() = new JdkPeriodic[Future, Int]() - testAll(autoUpdaterBuilder()) + testAll(periodicBuilder) FunFixture( _ => { @@ -24,11 +24,12 @@ class FutureJdkAutoUpdaterTest extends AutoUpdaterTestsFuture[Future] { val ses = Executors.newScheduledThreadPool(1) val v = new AutoUpdatingVar( - new FutureJdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)) + JdkPeriodic[Future, Int](Some(ses)) )( holder.get, UpdateInterval.Static(2.seconds), - UpdateAttemptStrategy.Infinite(1.second) + UpdateAttemptStrategy.Infinite(1.second), + Some(1.second) ) (v, holder, ses) }, diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdaterTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala similarity index 71% rename from periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdaterTest.scala rename to periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala index f8ea123..dda71d9 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdaterTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala @@ -5,27 +5,28 @@ import scala.concurrent.duration._ import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService -class IdentityJdkAutoUpdaterTest extends AutoUpdaterTestsFuture[Identity] { +class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity] { def evalU[T](ut: Identity[T]): T = ut def pureU(thunk: => Int): Identity[Int] = thunk - def autoUpdaterBuilder() = new IdentityJdkAutoUpdater[Int](_, None) + def periodicBuilder() = new JdkPeriodic[Identity, Int] - testAll(autoUpdaterBuilder()) + testAll(periodicBuilder) FunFixture( _ => { val holder = new VarHolder val ses = Executors.newScheduledThreadPool(1) val v = - new AutoUpdatingVar( - new IdentityJdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)) + AutoUpdatingVar( + JdkPeriodic[Identity, Int](Some(ses)) )( holder.get, UpdateInterval.Static(2.seconds), - UpdateAttemptStrategy.Infinite(1.second) + UpdateAttemptStrategy.Infinite(1.second), + Some(1.second) ) (v, holder, ses) }, From 1b7d6daf622439f2e7a45d17abe5ba6c67698895 Mon Sep 17 00:00:00 2001 From: David van Geest Date: Mon, 30 Oct 2023 21:24:43 -0400 Subject: [PATCH 3/5] Convert Pekko Streams impl. --- .../ca/dvgi/periodic/jdk/JdkPeriodic.scala | 112 +++++++----- .../stream/PekkoStreamsAutoUpdater.scala | 162 ----------------- .../pekko/stream/PekkoStreamsPeriodic.scala | 171 ++++++++++++++++++ ... => PekkoStreamsAutoUpdatingVarTest.scala} | 8 +- 4 files changed, 238 insertions(+), 215 deletions(-) delete mode 100644 periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdater.scala create mode 100644 periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala rename periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/{PekkoStreamsAutoUpdaterTest.scala => PekkoStreamsAutoUpdatingVarTest.scala} (72%) diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala index 04fc978..1ad3586 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala @@ -28,9 +28,11 @@ class JdkPeriodic[F[_], T]( @volatile private var closed = false - @volatile private var nextTask: Option[ScheduledFuture[_]] = None + @volatile private var nowTask: Option[ScheduledFuture[_]] = None - def scheduleNow( + @volatile private var recurringTask: Option[ScheduledFuture[_]] = None + + override def scheduleNow( log: Logger, operationName: String, fn: () => F[T], @@ -40,37 +42,45 @@ class JdkPeriodic[F[_], T]( ): Future[Unit] = { val ready = Promise[Unit]() - executor.schedule( - new Runnable { - def run(): Unit = { - val tryFn = - Try(try { - try { - log.info(s"Attempting to $operationName...") - evalF(fn()) - } catch { - case NonFatal(e) => - log.error(s"Failed to $operationName", e) - throw e + CloseLock.synchronized { + if (!closed) { + nowTask = Some( + executor.schedule( + new Runnable { + def run(): Unit = { + val tryFn = + Try(try { + try { + log.info(s"Attempting to $operationName...") + evalF(fn()) + } catch { + case NonFatal(e) => + log.warn(s"Failed to $operationName", e) + throw e + } + } catch { + case NonFatal(t) => + evalF(handleError.applyOrElse(t, (t: Throwable) => throw t)) + }) + + tryFn match { + case Success(value) => + onSuccess(value) + ready.complete(Success(())) + log.info(s"Successfully completed $operationName") + case Failure(e) => + ready.complete(Failure(e)) + } } - } catch { - case NonFatal(t) => - evalF(handleError.applyOrElse(t, (t: Throwable) => throw t)) - }) - - tryFn match { - case Success(value) => - onSuccess(value) - ready.complete(Success(())) - log.info(s"Successfully completed $operationName") - case Failure(e) => - ready.complete(Failure(e)) - } - } - }, - 0, - TimeUnit.NANOSECONDS - ) + }, + 0, + TimeUnit.NANOSECONDS + ) + ) + } else { + log.warn("Can't scheduleNow because JdkPeriodic is closing") + } + } blockUntilCompleteTimeout match { case Some(timeout) => @@ -94,7 +104,19 @@ class JdkPeriodic[F[_], T]( scheduleNext(initialDelay)(log, operationName, fn, onSuccess, interval, attemptStrategy) } - private def scheduleNext(nextUpdate: FiniteDuration)(implicit + override def close(): Unit = { + CloseLock.synchronized { + closed = true + nowTask.foreach(_.cancel(true)) + recurringTask.foreach(_.cancel(true)) + if (executorOverride.isEmpty) { + val _ = executor.shutdownNow() + } + } + () + } + + private def scheduleNext(delay: FiniteDuration)(implicit log: Logger, operationName: String, fn: () => F[T], @@ -103,14 +125,17 @@ class JdkPeriodic[F[_], T]( attemptStrategy: UpdateAttemptStrategy ): Unit = { CloseLock.synchronized { - if (!closed) - nextTask = Some( + if (!closed) { + log.info(s"Scheduling next $operationName in: $delay") + + recurringTask = Some( executor.schedule( new FnRunnable(1), - nextUpdate.length, - nextUpdate.unit + delay.length, + delay.unit ) ) + } } () } @@ -162,7 +187,7 @@ class JdkPeriodic[F[_], T]( CloseLock.synchronized { if (!closed) - nextTask = Some( + recurringTask = Some( executor.schedule( new FnRunnable(attempt + 1), delay.length, @@ -173,17 +198,6 @@ class JdkPeriodic[F[_], T]( () } } - - override def close(): Unit = { - CloseLock.synchronized { - closed = true - nextTask.foreach(_.cancel(true)) - if (executorOverride.isEmpty) { - val _ = executor.shutdownNow() - } - } - () - } } object JdkPeriodic { diff --git a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdater.scala b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdater.scala deleted file mode 100644 index 2a9f7be..0000000 --- a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdater.scala +++ /dev/null @@ -1,162 +0,0 @@ -package ca.dvgi.periodic.pekko.stream - -import ca.dvgi.periodic._ -import org.slf4j.Logger -import org.apache.pekko.NotUsed -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.KillSwitches -import org.apache.pekko.stream.scaladsl.Source -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.Future -import scala.util.control.NonFatal -import scala.concurrent.ExecutionContext -import scala.util.Success -import scala.util.Failure -import scala.util.Try - -/** An AutoUpdater based on Pekko Streams. - * - * Recommended when Pekko is already in use, since it is completely non-blocking, does not require - * additional resources, and will scale to many AutoUpdatingVars without any tuning. - * - * @param blockUntilReadyTimeout - * If specified, will block the calling AutoUpdatingVar instantiation until it succeeds, fails, - * or the timeout is reached. - * @param actorSystem - * An ActorSystem used to update the var. - */ -class PekkoStreamsAutoUpdater[T](blockUntilReadyTimeout: Option[Duration] = None)(implicit - actorSystem: ActorSystem -) extends AutoUpdater[Future, Future, T] { - import PekkoStreamsAutoUpdater._ - - implicit private val ec: ExecutionContext = actorSystem.dispatcher - - private val killSwitch = KillSwitches.shared("close") - - @volatile private var variable: Option[T] = None - - def start( - log: Logger, - updateVar: () => Future[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, - handleInitializationError: PartialFunction[Throwable, Future[T]] - ): Future[Unit] = { - log.info("Attempting initialization...") - - val ready = updateVar() - .recover { case NonFatal(e) => - log.error(s"Failed to initialize var", e) - throw e - } - .recoverWith(handleInitializationError) - .map { v => - log.info(s"Successfully initialized") - variable = Some(v) - scheduleUpdate(updateInterval.duration(v))( - log, - updateVar, - updateInterval, - updateAttemptStrategy - ) - () - } - - blockUntilReadyTimeout match { - case Some(timeout) => - Try(Await.result(ready, timeout)) match { - case Success(_) => Future.successful(()) - case Failure(exception) => throw exception - } - case None => ready - } - } - - def latest: Option[T] = variable - - def close(): Unit = { - killSwitch.shutdown() - } - - private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit - log: Logger, - updateVar: () => Future[T], - updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy - ): Unit = { - log.info(s"Scheduling update of var in: $nextUpdate") - - val maxUpdateAttempts = updateAttemptStrategy match { - case UpdateAttemptStrategy.Infinite(_) => - -1 // signifies infinite attempts to recoverWithRetries - case s: UpdateAttemptStrategy.Finite => - // maxAttempts required to be > 0 - s.maxAttempts - 1 - } - - val varUpdate = buildVarSource(nextUpdate) - .recoverWithRetries( - attempts = maxUpdateAttempts, - { case e: UpdateException => - log.warn( - s"Unhandled exception when trying to update var, retrying in ${updateAttemptStrategy.attemptInterval}", - e.cause - ) - buildVarSource(updateAttemptStrategy.attemptInterval) - } - ) - - varUpdate - .via(killSwitch.flow) - .runForeach { newVar => - variable = Some(newVar) - scheduleUpdate(updateInterval.duration(newVar)) - } - .failed - .foreach { - case e: UpdateException => - log.error(s"Var update retries exhausted! Final attempt exception", e.cause) - updateAttemptStrategy match { - case s: UpdateAttemptStrategy.Finite => s.attemptExhaustionBehavior.run(log) - case _ => - // should never happen - log.error( - "Somehow exhausted infinite attempts! Something is very wrong. Attempting to exit..." - ) - UpdateAttemptExhaustionBehavior.Terminate().run(log) - } - case e => - log.error(s"Unhandled library exception, attempting to exit...", e) - UpdateAttemptExhaustionBehavior.Terminate().run(log) - } - } - - private def buildVarSource(nextUpdate: FiniteDuration)(implicit - log: Logger, - updateVar: () => Future[T] - ): Source[T, NotUsed] = { - Source - .single(()) - .delay(nextUpdate) - .mapAsync(1) { _ => - updateVar() - .map { v => - log.info(s"Successfully updated") - v - } - .recover { case NonFatal(e) => - throw new UpdateException(e) - } - } - } -} - -object PekkoStreamsAutoUpdater { - def apply[T](blockUntilReadyTimeout: Option[Duration] = None)(implicit - actorSystem: ActorSystem - ): PekkoStreamsAutoUpdater[T] = new PekkoStreamsAutoUpdater(blockUntilReadyTimeout) - - private case class UpdateException(cause: Throwable) extends RuntimeException -} diff --git a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala new file mode 100644 index 0000000..11af3f3 --- /dev/null +++ b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala @@ -0,0 +1,171 @@ +package ca.dvgi.periodic.pekko.stream + +import ca.dvgi.periodic._ +import org.slf4j.Logger +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.KillSwitches +import org.apache.pekko.stream.scaladsl.Source +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.Future +import scala.util.control.NonFatal +import scala.concurrent.ExecutionContext +import scala.util.Success +import scala.util.Failure +import scala.util.Try + +/** A Periodic based on Pekko Streams. + * + * Recommended when Pekko is already in use, since it is completely non-blocking, does not require + * additional resources, and will scale to many usages without any tuning. + * + * @param actorSystem + * An ActorSystem used to execute periodic actions. + */ +class PekkoStreamsPeriodic[T](implicit + actorSystem: ActorSystem +) extends Periodic[Future, Future, T] { + import PekkoStreamsPeriodic._ + + implicit private val ec: ExecutionContext = actorSystem.dispatcher + + private val killSwitch = KillSwitches.shared("close") + + override def scheduleNow( + log: Logger, + operationName: String, + fn: () => Future[T], + onSuccess: T => Unit, + handleError: PartialFunction[Throwable, Future[T]], + blockUntilCompleteTimeout: Option[Duration] = None + ): Future[Unit] = { + log.info(s"Attempting to $operationName...") + + val result = fn() + .recover { case NonFatal(e) => + log.warn(s"Failed to $operationName", e) + throw e + } + .recoverWith { case NonFatal(t) => + handleError.applyOrElse(t, (t: Throwable) => throw t) + } + .map { v => + onSuccess(v) + log.info(s"Successfully completed $operationName") + () + } + + blockUntilCompleteTimeout match { + case Some(timeout) => + Try(Await.result(result, timeout)) match { + case Success(_) => Future.successful(()) + case Failure(exception) => throw exception + } + case None => result + } + } + + override def scheduleRecurring( + log: Logger, + operationName: String, + initialDelay: FiniteDuration, + fn: () => Future[T], + onSuccess: T => Unit, + interval: T => FiniteDuration, + attemptStrategy: UpdateAttemptStrategy + ): Unit = { + scheduleNext(initialDelay)(log, operationName, fn, onSuccess, interval, attemptStrategy) + } + + override def close(): Unit = { + killSwitch.shutdown() + } + + private def scheduleNext(delay: FiniteDuration)(implicit + log: Logger, + operationName: String, + fn: () => Future[T], + onSuccess: T => Unit, + interval: T => FiniteDuration, + attemptStrategy: UpdateAttemptStrategy + ): Unit = { + log.info(s"Scheduling next $operationName in: $delay") + + val maxAttempts = attemptStrategy match { + case UpdateAttemptStrategy.Infinite(_) => + -1 // signifies infinite attempts to recoverWithRetries + case s: UpdateAttemptStrategy.Finite => + // maxAttempts required to be > 0 + s.maxAttempts - 1 + } + + val runFn = buildRunFnSource(delay) + .recoverWithRetries( + attempts = maxAttempts, + { case e: RunFnException => + log.warn( + s"Unhandled exception when trying to $operationName, retrying in ${attemptStrategy.attemptInterval}", + e.cause + ) + buildRunFnSource(attemptStrategy.attemptInterval) + } + ) + + runFn + .via(killSwitch.flow) + .runForeach { result => + log.info(s"Successfully executed $operationName") + onSuccess(result) + scheduleNext(interval(result)) + } + .failed + .foreach { + case e: RunFnException => + log.error( + s"${operationName.capitalize} retries exhausted! Final attempt exception", + e.cause + ) + attemptStrategy match { + case s: UpdateAttemptStrategy.Finite => s.attemptExhaustionBehavior.run(log) + case _ => + // should never happen + log.error( + "Somehow exhausted infinite attempts! Something is very wrong. Attempting to exit..." + ) + UpdateAttemptExhaustionBehavior.Terminate().run(log) + } + case e => + log.error(s"Unhandled library exception, attempting to exit...", e) + UpdateAttemptExhaustionBehavior.Terminate().run(log) + } + } + + private def buildRunFnSource(delay: FiniteDuration)(implicit + log: Logger, + operationName: String, + fn: () => Future[T] + ): Source[T, NotUsed] = { + Source + .single(()) + .delay(delay) + .mapAsync(1) { _ => + log.info(s"Attempting $operationName...") + fn() + .map { v => + v + } + .recover { case NonFatal(e) => + throw new RunFnException(e) + } + } + } +} + +object PekkoStreamsPeriodic { + def apply[T]()(implicit + actorSystem: ActorSystem + ): PekkoStreamsPeriodic[T] = new PekkoStreamsPeriodic[T] + + private case class RunFnException(cause: Throwable) extends RuntimeException +} diff --git a/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdaterTest.scala b/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala similarity index 72% rename from periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdaterTest.scala rename to periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala index fa74b02..0c43c3d 100644 --- a/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdaterTest.scala +++ b/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala @@ -7,7 +7,7 @@ import org.apache.pekko.actor.ActorSystem import scala.concurrent.Await import scala.concurrent.ExecutionContext -class PekkoStreamsAutoUpdaterTest extends AutoUpdaterTestsFuture[Future] { +class PekkoStreamsAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] { implicit var actorSystem: ActorSystem = _ implicit var ec: ExecutionContext = _ @@ -25,8 +25,8 @@ class PekkoStreamsAutoUpdaterTest extends AutoUpdaterTestsFuture[Future] { def pureU(thunk: => Int): Future[Int] = Future(thunk) - def autoUpdaterBuilder(): Option[Duration] => PekkoStreamsAutoUpdater[Int] = - new PekkoStreamsAutoUpdater[Int](_)(actorSystem) + def periodicBuilder(): () => Periodic[Future, Future, Int] = + () => PekkoStreamsPeriodic[Int]() - testAll(autoUpdaterBuilder()) + testAll(periodicBuilder()) } From cab168efe30cb3f3a617aef2d1cc477e2228c185 Mon Sep 17 00:00:00 2001 From: David van Geest Date: Mon, 30 Oct 2023 21:34:22 -0400 Subject: [PATCH 4/5] Cleanup. --- README.md | 4 ++-- ....scala => AttemptExhaustionBehavior.scala} | 10 ++++---- ...ptStrategy.scala => AttemptStrategy.scala} | 11 ++++----- .../ca/dvgi/periodic/AutoUpdatingVar.scala | 16 ++++++++----- .../scala/ca/dvgi/periodic/Periodic.scala | 2 +- .../jdk/{Evaluator.scala => Eval.scala} | 8 +++---- .../ca/dvgi/periodic/jdk/JdkPeriodic.scala | 23 +++++++++---------- .../periodic/AutoUpdaterTestsFuture.scala | 18 +++++++-------- .../jdk/FutureJdkAutoUpdatingVarTest.scala | 2 +- .../jdk/IdentityJdkAutoUpdatingVarTest.scala | 2 +- .../pekko/stream/PekkoStreamsPeriodic.scala | 18 +++++++-------- 11 files changed, 58 insertions(+), 56 deletions(-) rename periodic-core/src/main/scala/ca/dvgi/periodic/{UpdateAttemptExhaustionBehavior.scala => AttemptExhaustionBehavior.scala} (57%) rename periodic-core/src/main/scala/ca/dvgi/periodic/{UpdateAttemptStrategy.scala => AttemptStrategy.scala} (62%) rename periodic-core/src/main/scala/ca/dvgi/periodic/jdk/{Evaluator.scala => Eval.scala} (58%) diff --git a/README.md b/README.md index 0cd875b..050b489 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ val data = AutoUpdatingVar.jdk( // or AutoUpdatingVar.jdkFuture if updateData re // can also be dynamic based on the last data UpdateInterval.Static(1.second), // can also be finite with configurable behavior for attempt exhaustion - UpdateAttemptStrategy.Infinite(5.seconds), + AttemptStrategy.Infinite(5.seconds), ) // `ready` returns a `Future[Unit]` which completes when the initial data initialization is complete @@ -106,7 +106,7 @@ val data = AutoUpdatingVar( )( updateData(), UpdateInterval.Static(1.second), - UpdateAttemptStrategy.Infinite(5.seconds) + AttemptStrategy.Infinite(5.seconds) ) ``` diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptExhaustionBehavior.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AttemptExhaustionBehavior.scala similarity index 57% rename from periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptExhaustionBehavior.scala rename to periodic-core/src/main/scala/ca/dvgi/periodic/AttemptExhaustionBehavior.scala index 32470f5..629a928 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptExhaustionBehavior.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AttemptExhaustionBehavior.scala @@ -2,16 +2,16 @@ package ca.dvgi.periodic import org.slf4j.Logger -sealed trait UpdateAttemptExhaustionBehavior { +sealed trait AttemptExhaustionBehavior { def run: Logger => Unit def description: String } -object UpdateAttemptExhaustionBehavior { - case class Terminate(exitCode: Int = 1) extends UpdateAttemptExhaustionBehavior { +object AttemptExhaustionBehavior { + case class Terminate(exitCode: Int = 1) extends AttemptExhaustionBehavior { def run: Logger => Unit = log => { log.error( - s"Var update attempts exhausted, will now attempt to exit the process with exit code: $exitCode..." + s"Attempts exhausted, will now attempt to exit the process with exit code: $exitCode..." ) sys.exit(exitCode) } @@ -20,7 +20,7 @@ object UpdateAttemptExhaustionBehavior { } case class Custom(run: Logger => Unit, descriptionOverride: Option[String] = None) - extends UpdateAttemptExhaustionBehavior { + extends AttemptExhaustionBehavior { val description: String = descriptionOverride.getOrElse("Run custom logic") } } diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptStrategy.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AttemptStrategy.scala similarity index 62% rename from periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptStrategy.scala rename to periodic-core/src/main/scala/ca/dvgi/periodic/AttemptStrategy.scala index a971423..72e8f7c 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/UpdateAttemptStrategy.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AttemptStrategy.scala @@ -2,22 +2,21 @@ package ca.dvgi.periodic import scala.concurrent.duration.FiniteDuration -sealed trait UpdateAttemptStrategy { +sealed trait AttemptStrategy { def attemptInterval: FiniteDuration def description: String } -object UpdateAttemptStrategy { - case class Infinite(attemptInterval: FiniteDuration) extends UpdateAttemptStrategy { +object AttemptStrategy { + case class Infinite(attemptInterval: FiniteDuration) extends AttemptStrategy { val description = s"Attempt update indefinitely every $attemptInterval" } case class Finite( attemptInterval: FiniteDuration, maxAttempts: Int, - attemptExhaustionBehavior: UpdateAttemptExhaustionBehavior = - UpdateAttemptExhaustionBehavior.Terminate() - ) extends UpdateAttemptStrategy { + attemptExhaustionBehavior: AttemptExhaustionBehavior = AttemptExhaustionBehavior.Terminate() + ) extends AttemptStrategy { require(maxAttempts > 0) val description = diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala index 2fc5328..9bc266d 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala @@ -27,6 +27,10 @@ import java.util.concurrent.ScheduledExecutorService * Configuration for the update interval * @param updateAttemptStrategy * Configuration for retrying updates on failure + * @param blockUntilReadyTimeout + * If specified, will cause the AutoUpdatingVar constructor to block until an initial value is + * computed, or there is a timeout or failure. This means that the effect returned by `ready` + * will always be complete. * @param handleInitializationError * A PartialFunction used to recover from exceptions in the var initialization. If unspecified, * the exception will fail the effect returned by `ready`. @@ -37,7 +41,7 @@ import java.util.concurrent.ScheduledExecutorService class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])( updateVar: => U[T], updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, + updateAttemptStrategy: AttemptStrategy, blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty, varNameOverride: Option[String] = None @@ -106,7 +110,7 @@ object AutoUpdatingVar { def apply[U[_], R[_], T](periodic: Periodic[U, R, T])( updateVar: => U[T], updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, + updateAttemptStrategy: AttemptStrategy, blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty, varNameOverride: Option[String] = None @@ -124,14 +128,14 @@ object AutoUpdatingVar { /** An AutoUpdatingVar based on only the JDK. * * @see - * [[ca.dvgi.periodic.jdk.JdkAutoUpdater]] + * [[ca.dvgi.periodic.jdk.JdkPeriodic]] * @see * [[ca.dvgi.periodic.AutoUpdatingVar]] */ def jdk[T]( updateVar: => T, updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, + updateAttemptStrategy: AttemptStrategy, blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, T] = PartialFunction.empty, varNameOverride: Option[String] = None, @@ -152,14 +156,14 @@ object AutoUpdatingVar { /** An AutoUpdatingVar based on only the JDK, for use when `updateVar` returns a `Future`. * * @see - * [[ca.dvgi.periodic.jdk.JdkAutoUpdater]] + * [[ca.dvgi.periodic.jdk.JdkPeriodic]] * @see * [[ca.dvgi.periodic.AutoUpdatingVar]] */ def jdkFuture[T]( updateVar: => Future[T], updateInterval: UpdateInterval[T], - updateAttemptStrategy: UpdateAttemptStrategy, + updateAttemptStrategy: AttemptStrategy, blockUntilReadyTimeout: Option[Duration] = None, handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty, varNameOverride: Option[String] = None, diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala index 3a6e27e..df0460b 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala @@ -21,6 +21,6 @@ trait Periodic[F[_], R[_], T] extends AutoCloseable { fn: () => F[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit } diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Eval.scala similarity index 58% rename from periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala rename to periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Eval.scala index f5fe817..2529a2a 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Evaluator.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/Eval.scala @@ -4,16 +4,16 @@ import scala.concurrent.Future import scala.concurrent.Await import scala.concurrent.duration.Duration -trait Evaluator[F[_]] { +trait Eval[F[_]] { def apply[A](fa: F[A]): A } -object Evaluator { - implicit val futureEvaluator: Evaluator[Future] = new Evaluator[Future] { +object Eval { + implicit val futureEval: Eval[Future] = new Eval[Future] { def apply[A](fa: Future[A]): A = Await.result(fa, Duration.Inf) } - implicit val identityEvaluator: Evaluator[Identity] = new Evaluator[Identity] { + implicit val identityEval: Eval[Identity] = new Eval[Identity] { def apply[A](fa: Identity[A]): A = fa } } diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala index 1ad3586..45bcfd1 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala @@ -1,7 +1,7 @@ package ca.dvgi.periodic.jdk import scala.concurrent.duration.FiniteDuration -import ca.dvgi.periodic.UpdateAttemptStrategy +import ca.dvgi.periodic.AttemptStrategy import ca.dvgi.periodic.Periodic import java.util.concurrent.ScheduledFuture import java.util.concurrent.ScheduledExecutorService @@ -19,7 +19,7 @@ import scala.concurrent.Await class JdkPeriodic[F[_], T]( executorOverride: Option[ScheduledExecutorService] = None -)(implicit evalF: Evaluator[F]) +)(implicit evalF: Eval[F]) extends Periodic[F, Future, T] { private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) @@ -99,7 +99,7 @@ class JdkPeriodic[F[_], T]( fn: () => F[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit = { scheduleNext(initialDelay)(log, operationName, fn, onSuccess, interval, attemptStrategy) } @@ -122,7 +122,7 @@ class JdkPeriodic[F[_], T]( fn: () => F[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit = { CloseLock.synchronized { if (!closed) { @@ -146,7 +146,7 @@ class JdkPeriodic[F[_], T]( fn: () => F[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ) extends Runnable { def run(): Unit = { try { @@ -158,12 +158,11 @@ class JdkPeriodic[F[_], T]( } catch { case NonFatal(e) => attemptStrategy match { - case UpdateAttemptStrategy.Infinite(attemptInterval) => + case AttemptStrategy.Infinite(attemptInterval) => reattempt(e, attemptInterval) - case UpdateAttemptStrategy.Finite(attemptInterval, maxAttempts, _) - if attempt < maxAttempts => + case AttemptStrategy.Finite(attemptInterval, maxAttempts, _) if attempt < maxAttempts => reattempt(e, attemptInterval) - case UpdateAttemptStrategy.Finite(_, _, attemptExhaustionBehavior) => + case AttemptStrategy.Finite(_, _, attemptExhaustionBehavior) => log.error( s"${operationName.capitalize} attempts exhausted! Final attempt exception", e @@ -178,10 +177,10 @@ class JdkPeriodic[F[_], T]( fn: () => F[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit = { log.warn( - s"Unhandled exception when trying to $operationName, retrying in $delay", + s"Unhandled exception during $operationName, retrying in $delay", e ) @@ -203,7 +202,7 @@ class JdkPeriodic[F[_], T]( object JdkPeriodic { def apply[F[_], T]( executorOverride: Option[ScheduledExecutorService] = None - )(implicit evalF: Evaluator[F]): JdkPeriodic[F, T] = { + )(implicit evalF: Eval[F]): JdkPeriodic[F, T] = { new JdkPeriodic[F, T](executorOverride) } } diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala index 74cd8bd..a0984c3 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala @@ -63,7 +63,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { val v = new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second) ) (v, holder) @@ -106,7 +106,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { val v = new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Dynamic((i: Int) => i * 1.second), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second) ) (v, holder) @@ -146,7 +146,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) + AttemptStrategy.Infinite(1.second) ) }, (f: AutoCloseable) => f.close() @@ -169,7 +169,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { 1 }, UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) + AttemptStrategy.Infinite(1.second) ) }, (f: AutoCloseable) => f.close() @@ -191,7 +191,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second) ) } @@ -209,7 +209,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { new AutoUpdatingVar(periodic())( pureU(throw TestException), UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second), { case _ => pureU(42) @@ -237,7 +237,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.second), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second), { case _ => pureU(42) @@ -276,8 +276,8 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { new AutoUpdatingVar(periodic())( holder.get, UpdateInterval.Static(1.second), - UpdateAttemptStrategy - .Finite(1.second, 2, UpdateAttemptExhaustionBehavior.Custom(_ => terminated = true)), + AttemptStrategy + .Finite(1.second, 2, AttemptExhaustionBehavior.Custom(_ => terminated = true)), Some(1.second), { case _ => pureU(42) diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala index c06ba6a..f3a68e3 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala @@ -28,7 +28,7 @@ class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] { )( holder.get, UpdateInterval.Static(2.seconds), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second) ) (v, holder, ses) diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala index dda71d9..aed35b6 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala @@ -25,7 +25,7 @@ class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity )( holder.get, UpdateInterval.Static(2.seconds), - UpdateAttemptStrategy.Infinite(1.second), + AttemptStrategy.Infinite(1.second), Some(1.second) ) (v, holder, ses) diff --git a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala index 11af3f3..71f4e04 100644 --- a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala +++ b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala @@ -73,7 +73,7 @@ class PekkoStreamsPeriodic[T](implicit fn: () => Future[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit = { scheduleNext(initialDelay)(log, operationName, fn, onSuccess, interval, attemptStrategy) } @@ -88,14 +88,14 @@ class PekkoStreamsPeriodic[T](implicit fn: () => Future[T], onSuccess: T => Unit, interval: T => FiniteDuration, - attemptStrategy: UpdateAttemptStrategy + attemptStrategy: AttemptStrategy ): Unit = { log.info(s"Scheduling next $operationName in: $delay") val maxAttempts = attemptStrategy match { - case UpdateAttemptStrategy.Infinite(_) => + case AttemptStrategy.Infinite(_) => -1 // signifies infinite attempts to recoverWithRetries - case s: UpdateAttemptStrategy.Finite => + case s: AttemptStrategy.Finite => // maxAttempts required to be > 0 s.maxAttempts - 1 } @@ -105,7 +105,7 @@ class PekkoStreamsPeriodic[T](implicit attempts = maxAttempts, { case e: RunFnException => log.warn( - s"Unhandled exception when trying to $operationName, retrying in ${attemptStrategy.attemptInterval}", + s"Unhandled exception during $operationName, retrying in ${attemptStrategy.attemptInterval}", e.cause ) buildRunFnSource(attemptStrategy.attemptInterval) @@ -127,17 +127,17 @@ class PekkoStreamsPeriodic[T](implicit e.cause ) attemptStrategy match { - case s: UpdateAttemptStrategy.Finite => s.attemptExhaustionBehavior.run(log) - case _ => + case s: AttemptStrategy.Finite => s.attemptExhaustionBehavior.run(log) + case _ => // should never happen log.error( "Somehow exhausted infinite attempts! Something is very wrong. Attempting to exit..." ) - UpdateAttemptExhaustionBehavior.Terminate().run(log) + AttemptExhaustionBehavior.Terminate().run(log) } case e => log.error(s"Unhandled library exception, attempting to exit...", e) - UpdateAttemptExhaustionBehavior.Terminate().run(log) + AttemptExhaustionBehavior.Terminate().run(log) } } From 86074e494191996a9bb334cc11154f26b9947015 Mon Sep 17 00:00:00 2001 From: David van Geest Date: Mon, 30 Oct 2023 21:58:28 -0400 Subject: [PATCH 5/5] Docs. --- README.md | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 050b489..eb2e726 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,23 @@ # Periodic [![Maven](https://img.shields.io/maven-central/v/ca.dvgi/periodic-core_2.13?color=blue)](https://search.maven.org/search?q=g:ca.dvgi%20periodic) [![CI](https://img.shields.io/github/actions/workflow/status/dvgica/periodic/ci.yml?branch=main)](https://github.com/dvgica/periodic/actions) -Periodic is a low-dependency Scala library providing an in-memory cached variable (`AutoUpdatingVar`) that self-updates on a periodic basis. +Periodic is a low-dependency Scala library providing: + +- an in-memory cached variable (`AutoUpdatingVar`) that self-updates on a periodic basis + +It attempts to provide an effect- and runtime-agnostic API which can abstract various implementations as needed. - [Motivation](#motivation) - [Installation](#installation) -- [Usage Example](#usage-example) +- [Usage](#usage-example) - [Contributing](#contributing) ## Motivation -This library is useful for caching semi-static data in memory and having that data be automatically and periodically updated. The source of the data is typically another process that can be queried to get a new data value. If the cached data becomes stale at a predictable interval, the cached data can be updated before this occurs. If the cached data becomes stale at unpredictable times, the stale data must still be usable. Concrete use cases include: +It is fairly common to need to do something periodically, while a process is running. + +### `AutoUpdatingVar` +`AutoUpdatingVar` is useful for caching semi-static data in memory and having that data be automatically and periodically updated. The source of the data is typically another process that can be queried to get a new data value. If the cached data becomes stale at a predictable interval, the cached data can be updated before this occurs. If the cached data becomes stale at unpredictable times, the stale data must still be usable. Concrete use cases include: - caching a time-limited key or token, and replacing it with a new one before it expires (e.g. an OAuth access token) - caching data that changes irregularly and occasionally, such as a list of a country's airports and their codes @@ -37,9 +44,27 @@ For the **Pekko Streams-based implementation**, use this dependency: - `periodic-core` depends only on `slf4j-api` - `periodic-pekko-stream` depends on `pekko-stream` and `periodic-core` -## Usage Example +## Usage + +### Periodic + +All library functionality is based on implementations of `Periodic`. Therefore all classes require an instance of `Periodic` in their constructor. + +#### JDK Implementation + +`JdkPeriodic` is the default implementation provided in `periodic-core`. It is suitable for most usages, although users with many `AutoUpdatingVar`s or `Runner`s may wish to provide a shared `ScheduledExecutorService` to them, to avoid starting many threads. The number of threads in this shared `ScheduledExecutorService` will need to be tuned based on workload. Threads in the `ScheduledExecutorService` will be blocked. -Using the default JDK-based implementation: +The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) update code. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`. + +#### Pekko Streams Implementation + +The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s and `Runner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency. + +The Pekko Streams implementation only works with `scala.concurrent.Future`. + +### `AutoUpdatingVar` + +#### Default JDK-based Implementation ``` scala import ca.dvgi.periodic._ @@ -82,16 +107,11 @@ New cached data is 2023-10-19T02:35:23.474155Z For handling errors during update, and other options, see the Scaladocs. -### Alternate Implementations - -#### Pekko Streams - -The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s without requiring tuning. -It is recommended if you are already using Pekko. +#### Pekko Streams Implementation ``` scala import org.apache.pekko.actor.ActorSystem -import ca.dvgi.periodic.pekko.stream.PekkoStreamsAutoUpdater +import ca.dvgi.periodic.pekko.stream.PekkoStreamsPeriodic import ca.dvgi.periodic._ import scala.concurrent.duration._ import scala.concurrent.Future @@ -102,7 +122,7 @@ def updateData(): Future[String] = Future.successful(Instant.now.toString) implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already val data = AutoUpdatingVar( - PekkoStreamsAutoUpdater[String]() // T must be explicitly provided, it can't be inferred + PekkoStreamsPeriodic[String]() // T must be explicitly provided, it can't be inferred )( updateData(), UpdateInterval.Static(1.second),