Skip to content

Commit

Permalink
New ParallelUnorderedStreamProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 15, 2024
1 parent 5d0dee0 commit 2568e06
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 32 deletions.
54 changes: 27 additions & 27 deletions benchmark/src/main/scala/benchmark/StreamBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
class StreamBenchmark {
@Param(Array("1000")) //, "10000", "100000"))
@Param(Array("1000", "10000", "100000"))
var size: Int = _

lazy val rapidStream: rapid.Stream[Int] = rapid.Stream.emits(1 to size)
Expand All @@ -31,48 +31,48 @@ class StreamBenchmark {
list
}

// @Benchmark
// def rapidStreamToList(): List[Int] = {
// verify(rapidStream.toList.sync())
// }
@Benchmark
def rapidStreamToList(): List[Int] = {
verify(rapidStream.toList.sync())
}

@Benchmark
def rapidParallelStreamToList(): List[Int] = {
verify(rapidStream.par()(Task.pure).toList.sync())
}

// @Benchmark
// def fs2StreamToList(): List[Int] = {
// verify(fs2Stream.compile.toList.unsafeRunSync())
// }
//
// @Benchmark
// def rapidStreamFilter(): List[Int] = {
// verify(rapidStream.filter(_ % 2 == 0).toList.sync(), size / 2)
// }
@Benchmark
def fs2StreamToList(): List[Int] = {
verify(fs2Stream.compile.toList.unsafeRunSync())
}

@Benchmark
def rapidStreamFilter(): List[Int] = {
verify(rapidStream.filter(_ % 2 == 0).toList.sync(), size / 2)
}

@Benchmark
def rapidParallelStreamFilter(): List[Int] = {
verify(rapidStream.filter(_ % 2 == 0).par()(Task.pure).toList.sync(), size / 2)
}

// @Benchmark
// def fs2StreamFilter(): List[Int] = {
// verify(fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync(), size / 2)
// }
//
// @Benchmark
// def rapidStreamMap(): List[Int] = {
// verify(rapidStream.map(_ * 2).toList.sync())
// }
@Benchmark
def fs2StreamFilter(): List[Int] = {
verify(fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync(), size / 2)
}

@Benchmark
def rapidStreamMap(): List[Int] = {
verify(rapidStream.map(_ * 2).toList.sync())
}

@Benchmark
def rapidParallelStreamMap(): List[Int] = {
verify(rapidStream.par()(i => Task(i * 2)).toList.sync())
}

// @Benchmark
// def fs2StreamMap(): List[Int] = {
// verify(fs2Stream.map(_ * 2).compile.toList.unsafeRunSync())
// }
@Benchmark
def fs2StreamMap(): List[Int] = {
verify(fs2Stream.map(_ * 2).compile.toList.unsafeRunSync())
}
}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ val developerURL: String = "https://matthicks.com"

name := projectName
ThisBuild / organization := org
ThisBuild / version := "0.2.2"
ThisBuild / version := "0.3.0-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := allScalaVersions
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
Expand Down
2 changes: 1 addition & 1 deletion core/jvm/src/test/scala/spec/ParallelStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ class ParallelStreamSpec extends AnyWordSpec with Matchers {
task.sync().sum should be(1_409_965_408)
}
}
}
}
21 changes: 21 additions & 0 deletions core/shared/src/main/scala/rapid/Opt.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rapid

sealed trait Opt[+A] extends Any {
def isEmpty: Boolean
def isNonEmpty: Boolean = !isEmpty
}

object Opt {
case class Value[+A](value: A) extends AnyVal with Opt[A] {
override def isEmpty: Boolean = false
}
case object Empty extends Opt[Nothing] {
override def isEmpty: Boolean = true
}

def apply[A](value: A): Opt[A] = if (value == null) {
Empty
} else {
Value(value)
}
}
6 changes: 3 additions & 3 deletions core/shared/src/main/scala/rapid/ParallelStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ case class ParallelStream[T, R](stream: Stream[T],
maxBuffer: Int) {
def drain: Task[Unit] = Task.unit.flatMap { _ =>
val completable = Task.completable[Unit]
ParallelStreamProcessor(
ParallelUnorderedStreamProcessor(
stream = this,
handle = (_: R) => (),
complete = (_: Int) => completable.success(())
Expand All @@ -22,7 +22,7 @@ case class ParallelStream[T, R](stream: Stream[T],

def count: Task[Int] = Task.unit.flatMap { _ =>
val completable = Task.completable[Int]
ParallelStreamProcessor(
ParallelUnorderedStreamProcessor(
stream = this,
handle = (_: R) => (),
complete = completable.success
Expand All @@ -33,7 +33,7 @@ case class ParallelStream[T, R](stream: Stream[T],
def toList: Task[List[R]] = Task.unit.flatMap { _ =>
val list = ListBuffer.empty[R]
val completable = Task.completable[List[R]]
ParallelStreamProcessor(
ParallelUnorderedStreamProcessor(
stream = this,
handle = list.addOne,
complete = (_: Int) => completable.success(list.toList)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rapid

import scala.annotation.tailrec

case class ParallelUnorderedStreamProcessor[T, R](stream: ParallelStream[T, R],
handle: R => Unit,
complete: Int => Unit) {
private val iteratorTask: Task[Iterator[T]] = Stream.task(stream.stream)
private val ready = Queue[R](stream.maxBuffer)
private val processing = Queue[Task[Unit]](stream.maxThreads)
@volatile var counter = -1

// Processes the iterator feeding through processing and finally into ready
iteratorTask.map { iterator =>
var size = 0
iterator.foreach { t =>
var task: Task[Unit] = null
task = stream.f(t).map { r =>
ready.add(r)
processing.remove(task)
}
processing.add(task)
task.start()
size += 1
}
counter = size
}.start()

// Processes through the ready queue feeding to handle and finally complete
Task(handleNext(0)).start()

@tailrec
private def handleNext(counter: Int): Unit = {
val next = ready.poll()
if (this.counter == counter) {
complete(counter)
} else {
val c = next match {
case Opt.Value(value) =>
handle(value)
counter + 1
case Opt.Empty => counter
}
handleNext(c)
}
}
}
50 changes: 50 additions & 0 deletions core/shared/src/main/scala/rapid/Queue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package rapid

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

case class Queue[T](maxSize: Int) {
private val q = new ConcurrentLinkedQueue[T]
private val s = new AtomicInteger(0)

def size: Int = s.get()

def isEmpty: Boolean = size == 0

def tryAdd(value: T): Boolean = {
var incremented = false
s.updateAndGet((operand: Int) => {
if (operand < maxSize) {
incremented = true
operand + 1
} else {
incremented = false
operand
}
})
if (incremented) {
q.add(value)
}
incremented
}

def add(value: T): Unit = while (!tryAdd(value)) {
Thread.`yield`()
}

def poll(): Opt[T] = {
val o = Opt(q.poll())
if (o.isNonEmpty) {
s.decrementAndGet()
}
o
}

def remove(value: T): Boolean = {
val removed = q.remove(value)
if (removed) {
s.decrementAndGet()
}
removed
}
}

0 comments on commit 2568e06

Please sign in to comment.