Skip to content

Commit

Permalink
Merge pull request #10 from dvgica/add-pekko-implementation
Browse files Browse the repository at this point in the history
Add Pekko Streams implementation
  • Loading branch information
dvgica authored Oct 24, 2023
2 parents 6009030 + 52e631a commit 569a1e8
Show file tree
Hide file tree
Showing 15 changed files with 665 additions and 272 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' test

- name: Compress target directories
run: tar cf targets.tar target periodic-core/target project/target
run: tar cf targets.tar target periodic-core/target periodic-pekko-stream/target project/target

- name: Upload target directories
uses: actions/upload-artifact@v3
Expand Down
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ For the default JDK-based implementation, add the following dependency:

`"ca.dvgi" %% "periodic-core" % "<latest>"`

For the Pekko Streams-based implementation, use this dependency:

`"ca.dvgi" %% "periodic-pekko-stream" % "<latest>"`

### Dependencies
- `periodic-core` depends only on `slf4j-api`
- `periodic-pekko-stream` depends on `pekko-stream` and `periodic-core`

## Usage Example

Expand Down Expand Up @@ -75,11 +80,25 @@ For handling errors during update, and other options, see the Scaladocs.

### Alternate Implementations

Alternate implementations are used by passing an `AutoUpdater` to `AutoUpdatingVar.apply`:
#### Pekko Streams

The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s without requiring tuning.
It is recommended if you are already using Pekko.

``` scala
AutoUpdatingVar(
SomeOtherAutoUpdater[String]() // T must be explicitly provided, it can't be inferred
import org.apache.pekko.actor.ActorSystem
import ca.dvgi.periodic.pekko.stream.PekkoStreamsAutoUpdater
import ca.dvgi.periodic._
import scala.concurrent.duration._
import scala.concurrent.Future
import java.time.Instant

def updateData(): Future[String] = Future.successful(Instant.now.toString)

implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already

val data = AutoUpdatingVar(
PekkoStreamsAutoUpdater[String]() // T must be explicitly provided, it can't be inferred
)(
updateData(),
UpdateInterval.Static(1.second),
Expand Down
20 changes: 15 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def subproject(name: String) = {
).settings(
scalaVersion := scala213Version,
crossScalaVersions := scalaVersions,
libraryDependencies += "org.scalameta" %% "munit" % "0.7.29" % Test,
libraryDependencies ++= Seq(
"org.scalameta" %% "munit" % Versions.Munit % Test,
"org.slf4j" % "slf4j-simple" % Versions.Slf4j % Test
),
sonatypeCredentialHost := "s01.oss.sonatype.org",
sonatypeRepository := "https://s01.oss.sonatype.org/service/local"
)
Expand All @@ -42,16 +45,23 @@ def subproject(name: String) = {
lazy val core = subproject("core")
.settings(
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "2.0.9",
"org.scalameta" %% "munit" % "0.7.29" % Test,
"org.slf4j" % "slf4j-simple" % "2.0.9" % Test
"org.slf4j" % "slf4j-api" % Versions.Slf4j
)
)

lazy val pekkoStream = subproject("pekko-stream")
.dependsOn(core % "test->test;compile->compile")
.settings(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-stream" % Versions.Pekko
)
)

lazy val root = project
.in(file("."))
.aggregate(
core
core,
pekkoStream
)
.settings(
publish / skip := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ object UpdateAttemptStrategy {
attemptExhaustionBehavior: UpdateAttemptExhaustionBehavior =
UpdateAttemptExhaustionBehavior.Terminate()
) extends UpdateAttemptStrategy {
require(maxAttempts > 0)

val description =
s"Attempt update a maximum of $maxAttempts times every $attemptInterval; when attempts are exhausted: ${attemptExhaustionBehavior.description}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ class FutureJdkAutoUpdater[T](
) extends JdkAutoUpdater[Future, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Future[T]): T = Await.result(ut, Duration.Inf)
}

object FutureJdkAutoUpdater {
def apply[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
): FutureJdkAutoUpdater[T] =
new FutureJdkAutoUpdater(blockUntilReadyTimeout, executorOverride)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ class IdentityJdkAutoUpdater[T](
) extends JdkAutoUpdater[Identity, T](blockUntilReadyTimeout, executorOverride) {
override protected def evalUpdate(ut: Identity[T]): T = ut
}

object IdentityJdkAutoUpdater {
def apply[T](
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
): IdentityJdkAutoUpdater[T] =
new IdentityJdkAutoUpdater(blockUntilReadyTimeout, executorOverride)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import java.util.concurrent.ScheduledFuture
* If present, will be used instead of starting a new thread.
*/
abstract class JdkAutoUpdater[U[T], T](
blockUntilReadyTimeout: Option[Duration],
executorOverride: Option[ScheduledExecutorService]
blockUntilReadyTimeout: Option[Duration] = None,
executorOverride: Option[ScheduledExecutorService] = None
) extends AutoUpdater[U, Future, T] {

private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1))
Expand All @@ -39,8 +39,6 @@ abstract class JdkAutoUpdater[U[T], T](

@volatile private var variable: Option[T] = None

private val _ready = Promise[Unit]()

@volatile private var nextTask: Option[ScheduledFuture[_]] = None

override def start(
Expand All @@ -50,6 +48,8 @@ abstract class JdkAutoUpdater[U[T], T](
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, U[T]]
): Future[Unit] = {
val ready = Promise[Unit]()

executor.schedule(
new Runnable {
def run(): Unit = {
Expand All @@ -68,7 +68,7 @@ abstract class JdkAutoUpdater[U[T], T](
tryV match {
case Success(value) =>
variable = Some(value)
_ready.complete(Success(()))
ready.complete(Success(()))
log.info("Successfully initialized")
scheduleUpdate(updateInterval.duration(value))(
log,
Expand All @@ -77,7 +77,7 @@ abstract class JdkAutoUpdater[U[T], T](
updateAttemptStrategy
)
case Failure(e) =>
_ready.complete(Failure(e))
ready.complete(Failure(e))
}
}
},
Expand All @@ -87,11 +87,11 @@ abstract class JdkAutoUpdater[U[T], T](

blockUntilReadyTimeout match {
case Some(timeout) =>
Try(Await.result(_ready.future, timeout)) match {
Try(Await.result(ready.future, timeout)) match {
case Success(_) => Future.successful(())
case Failure(exception) => throw exception
}
case None => _ready.future
case None => ready.future
}
}

Expand Down
Loading

0 comments on commit 569a1e8

Please sign in to comment.