Skip to content

Commit

Permalink
Merge pull request phaller#133 from phaller/issue/79
Browse files Browse the repository at this point in the history
Use proper packages
  • Loading branch information
phaller authored Mar 30, 2018
2 parents 4f05e92 + c7b3188 commit 227c93d
Show file tree
Hide file tree
Showing 26 changed files with 96 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
package com.phaller.rasync
package bench

import scala.concurrent.Promise
import scala.annotation.tailrec
import org.scalameter.api._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package cell
package com.phaller.rasync
package bench

import lattice.{ Lattice, NaturalNumberLattice, NaturalNumberKey }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cell
package com.phaller.rasync

// Exponential backoff
// Taken, and adapted from ChemistrySet, see: https://github.com/aturon/ChemistrySet
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cell
package com.phaller.rasync

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ CountDownLatch, ExecutionException }
Expand All @@ -8,7 +8,7 @@ import scala.util.{ Failure, Success, Try }
import lattice.{ DefaultKey, Key, Updater, NotMonotonicException, PartialOrderingWithBottom }

trait Cell[K <: Key[V], V] {
private[cell] val completer: CellCompleter[K, V]
private[rasync] val completer: CellCompleter[K, V]

def key: K

Expand Down Expand Up @@ -82,40 +82,40 @@ trait Cell[K <: Key[V], V] {
// internal API

// Schedules execution of `callback` when next intermediate result is available.
private[cell] def onNext[U](callback: Try[V] => U): Unit //(implicit context: ExecutionContext): Unit
private[rasync] def onNext[U](callback: Try[V] => U): Unit //(implicit context: ExecutionContext): Unit

// Schedules execution of `callback` when completed with final result.
private[cell] def onComplete[U](callback: Try[V] => U): Unit
private[rasync] def onComplete[U](callback: Try[V] => U): Unit

// Only used in tests.
private[cell] def waitUntilNoDeps(): Unit
private[rasync] def waitUntilNoDeps(): Unit

// Only used in tests.
private[cell] def waitUntilNoNextDeps(): Unit
private[rasync] def waitUntilNoNextDeps(): Unit

private[cell] def tasksActive(): Boolean
private[cell] def setTasksActive(): Boolean
private[rasync] def tasksActive(): Boolean
private[rasync] def setTasksActive(): Boolean

private[cell] def numTotalDependencies: Int
private[cell] def numNextDependencies: Int
private[cell] def numCompleteDependencies: Int
private[rasync] def numTotalDependencies: Int
private[rasync] def numNextDependencies: Int
private[rasync] def numCompleteDependencies: Int

private[cell] def numNextCallbacks: Int
private[cell] def numCompleteCallbacks: Int
private[rasync] def numNextCallbacks: Int
private[rasync] def numCompleteCallbacks: Int

private[cell] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit
private[cell] def addNextCallback(callback: NextCallbackRunnable[K, V], cell: Cell[K, V]): Unit
private[rasync] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit
private[rasync] def addNextCallback(callback: NextCallbackRunnable[K, V], cell: Cell[K, V]): Unit

private[cell] def resolveWithValue(value: V): Unit
private[rasync] def resolveWithValue(value: V): Unit
def cellDependencies: Seq[Cell[K, V]]
def totalCellDependencies: Seq[Cell[K, V]]
def isIndependent(): Boolean

def removeCompleteCallbacks(cell: Cell[K, V]): Unit
def removeNextCallbacks(cell: Cell[K, V]): Unit

private[cell] def removeAllCallbacks(cell: Cell[K, V]): Unit
private[cell] def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit
private[rasync] def removeAllCallbacks(cell: Cell[K, V]): Unit
private[rasync] def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit

def isADependee(): Boolean
}
Expand Down Expand Up @@ -261,19 +261,19 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
pre.asInstanceOf[State[K, V]]
}

override private[cell] def numCompleteDependencies: Int = {
override private[rasync] def numCompleteDependencies: Int = {
val current = currentState()
if (current == null) 0
else current.completeDeps.size
}

override private[cell] def numNextDependencies: Int = {
override private[rasync] def numNextDependencies: Int = {
val current = currentState()
if (current == null) 0
else current.nextDeps.size
}

override private[cell] def numTotalDependencies: Int = {
override private[rasync] def numTotalDependencies: Int = {
val current = currentState()
if (current == null) 0
else (current.completeDeps ++ current.nextDeps).size
Expand Down Expand Up @@ -329,7 +329,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override private[cell] def resolveWithValue(value: V): Unit = {
override private[rasync] def resolveWithValue(value: V): Unit = {
this.putFinal(value)
}

Expand Down Expand Up @@ -419,11 +419,11 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override private[cell] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit = {
override private[rasync] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit = {
dispatchOrAddCallback(callback)
}

override private[cell] def addNextCallback(callback: NextCallbackRunnable[K, V], cell: Cell[K, V]): Unit = {
override private[rasync] def addNextCallback(callback: NextCallbackRunnable[K, V], cell: Cell[K, V]): Unit = {
dispatchOrAddNextCallback(callback)
}

Expand All @@ -442,7 +442,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
* if it fails.
*/
@tailrec
private[cell] final def tryNewState(value: V): Boolean = {
private[rasync] final def tryNewState(value: V): Boolean = {
state.get() match {
case finalRes: Try[_] => // completed with final result already
try {
Expand Down Expand Up @@ -532,7 +532,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

@tailrec
override private[cell] final def removeDep(cell: Cell[K, V]): Unit = {
override private[rasync] final def removeDep(cell: Cell[K, V]): Unit = {
state.get() match {
case pre: State[_, _] =>
val current = pre.asInstanceOf[State[K, V]]
Expand All @@ -549,7 +549,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

@tailrec
override private[cell] final def removeNextDep(cell: Cell[K, V]): Unit = {
override private[rasync] final def removeNextDep(cell: Cell[K, V]): Unit = {
state.get() match {
case pre: State[_, _] =>
val current = pre.asInstanceOf[State[K, V]]
Expand Down Expand Up @@ -594,7 +594,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

@tailrec
override private[cell] final def removeAllCallbacks(cell: Cell[K, V]): Unit = {
override private[rasync] final def removeAllCallbacks(cell: Cell[K, V]): Unit = {
state.get() match {
case pre: State[_, _] =>
val current = pre.asInstanceOf[State[K, V]]
Expand All @@ -609,7 +609,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

@tailrec
override private[cell] final def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit = {
override private[rasync] final def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit = {
state.get() match {
case pre: State[_, _] =>
val current = pre.asInstanceOf[State[K, V]]
Expand All @@ -623,15 +623,15 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override private[cell] def waitUntilNoDeps(): Unit = {
override private[rasync] def waitUntilNoDeps(): Unit = {
nodepslatch.await()
}

override private[cell] def waitUntilNoNextDeps(): Unit = {
override private[rasync] def waitUntilNoNextDeps(): Unit = {
nonextdepslatch.await()
}

override private[cell] def tasksActive() = state.get() match {
override private[rasync] def tasksActive() = state.get() match {
case _: Try[_] => false
case s: State[_, _] => s.tasksActive
}
Expand All @@ -642,7 +642,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
* @return Returns true, iff the cell's status changed (i.e. it had not been running before).
*/
@tailrec
override private[cell] final def setTasksActive(): Boolean = state.get() match {
override private[rasync] final def setTasksActive(): Boolean = state.get() match {
case pre: State[_, _] =>
if (pre.tasksActive)
false
Expand All @@ -656,7 +656,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

// Schedules execution of `callback` when next intermediate result is available.
override private[cell] def onNext[U](callback: Try[V] => U): Unit = {
override private[rasync] def onNext[U](callback: Try[V] => U): Unit = {
val runnable = new NextConcurrentCallbackRunnable[K, V](pool, null, this, callback) // NULL indicates that no cell is waiting for this callback.
dispatchOrAddNextCallback(runnable)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cell
package com.phaller.rasync

import scala.util.Try
import lattice.{ DefaultKey, Key, Updater }
Expand All @@ -13,17 +13,17 @@ trait CellCompleter[K <: Key[V], V] {
*/
def cell: Cell[K, V]

private[cell] def init: (Cell[K, V]) => Outcome[V]
private[rasync] def init: (Cell[K, V]) => Outcome[V]

def putFinal(x: V): Unit
def putNext(x: V): Unit
def put(x: V, isFinal: Boolean): Unit

private[cell] def tryNewState(value: V): Boolean
private[rasync] def tryNewState(value: V): Boolean
def tryComplete(value: Try[V]): Boolean

private[cell] def removeDep(cell: Cell[K, V]): Unit
private[cell] def removeNextDep(cell: Cell[K, V]): Unit
private[rasync] def removeDep(cell: Cell[K, V]): Unit
private[rasync] def removeNextDep(cell: Cell[K, V]): Unit
}

object CellCompleter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cell
package com.phaller.rasync

import java.util.concurrent.ForkJoinPool
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -332,7 +332,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
*
* @param callback The callback that should be run sequentially to all other sequential callbacks for the dependent cell.
*/
private[cell] def scheduleSequentialCallback[K <: Key[V], V](callback: SequentialCallbackRunnable[K, V]): Unit = {
private[rasync] def scheduleSequentialCallback[K <: Key[V], V](callback: SequentialCallbackRunnable[K, V]): Unit = {
val dependentCell = callback.dependentCell
var success = false
var startCallback = false
Expand Down Expand Up @@ -416,7 +416,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
*
* @param cell The cell that is triggered.
*/
private[cell] def triggerExecution[K <: Key[V], V](cell: Cell[K, V]): Unit = {
private[rasync] def triggerExecution[K <: Key[V], V](cell: Cell[K, V]): Unit = {
if (cell.setTasksActive())
execute(() => {
val completer = cell.completer
Expand Down
Loading

0 comments on commit 227c93d

Please sign in to comment.