Skip to content

Commit

Permalink
Add methods to register a set of dependencies
Browse files Browse the repository at this point in the history
Six methods of the following form are added:
Cell.when**: (Cell, Callback)* => Unit

This should make the use of CellCompleter.sequential by the client obsolete.
  • Loading branch information
JanKoelzer committed Jul 2, 2018
1 parent 50829e9 commit 2ab5660
Show file tree
Hide file tree
Showing 4 changed files with 642 additions and 50 deletions.
187 changes: 140 additions & 47 deletions core/src/main/scala/com/phaller/rasync/Cell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ trait Cell[K <: Key[V], V] {
*/
def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenComplete(deps: (Cell[K, V], V => Outcome[V])*): Unit
def whenCompleteSequential(deps: (Cell[K, V], V => Outcome[V])*): Unit

/**
* Adds a dependency on some `other` cell.
Expand All @@ -64,6 +66,8 @@ trait Cell[K <: Key[V], V] {
*/
def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenNext(deps: (Cell[K, V], V => Outcome[V])*): Unit
def whenNextSequential(deps: (Cell[K, V], V => Outcome[V])*): Unit

/**
* Adds a dependency on some `other` cell.
Expand All @@ -82,6 +86,8 @@ trait Cell[K <: Key[V], V] {
*/
def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit
def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit
def when(deps: (Cell[K, V], (V, Boolean) => Outcome[V])*): Unit
def whenSequential(deps: (Cell[K, V], (V, Boolean) => Outcome[V])*): Unit

def zipFinal(that: Cell[K, V]): Cell[DefaultKey[(V, V)], (V, V)]

Expand Down Expand Up @@ -444,14 +450,14 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}

override def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = {
this.when(other, valueCallback, sequential = false)
this.when(other, new CombinedConcurrentCallbackRunnable(pool, this, other, valueCallback))
}

override def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = {
this.when(other, valueCallback, sequential = true)
this.when(other, new CombinedSequentialCallbackRunnable(pool, this, other, valueCallback))
}

private def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V], sequential: Boolean): Unit = {
private def when(other: Cell[K, V], callbackRunnable: CombinedCallbackRunnable[K, V]): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
Expand All @@ -462,18 +468,9 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val depRegistered =
if (current.combinedCallbacks.contains(other))
true // another combined dependency has been registered already. Ignore the new (duplicate) one. // TODO maybe remove this case distinction. By only using the else part, "old" callbacks get replaced.
else {
val newCallback: CombinedCallbackRunnable[K, V] =
if (sequential) new CombinedSequentialCallbackRunnable(pool, this, other, valueCallback)
else new CombinedConcurrentCallbackRunnable(pool, this, other, valueCallback)

val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks + (other -> newCallback))
state.compareAndSet(current, newState)
}
if (depRegistered) {
val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks + (other -> callbackRunnable))

if (state.compareAndSet(current, newState)) {
success = true
// Inform `other` that this cell depends on its updates.
other.addCombinedDependentCell(this)
Expand All @@ -484,15 +481,53 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override def when(deps: (Cell[K, V], (V, Boolean) => Outcome[V])*): Unit = {
this.whenMulti(deps.map({
case (other, callback) => (other, new CombinedConcurrentCallbackRunnable(pool, this, other, callback))
}))
}

override def whenSequential(deps: (Cell[K, V], (V, Boolean) => Outcome[V])*): Unit = {
this.whenMulti(deps.map({
case (other, callback) => (other, new CombinedSequentialCallbackRunnable(pool, this, other, callback))
}))
}

private def whenMulti(callbackRunnables: Seq[(Cell[K, V], CombinedCallbackRunnable[K, V])]): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
case finalRes: FinalState[K, V] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks ++ callbackRunnables)

if (state.compareAndSet(current, newState)) {
success = true
callbackRunnables.foreach(r => {
// Inform `other` that this cell depends on its updates.
r._1.addCombinedDependentCell(this)
// start calculations on `other` so that we eventually get its updates.
pool.triggerExecution(r._1)
})
}
}
}
}

override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
this.whenNext(other, valueCallback, sequential = false)
this.whenNext(other, new NextConcurrentCallbackRunnable(pool, this, other, valueCallback))
}

override def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
this.whenNext(other, valueCallback, sequential = true)
this.whenNext(other, new NextSequentialCallbackRunnable(pool, this, other, valueCallback))
}

private def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = {
private def whenNext(other: Cell[K, V], callbackRunnable: NextCallbackRunnable[K, V]): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
Expand All @@ -502,19 +537,10 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val depRegistered =
if (current.nextCallbacks.contains(other))
true // another combined dependency has been registered already. Ignore the new (duplicate) one. // TODO maybe remove this case distinction. By only using the else part, "old" callbacks get replaced.
else {
val newCallback: NextCallbackRunnable[K, V] =
if (sequential) new NextSequentialCallbackRunnable(pool, this, other, valueCallback)
else new NextConcurrentCallbackRunnable(pool, this, other, valueCallback)

val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks + (other -> newCallback), current.combinedCallbacks)
state.compareAndSet(current, newState)
}
if (depRegistered) {
val currentState = raw.asInstanceOf[IntermediateState[K, V]]
val newState = new IntermediateState(currentState.res, currentState.tasksActive, currentState.completeDependentCells, currentState.completeCallbacks, currentState.nextDependentCells, currentState.nextCallbacks + (other -> callbackRunnable), currentState.combinedCallbacks)

if (state.compareAndSet(currentState, newState)) {
success = true
// Inform `other` that this cell depends on its updates.
other.addNextDependentCell(this)
Expand All @@ -525,15 +551,53 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override def whenNext(deps: (Cell[K, V], V => Outcome[V])*): Unit = {
this.whenNextMulti(deps.map({
case (other, callback) => (other, new NextConcurrentCallbackRunnable(pool, this, other, callback))
}))
}

override def whenNextSequential(deps: (Cell[K, V], V => Outcome[V])*): Unit = {
this.whenNextMulti(deps.map({
case (other, callback) => (other, new NextSequentialCallbackRunnable(pool, this, other, callback))
}))
}

private def whenNextMulti(callbackRunnables: Seq[(Cell[K, V], NextCallbackRunnable[K, V])]): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
case finalRes: FinalState[K, V] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks ++ callbackRunnables, current.combinedCallbacks)

if (state.compareAndSet(current, newState)) {
success = true
callbackRunnables.foreach(r => {
// Inform `other` that this cell depends on its updates.
r._1.addNextDependentCell(this)
// start calculations on `other` so that we eventually get its updates.
pool.triggerExecution(r._1)
})
}
}
}
}

override def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
this.whenComplete(other, valueCallback, false)
this.whenComplete(other, new CompleteConcurrentCallbackRunnable(pool, this, other, valueCallback))
}

override def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
this.whenComplete(other, valueCallback, true)
this.whenComplete(other, new CompleteSequentialCallbackRunnable(pool, this, other, valueCallback))
}

private def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = {
private def whenComplete(other: Cell[K, V], callbackRunnable: CompleteCallbackRunnable[K, V]): Unit = {
var success = false
while (!success) {
state.get() match {
Expand All @@ -543,19 +607,10 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val depRegistered =
if (current.completeCallbacks.contains(other)) true
else {
val newCallback: CompleteCallbackRunnable[K, V] =
if (sequential) new CompleteSequentialCallbackRunnable(pool, this, other, valueCallback)
else new CompleteConcurrentCallbackRunnable(pool, this, other, valueCallback)

val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks + (other -> newCallback), current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks)
// TODO see whenNext
state.compareAndSet(current, newState)
}
if (depRegistered) {
val currentState = raw.asInstanceOf[IntermediateState[K, V]]
val newState = new IntermediateState(currentState.res, currentState.tasksActive, currentState.completeDependentCells, currentState.completeCallbacks + (other -> callbackRunnable), currentState.nextDependentCells, currentState.nextCallbacks, currentState.combinedCallbacks)

if (state.compareAndSet(currentState, newState)) {
success = true
// Inform `other` that this cell depends on its updates.
other.addCompleteDependentCell(this)
Expand All @@ -566,6 +621,44 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}

override def whenComplete(deps: (Cell[K, V], V => Outcome[V])*): Unit = {
this.whenCompleteMulti(deps.map({
case (other, callback) => (other, new CompleteConcurrentCallbackRunnable(pool, this, other, callback))
}))
}

override def whenCompleteSequential(deps: (Cell[K, V], V => Outcome[V])*): Unit = {
this.whenCompleteMulti(deps.map({
case (other, callback) => (other, new CompleteSequentialCallbackRunnable(pool, this, other, callback))
}))
}

private def whenCompleteMulti(callbackRunnables: Seq[(Cell[K, V], CompleteCallbackRunnable[K, V])]): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
case finalRes: FinalState[K, V] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks ++ callbackRunnables, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks)

if (state.compareAndSet(current, newState)) {
success = true
callbackRunnables.foreach(r => {
// Inform `other` that this cell depends on its updates.
r._1.addCompleteDependentCell(this)
// start calculations on `other` so that we eventually get its updates.
pool.triggerExecution(r._1)
})
}
}
}
}

override private[rasync] def addCompleteDependentCell(dependentCell: Cell[K, V]): Unit = {
triggerOrAddCompleteDependentCell(dependentCell)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/phaller/rasync/CellCompleter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait CellCompleter[K <: Key[V], V] {
* Run code for `this` cell sequentially.
* @return The result of `f`.
*/
def sequential[T](f: => T): T
private[rasync] def sequential[T](f: => T): T
}

object CellCompleter {
Expand Down
Loading

0 comments on commit 2ab5660

Please sign in to comment.