Skip to content

Commit

Permalink
Merge pull request #16 from dvgica/add-runner
Browse files Browse the repository at this point in the history
Add runner and docs
  • Loading branch information
dvgica authored Oct 31, 2023
2 parents 1a1a076 + c859d65 commit 29daf48
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 46 deletions.
33 changes: 29 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Periodic is a low-dependency Scala library providing:

- an in-memory cached variable (`AutoUpdatingVar`) that self-updates on a periodic basis
- a periodic runner for a side-effecting function (`FnRunner`)

It attempts to provide an effect- and runtime-agnostic API which can abstract various implementations as needed.

Expand All @@ -24,6 +25,12 @@ It is fairly common to need to do something periodically, while a process is run

For data that changes irregularly but must be up-to-date, you likely want to be subscribing to some kind of change event instead.

### `FnRunner`
`FnRunner` is useful when you want to do something periodically, but don't need to make any data available. Concrete use cases include:

- deleting old records in a database
- triggering calls to an external service

## Installation

Periodic is available on Maven Central for Scala 2.12, 2.13, and 3. Java 11+ is required.
Expand Down Expand Up @@ -52,13 +59,15 @@ All library functionality is based on implementations of `Periodic`. Therefore a

#### JDK Implementation

`JdkPeriodic` is the default implementation provided in `periodic-core`. It is suitable for most usages, although users with many `AutoUpdatingVar`s or `Runner`s may wish to provide a shared `ScheduledExecutorService` to them, to avoid starting many threads. The number of threads in this shared `ScheduledExecutorService` will need to be tuned based on workload. Threads in the `ScheduledExecutorService` will be blocked.
`JdkPeriodic` is the default implementation provided in `periodic-core` which is suitable for many use cases. Usages of the `jdk` and `jdkFuture` methods on the `AutoUpdatingVar` and `FnRunner` companion objects create a new, non-shared `JdkPeriodic` (and thus a new thread) for each invocation. This will work well as long as the number of created threads is not problematic for your application.

Users with many `AutoUpdatingVar`s or `FnRunner`s may wish to share a `JdkPeriodic` between them to decrease the total number of threads used. In this case, the shared `JdkPeriodic` may need to be tuned based on workload. Specifically, users may need to provide a `ScheduledExecutorService` to the shared `JdkPeriodic` with an increased thread count (the default number of threads used by a `JdkPeriodic` is one). Threads in the `ScheduledExecutorService` will be blocked.

The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) update code. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`.
The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) functions. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`.

#### Pekko Streams Implementation

The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s and `Runner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency.
The Pekko Streams implementation is completely non-blocking and does not need additional resources besides an `ActorSystem`. A single `PekkoStreamsPeriodic` can be shared by many `AutoUpdatingVar`s and `FnRunner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency. As usual with Pekko-based code, user-provided functions should not block.

The Pekko Streams implementation only works with `scala.concurrent.Future`.

Expand Down Expand Up @@ -122,14 +131,30 @@ def updateData(): Future[String] = Future.successful(Instant.now.toString)
implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already

val data = AutoUpdatingVar(
PekkoStreamsPeriodic[String]() // T must be explicitly provided, it can't be inferred
PekkoStreamsPeriodic() // can also be shared by many AutoUpdatingVars or FnRunners
)(
updateData(),
UpdateInterval.Static(1.second),
AttemptStrategy.Infinite(5.seconds)
)

```

### `FnRunner`

``` scala
import ca.dvgi.periodic._
import scala.concurrent.duration._
import java.time.Instant

def doSomething(): FiniteDuration = {
println(s"the time is: ${Instant.now.toString}")
10.seconds
}

// alternately use FnRunner.jdkFuture or FnRunner.apply(somePeriodic)
val runner = FnRunner.jdk(doSomething, AttemptStrategy.Infinite(1.second), "time printer")
```
## Contributing

Contributions in the form of Issues and PRs are welcome.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sealed trait AttemptStrategy {

object AttemptStrategy {
case class Infinite(attemptInterval: FiniteDuration) extends AttemptStrategy {
val description = s"Attempt update indefinitely every $attemptInterval"
val description = s"Attempt retries indefinitely every $attemptInterval"
}

case class Finite(
Expand All @@ -20,6 +20,6 @@ object AttemptStrategy {
require(maxAttempts > 0)

val description =
s"Attempt update a maximum of $maxAttempts times every $attemptInterval; when attempts are exhausted: ${attemptExhaustionBehavior.description}"
s"Attempt a maximum of $maxAttempts times every $attemptInterval; when attempts are exhausted: ${attemptExhaustionBehavior.description}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent.ScheduledExecutorService
* A successful update schedules the next update, with an interval that can vary based on the
* just-updated var.
*
* @param periodic
* A Periodic instance used to update the var
* @param updateVar
* A thunk to initialize and update the var
* @param updateInterval
Expand All @@ -38,7 +40,7 @@ import java.util.concurrent.ScheduledExecutorService
* A name for this variable, used in logging. If unspecified, the simple class name of T will be
* used.
*/
class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])(
class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: AttemptStrategy,
Expand All @@ -59,13 +61,13 @@ class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])(

@volatile private var variable: Option[T] = None

private val _ready = periodic.scheduleNow(
private val _ready = periodic.scheduleNow[T](
log,
"initialize var",
() => updateVar,
newV => {
variable = Some(newV)
periodic.scheduleRecurring(
periodic.scheduleRecurring[T](
log,
"update var",
updateInterval.duration(newV),
Expand Down Expand Up @@ -107,7 +109,7 @@ object AutoUpdatingVar {
/** @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def apply[U[_], R[_], T](periodic: Periodic[U, R, T])(
def apply[U[_], R[_], T](periodic: Periodic[U, R])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: AttemptStrategy,
Expand Down Expand Up @@ -142,7 +144,7 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = {
new AutoUpdatingVar(
new JdkPeriodic[Identity, T](executorOverride)
new JdkPeriodic[Identity](executorOverride)
)(
updateVar,
updateInterval,
Expand Down Expand Up @@ -170,7 +172,7 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = {
new AutoUpdatingVar(
new JdkPeriodic[Future, T](executorOverride)
new JdkPeriodic[Future](executorOverride)
)(
updateVar,
updateInterval,
Expand Down
85 changes: 85 additions & 0 deletions periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ca.dvgi.periodic

import ca.dvgi.periodic.jdk._
import scala.concurrent.duration.FiniteDuration
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.Future

/** A FnRunner executes a side-effecting function periodically. The `FiniteDuration` returned from
* the function determines the delay before the next run.
*
* Failed runs may be retried with various configurations.
*
* @param periodic
* A Periodic instance used to run the function
* @param fn
* The side-effecting function to run periodically
* @param fnAttemptStrategy
* Configuration for retrying runs on failure
* @param fnName
* A human-friendly description of the function, used in logging
* @param initialDelay
* If specified, the first run of the function will be delayed this much
*/
class FnRunner[F[_], R[_]](periodic: Periodic[F, R])(
fn: => F[FiniteDuration],
fnAttemptStrategy: AttemptStrategy,
fnName: String,
initialDelay: FiniteDuration = 0.seconds
) extends AutoCloseable {

private val log = LoggerFactory.getLogger(s"FnRunner[$fnName]")

log.info(s"Starting. ${fnAttemptStrategy.description}")

periodic.scheduleRecurring[FiniteDuration](
log,
fnName,
initialDelay,
() => fn,
_ => (),
identity,
fnAttemptStrategy
)

override def close(): Unit = {
periodic.close()
log.info(s"Shut down sucessfully")
}
}

object FnRunner {
def apply[F[_], R[_]](periodic: Periodic[F, R])(
fn: => F[FiniteDuration],
fnAttemptStrategy: AttemptStrategy,
fnName: String,
initialDelay: FiniteDuration = 0.seconds
): FnRunner[F, R] = new FnRunner(periodic)(fn, fnAttemptStrategy, fnName, initialDelay)

def jdk[F[_]](
fn: => FiniteDuration,
fnAttemptStrategy: AttemptStrategy,
fnName: String,
initialDelay: FiniteDuration = 0.seconds
): FnRunner[Identity, Future] =
new FnRunner(JdkPeriodic[Identity]())(
fn,
fnAttemptStrategy,
fnName,
initialDelay
)

def jdkFuture[F[_]](
fn: => Future[FiniteDuration],
fnAttemptStrategy: AttemptStrategy,
fnName: String,
initialDelay: FiniteDuration = 0.seconds
): FnRunner[Future, Future] =
new FnRunner(JdkPeriodic[Future]())(
fn,
fnAttemptStrategy,
fnName,
initialDelay
)
}
6 changes: 3 additions & 3 deletions periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import scala.concurrent.duration.FiniteDuration
import org.slf4j.Logger
import scala.concurrent.duration.Duration

trait Periodic[F[_], R[_], T] extends AutoCloseable {
def scheduleNow(
trait Periodic[F[_], R[_]] extends AutoCloseable {
def scheduleNow[T](
log: Logger,
operationName: String,
fn: () => F[T],
Expand All @@ -14,7 +14,7 @@ trait Periodic[F[_], R[_], T] extends AutoCloseable {
blockUntilCompleteTimeout: Option[Duration] = None
): R[Unit]

def scheduleRecurring(
def scheduleRecurring[T](
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.Await

class JdkPeriodic[F[_], T](
class JdkPeriodic[F[_]](
executorOverride: Option[ScheduledExecutorService] = None
)(implicit evalF: Eval[F])
extends Periodic[F, Future, T] {
extends Periodic[F, Future] {

private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1))

Expand All @@ -32,7 +32,7 @@ class JdkPeriodic[F[_], T](

@volatile private var recurringTask: Option[ScheduledFuture[_]] = None

override def scheduleNow(
override def scheduleNow[T](
log: Logger,
operationName: String,
fn: () => F[T],
Expand Down Expand Up @@ -92,7 +92,7 @@ class JdkPeriodic[F[_], T](
}
}

override def scheduleRecurring(
override def scheduleRecurring[T](
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
Expand All @@ -116,7 +116,7 @@ class JdkPeriodic[F[_], T](
()
}

private def scheduleNext(delay: FiniteDuration)(implicit
private def scheduleNext[T](delay: FiniteDuration)(implicit
log: Logger,
operationName: String,
fn: () => F[T],
Expand All @@ -140,7 +140,7 @@ class JdkPeriodic[F[_], T](
()
}

private class FnRunnable(attempt: Int)(implicit
private class FnRunnable[T](attempt: Int)(implicit
log: Logger,
operationName: String,
fn: () => F[T],
Expand Down Expand Up @@ -200,9 +200,9 @@ class JdkPeriodic[F[_], T](
}

object JdkPeriodic {
def apply[F[_], T](
def apply[F[_]](
executorOverride: Option[ScheduledExecutorService] = None
)(implicit evalF: Eval[F]): JdkPeriodic[F, T] = {
new JdkPeriodic[F, T](executorOverride)
)(implicit evalF: Eval[F]): JdkPeriodic[F] = {
new JdkPeriodic[F](executorOverride)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {

def evalU[T](ut: U[T]): T

def testAll(periodic: () => Periodic[U, Future, Int])(implicit
def testAll(periodic: () => Periodic[U, Future])(implicit
loc: munit.Location
): Unit = {
implicit val per = periodic
Expand All @@ -55,7 +55,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testBasicsWithBlocking(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {
FunFixture(
_ => {
Expand Down Expand Up @@ -97,7 +97,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testAdjustsUpdateInterval(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand Down Expand Up @@ -138,7 +138,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testReturnsFailedReady(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -158,7 +158,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testThrowsFromLatest(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -181,7 +181,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testThrowsFromConstructor(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

test(
Expand All @@ -201,7 +201,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testHandlesInititializationErrors(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -227,7 +227,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testInfiniteReattempts(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand Down Expand Up @@ -265,7 +265,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testFiniteReattempts(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

var terminated = false
Expand Down
Loading

0 comments on commit 29daf48

Please sign in to comment.