Skip to content

Commit

Permalink
Add pool.onQuiescent(cell)
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKoelzer committed Mar 30, 2018
1 parent 227c93d commit 367f4bb
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 7 deletions.
31 changes: 26 additions & 5 deletions core/src/main/scala/com/phaller/rasync/HandlerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ import lattice.{ DefaultKey, Key, Updater }
import org.opalj.graphs._

import scala.collection.immutable.Queue
import scala.util.{ Success, Try }

/* Need to have reference equality for CAS.
*
* quiescenceCellHandlers use NEXTCallbackRunnable, because (a) pool might reach quiescence
* repeatedly and (b) cell might not be completed, when quiescence is reached.
*/
private class PoolState(val handlers: List[() => Unit] = List(), val submittedTasks: Int = 0) {
private class PoolState(
val quiescenceHandlers: List[() => Unit] = List(),
val quiescenceCellHandlers: List[NextCallbackRunnable[_, _]] = List(),
val submittedTasks: Int = 0) {
def isQuiescent(): Boolean =
submittedTasks == 0
}
Expand Down Expand Up @@ -63,13 +70,27 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
if (state.isQuiescent) {
execute(new Runnable { def run(): Unit = handler() })
} else {
val newState = new PoolState(handler :: state.handlers, state.submittedTasks)
val newState = new PoolState(handler :: state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks)
val success = poolState.compareAndSet(state, newState)
if (!success)
onQuiescent(handler)
}
}

@tailrec
final def onQuiescent[K <: Key[V], V](cell: Cell[K, V])(handler: Try[V] => Unit): Unit = {
val state = poolState.get()
if (state.isQuiescent) {
execute(new Runnable { def run(): Unit = handler(Success(cell.getResult())) })
} else {
val runnable = new NextConcurrentCallbackRunnable(this, null, cell, handler)
val newState = new PoolState(state.quiescenceHandlers, runnable :: state.quiescenceCellHandlers, state.submittedTasks)
val success = poolState.compareAndSet(state, newState)
if (!success)
onQuiescent(cell)(handler)
}
}

/**
* Register a cell with this HandlerPool.
*
Expand Down Expand Up @@ -265,7 +286,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
var submitSuccess = false
while (!submitSuccess) {
val state = poolState.get()
val newState = new PoolState(state.handlers, state.submittedTasks + 1)
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks + 1)
submitSuccess = poolState.compareAndSet(state, newState)
}
}
Expand All @@ -281,10 +302,10 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
val state = poolState.get()
if (state.submittedTasks > i) {
handlersToRun = None
val newState = new PoolState(state.handlers, state.submittedTasks - i)
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks - i)
success = poolState.compareAndSet(state, newState)
} else if (state.submittedTasks == 1) {
handlersToRun = Some(state.handlers)
handlersToRun = Some(state.quiescenceHandlers)
val newState = new PoolState()
success = poolState.compareAndSet(state, newState)
} else {
Expand Down
72 changes: 70 additions & 2 deletions core/src/test/scala/com/phaller/rasync/test/pool.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.phaller.rasync
package test

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch }

import org.scalatest.FunSuite

import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import lattice.{ Lattice, StringIntKey, StringIntUpdater, Updater }

import scala.util.{ Failure, Success }
import lattice.{ StringIntKey, StringIntUpdater, Updater }

class PoolSuite extends FunSuite {
test("onQuiescent") {
Expand Down Expand Up @@ -69,4 +71,70 @@ class PoolSuite extends FunSuite {
assert(regCells.size === 1000)
}

test("onQuiescent(cell): incomplete cell") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter[StringIntKey, Int]("somekey")(new StringIntUpdater, pool)

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
pool.onQuiescent { () => p2.success(true) }
try {
Await.result(p2.future, 1.seconds)
} catch {
case t: Throwable =>
assert(false, s"failure after $i iterations")
}
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 0)
latch.countDown()
case Failure(_) => assert(false)
}

latch.await()

pool.shutdown()
}

test("onQuiescent(cell): completed cell") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter.completed[Int](10)(new StringIntUpdater, pool)

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
pool.onQuiescent { () => p2.success(true) }
try {
Await.result(p2.future, 1.seconds)
} catch {
case t: Throwable =>
assert(false, s"failure after $i iterations")
}
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 10)
latch.countDown()
case Failure(_) => assert(false)
}

latch.await()

pool.shutdown()
}

}

0 comments on commit 367f4bb

Please sign in to comment.