Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Add methods to register a set of dependencies #150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion core/src/main/scala/com/phaller/rasync/Cell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ trait Cell[K <: Key[V], V] {
def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit
def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit

def whenSequentialMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit
def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit

def zipFinal(that: Cell[K, V]): Cell[DefaultKey[(V, V)], (V, V)]

// internal API
Expand Down Expand Up @@ -257,7 +260,7 @@ private class IntermediateState[K <: Key[V], V](
/** A list of cells that `this` cell depends on mapped to the callbacks to call, if those cells change. */
val nextCallbacks: Map[Cell[K, V], NextCallbackRunnable[K, V]],
/** A list of cells that `this` cell depends on mapped to the callbacks to call, if those cells change. */
val combinedCallbacks: Map[Cell[K, V], CombinedCallbackRunnable[K, V]]) extends State[V]
val combinedCallbacks: Map[Cell[K, V], CallbackRunnable[K, V]]) extends State[V]

private object IntermediateState {
def empty[K <: Key[V], V](updater: Updater[V]): IntermediateState[K, V] =
Expand Down Expand Up @@ -481,6 +484,44 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
}
}
}
override def whenSequentialMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit =
whenMulti(other, valueCallback, true)

override def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit =
whenMulti(other, valueCallback, false)


private def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V], sequential: Boolean): Unit = {
var success = false
while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated)
state.get() match {
case _: FinalState[K, V] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: IntermediateState[_, _] => // not completed
val current = raw.asInstanceOf[IntermediateState[K, V]]
val toRegister = other.diff[Cell[K, V]](current.combinedCallbacks.keys.toSeq)
if (toRegister.isEmpty)
success = true // another combined dependency has been registered already. Ignore the new (duplicate) one.
else {
val newCallback: MultiCallbackRunnable[K, V] =
if (sequential) new MultiSequentialCallbackRunnable(pool, this, other, valueCallback)
else new MultiConcurrentCallbackRunnable(pool, this, other, valueCallback)

val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks ++ other.map((_, newCallback)))
if (state.compareAndSet(current, newState)) {
success = true
// Inform `other` that this cell depends on its updates.
other.foreach(_.addCombinedDependentCell(this))
// start calculations on `other` so that we eventually get its updates.
other.foreach(pool.triggerExecution)
}
}
}
}
}

override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
this.whenNext(other, valueCallback, sequential = false)
Expand Down
79 changes: 73 additions & 6 deletions core/src/main/scala/com/phaller/rasync/callbackRunnable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,34 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC
/** The cell that awaits this callback. */
val dependentCompleter: CellCompleter[K, V]

/** The cell that triggers the callback. */
val otherCell: Cell[K, V]

protected val completeDep: Boolean
protected val sequential: Boolean

/** The callback to be called. It retrieves an updated value of otherCell and returns an Outcome for dependentCompleter. */
val callback: Any // TODO Is there a better supertype for (a) (V, Bool)=>Outcome[V] and (b) V=>Outcome[V]. Nothing=>Outcome[V] does not work.


/** Call the callback and use update dependentCompleter according to the callback's result. */
override def run(): Unit

}

private[rasync] trait SingleDepCallbackRunnable[K <: Key[V], V] extends CallbackRunnable[K, V] { //extends Runnable with OnCompleteRunnable {
/** The handler pool that runs the callback function. */
override val pool: HandlerPool

/** The cell that awaits this callback. */
override val dependentCompleter: CellCompleter[K, V]

/** The cell that triggers the callback. */
val otherCell: Cell[K, V]

override protected val completeDep: Boolean
override protected val sequential: Boolean

/** The callback to be called. It retrieves an updated value of otherCell and returns an Outcome for dependentCompleter. */
override val callback: Any // TODO Is there a better supertype for (a) (V, Bool)=>Outcome[V] and (b) V=>Outcome[V]. Nothing=>Outcome[V] does not work.

/** Add this CallbackRunnable to its handler pool. */
// This method is not needed currently, as all callbackRunnables for one update are run in the same thread.
// Also, this is the only use of the peekFor method, so if tests show that the current implementation is
Expand Down Expand Up @@ -74,7 +93,7 @@ private[rasync] abstract class CompleteCallbackRunnable[K <: Key[V], V](
override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists.
override val otherCell: Cell[K, V],
override val callback: V => Outcome[V])
extends CallbackRunnable[K, V] {
extends SingleDepCallbackRunnable[K, V] {

override protected final val completeDep = true
// must be filled in before running it
Expand Down Expand Up @@ -135,7 +154,7 @@ private[rasync] abstract class NextCallbackRunnable[K <: Key[V], V](
override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists.
override val otherCell: Cell[K, V],
override val callback: V => Outcome[V])
extends CallbackRunnable[K, V] {
extends SingleDepCallbackRunnable[K, V] {

override protected final val completeDep = false

Expand Down Expand Up @@ -190,7 +209,7 @@ private[rasync] abstract class CombinedCallbackRunnable[K <: Key[V], V](
override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists.
override val otherCell: Cell[K, V],
override val callback: (V, Boolean) => Outcome[V])
extends CallbackRunnable[K, V] {
extends SingleDepCallbackRunnable[K, V] {

override protected final val completeDep = false

Expand Down Expand Up @@ -232,3 +251,51 @@ private[rasync] class CombinedConcurrentCallbackRunnable[K <: Key[V], V](overrid

private[rasync] class CombinedSequentialCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCell: Cell[K, V], override val callback: (V, Boolean) => Outcome[V])
extends CombinedCallbackRunnable[K, V](pool, dependentCompleter, otherCell, callback) with SequentialCallbackRunnable[K, V]

/* To be run when `otherCell` gets an update.
* @param pool The handler pool that runs the callback function
* @param dependentCompleter The cell, that depends on `otherCell`.
* @param otherCell Cell that triggers this callback.
* @param callback Callback function that is triggered on an onNext event
*/
private[rasync] abstract class MultiCallbackRunnable[K <: Key[V], V](
override val pool: HandlerPool,
override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists.
val otherCells: List[Cell[K, V]],
override val callback: () => Outcome[V])
extends CallbackRunnable[K, V] {

override protected final val completeDep = false

def run(): Unit = {
if (sequential) {
dependentCompleter.sequential {
callCallback()
}
} else {
callCallback()
}
}

protected def callCallback(): Unit = {
if (dependentCompleter.cell.isComplete) {
return ;
}

callback() match {
case NextOutcome(v) =>
dependentCompleter.putNext(v)
case FinalOutcome(v) =>
dependentCompleter.putFinal(v)
case _ => /* do nothing, the value of */
}

dependentCompleter.cell.removeAllCallbacks(otherCells.filter(_.isComplete))
}
}

private[rasync] class MultiConcurrentCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCells: List[Cell[K, V]], override val callback: () => Outcome[V])
extends MultiCallbackRunnable[K, V](pool, dependentCompleter, otherCells, callback) with ConcurrentCallbackRunnable[K, V]

private[rasync] class MultiSequentialCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCells: List[Cell[K, V]], override val callback: () => Outcome[V])
extends MultiCallbackRunnable[K, V](pool, dependentCompleter, otherCells, callback) with SequentialCallbackRunnable[K, V]
Loading