Skip to content

Commit

Permalink
Preliminary Scala.js and ScalaNative support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 12, 2024
1 parent dd2c5db commit 0f3c6c1
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 43 deletions.
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ val catsVersion: String = "3.5.7"

val fs2Version: String = "3.11.0"

val scalaJsMacrotaskVersion: String = "1.1.1"

val scalaTestVersion: String = "3.2.19"

lazy val root = project.in(file("."))
Expand All @@ -67,6 +69,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"org.scalatest" %%% "scalatest" % scalaTestVersion % Test
)
)
.jsSettings(
libraryDependencies ++= Seq(
"org.scala-js" %%% "scala-js-macrotask-executor" % scalaJsMacrotaskVersion,
)
)

lazy val cats = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Full)
Expand Down
9 changes: 9 additions & 0 deletions core/js/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package rapid

import scala.concurrent.ExecutionContext

object Platform extends RapidPlatform {
override def executionContext: ExecutionContext = org.scalajs.macrotaskexecutor.MacrotaskExecutor.Implicits.global

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new FutureFiber[Return](task)
}
4 changes: 4 additions & 0 deletions core/js/src/main/scala/rapid/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import scala.language.implicitConversions

package object rapid extends RapidPackage {
}
23 changes: 23 additions & 0 deletions core/jvm-native/src/main/scala/rapid/FutureBlockableFiber.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rapid

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}

class FutureBlockableFiber[Return](val task: Task[Return]) extends BlockableFiber[Return] {
private val future: Future[Return] = Future(task.sync())(Platform.executionContext)

override protected def invoke(): Return = await()

override def attempt(): Try[Return] = future.value match {
case Some(value) => value
case None => Try(Await.result(future, Duration.Inf))
}

override def await(): Return = Await.result(future, Duration.Inf)

override def await(duration: Duration): Option[Return] = Try(Await.result(future, duration)) match {
case Success(value) => Some(value)
case Failure(_) => None
}
}
20 changes: 20 additions & 0 deletions core/jvm-native/src/test/scala/spec/FiberSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package spec

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import rapid.Task

class FiberSpec extends AnyWordSpec with Matchers {
"Fiber" should {
"start and await a task" in {
val task = Task { 5 * 5 }
val fiber = task.start()
fiber.await() shouldEqual 25
}
"handle task failures in fibers" in {
val task = Task { throw new RuntimeException("Failure") }
val fiber = task.start()
an[RuntimeException] should be thrownBy fiber.await()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
package spec

import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers
import rapid.{Fiber, Task}
import scala.concurrent.duration._
import org.scalatest.wordspec.AnyWordSpec
import rapid.Task

class FiberSpec extends AnyWordSpec with Matchers {
"Fiber" should {
"start and await a task" in {
val task = Task { 5 * 5 }
val fiber = task.start()
fiber.await() shouldEqual 25
}
"handle task failures in fibers" in {
val task = Task { throw new RuntimeException("Failure") }
val fiber = task.start()
an[RuntimeException] should be thrownBy fiber.await()
}
import scala.concurrent.duration.DurationInt

class FiberWaitingSpec extends AnyWordSpec with Matchers {
"Fiber waiting" should {
"await with a timeout" in {
val task = Task { Thread.sleep(1000); 42 }
val task = Task {
Thread.sleep(1000); 42
}
val fiber = task.start()
fiber.await(500.millis) shouldEqual None
fiber.await(1500.millis) shouldEqual Some(42)
Expand All @@ -34,4 +27,4 @@ class FiberSpec extends AnyWordSpec with Matchers {
elapsed should be >= 750L
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package spec

import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import rapid.Task

import scala.concurrent.duration._

class TaskSpec extends AnyWordSpec with Matchers {
Expand Down
7 changes: 5 additions & 2 deletions core/jvm/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package rapid

import scala.concurrent.ExecutionContext

object Platform extends RapidPlatform {
// override def createFiber[Return](task: Task[Return]): Fiber[Return] = new VirtualThreadFiber[Return](task)
override def createFiber[Return](task: Task[Return]): Fiber[Return] = new FutureFiber[Return](task)
override def executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new VirtualThreadFiber[Return](task)
}
9 changes: 9 additions & 0 deletions core/native/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package rapid

import scala.concurrent.ExecutionContext

object Platform extends RapidPlatform {
override def executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new FutureBlockableFiber[Return](task)
}
6 changes: 6 additions & 0 deletions core/native/src/main/scala/rapid/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import scala.language.implicitConversions

package object rapid extends RapidPackage {
implicit def fiber2Blockable[Return](fiber: Fiber[Return]): BlockableFiber[Return] =
fiber.asInstanceOf[BlockableFiber[Return]]
}
17 changes: 9 additions & 8 deletions core/shared/src/main/scala/rapid/FutureFiber.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package rapid

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
import scala.concurrent.Future
import scala.util.{Failure, Try}

class FutureFiber[Return](val task: Task[Return]) extends BlockableFiber[Return] {
private val future: Future[Return] = Future(task.sync())
class FutureFiber[Return](val task: Task[Return]) extends Fiber[Return] {
private val future: Future[Return] = Future(task.sync())(Platform.executionContext)

override protected def invoke(): Return = await()

override def await(): Return = await(Duration.Inf).get
override def attempt(): Try[Return] = future.value match {
case Some(value) => value
case None => Failure(new RuntimeException("Cannot wait"))
}

override def await(duration: Duration): Option[Return] = Try(Await.result(future, duration)).toOption
override def await(): Return = attempt().get
}
33 changes: 18 additions & 15 deletions core/shared/src/main/scala/rapid/ParallelStreamProcessor.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package rapid

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec


case class ParallelStreamProcessor[T, R](stream: ParallelStream[T, R],
handle: R => Unit,
complete: Int => Unit) {
private val iterator = Stream.task(stream.stream)
private val iteratorTask: Task[Iterator[T]] = Stream.task(stream.stream)

private val ready = new AtomicIndexedQueue[R](stream.maxBuffer)
private val processing = new ConcurrentLinkedQueue[(Int, Task[R])]
// Push iterator into processing queue
private val queuingFiber: Fiber[Unit] = Stream.task(stream.stream).map { iterator =>
@volatile private var _total = -1

// Start a fiber that consumes the stream and queues tasks
private val queuingFiber: Fiber[Unit] = iteratorTask.map { iterator =>
var total = 0
iterator.zipWithIndex.foreach {
case (t, index) =>
Expand All @@ -20,46 +23,46 @@ case class ParallelStreamProcessor[T, R](stream: ParallelStream[T, R],
}
_total = total
}.start()
@volatile private var _total = -1

def total: Option[Int] = if (_total == -1) None else Some(_total)

// Create maxThreads threads to execute from processing and put into ready
// Spawn worker fibers to process tasks
(0 until stream.maxThreads).foreach { _ =>
Task(processRecursive()).start()
}

@tailrec
private def processRecursive(): Unit = {
// Grab next processing and execute
val next = processing.poll
val next = processing.poll()
if (next != null) {
val (index, task) = next
val result = task.sync()
// Put into ready
ready.add(index, result)
} else {
Thread.sleep(1)
// No next task
Thread.sleep(1) // Consider a better signaling mechanism
}
// If total known and no more tasks, stop recursion
if (_total != -1 && processing.isEmpty) {
// Stop
// Done processing
} else {
processRecursive()
}
}

// Monitor position and push results
// Fiber to consume results from 'ready' and handle them
Task {
var count = 0
while (_total == -1 || count < _total) {
ready.blockingPoll() match {
case Some(r) =>
handle(r)
count += 1
case None => Thread.sleep(1)
case None =>
// No result ready yet
Thread.sleep(1) // Again, consider using proper synchronization
}
}
complete(_total)
}.start()
}

}
4 changes: 4 additions & 0 deletions core/shared/src/main/scala/rapid/RapidPlatform.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package rapid

import scala.concurrent.ExecutionContext

trait RapidPlatform {
def executionContext: ExecutionContext

def createFiber[Return](task: Task[Return]): Fiber[Return]
}
47 changes: 47 additions & 0 deletions core/shared/src/test/scala/spec/BasicsSyncSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package spec

import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatest.wordspec.AnyWordSpec
import rapid._

class BasicsSyncSpec extends AnyWordSpec with Matchers {
"Basics sync" should {
"handle a simple task" in {
val i = Task {
5 * 5
}
i.sync() should be(25)
}
"handle a simple task mapping" in {
val i = Task {
5 * 5
}
val s = i.map { v =>
s"Value: $v"
}
s.sync() should be("Value: 25")
}
"handle flat mapping" in {
val task = (1 to 10).foldLeft(Task(0))((t, i) => t.flatMap { total =>
Task(total + i)
})
val result = task.sync()
result should be(55)
}
"throw an error and recover" in {
val result = Task[String](throw new RuntimeException("Die Die Die"))
.handleError { _ =>
Task.pure("Recovered")
}
.sync()
result should be("Recovered")
}
"process a list of tasks to a task with a list" in {
val list = List(
Task("One"), Task("Two"), Task("Three")
)
list.tasks.sync() should be(List("One", "Two", "Three"))
}
}
}

0 comments on commit 0f3c6c1

Please sign in to comment.