Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Future support to JDK implementation #9

Merged
merged 3 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.time.Instant

def updateData(): String = Instant.now.toString

val data = AutoUpdatingVar.jdk(
val data = AutoUpdatingVar.jdk( // or `AutoUpdatingVar.jdkFuture` if `updateData` returns a `scala.concurrent.Future`
updateData(),
// can also be dynamic based on the last data
UpdateInterval.Static(1.second),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package ca.dvgi.periodic

import scala.reflect.ClassTag
import org.slf4j.LoggerFactory
import ca.dvgi.periodic.jdk.Identity
import ca.dvgi.periodic.jdk.JdkAutoUpdater
import ca.dvgi.periodic.jdk._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
Expand Down Expand Up @@ -122,7 +121,34 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = {
new AutoUpdatingVar(
new JdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
new IdentityJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
)(
updateVar,
updateInterval,
updateAttemptStrategy,
handleInitializationError,
varNameOverride
)
}

/** An AutoUpdatingVar based on only the JDK, for use when `updateVar` returns a `Future`.
*
* @see
* [[ca.dvgi.periodic.jdk.JdkAutoUpdater]]
* @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def jdkFuture[T](
updateVar: => Future[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty,
varNameOverride: Option[String] = None,
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = {
new AutoUpdatingVar(
new FutureJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
)(
updateVar,
updateInterval,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ca.dvgi.periodic.jdk

import scala.concurrent.duration.Duration
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.Future
import scala.concurrent.Await

class FutureJdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Future[T]): T = Await.result(ut, Duration.Inf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ca.dvgi.periodic.jdk

import scala.concurrent.duration.Duration
import java.util.concurrent.ScheduledExecutorService

class IdentityJdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Identity[T]): T = ut
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import java.util.concurrent.ScheduledFuture
* @param executorOverride
* If present, will be used instead of starting a new thread.
*/
class JdkAutoUpdater[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends AutoUpdater[Identity, Future, T] {
abstract class JdkAutoUpdater[U[T], T](
blockUntilReadyTimeout: Option[Duration],
executorOverride: Option[ScheduledExecutorService]
) extends AutoUpdater[U, Future, T] {

private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1))

Expand All @@ -45,10 +45,10 @@ class JdkAutoUpdater[T](

override def start(
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, T]
handleInitializationError: PartialFunction[Throwable, U[T]]
): Future[Unit] = {
executor.schedule(
new Runnable {
Expand All @@ -57,13 +57,13 @@ class JdkAutoUpdater[T](
Try(try {
try {
log.info("Attempting initialization...")
updateVar()
evalUpdate(updateVar())
} catch {
case NonFatal(e) =>
log.error("Failed to initialize var", e)
throw e
}
} catch (handleInitializationError))
} catch (handleInitializationError.andThen(evalUpdate _)))

tryV match {
case Success(value) =>
Expand Down Expand Up @@ -108,9 +108,11 @@ class JdkAutoUpdater[T](
()
}

protected def evalUpdate(ut: U[T]): T

private def scheduleUpdate(nextUpdate: FiniteDuration)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
): Unit = {
Expand All @@ -131,14 +133,14 @@ class JdkAutoUpdater[T](

private class UpdateVar(attempt: Int)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
) extends Runnable {
def run(): Unit = {
log.info("Attempting var update...")
try {
val newV = updateVar()
val newV = evalUpdate(updateVar())
variable = Some(newV)
log.info("Successfully updated")
scheduleUpdate(updateInterval.duration(newV))
Expand All @@ -159,7 +161,7 @@ class JdkAutoUpdater[T](

private def reattempt(e: Throwable, delay: FiniteDuration)(implicit
log: Logger,
updateVar: () => T,
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy
): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(5.seconds)))(
val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(5.seconds)))(
holder.get,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down Expand Up @@ -66,7 +66,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
val v = new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Dynamic((i: Int) => i * 1.second),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down Expand Up @@ -98,7 +98,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand All @@ -111,7 +111,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int]())(
{
Thread.sleep(1000)
1
Expand All @@ -129,7 +129,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
"returns a failed future from constructor if the first update fails and instructed to block"
) {
intercept[TestException.type] {
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand All @@ -139,7 +139,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second),
Expand All @@ -159,7 +159,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
_ => {
val holder = new VarErrorHolder
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Static(1.second),
UpdateAttemptStrategy.Infinite(1.second),
Expand Down Expand Up @@ -190,7 +190,7 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
_ => {
val holder = new VarErrorHolder
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
new AutoUpdatingVar(new IdentityJdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Static(1.second),
UpdateAttemptStrategy
Expand Down Expand Up @@ -229,7 +229,9 @@ class JdkAutoUpdaterTest extends munit.FunSuite {
val holder = new VarHolder
val ses = Executors.newScheduledThreadPool(1)
val v =
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses)))(
new AutoUpdatingVar(
new IdentityJdkAutoUpdater[Int](Some(1.second), executorOverride = Some(ses))
)(
holder.get,
UpdateInterval.Static(2.seconds),
UpdateAttemptStrategy.Infinite(1.second)
Expand Down