diff --git a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala index 595f279..4a1a24f 100644 --- a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala +++ b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala @@ -12,10 +12,17 @@ import lattice.{ DefaultKey, Key, Updater } import org.opalj.graphs._ import scala.collection.immutable.Queue +import scala.util.{ Success, Try } /* Need to have reference equality for CAS. + * + * quiescenceCellHandlers use NEXTCallbackRunnable, because (a) pool might reach quiescence + * repeatedly and (b) cell might not be completed, when quiescence is reached. */ -private class PoolState(val handlers: List[() => Unit] = List(), val submittedTasks: Int = 0) { +private class PoolState( + val quiescenceHandlers: List[() => Unit] = List(), + val quiescenceCellHandlers: List[NextCallbackRunnable[_, _]] = List(), + val submittedTasks: Int = 0) { def isQuiescent(): Boolean = submittedTasks == 0 } @@ -63,13 +70,27 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => if (state.isQuiescent) { execute(new Runnable { def run(): Unit = handler() }) } else { - val newState = new PoolState(handler :: state.handlers, state.submittedTasks) + val newState = new PoolState(handler :: state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks) val success = poolState.compareAndSet(state, newState) if (!success) onQuiescent(handler) } } + @tailrec + final def onQuiescent[K <: Key[V], V](cell: Cell[K, V])(handler: Try[V] => Unit): Unit = { + val state = poolState.get() + if (state.isQuiescent) { + execute(new Runnable { def run(): Unit = handler(Success(cell.getResult())) }) + } else { + val runnable = new NextConcurrentCallbackRunnable(this, null, cell, handler) + val newState = new PoolState(state.quiescenceHandlers, runnable :: state.quiescenceCellHandlers, state.submittedTasks) + val success = poolState.compareAndSet(state, newState) + if (!success) + onQuiescent(cell)(handler) + } + } + /** * Register a cell with this HandlerPool. * @@ -265,7 +286,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => var submitSuccess = false while (!submitSuccess) { val state = poolState.get() - val newState = new PoolState(state.handlers, state.submittedTasks + 1) + val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks + 1) submitSuccess = poolState.compareAndSet(state, newState) } } @@ -281,10 +302,10 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => val state = poolState.get() if (state.submittedTasks > i) { handlersToRun = None - val newState = new PoolState(state.handlers, state.submittedTasks - i) + val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks - i) success = poolState.compareAndSet(state, newState) } else if (state.submittedTasks == 1) { - handlersToRun = Some(state.handlers) + handlersToRun = Some(state.quiescenceHandlers) val newState = new PoolState() success = poolState.compareAndSet(state, newState) } else { diff --git a/core/src/test/scala/com/phaller/rasync/test/pool.scala b/core/src/test/scala/com/phaller/rasync/test/pool.scala index 741f6ab..f8faea6 100644 --- a/core/src/test/scala/com/phaller/rasync/test/pool.scala +++ b/core/src/test/scala/com/phaller/rasync/test/pool.scala @@ -1,13 +1,15 @@ package com.phaller.rasync package test -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch } import org.scalatest.FunSuite import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ -import lattice.{ Lattice, StringIntKey, StringIntUpdater, Updater } + +import scala.util.{ Failure, Success } +import lattice.{ StringIntKey, StringIntUpdater, Updater } class PoolSuite extends FunSuite { test("onQuiescent") { @@ -69,4 +71,70 @@ class PoolSuite extends FunSuite { assert(regCells.size === 1000) } + test("onQuiescent(cell): incomplete cell") { + val latch = new CountDownLatch(1) + + val pool = new HandlerPool + val completer1 = CellCompleter[StringIntKey, Int]("somekey")(new StringIntUpdater, pool) + + var i = 0 + while (i < 10000) { + val p1 = Promise[Boolean]() + val p2 = Promise[Boolean]() + pool.execute { () => { p1.success(true); () } } + pool.onQuiescent { () => p2.success(true) } + try { + Await.result(p2.future, 1.seconds) + } catch { + case t: Throwable => + assert(false, s"failure after $i iterations") + } + i += 1 + } + + pool.onQuiescent(completer1.cell) { + case Success(x) => + assert(x === 0) + latch.countDown() + case Failure(_) => assert(false) + } + + latch.await() + + pool.shutdown() + } + + test("onQuiescent(cell): completed cell") { + val latch = new CountDownLatch(1) + + val pool = new HandlerPool + val completer1 = CellCompleter.completed[Int](10)(new StringIntUpdater, pool) + + var i = 0 + while (i < 10000) { + val p1 = Promise[Boolean]() + val p2 = Promise[Boolean]() + pool.execute { () => { p1.success(true); () } } + pool.onQuiescent { () => p2.success(true) } + try { + Await.result(p2.future, 1.seconds) + } catch { + case t: Throwable => + assert(false, s"failure after $i iterations") + } + i += 1 + } + + pool.onQuiescent(completer1.cell) { + case Success(x) => + assert(x === 10) + latch.countDown() + case Failure(_) => assert(false) + } + + latch.await() + + pool.shutdown() + } + }