Skip to content

Commit

Permalink
Add Smart Progagation Prevention
Browse files Browse the repository at this point in the history
This fixes phaller#82: Do not inform dependent cells about updates, if another update is in sight.
  • Loading branch information
JanKoelzer committed Jun 29, 2018
1 parent a5361a8 commit 8d77bba
Showing 1 changed file with 52 additions and 8 deletions.
60 changes: 52 additions & 8 deletions core/src/main/scala/com/phaller/rasync/Cell.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.phaller.rasync

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import java.util.concurrent.{ CountDownLatch, ExecutionException }

import scala.annotation.tailrec
Expand Down Expand Up @@ -295,6 +295,10 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
* (b) `IntermediateState[K,V]` for an incomplete state.
*/
private val state = new AtomicReference[State[V]](IntermediateState.empty[K, V](updater))
/* first element is the number of incoming callbacks,
* second element is the value that has been propagated to dependent cells (or initially bottom)
*/
private val numIncomingCallbacks = new AtomicInteger(0) //new AtomicReference[(Int, V)]((0, updater.initial))

// A list of callbacks to call, when `this` cell is completed/updated.
// Note that this is not sync'ed with the state in any way, so calling `onComplete` or `onNext`
Expand Down Expand Up @@ -618,7 +622,13 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
// CAS was successful, so there was a point in time where `newVal` was in the cell
// every dependent cell should pull the new value
onNextHandler.foreach(_.apply(Success(newVal)))
current.nextDependentCells.foreach(_._1.updateDeps(this))

// If we came here via a direct putNext (instead of Outcome of a whenNextCallback)
// this incoming change has not been counted. So we need to manually start outgoing callbacks.
// (This might lead to duplicate invocation.)
if (numIncomingCallbacks.get() <= 0)
triggerDependentCells()

true
}
} else true
Expand Down Expand Up @@ -666,6 +676,8 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U

case (pre: IntermediateState[K, V], finalValue: FinalState[K, V]) =>
// Inform dependent cells about the update.
// We do not need to take the number of incoming callbacks into account,
// because a final update always needs to be propagated.
val dependentCells = pre.nextDependentCells.keys ++ pre.completeDependentCells
dontCall match {
case Some(cells) =>
Expand Down Expand Up @@ -1019,11 +1031,32 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
case t => Failure(t)
}

/** Called, when a CallbackRunnable r with r.dependentCell == this has been started. */
private def incIncomingCallbacks(): Unit = {
numIncomingCallbacks.incrementAndGet()
}

/**
* Called, when a CallbackRunnable r with r.dependentCell == this has been completed.
* Triggers all dependent cells, if no more incoming callbacks are running.
*/
private def decIncomingCallbacks(): Unit = {
val newValue = numIncomingCallbacks.decrementAndGet()
if (newValue == 0) triggerDependentCells()
}

def triggerDependentCells(): Unit = state.get match {
case pre: FinalState[_, _] =>
val cur = pre.asInstanceOf[FinalState[K, V]]
cur.nextDependentCells.keys.foreach(_.updateDeps(this))
case pre: IntermediateState[_, _] =>
val cur = pre.asInstanceOf[IntermediateState[K, V]]
cur.nextDependentCells.keys.foreach(_.updateDeps(this))
}

override private[rasync] def updateDeps(otherCell: Cell[K, V]): Unit = state.get() match {
case pre: IntermediateState[_, _] =>

// Store snapshots of the callbacks, as the Cell's callbacks might change
// before the update (see below) task gets executed
val current = pre.asInstanceOf[IntermediateState[K, V]]
val completeCallbacks = current.completeCallbacks.get(otherCell)
val nextCallbacks = current.nextCallbacks.get(otherCell)
Expand All @@ -1033,11 +1066,22 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
pool.execute(() => {
state.get() match {
case _: IntermediateState[_, _] =>
completeCallbacks.foreach { _.run() }
nextCallbacks.foreach { _.run() }
combinedCallbacks.foreach { _.run() }
// Indicate an intent to run a callback, then run it.
// This prevents premature updates in dependent cells.
completeCallbacks.foreach { incIncomingCallbacks(); _.run() }
nextCallbacks.foreach { incIncomingCallbacks(); _.run() }
combinedCallbacks.foreach { incIncomingCallbacks(); _.run() }

// Indicate, that callbacks have been run.
// If the counter drops to zero, inform depdent cells.
completeCallbacks.foreach(_ => decIncomingCallbacks())
nextCallbacks.foreach(_ => decIncomingCallbacks())
combinedCallbacks.foreach(_ => decIncomingCallbacks())
case _: FinalState[K, V] =>
/* We are final already, so we ignore all incoming information. */
/* We are final already, so we ignore all incoming information.
* Dependent cells do not need to be informed any more, as this
* has been done at the time, the cell has been completed.
*/
}
})
case _: FinalState[K, V] => /* We are final already, so we ignore all incoming information. */
Expand Down

0 comments on commit 8d77bba

Please sign in to comment.