Skip to content

Commit

Permalink
TaskMonitor
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jan 2, 2025
1 parent 00ab377 commit be8e493
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ val developerURL: String = "https://matthicks.com"

name := projectName
ThisBuild / organization := org
ThisBuild / version := "0.5.0"
ThisBuild / version := "0.5.1-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := allScalaVersions
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
Expand Down
2 changes: 1 addition & 1 deletion core/jvm-native/src/test/scala/spec/TaskSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TaskSpec extends AnyWordSpec with Matchers {
}
"utilize completable" in {
val start = System.currentTimeMillis()
val c = Task.completable[String].sync()
val c = Task.completable[String]
Task.sleep(500.millis).map { _ =>
c.success("Success!")
}.start()
Expand Down
8 changes: 6 additions & 2 deletions core/shared/src/main/scala/rapid/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

trait Fiber[Return] extends Task[Return] {
Task.Monitor.foreach(_.fiberCreated(this))

override def start(): Fiber[Return] = this

/**
* Attempts to cancel the Fiber. Returns true if successful.
*/
def cancel(): Task[Boolean] = Task.pure(false)

override def await(): Return = invoke()
override def await(): Return = invokeInternal()

override def toString: String = "Fiber"
}

object Fiber {
def fromFuture[Return](future: Future[Return]): Fiber[Return] =
() => Await.result(future, 24.hours)

def fromFuture[Return](future: CompletableFuture[Return]): Fiber[Return] = {
val completable = new Task.Completable[Return]
val completable = Task.completable[Return]
future.whenComplete {
case (_, error) if error != null => completable.failure(error)
case (r, _) => completable.success(r)
Expand Down
6 changes: 3 additions & 3 deletions core/shared/src/main/scala/rapid/ParallelStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ case class ParallelStream[T, R](stream: Stream[T],
maxThreads: Int,
maxBuffer: Int) {
def drain: Task[Unit] = Task.flatMap { _ =>
val completable = new Task.Completable[Unit]
val completable = Task.completable[Unit]
compile(_ => (), _ => completable.success(()))
completable
}

def count: Task[Int] = Task.flatMap { _ =>
val completable = new Task.Completable[Int]
val completable = Task.completable[Int]
compile(_ => (), completable.success)
completable
}

def toList: Task[List[R]] = Task.flatMap { _ =>
val list = ListBuffer.empty[R]
val completable = new Task.Completable[List[R]]
val completable = Task.completable[List[R]]
compile(list.addOne, _ => completable.success(list.toList))
completable
}
Expand Down
70 changes: 59 additions & 11 deletions core/shared/src/main/scala/rapid/Task.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rapid

import rapid.monitor.TaskMonitor

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.BuildFrom
import scala.concurrent.duration.FiniteDuration
Expand All @@ -12,6 +14,21 @@ import scala.util.{Failure, Success, Try}
* @tparam Return the type of the result produced by this task
*/
trait Task[Return] extends Any {
protected def invokeInternal(): Return = Task.Monitor match {
case Opt.Value(d) =>
d.start(this)
try {
val r = invoke()
d.success(this, r)
r
} catch {
case t: Throwable =>
d.error(this, t)
throw t
}
case Opt.Empty => invoke()
}

protected def invoke(): Return

/**
Expand All @@ -26,7 +43,7 @@ trait Task[Return] extends Any {
*
* @return the result of the task
*/
def sync(): Return = invoke()
def sync(): Return = invokeInternal()

/**
* Starts the task and returns a `Fiber` representing the running task.
Expand All @@ -48,7 +65,7 @@ trait Task[Return] extends Any {
* @return either the result of the task or an exception
*/
def attempt: Task[Try[Return]] = Task {
Try(invoke())
Try(invokeInternal())
}

/**
Expand All @@ -57,7 +74,7 @@ trait Task[Return] extends Any {
* @param throwable the exception to raise
* @return a new Error task
*/
def error[T](throwable: Throwable): Task[T] = flatMap(_ => Task.Error[T](throwable))
def error[T](throwable: Throwable): Task[T] = flatMap(_ => Task.nt(Task.Error[T](throwable)))

/**
* Handles error in task execution.
Expand Down Expand Up @@ -89,7 +106,7 @@ trait Task[Return] extends Any {
* @tparam T the type of the transformed result
* @return a new task with the transformed result
*/
def map[T](f: Return => T): Task[T] = Task(f(invoke()))
def map[T](f: Return => T): Task[T] = Task(f(invokeInternal()))

/**
* Transforms this task to a pure result.
Expand All @@ -98,7 +115,7 @@ trait Task[Return] extends Any {
* @tparam T the type of the result produced by the task
* @return a new task
*/
def pure[T](value: T): Task[T] = flatMap(_ => Task.Pure(value))
def pure[T](value: T): Task[T] = flatMap(_ => Task.nt(Task.Pure(value)))

/**
* Transforms the result of this task to the result of the supplied function.
Expand All @@ -107,7 +124,7 @@ trait Task[Return] extends Any {
* @tparam T the type of the result produced by the task
* @return a new task
*/
def apply[T](f: => T): Task[T] = Task.Single(() => f)
def apply[T](f: => T): Task[T] = Task.nt(Task.Single(() => f))

/**
* Similar to map, but does not change the value.
Expand All @@ -127,10 +144,10 @@ trait Task[Return] extends Any {
* @tparam T the type of the result of the new task
* @return a new task with the transformed result
*/
def flatMap[T](f: Return => Task[T]): Task[T] = Task.Chained(List(
def flatMap[T](f: Return => Task[T]): Task[T] = Task.nt(Task.Chained(List(
v => f(v.asInstanceOf[Return]).asInstanceOf[Task[Any]],
(_: Any) => this.asInstanceOf[Task[Any]],
))
)))

/**
* Similar to flatMap, but ignores the return propagating the current return value on.
Expand Down Expand Up @@ -199,7 +216,7 @@ trait Task[Return] extends Any {
*/
def singleton: Task[Return] = {
val triggered = new AtomicBoolean(false)
val completable = Task.completable[Return].sync()
val completable = Task.withCompletable[Return].sync()
val actualTask = map { r =>
completable.success(r)
}
Expand Down Expand Up @@ -237,7 +254,7 @@ trait Task[Return] extends Any {
def parSequence[T: ClassTag, C[_]](tasks: C[Task[T]])
(implicit bf: BuildFrom[C[Task[T]], T, C[T]],
asIterable: C[Task[T]] => Iterable[Task[T]]): Task[C[T]] = flatMap { _ =>
val completable = Task.completable[C[T]].sync()
val completable = Task.withCompletable[C[T]].sync()
val total = asIterable(tasks).size
val array = new Array[T](total)
val completed = new AtomicInteger(0)
Expand Down Expand Up @@ -265,7 +282,7 @@ trait Task[Return] extends Any {
* @tparam T the type of the result produced by the task
* @return a new Completable task
*/
def completable[T]: Task[Task.Completable[T]] = map(_ => new Task.Completable)
def withCompletable[T]: Task[Task.Completable[T]] = map(_ => Task.completable)

/**
* Provides convenience functionality to execute this Task as a scala.concurrent.Future.
Expand All @@ -275,37 +292,62 @@ trait Task[Return] extends Any {
}

object Task extends Task[Unit] {
var Monitor: Opt[TaskMonitor] = Opt.Empty

private def nt[Return](task: Task[Return]): Task[Return] = Monitor match {
case Opt.Value(d) =>
task match {
case t: Pure[Return] => d.pureCreated(t)
case t: Single[Return] => d.singleCreated(t)
case t: Chained[Return] => d.chainedCreated(t)
case t: Error[Return] => d.errorCreated(t)
case t: Completable[Return] => d.completableCreated(t)
}
task
case Opt.Empty => task
}

override protected def invoke(): Unit = ()

case class Pure[Return](value: Return) extends AnyVal with Task[Return] {
override protected def invoke(): Return = value

override def toString: String = s"Pure($value)"
}

case class Single[Return](f: () => Return) extends AnyVal with Task[Return] {
override protected def invoke(): Return = f()

override def toString: String = "Single"
}

case class Chained[Return](list: List[Any => Task[Any]]) extends AnyVal with Task[Return] {
override protected def invoke(): Return = list.reverse.foldLeft((): Any)((value, f) => f(value).sync()).asInstanceOf[Return]

override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list)

override def toString: String = s"Chained(${list.mkString(", ")})"
}

case class Error[Return](throwable: Throwable) extends AnyVal with Task[Return] {
override protected def invoke(): Return = throw throwable

override def flatMap[T](f: Return => Task[T]): Task[T] = this.asInstanceOf[Task[T]]

override def toString: String = s"Error(${throwable.getMessage})"
}

class Completable[Return] extends Task[Return] {
@volatile private var result: Option[Try[Return]] = None

def success(result: Return): Unit = synchronized {
Monitor.foreach(_.completableSuccess(this, result))
this.result = Some(Success(result))
notifyAll()
}

def failure(throwable: Throwable): Unit = synchronized {
Monitor.foreach(_.completableFailure(this, throwable))
this.result = Some(Failure(throwable))
notifyAll()
}
Expand All @@ -316,10 +358,16 @@ object Task extends Task[Unit] {
}
result.get.get
}

override def toString: String = "Completable"
}

/**
* A task that returns `Unit`.
*/
override def unit: Task[Unit] = this

def completable[Return]: Completable[Return] = new Completable[Return]

override def toString: String = "Unit"
}
58 changes: 58 additions & 0 deletions core/shared/src/main/scala/rapid/monitor/StatsTaskMonitor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rapid.monitor

import rapid.{Fiber, Task}

class StatsTaskMonitor extends TaskMonitor {
protected var map = Map.empty[String, Int]
protected var tasks = Set.empty[Task[_]]

private def mod(name: String, mod: Int): Unit = synchronized {
val c = map.getOrElse(name, 0)
map += name -> (c + mod)
}

private def created(name: String, task: Task[_]): Unit = {
mod(name, 1)
mod("Tasks", 1)
synchronized {
tasks += task
}
}

override def pureCreated[T](task: Task.Pure[T]): Unit = created("Pure", task)
override def singleCreated[T](task: Task.Single[T]): Unit = created("Single", task)
override def chainedCreated[T](task: Task.Chained[T]): Unit = created("Chained", task)
override def errorCreated[T](task: Task.Error[T]): Unit = created("Error", task)
override def completableCreated[T](task: Task.Completable[T]): Unit = created("Completable", task)
override def completableSuccess[T](task: Task.Completable[T], result: T): Unit = mod("CompletableCompleted", 1)
override def completableFailure[T](task: Task.Completable[T], throwable: Throwable): Unit = mod("CompletableFailure", 1)
override def fiberCreated[T](fiber: Fiber[T]): Unit = {
mod("Fibers", 1)
mod("FibersActive", 1)
}
override def start[T](task: Task[T]): Unit = {
mod("TaskStarted", 1)
mod("TasksRunning", 1)
}
override def success[T](task: Task[T], result: T): Unit = {
mod("TaskSuccess", 1)
mod("TasksRunning", -1)
synchronized {
tasks -= task
}
}
override def error[T](task: Task[T], throwable: Throwable): Unit = {
mod("TaskError", 1)
mod("TasksRunning", -1)
synchronized {
tasks -= task
}
}

def report(): String = {
val entries = map.map {
case (key, value) => s"\t$key: $value"
}.mkString("\n")
s"Task Report:\n$entries\n\tNot Consumed: ${tasks.size}"
}
}
Loading

0 comments on commit be8e493

Please sign in to comment.