diff --git a/build.sbt b/build.sbt index 045bee1..f87e4c0 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "0.5.0" +ThisBuild / version := "0.5.1-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") diff --git a/core/jvm-native/src/test/scala/spec/TaskSpec.scala b/core/jvm-native/src/test/scala/spec/TaskSpec.scala index 5e003fc..6e66070 100644 --- a/core/jvm-native/src/test/scala/spec/TaskSpec.scala +++ b/core/jvm-native/src/test/scala/spec/TaskSpec.scala @@ -34,7 +34,7 @@ class TaskSpec extends AnyWordSpec with Matchers { } "utilize completable" in { val start = System.currentTimeMillis() - val c = Task.completable[String].sync() + val c = Task.completable[String] Task.sleep(500.millis).map { _ => c.success("Success!") }.start() diff --git a/core/shared/src/main/scala/rapid/Fiber.scala b/core/shared/src/main/scala/rapid/Fiber.scala index dfc9edc..fd8fea4 100644 --- a/core/shared/src/main/scala/rapid/Fiber.scala +++ b/core/shared/src/main/scala/rapid/Fiber.scala @@ -5,6 +5,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} trait Fiber[Return] extends Task[Return] { + Task.Monitor.foreach(_.fiberCreated(this)) + override def start(): Fiber[Return] = this /** @@ -12,7 +14,9 @@ trait Fiber[Return] extends Task[Return] { */ def cancel(): Task[Boolean] = Task.pure(false) - override def await(): Return = invoke() + override def await(): Return = invokeInternal() + + override def toString: String = "Fiber" } object Fiber { @@ -20,7 +24,7 @@ object Fiber { () => Await.result(future, 24.hours) def fromFuture[Return](future: CompletableFuture[Return]): Fiber[Return] = { - val completable = new Task.Completable[Return] + val completable = Task.completable[Return] future.whenComplete { case (_, error) if error != null => completable.failure(error) case (r, _) => completable.success(r) diff --git a/core/shared/src/main/scala/rapid/ParallelStream.scala b/core/shared/src/main/scala/rapid/ParallelStream.scala index 25caa48..d5a51a4 100644 --- a/core/shared/src/main/scala/rapid/ParallelStream.scala +++ b/core/shared/src/main/scala/rapid/ParallelStream.scala @@ -7,20 +7,20 @@ case class ParallelStream[T, R](stream: Stream[T], maxThreads: Int, maxBuffer: Int) { def drain: Task[Unit] = Task.flatMap { _ => - val completable = new Task.Completable[Unit] + val completable = Task.completable[Unit] compile(_ => (), _ => completable.success(())) completable } def count: Task[Int] = Task.flatMap { _ => - val completable = new Task.Completable[Int] + val completable = Task.completable[Int] compile(_ => (), completable.success) completable } def toList: Task[List[R]] = Task.flatMap { _ => val list = ListBuffer.empty[R] - val completable = new Task.Completable[List[R]] + val completable = Task.completable[List[R]] compile(list.addOne, _ => completable.success(list.toList)) completable } diff --git a/core/shared/src/main/scala/rapid/Task.scala b/core/shared/src/main/scala/rapid/Task.scala index beca9f2..0830765 100644 --- a/core/shared/src/main/scala/rapid/Task.scala +++ b/core/shared/src/main/scala/rapid/Task.scala @@ -1,5 +1,7 @@ package rapid +import rapid.monitor.TaskMonitor + import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import scala.collection.BuildFrom import scala.concurrent.duration.FiniteDuration @@ -12,6 +14,21 @@ import scala.util.{Failure, Success, Try} * @tparam Return the type of the result produced by this task */ trait Task[Return] extends Any { + protected def invokeInternal(): Return = Task.Monitor match { + case Opt.Value(d) => + d.start(this) + try { + val r = invoke() + d.success(this, r) + r + } catch { + case t: Throwable => + d.error(this, t) + throw t + } + case Opt.Empty => invoke() + } + protected def invoke(): Return /** @@ -26,7 +43,7 @@ trait Task[Return] extends Any { * * @return the result of the task */ - def sync(): Return = invoke() + def sync(): Return = invokeInternal() /** * Starts the task and returns a `Fiber` representing the running task. @@ -48,7 +65,7 @@ trait Task[Return] extends Any { * @return either the result of the task or an exception */ def attempt: Task[Try[Return]] = Task { - Try(invoke()) + Try(invokeInternal()) } /** @@ -57,7 +74,7 @@ trait Task[Return] extends Any { * @param throwable the exception to raise * @return a new Error task */ - def error[T](throwable: Throwable): Task[T] = flatMap(_ => Task.Error[T](throwable)) + def error[T](throwable: Throwable): Task[T] = flatMap(_ => Task.nt(Task.Error[T](throwable))) /** * Handles error in task execution. @@ -89,7 +106,7 @@ trait Task[Return] extends Any { * @tparam T the type of the transformed result * @return a new task with the transformed result */ - def map[T](f: Return => T): Task[T] = Task(f(invoke())) + def map[T](f: Return => T): Task[T] = Task(f(invokeInternal())) /** * Transforms this task to a pure result. @@ -98,7 +115,7 @@ trait Task[Return] extends Any { * @tparam T the type of the result produced by the task * @return a new task */ - def pure[T](value: T): Task[T] = flatMap(_ => Task.Pure(value)) + def pure[T](value: T): Task[T] = flatMap(_ => Task.nt(Task.Pure(value))) /** * Transforms the result of this task to the result of the supplied function. @@ -107,7 +124,7 @@ trait Task[Return] extends Any { * @tparam T the type of the result produced by the task * @return a new task */ - def apply[T](f: => T): Task[T] = Task.Single(() => f) + def apply[T](f: => T): Task[T] = Task.nt(Task.Single(() => f)) /** * Similar to map, but does not change the value. @@ -127,10 +144,10 @@ trait Task[Return] extends Any { * @tparam T the type of the result of the new task * @return a new task with the transformed result */ - def flatMap[T](f: Return => Task[T]): Task[T] = Task.Chained(List( + def flatMap[T](f: Return => Task[T]): Task[T] = Task.nt(Task.Chained(List( v => f(v.asInstanceOf[Return]).asInstanceOf[Task[Any]], (_: Any) => this.asInstanceOf[Task[Any]], - )) + ))) /** * Similar to flatMap, but ignores the return propagating the current return value on. @@ -199,7 +216,7 @@ trait Task[Return] extends Any { */ def singleton: Task[Return] = { val triggered = new AtomicBoolean(false) - val completable = Task.completable[Return].sync() + val completable = Task.withCompletable[Return].sync() val actualTask = map { r => completable.success(r) } @@ -237,7 +254,7 @@ trait Task[Return] extends Any { def parSequence[T: ClassTag, C[_]](tasks: C[Task[T]]) (implicit bf: BuildFrom[C[Task[T]], T, C[T]], asIterable: C[Task[T]] => Iterable[Task[T]]): Task[C[T]] = flatMap { _ => - val completable = Task.completable[C[T]].sync() + val completable = Task.withCompletable[C[T]].sync() val total = asIterable(tasks).size val array = new Array[T](total) val completed = new AtomicInteger(0) @@ -265,7 +282,7 @@ trait Task[Return] extends Any { * @tparam T the type of the result produced by the task * @return a new Completable task */ - def completable[T]: Task[Task.Completable[T]] = map(_ => new Task.Completable) + def withCompletable[T]: Task[Task.Completable[T]] = map(_ => Task.completable) /** * Provides convenience functionality to execute this Task as a scala.concurrent.Future. @@ -275,37 +292,62 @@ trait Task[Return] extends Any { } object Task extends Task[Unit] { + var Monitor: Opt[TaskMonitor] = Opt.Empty + + private def nt[Return](task: Task[Return]): Task[Return] = Monitor match { + case Opt.Value(d) => + task match { + case t: Pure[Return] => d.pureCreated(t) + case t: Single[Return] => d.singleCreated(t) + case t: Chained[Return] => d.chainedCreated(t) + case t: Error[Return] => d.errorCreated(t) + case t: Completable[Return] => d.completableCreated(t) + } + task + case Opt.Empty => task + } + override protected def invoke(): Unit = () case class Pure[Return](value: Return) extends AnyVal with Task[Return] { override protected def invoke(): Return = value + + override def toString: String = s"Pure($value)" } case class Single[Return](f: () => Return) extends AnyVal with Task[Return] { override protected def invoke(): Return = f() + + override def toString: String = "Single" } case class Chained[Return](list: List[Any => Task[Any]]) extends AnyVal with Task[Return] { override protected def invoke(): Return = list.reverse.foldLeft((): Any)((value, f) => f(value).sync()).asInstanceOf[Return] override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list) + + override def toString: String = s"Chained(${list.mkString(", ")})" } case class Error[Return](throwable: Throwable) extends AnyVal with Task[Return] { override protected def invoke(): Return = throw throwable override def flatMap[T](f: Return => Task[T]): Task[T] = this.asInstanceOf[Task[T]] + + override def toString: String = s"Error(${throwable.getMessage})" } class Completable[Return] extends Task[Return] { @volatile private var result: Option[Try[Return]] = None def success(result: Return): Unit = synchronized { + Monitor.foreach(_.completableSuccess(this, result)) this.result = Some(Success(result)) notifyAll() } def failure(throwable: Throwable): Unit = synchronized { + Monitor.foreach(_.completableFailure(this, throwable)) this.result = Some(Failure(throwable)) notifyAll() } @@ -316,10 +358,16 @@ object Task extends Task[Unit] { } result.get.get } + + override def toString: String = "Completable" } /** * A task that returns `Unit`. */ override def unit: Task[Unit] = this + + def completable[Return]: Completable[Return] = new Completable[Return] + + override def toString: String = "Unit" } \ No newline at end of file diff --git a/core/shared/src/main/scala/rapid/monitor/StatsTaskMonitor.scala b/core/shared/src/main/scala/rapid/monitor/StatsTaskMonitor.scala new file mode 100644 index 0000000..efaee49 --- /dev/null +++ b/core/shared/src/main/scala/rapid/monitor/StatsTaskMonitor.scala @@ -0,0 +1,58 @@ +package rapid.monitor + +import rapid.{Fiber, Task} + +class StatsTaskMonitor extends TaskMonitor { + protected var map = Map.empty[String, Int] + protected var tasks = Set.empty[Task[_]] + + private def mod(name: String, mod: Int): Unit = synchronized { + val c = map.getOrElse(name, 0) + map += name -> (c + mod) + } + + private def created(name: String, task: Task[_]): Unit = { + mod(name, 1) + mod("Tasks", 1) + synchronized { + tasks += task + } + } + + override def pureCreated[T](task: Task.Pure[T]): Unit = created("Pure", task) + override def singleCreated[T](task: Task.Single[T]): Unit = created("Single", task) + override def chainedCreated[T](task: Task.Chained[T]): Unit = created("Chained", task) + override def errorCreated[T](task: Task.Error[T]): Unit = created("Error", task) + override def completableCreated[T](task: Task.Completable[T]): Unit = created("Completable", task) + override def completableSuccess[T](task: Task.Completable[T], result: T): Unit = mod("CompletableCompleted", 1) + override def completableFailure[T](task: Task.Completable[T], throwable: Throwable): Unit = mod("CompletableFailure", 1) + override def fiberCreated[T](fiber: Fiber[T]): Unit = { + mod("Fibers", 1) + mod("FibersActive", 1) + } + override def start[T](task: Task[T]): Unit = { + mod("TaskStarted", 1) + mod("TasksRunning", 1) + } + override def success[T](task: Task[T], result: T): Unit = { + mod("TaskSuccess", 1) + mod("TasksRunning", -1) + synchronized { + tasks -= task + } + } + override def error[T](task: Task[T], throwable: Throwable): Unit = { + mod("TaskError", 1) + mod("TasksRunning", -1) + synchronized { + tasks -= task + } + } + + def report(): String = { + val entries = map.map { + case (key, value) => s"\t$key: $value" + }.mkString("\n") + s"Task Report:\n$entries\n\tNot Consumed: ${tasks.size}" + } +} diff --git a/core/shared/src/main/scala/rapid/monitor/SwingTaskMonitor.scala b/core/shared/src/main/scala/rapid/monitor/SwingTaskMonitor.scala new file mode 100644 index 0000000..428d37a --- /dev/null +++ b/core/shared/src/main/scala/rapid/monitor/SwingTaskMonitor.scala @@ -0,0 +1,99 @@ +package rapid.monitor + +import rapid.{Fiber, Task} + +import javax.swing._ +import java.awt._ + +class SwingTaskMonitor extends StatsTaskMonitor { + private val frame = new JFrame("Task Monitor") + private val statsArea = new JTextArea() + + statsArea.setForeground(Color.white) + statsArea.setBackground(Color.black) + statsArea.setFont(new Font("Arial", Font.BOLD, 18)) + statsArea.setEditable(false) + statsArea.setLineWrap(true) + statsArea.setWrapStyleWord(true) + + frame.setLayout(new BorderLayout()) + frame.add(new JScrollPane(statsArea, ScrollPaneConstants.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneConstants.HORIZONTAL_SCROLLBAR_NEVER), BorderLayout.CENTER) + frame.setSize(800, 600) + frame.setDefaultCloseOperation(WindowConstants.DISPOSE_ON_CLOSE) + frame.setVisible(true) + + // Method to update the stats in the JTextArea + private def updateStats(): Unit = SwingUtilities.invokeLater(() => { + statsArea.setText(report()) + }) + + // Start a background task to refresh the UI periodically + private def startUpdater(): Unit = { + Task { + while (true) { + Thread.sleep(1000) + updateStats() + } + }.start() + } + + // Override methods to update stats and refresh the UI + override def pureCreated[T](task: Task.Pure[T]): Unit = { + super.pureCreated(task) + updateStats() + } + + override def singleCreated[T](task: Task.Single[T]): Unit = { + super.singleCreated(task) + updateStats() + } + + override def chainedCreated[T](task: Task.Chained[T]): Unit = { + super.chainedCreated(task) + updateStats() + } + + override def errorCreated[T](task: Task.Error[T]): Unit = { + super.errorCreated(task) + updateStats() + } + + override def completableCreated[T](task: Task.Completable[T]): Unit = { + super.completableCreated(task) + updateStats() + } + + override def completableSuccess[T](task: Task.Completable[T], result: T): Unit = { + super.completableSuccess(task, result) + updateStats() + } + + override def completableFailure[T](task: Task.Completable[T], throwable: Throwable): Unit = { + super.completableFailure(task, throwable) + updateStats() + } + + override def fiberCreated[T](fiber: Fiber[T]): Unit = { + super.fiberCreated(fiber) + updateStats() + } + + override def start[T](task: Task[T]): Unit = { + super.start(task) + updateStats() + } + + override def success[T](task: Task[T], result: T): Unit = { + super.success(task, result) + updateStats() + } + + override def error[T](task: Task[T], throwable: Throwable): Unit = { + super.error(task, throwable) + updateStats() + } + + // Start the updater thread + startUpdater() +} + diff --git a/core/shared/src/main/scala/rapid/monitor/TaskMonitor.scala b/core/shared/src/main/scala/rapid/monitor/TaskMonitor.scala new file mode 100644 index 0000000..4a562c9 --- /dev/null +++ b/core/shared/src/main/scala/rapid/monitor/TaskMonitor.scala @@ -0,0 +1,17 @@ +package rapid.monitor + +import rapid.{Fiber, Task} + +trait TaskMonitor { + def pureCreated[T](task: Task.Pure[T]): Unit + def singleCreated[T](task: Task.Single[T]): Unit + def chainedCreated[T](task: Task.Chained[T]): Unit + def errorCreated[T](task: Task.Error[T]): Unit + def completableCreated[T](task: Task.Completable[T]): Unit + def completableSuccess[T](task: Task.Completable[T], result: T): Unit + def completableFailure[T](task: Task.Completable[T], throwable: Throwable): Unit + def fiberCreated[T](fiber: Fiber[T]): Unit + def start[T](task: Task[T]): Unit + def success[T](task: Task[T], result: T): Unit + def error[T](task: Task[T], throwable: Throwable): Unit +} \ No newline at end of file diff --git a/test/jvm/src/test/scala/spec/BlockableSpec.scala b/test/jvm/src/test/scala/spec/BlockableSpec.scala index 3397d80..5367491 100644 --- a/test/jvm/src/test/scala/spec/BlockableSpec.scala +++ b/test/jvm/src/test/scala/spec/BlockableSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration.DurationInt class BlockableSpec extends AsyncWordSpec with AsyncTaskSpec with Matchers { "Blockable" should { "handle a completable partway through a chain" in { - Task.sleep(250.millis).completable[String].flatMap { c => + Task.sleep(250.millis).withCompletable[String].flatMap { c => Task.sleep(100.millis).foreach(_ => c.success("Finished!")).start() c }.map(_ should be("Finished!")) diff --git a/test/shared/src/test/scala/spec/BasicsAsyncSpec.scala b/test/shared/src/test/scala/spec/BasicsAsyncSpec.scala index 24150f0..11a889f 100644 --- a/test/shared/src/test/scala/spec/BasicsAsyncSpec.scala +++ b/test/shared/src/test/scala/spec/BasicsAsyncSpec.scala @@ -1,11 +1,9 @@ package spec import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} +import org.scalatest.wordspec.AsyncWordSpec import rapid._ -import scala.concurrent.duration.DurationInt - class BasicsAsyncSpec extends AsyncWordSpec with AsyncTaskSpec with Matchers { "Basics sync" should { "handle a simple task" in { @@ -57,5 +55,16 @@ class BasicsAsyncSpec extends AsyncWordSpec with AsyncTaskSpec with Matchers { s.length should be(32) } } + /*"flatMap millions of times without overflowing" in { + val max = 10_000_000 + def count(i: Int): Task[Int] = if (i >= max) { + Task.pure(i) + } else { + Task(i + 1).flatMap(count) + } + count(0).map { result => + result should be(max) + } + }*/ } } \ No newline at end of file