Skip to content

Commit

Permalink
Reduce starvation of ConflatedChannel receiver with many senders
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Oct 3, 2019
1 parent 4b62d3b commit d381216
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,21 @@ public actual open class LockFreeLinkedListNode {
* In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed".
* Invoke [helpRemove] to make sure that remove was completed.
*/
public actual open fun remove(): Boolean {
public actual open fun remove(): Boolean =
removeOrNext() == null

// returns null if removed successfully or next node if this node is already removed
@PublishedApi
internal fun removeOrNext(): Node? {
while (true) { // lock-free loop on next
val next = this.next
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
if (next === this) return false // was not even added
if (next is Removed) return next.ref // was already removed -- don't try to help (original thread will take care)
if (next === this) return next // was not even added
val removed = (next as Node).removed()
if (_next.compareAndSet(next, removed)) {
// was removed successfully (linearized remove) -- fixup the list
next.correctPrev(null, false)
return true
return null
}
}
}
Expand Down Expand Up @@ -283,13 +288,34 @@ public actual open class LockFreeLinkedListNode {

// just peek at item when predicate is true
public actual inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? {
while (true) { // try to linearize
val first = next as Node
if (first === this) return null
if (first !is T) return null
if (predicate(first)) return first // just peek when predicate is true
if (first.remove()) return first
first.helpRemove() // must help remove to ensure lock-freedom
while(true) { // start with the first node of the list
var first = this.next as Node
var moveNextAttempts = 0
while (true) { // scan a sequence of next nodes to find the first non-removed node
if (first === this) return null // got list head -- nothing to remove
if (first !is T) return null
if (predicate(first)) {
// check for removal of the current node to make sure "peek" operation is linearizable
if (first.isRemoved) break // help remove it and start from the beginning
return first // just peek when predicate is true
}
val next = first.removeOrNext()
if (next === null) return first // removed successfully -- return it
/*
* This code is needed to reduce starvation in ConflatedChannel receiver that ties to remove a node
* from the head of its queue while sender threads are constantly removing and adding new nodes.
* So, upon discovery of a removed node we don't immediately rush to help with its removal.
* We try to quickly move to the next node to see if we can remove it instead.
* However, we limit the number of attempts to move to the next node and help with removal at the end
* to avoid repeatedly scanning very long lists in LinkedListChannel.
*/
if (moveNextAttempts++ < 32) {
first = next
} else {
break // help and start from the beginning
}
}
first.helpRemove() // help remove this one and retry from the beginning of the list
}
}

Expand Down

0 comments on commit d381216

Please sign in to comment.