Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework in preparation for Runner #15

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# Periodic
[![Maven](https://img.shields.io/maven-central/v/ca.dvgi/periodic-core_2.13?color=blue)](https://search.maven.org/search?q=g:ca.dvgi%20periodic) [![CI](https://img.shields.io/github/actions/workflow/status/dvgica/periodic/ci.yml?branch=main)](https://github.com/dvgica/periodic/actions)

Periodic is a low-dependency Scala library providing an in-memory cached variable (`AutoUpdatingVar`) that self-updates on a periodic basis.
Periodic is a low-dependency Scala library providing:

- an in-memory cached variable (`AutoUpdatingVar`) that self-updates on a periodic basis

It attempts to provide an effect- and runtime-agnostic API which can abstract various implementations as needed.

- [Motivation](#motivation)
- [Installation](#installation)
- [Usage Example](#usage-example)
- [Usage](#usage-example)
- [Contributing](#contributing)

## Motivation

This library is useful for caching semi-static data in memory and having that data be automatically and periodically updated. The source of the data is typically another process that can be queried to get a new data value. If the cached data becomes stale at a predictable interval, the cached data can be updated before this occurs. If the cached data becomes stale at unpredictable times, the stale data must still be usable. Concrete use cases include:
It is fairly common to need to do something periodically, while a process is running.

### `AutoUpdatingVar`
`AutoUpdatingVar` is useful for caching semi-static data in memory and having that data be automatically and periodically updated. The source of the data is typically another process that can be queried to get a new data value. If the cached data becomes stale at a predictable interval, the cached data can be updated before this occurs. If the cached data becomes stale at unpredictable times, the stale data must still be usable. Concrete use cases include:

- caching a time-limited key or token, and replacing it with a new one before it expires (e.g. an OAuth access token)
- caching data that changes irregularly and occasionally, such as a list of a country's airports and their codes
Expand All @@ -37,9 +44,27 @@ For the **Pekko Streams-based implementation**, use this dependency:
- `periodic-core` depends only on `slf4j-api`
- `periodic-pekko-stream` depends on `pekko-stream` and `periodic-core`

## Usage Example
## Usage

### Periodic

All library functionality is based on implementations of `Periodic`. Therefore all classes require an instance of `Periodic` in their constructor.

#### JDK Implementation

`JdkPeriodic` is the default implementation provided in `periodic-core`. It is suitable for most usages, although users with many `AutoUpdatingVar`s or `Runner`s may wish to provide a shared `ScheduledExecutorService` to them, to avoid starting many threads. The number of threads in this shared `ScheduledExecutorService` will need to be tuned based on workload. Threads in the `ScheduledExecutorService` will be blocked.

Using the default JDK-based implementation:
The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) update code. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`.

#### Pekko Streams Implementation

The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s and `Runner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency.

The Pekko Streams implementation only works with `scala.concurrent.Future`.

### `AutoUpdatingVar`

#### Default JDK-based Implementation

``` scala
import ca.dvgi.periodic._
Expand All @@ -54,7 +79,7 @@ val data = AutoUpdatingVar.jdk( // or AutoUpdatingVar.jdkFuture if updateData re
// can also be dynamic based on the last data
UpdateInterval.Static(1.second),
// can also be finite with configurable behavior for attempt exhaustion
UpdateAttemptStrategy.Infinite(5.seconds),
AttemptStrategy.Infinite(5.seconds),
)

// `ready` returns a `Future[Unit]` which completes when the initial data initialization is complete
Expand Down Expand Up @@ -82,16 +107,11 @@ New cached data is 2023-10-19T02:35:23.474155Z

For handling errors during update, and other options, see the Scaladocs.

### Alternate Implementations

#### Pekko Streams

The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s without requiring tuning.
It is recommended if you are already using Pekko.
#### Pekko Streams Implementation

``` scala
import org.apache.pekko.actor.ActorSystem
import ca.dvgi.periodic.pekko.stream.PekkoStreamsAutoUpdater
import ca.dvgi.periodic.pekko.stream.PekkoStreamsPeriodic
import ca.dvgi.periodic._
import scala.concurrent.duration._
import scala.concurrent.Future
Expand All @@ -102,11 +122,11 @@ def updateData(): Future[String] = Future.successful(Instant.now.toString)
implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already

val data = AutoUpdatingVar(
PekkoStreamsAutoUpdater[String]() // T must be explicitly provided, it can't be inferred
PekkoStreamsPeriodic[String]() // T must be explicitly provided, it can't be inferred
)(
updateData(),
UpdateInterval.Static(1.second),
UpdateAttemptStrategy.Infinite(5.seconds)
AttemptStrategy.Infinite(5.seconds)
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package ca.dvgi.periodic

import org.slf4j.Logger

sealed trait UpdateAttemptExhaustionBehavior {
sealed trait AttemptExhaustionBehavior {
def run: Logger => Unit
def description: String
}

object UpdateAttemptExhaustionBehavior {
case class Terminate(exitCode: Int = 1) extends UpdateAttemptExhaustionBehavior {
object AttemptExhaustionBehavior {
case class Terminate(exitCode: Int = 1) extends AttemptExhaustionBehavior {
def run: Logger => Unit = log => {
log.error(
s"Var update attempts exhausted, will now attempt to exit the process with exit code: $exitCode..."
s"Attempts exhausted, will now attempt to exit the process with exit code: $exitCode..."
)
sys.exit(exitCode)
}
Expand All @@ -20,7 +20,7 @@ object UpdateAttemptExhaustionBehavior {
}

case class Custom(run: Logger => Unit, descriptionOverride: Option[String] = None)
extends UpdateAttemptExhaustionBehavior {
extends AttemptExhaustionBehavior {
val description: String = descriptionOverride.getOrElse("Run custom logic")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package ca.dvgi.periodic

import scala.concurrent.duration.FiniteDuration

sealed trait UpdateAttemptStrategy {
sealed trait AttemptStrategy {
def attemptInterval: FiniteDuration
def description: String
}

object UpdateAttemptStrategy {
case class Infinite(attemptInterval: FiniteDuration) extends UpdateAttemptStrategy {
object AttemptStrategy {
case class Infinite(attemptInterval: FiniteDuration) extends AttemptStrategy {
val description = s"Attempt update indefinitely every $attemptInterval"
}

case class Finite(
attemptInterval: FiniteDuration,
maxAttempts: Int,
attemptExhaustionBehavior: UpdateAttemptExhaustionBehavior =
UpdateAttemptExhaustionBehavior.Terminate()
) extends UpdateAttemptStrategy {
attemptExhaustionBehavior: AttemptExhaustionBehavior = AttemptExhaustionBehavior.Terminate()
) extends AttemptStrategy {
require(maxAttempts > 0)

val description =
Expand Down
34 changes: 0 additions & 34 deletions periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala

This file was deleted.

63 changes: 43 additions & 20 deletions periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ca.dvgi.periodic

import scala.reflect.ClassTag
import org.slf4j.LoggerFactory
import ca.dvgi.periodic.jdk._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import ca.dvgi.periodic.jdk._
import java.util.concurrent.ScheduledExecutorService

/** A variable that updates itself. `latest` can be called from multiple threads, which are all
Expand All @@ -27,17 +27,22 @@ import java.util.concurrent.ScheduledExecutorService
* Configuration for the update interval
* @param updateAttemptStrategy
* Configuration for retrying updates on failure
* @param blockUntilReadyTimeout
* If specified, will cause the AutoUpdatingVar constructor to block until an initial value is
* computed, or there is a timeout or failure. This means that the effect returned by `ready`
* will always be complete.
* @param handleInitializationError
* A PartialFunction used to recover from exceptions in the var initialization. If unspecified,
* the exception will fail the effect returned by `ready`.
* @param varNameOverride
* A name for this variable, used in logging. If unspecified, the simple class name of T will be
* used.
*/
class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])(
class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
updateAttemptStrategy: AttemptStrategy,
blockUntilReadyTimeout: Option[Duration] = None,
handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty,
varNameOverride: Option[String] = None
)(implicit ct: ClassTag[T])
Expand All @@ -52,12 +57,26 @@ class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])(

log.info(s"Starting. ${updateAttemptStrategy.description}")

private val _ready = autoUpdater.start(
@volatile private var variable: Option[T] = None

private val _ready = periodic.scheduleNow(
log,
"initialize var",
() => updateVar,
updateInterval,
updateAttemptStrategy,
handleInitializationError
newV => {
variable = Some(newV)
periodic.scheduleRecurring(
log,
"update var",
updateInterval.duration(newV),
() => updateVar,
v => variable = Some(v),
v => updateInterval.duration(v),
updateAttemptStrategy
)
},
handleInitializationError,
blockUntilReadyTimeout
)

/** @return
Expand All @@ -75,10 +94,10 @@ class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])(
* @throws UnreadyAutoUpdatingVarException
* if there is not yet a value to return
*/
def latest: T = autoUpdater.latest.getOrElse(throw UnreadyAutoUpdatingVarException)
def latest: T = variable.getOrElse(throw UnreadyAutoUpdatingVarException)

override def close(): Unit = {
autoUpdater.close()
periodic.close()
log.info(s"Shut down sucessfully")
}
}
Expand All @@ -88,17 +107,19 @@ object AutoUpdatingVar {
/** @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def apply[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])(
def apply[U[_], R[_], T](periodic: Periodic[U, R, T])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
updateAttemptStrategy: AttemptStrategy,
blockUntilReadyTimeout: Option[Duration] = None,
handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty,
varNameOverride: Option[String] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[U, R, T] = {
new AutoUpdatingVar(autoUpdater)(
new AutoUpdatingVar(periodic)(
updateVar,
updateInterval,
updateAttemptStrategy,
blockUntilReadyTimeout,
handleInitializationError,
varNameOverride
)
Expand All @@ -107,25 +128,26 @@ object AutoUpdatingVar {
/** An AutoUpdatingVar based on only the JDK.
*
* @see
* [[ca.dvgi.periodic.jdk.JdkAutoUpdater]]
* [[ca.dvgi.periodic.jdk.JdkPeriodic]]
* @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def jdk[T](
updateVar: => T,
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
updateAttemptStrategy: AttemptStrategy,
blockUntilReadyTimeout: Option[Duration] = None,
handleInitializationError: PartialFunction[Throwable, T] = PartialFunction.empty,
varNameOverride: Option[String] = None,
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = {
new AutoUpdatingVar(
new IdentityJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
new JdkPeriodic[Identity, T](executorOverride)
)(
updateVar,
updateInterval,
updateAttemptStrategy,
blockUntilReadyTimeout,
handleInitializationError,
varNameOverride
)
Expand All @@ -134,25 +156,26 @@ object AutoUpdatingVar {
/** An AutoUpdatingVar based on only the JDK, for use when `updateVar` returns a `Future`.
*
* @see
* [[ca.dvgi.periodic.jdk.JdkAutoUpdater]]
* [[ca.dvgi.periodic.jdk.JdkPeriodic]]
* @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def jdkFuture[T](
updateVar: => Future[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
updateAttemptStrategy: AttemptStrategy,
blockUntilReadyTimeout: Option[Duration] = None,
handleInitializationError: PartialFunction[Throwable, Future[T]] = PartialFunction.empty,
varNameOverride: Option[String] = None,
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = {
new AutoUpdatingVar(
new FutureJdkAutoUpdater[T](blockUntilReadyTimeout, executorOverride)
new JdkPeriodic[Future, T](executorOverride)
)(
updateVar,
updateInterval,
updateAttemptStrategy,
blockUntilReadyTimeout,
handleInitializationError,
varNameOverride
)
Expand Down
26 changes: 26 additions & 0 deletions periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ca.dvgi.periodic

import scala.concurrent.duration.FiniteDuration
import org.slf4j.Logger
import scala.concurrent.duration.Duration

trait Periodic[F[_], R[_], T] extends AutoCloseable {
def scheduleNow(
log: Logger,
operationName: String,
fn: () => F[T],
onSuccess: T => Unit,
handleError: PartialFunction[Throwable, F[T]],
blockUntilCompleteTimeout: Option[Duration] = None
): R[Unit]

def scheduleRecurring(
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
fn: () => F[T],
onSuccess: T => Unit,
interval: T => FiniteDuration,
attemptStrategy: AttemptStrategy
): Unit
}
Loading