diff --git a/README.md b/README.md index 560654d..6ecc078 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ import java.time.Instant def updateData(): String = Instant.now.toString -val data = AutoUpdatingVar.jdk( +val data = AutoUpdatingVar.jdk( // or `AutoUpdatingVar.jdkFuture` if `updateData` returns a `scala.concurrent.Future` updateData(), // can also be dynamic based on the last data UpdateInterval.Static(1.second), diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala index e889511..8277e85 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala @@ -2,8 +2,7 @@ package ca.dvgi.periodic import scala.reflect.ClassTag import org.slf4j.LoggerFactory -import ca.dvgi.periodic.jdk.Identity -import ca.dvgi.periodic.jdk.JdkAutoUpdater +import ca.dvgi.periodic.jdk._ import scala.concurrent.duration.Duration import scala.concurrent.Future import java.util.concurrent.ScheduledExecutorService @@ -122,7 +121,34 @@ object AutoUpdatingVar { executorOverride: Option[ScheduledExecutorService] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = { new AutoUpdatingVar( - new JdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride) + new IdentityJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride) + )( + updateVar, + updateInterval, + updateAttemptStrategy, + handleInitializationError, + varNameOverride + ) + } + + /** An AutoUpdatingVar based on only the JDK, for use when `updateVar` returns a `Future`. + * + * @see + * [[ca.dvgi.periodic.jdk.JdkAutoUpdater]] + * @see + * [[ca.dvgi.periodic.AutoUpdatingVar]] + */ + def jdkFuture[T]( + updateVar: => Future[T], + updateInterval: UpdateInterval[T], + updateAttemptStrategy: UpdateAttemptStrategy, + handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty, + varNameOverride: Option[String] = None, + blockUntilReadyTimeout: Option[Duration] = None, + executorOverride: Option[ScheduledExecutorService] = None + )(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = { + new AutoUpdatingVar( + new FutureJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride) )( updateVar, updateInterval, diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala new file mode 100644 index 0000000..813c80d --- /dev/null +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdater.scala @@ -0,0 +1,13 @@ +package ca.dvgi.periodic.jdk + +import scala.concurrent.duration.Duration +import java.util.concurrent.ScheduledExecutorService +import scala.concurrent.Future +import scala.concurrent.Await + +class FutureJdkAutoUpdater[T]( + blockUntilReadyTimeout: Option[Duration] = None, + executorOverride: Option[ScheduledExecutorService] = None +) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) { + override protected def evalUpdate(ut: Future[T]): T = Await.result(ut, Duration.Inf) +} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala new file mode 100644 index 0000000..604218c --- /dev/null +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdater.scala @@ -0,0 +1,11 @@ +package ca.dvgi.periodic.jdk + +import scala.concurrent.duration.Duration +import java.util.concurrent.ScheduledExecutorService + +class IdentityJdkAutoUpdater[T]( + blockUntilReadyTimeout: Option[Duration] = None, + executorOverride: Option[ScheduledExecutorService] = None +) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) { + override protected def evalUpdate(ut: Identity[T]): T = ut +} diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala index 9149021..09ae852 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala @@ -26,10 +26,10 @@ import java.util.concurrent.ScheduledFuture * @param executorOverride * If present, will be used instead of starting a new thread. */ -class JdkAutoUpdater[T]( - blockUntilReadyTimeout: Option[Duration] = None, - executorOverride: Option[ScheduledExecutorService] = None -) extends AutoUpdater[Identity, Future, T] { +abstract class JdkAutoUpdater[U[T], T]( + blockUntilReadyTimeout: Option[Duration], + executorOverride: Option[ScheduledExecutorService] +) extends AutoUpdater[U, Future, T] { private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) @@ -45,10 +45,10 @@ class JdkAutoUpdater[T]( override def start( log: Logger, - updateVar: () => T, + updateVar: () => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, - handleInitializationError: PartialFunction[Throwable, T] + handleInitializationError: PartialFunction[Throwable, U[T]] ): Future[Unit] = { executor.schedule( new Runnable { @@ -57,13 +57,13 @@ class JdkAutoUpdater[T]( Try(try { try { log.info("Attempting initialization...") - updateVar() + evalUpdate(updateVar()) } catch { case NonFatal(e) => log.error("Failed to initialize var", e) throw e } - } catch (handleInitializationError)) + } catch (handleInitializationError.andThen(evalUpdate _))) tryV match { case Success(value) => @@ -108,9 +108,11 @@ class JdkAutoUpdater[T]( () } + protected def evalUpdate(ut: U[T]): T + private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit log: Logger, - updateVar: () => T, + updateVar: () => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy ): Unit = { @@ -131,14 +133,14 @@ class JdkAutoUpdater[T]( private class UpdateVar(attempt: Int)(implicit log: Logger, - updateVar: () => T, + updateVar: () => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy ) extends Runnable { def run(): Unit = { log.info("Attempting var update...") try { - val newV = updateVar() + val newV = evalUpdate(updateVar()) variable = Some(newV) log.info("Successfully updated") scheduleUpdate(updateInterval.duration(newV)) @@ -159,7 +161,7 @@ class JdkAutoUpdater[T]( private def reattempt(e: Throwable, delay: FiniteDuration)(implicit log: Logger, - updateVar: () => T, + updateVar: () => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy ): Unit = { diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala index 8eaf51f..db52814 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala @@ -31,7 +31,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { FunFixture( _ => { val holder = new VarHolder - val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(5.seconds)))( + val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(5.seconds)))( holder.get, UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second) @@ -66,7 +66,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { FunFixture( _ => { val holder = new VarHolder - val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))( holder.get, UpdateInterval.Dynamic((i: Int) => i * 1.second), UpdateAttemptStrategy.Infinite(1.second) @@ -98,7 +98,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { FunFixture( _ => { - new AutoUpdatingVar(new JdkAutoUpdater[Int]())( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())( throw TestException, UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second) @@ -111,7 +111,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { FunFixture( _ => { - new AutoUpdatingVar(new JdkAutoUpdater[Int]())( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())( { Thread.sleep(1000) 1 @@ -129,7 +129,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { "returns a failed future from constructor if the first update fails and instructed to block" ) { intercept[TestException.type] { - new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))( throw TestException, UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second) @@ -139,7 +139,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { FunFixture( _ => { - new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))( throw TestException, UpdateInterval.Static(1.seconds), UpdateAttemptStrategy.Infinite(1.second), @@ -159,7 +159,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { _ => { val holder = new VarErrorHolder val v = - new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))( holder.get, UpdateInterval.Static(1.second), UpdateAttemptStrategy.Infinite(1.second), @@ -190,7 +190,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite { _ => { val holder = new VarErrorHolder val v = - new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))( holder.get, UpdateInterval.Static(1.second), UpdateAttemptStrategy @@ -229,7 +229,9 @@ class JdkAutoUpdaterTest extends munit.FunSuite { val holder = new VarHolder val ses = Executors.newScheduledThreadPool(1) val v = - new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)))( + new AutoUpdatingVar( + new IdentityJdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)) + )( holder.get, UpdateInterval.Static(2.seconds), UpdateAttemptStrategy.Infinite(1.second)