From 6862afcb410a2dfcca344a052e618abaaeb63859 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 4 Mar 2020 16:17:31 +0300 Subject: [PATCH] Simpler and faster lock-free linked list (#1565) Lock-free list implementation is considerably simplified, taking into account a limited number of operations that it needs to support. * prev pointers in the list are not marked for removal, since we don't need to support linearizable backwards iteration. * helpDelete method is completely removed. All "delete-helping" is performed only by correctPrev method. * correctPrev method bails out when the node it works on is removed to reduce contention during concurrent removals. * Special open methods "isRemoved" and "nextIfRemoved" are introduced and are overridden in list head class (which is never removed). This ensures that on long list "removeFist" operation (touching head) does not interfere with "addLast" (touch tail). There is still sharing of cache-lines in this case, but no helping between them. All in all, this improvement reduces the size of implementation code and makes it considerably faster. Operations on LinkedListChannel are now much faster (see timings of ChannelSendReceiveStressTest). --- .../common/src/channels/ConflatedChannel.kt | 4 +- .../jvm/src/internal/LockFreeLinkedList.kt | 328 ++++++++---------- .../channels/ChannelSendReceiveStressTest.kt | 15 +- 3 files changed, 150 insertions(+), 197 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index ea7c1e5567..399019c3ee 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -7,12 +7,12 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* -import kotlin.native.concurrent.SharedImmutable +import kotlin.native.concurrent.* /** * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, * so that the receiver always gets the most recently sent element. - * Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received, + * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, * while previously sent elements **are lost**. * Sender to this channel never suspends and [offer] always returns `true`. * diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt index 19cb159b6b..26fd169da3 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt @@ -40,14 +40,18 @@ public actual typealias PrepareOp = LockFreeLinkedListNode.PrepareOp * Doubly-linked concurrent list node with remove support. * Based on paper * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf) - * by Sundell and Tsigas. + * by Sundell and Tsigas with considerable changes. + * + * The core idea of the algorithm is to maintain a doubly-linked list with an ever-present sentinel node (it is + * never removed) that serves both as a list head and tail and to linearize all operations (both insert and remove) on + * the update of the next pointer. Removed nodes have their next pointer marked with [Removed] class. * * Important notes: - * * The instance of this class serves both as list head/tail sentinel and as the list item. - * Sentinel node should be never removed. * * There are no operations to add items to left side of the list, only to the end (right side), because we cannot * efficiently linearize them with atomic multi-step head-removal operations. In short, * support for [describeRemoveFirst] operation precludes ability to add items at the beginning. + * * Previous pointers are not marked for removal. We don't support linearizable backwards traversal. + * * Remove-helping logic is simplified and consolidated in [correctPrev] method. * * @suppress **This is unstable API and it is subject to change.** */ @@ -55,7 +59,7 @@ public actual typealias PrepareOp = LockFreeLinkedListNode.PrepareOp @InternalCoroutinesApi public actual open class LockFreeLinkedListNode { private val _next = atomic(this) // Node | Removed | OpDescriptor - private val _prev = atomic(this) // Node | Removed + private val _prev = atomic(this) // Node to the left (cannot be marked as removed) private val _removedRef = atomic(null) // lazily cached removed ref to this private fun removed(): Removed = @@ -83,7 +87,7 @@ public actual open class LockFreeLinkedListNode { override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE } - public actual val isRemoved: Boolean get() = next is Removed + public actual open val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed public val next: Any get() { @@ -96,19 +100,19 @@ public actual open class LockFreeLinkedListNode { // LINEARIZABLE. Returns next non-removed Node public actual val nextNode: Node get() = next.unwrap() - // LINEARIZABLE. Returns Node | Removed - public val prev: Any get() { - _prev.loop { prev -> - if (prev is Removed) return prev - prev as Node // otherwise, it can be only node - if (prev.next === this) return prev - correctPrev(prev, null) - } + // LINEARIZABLE WHEN THIS NODE IS NOT REMOVED: + // Returns prev non-removed Node, makes sure prev is correct (prev.next === this) + // NOTE: if this node is removed, then returns non-removed previous node without applying + // prev.next correction, which does not provide linearizable backwards iteration, but can be used to + // resume forward iteration when current node was removed. + public actual val prevNode: Node + get() = correctPrev(null) ?: findPrevNonRemoved(_prev.value) + + private tailrec fun findPrevNonRemoved(current: Node): Node { + if (!current.isRemoved) return current + return findPrevNonRemoved(current._prev.value) } - // LINEARIZABLE. Returns prev non-removed Node - public actual val prevNode: Node get() = prev.unwrap() - // ------ addOneIfEmpty ------ public actual fun addOneIfEmpty(node: Node): Boolean { @@ -132,8 +136,7 @@ public actual open class LockFreeLinkedListNode { */ public actual fun addLast(node: Node) { while (true) { // lock-free loop on prev.next - val prev = prev as Node // sentinel node is never removed, so prev is always defined - if (prev.addNext(node, this)) return + if (prevNode.addNext(node, this)) return } } @@ -145,7 +148,7 @@ public actual open class LockFreeLinkedListNode { public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { val condAdd = makeCondAddOp(node, condition) while (true) { // lock-free loop on prev.next - val prev = prev as Node // sentinel node is never removed, so prev is always defined + val prev = prevNode // sentinel node is never removed, so prev is always defined when (prev.tryCondAddNext(node, this, condAdd)) { SUCCESS -> return true FAILURE -> return false @@ -155,7 +158,7 @@ public actual open class LockFreeLinkedListNode { public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { while (true) { // lock-free loop on prev.next - val prev = prev as Node // sentinel node is never removed, so prev is always defined + val prev = prevNode // sentinel node is never removed, so prev is always defined if (!predicate(prev)) return false if (prev.addNext(node, this)) return true } @@ -168,7 +171,7 @@ public actual open class LockFreeLinkedListNode { ): Boolean { val condAdd = makeCondAddOp(node, condition) while (true) { // lock-free loop on prev.next - val prev = prev as Node // sentinel node is never removed, so prev is always defined + val prev = prevNode // sentinel node is never removed, so prev is always defined if (!predicate(prev)) return false when (prev.tryCondAddNext(node, this, condAdd)) { SUCCESS -> return true @@ -233,55 +236,71 @@ public actual open class LockFreeLinkedListNode { * In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed". * Invoke [helpRemove] to make sure that remove was completed. */ - public actual open fun remove(): Boolean { + public actual open fun remove(): Boolean = + removeOrNext() == null + + // returns null if removed successfully or next node if this node is already removed + @PublishedApi + internal fun removeOrNext(): Node? { while (true) { // lock-free loop on next val next = this.next - if (next is Removed) return false // was already removed -- don't try to help (original thread will take care) - if (next === this) return false // was not even added + if (next is Removed) return next.ref // was already removed -- don't try to help (original thread will take care) + if (next === this) return next // was not even added val removed = (next as Node).removed() if (_next.compareAndSet(next, removed)) { // was removed successfully (linearized remove) -- fixup the list - finishRemove(next) - return true + next.correctPrev(null) + return null } } } + // Helps with removal of this node public actual fun helpRemove() { - val removed = this.next as? Removed ?: error("Must be invoked on a removed node") - finishRemove(removed.ref) + // Note: this node must be already removed + (next as Removed).ref.correctPrev(null) } - public actual fun removeFirstOrNull(): Node? { - while (true) { // try to linearize - val first = next as Node - if (first === this) return null - if (first.remove()) return first - first.helpDelete() // must help delete, or loose lock-freedom + // Helps with removal of nodes that are previous to this + @PublishedApi + internal fun helpRemovePrev() { + // We need to call correctPrev on a non-removed node to ensure progress, since correctPrev bails out when + // called on a removed node. There's always at least one non-removed node (list head). + var node = this + while (true) { + val next = node.next + if (next !is Removed) break + node = next.ref } + // Found a non-removed node + node.correctPrev(null) } - public fun describeRemoveFirst(): RemoveFirstDesc = RemoveFirstDesc(this) - - public inline fun removeFirstIfIsInstanceOf(): T? { + public actual fun removeFirstOrNull(): Node? { while (true) { // try to linearize val first = next as Node if (first === this) return null - if (first !is T) return null if (first.remove()) return first - first.helpDelete() // must help delete, or loose lock-freedom + first.helpRemove() // must help remove to ensure lock-freedom } } + public fun describeRemoveFirst(): RemoveFirstDesc = RemoveFirstDesc(this) + // just peek at item when predicate is true public actual inline fun removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? { - while (true) { // try to linearize - val first = next as Node - if (first === this) return null + while (true) { + val first = this.next as Node + if (first === this) return null // got list head -- nothing to remove if (first !is T) return null - if (predicate(first)) return first // just peek when predicate is true - if (first.remove()) return first - first.helpDelete() // must help delete, or loose lock-freedom + if (predicate(first)) { + // check for removal of the current node to make sure "peek" operation is linearizable + if (!first.isRemoved) return first + } + val next = first.removeOrNext() + if (next === null) return first // removed successfully -- return it + // help and start from the beginning + next.helpRemovePrev() } } @@ -296,25 +315,9 @@ public actual open class LockFreeLinkedListNode { assert { node._next.value === node && node._prev.value === node } } - // Returns null when atomic op got into deadlock trying to help operation that started later - final override fun takeAffectedNode(op: OpDescriptor): Node? { - while (true) { - val prev = queue._prev.value as Node // this sentinel node is never removed - val next = prev._next.value - if (next === queue) return prev // all is good -> linked properly - if (next === op) return prev // all is good -> our operation descriptor is already there - if (next is OpDescriptor) { // some other operation descriptor -> help & retry - if (op.isEarlierThan(next)) - return null // RETRY_ATOMIC - next.perform(prev) - continue - } - // linked improperly -- help insert - val affected = queue.correctPrev(prev, op) - // we can find node which this operation is already affecting while trying to correct prev - if (affected != null) return affected - } - } + // Returns null when atomic op got into deadlock trying to help operation that started later (RETRY_ATOMIC) + final override fun takeAffectedNode(op: OpDescriptor): Node? = + queue.correctPrev(op) // queue head is never removed, so null result can only mean RETRY_ATOMIC private val _affectedNode = atomic(null) final override val affectedNode: Node? get() = _affectedNode.value @@ -368,11 +371,11 @@ public actual open class LockFreeLinkedListNode { // check node predicates here, must signal failure if affect is not of type T protected override fun failure(affected: Node): Any? = - if (affected === queue) LIST_EMPTY else null + if (affected === queue) LIST_EMPTY else null final override fun retry(affected: Node, next: Any): Boolean { if (next !is Removed) return false - affected.helpDelete() // must help delete, or loose lock-freedom + next.ref.helpRemovePrev() // must help delete to ensure lock-freedom return true } @@ -385,7 +388,12 @@ public actual open class LockFreeLinkedListNode { } final override fun updatedNext(affected: Node, next: Node): Any = next.removed() - final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next) + + final override fun finishOnSuccess(affected: Node, next: Node) { + // Complete removal operation here. It bails out if next node is also removed and it becomes + // responsibility of the next's removes to call correctPrev which would help fix all the links. + next.correctPrev(null) + } } // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation @@ -404,9 +412,12 @@ public actual open class LockFreeLinkedListNode { val decision = desc.onPrepare(this) if (decision === REMOVE_PREPARED) { // remove element on failure -- do not mark as decided, will try another one + val next = this.next val removed = next.removed() if (affected._next.compareAndSet(this, removed)) { - affected.helpDelete() + // Complete removal operation here. It bails out if next node is also removed and it becomes + // responsibility of the next's removes to call correctPrev which would help fix all the links. + next.correctPrev(null) } return REMOVE_PREPARED } @@ -519,139 +530,68 @@ public actual open class LockFreeLinkedListNode { */ private fun finishAdd(next: Node) { next._prev.loop { nextPrev -> - if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links - if (next._prev.compareAndSet(nextPrev, this)) { - if (this.next is Removed) { - // already removed - next.correctPrev(nextPrev as Node, null) - } - return - } + if (this.next !== next) return // this or next was removed or another node added, remover/adder fixes up links + if (next._prev.compareAndSet(nextPrev, this)) return } } - private fun finishRemove(next: Node) { - helpDelete() - next.correctPrev(_prev.value.unwrap(), null) - } - - private fun markPrev(): Node { - _prev.loop { prev -> - if (prev is Removed) return prev.ref - // See detailed comment in findHead on why `prev === this` is a special case for which we know that - // the prev should have being pointing to the head of list but finishAdd that was supposed - // to do that is not complete yet. - val removedPrev = (if (prev === this) findHead() else (prev as Node)).removed() - if (_prev.compareAndSet(prev, removedPrev)) return prev - } - } + protected open fun nextIfRemoved(): Node? = (next as? Removed)?.ref /** - * Finds the head of the list (implementing [LockFreeLinkedListHead]) by following [next] pointers. + * Returns the corrected value of the previous node while also correcting the `prev` pointer + * (so that `this.prev.next === this`) and helps complete node removals to the left ot this node. * - * The code in [kotlinx.coroutines.JobSupport] performs upgrade of a single node to a list. - * It uses [addOneIfEmpty] to add the list head to "empty list of a single node" once. - * During upgrade a transient state of the list looks like this: + * It returns `null` in two special cases: * - * ``` - * +-----------------+ - * | | - * node V head | - * +---+---+ +---+---+ | - * +-> | P | N | --> | P | N |-+ - * | +---+---+ +---+---+ - * | | ^ | - * +---- + +---------+ - * ``` - * - * The [prev] pointer in `node` still points to itself when [finishAdd] (invoked inside [addOneIfEmpty]) - * has not completed yet. If this state is observed, then we know that [prev] should have been pointing - * to the list head. This function is looking up the head by following consistent chain of [next] pointers. + * * When this node is removed. In this case there is no need to waste time on corrections, because + * remover of this node will ultimately call [correctPrev] on the next node and that will fix all + * the links from this node, too. + * * When [op] descriptor is not `null` and and operation descriptor that is [OpDescriptor.isEarlierThan] + * that current [op] is found while traversing the list. This `null` result will be translated + * by callers to [RETRY_ATOMIC]. */ - private fun findHead(): Node { - var cur = this - while (true) { - if (cur is LockFreeLinkedListHead) return cur - cur = cur.nextNode - assert { cur !== this } // "Cannot loop to this while looking for list head" - } - } - - // fixes next links to the left of this node - @PublishedApi - internal fun helpDelete() { - var last: Node? = null // will set to the node left of prev when found - var prev: Node = markPrev() - var next: Node = (this._next.value as Removed).ref - while (true) { - // move to the right until first non-removed node - val nextNext = next.next - if (nextNext is Removed) { - next.markPrev() - next = nextNext.ref - continue - } - // move the the left until first non-removed node - val prevNext = prev.next - if (prevNext is Removed) { - if (last != null) { - prev.markPrev() - last._next.compareAndSet(prev, prevNext.ref) - prev = last - last = null - } else { - prev = prev._prev.value.unwrap() - } - continue - } - if (prevNext !== this) { - // skipped over some removed nodes to the left -- setup to fixup the next links - last = prev - prev = prevNext as Node - if (prev === next) return // already done!!! - continue - } - // Now prev & next are Ok - if (prev._next.compareAndSet(this, next)) return // success! - } - } - - // fixes prev links from this node - // returns affected node by this operation when this op is in progress (and nothing can be corrected) - // returns null otherwise (prev was corrected) - private fun correctPrev(_prev: Node, op: OpDescriptor?): Node? { - var prev: Node = _prev + private tailrec fun correctPrev(op: OpDescriptor?): Node? { + val oldPrev = _prev.value + var prev: Node = oldPrev var last: Node? = null // will be set so that last.next === prev - while (true) { - // move the the left until first non-removed node - val prevNext = prev._next.value - if (prevNext === op) return prev // part of the same op -- don't recurse, didn't correct prev - if (prevNext is OpDescriptor) { // help & retry - prevNext.perform(prev) - continue - } - if (prevNext is Removed) { - if (last !== null) { - prev.markPrev() - last._next.compareAndSet(prev, prevNext.ref) - prev = last - last = null - } else { - prev = prev._prev.value.unwrap() + while (true) { // move the the left until first non-removed node + val prevNext: Any = prev._next.value + when { + // fast path to find quickly find prev node when everything is properly linked + prevNext === this -> { + if (oldPrev === prev) return prev // nothing to update -- all is fine, prev found + // otherwise need to update prev + if (!this._prev.compareAndSet(oldPrev, prev)) { + // Note: retry from scratch on failure to update prev + return correctPrev(op) + } + return prev // return a correct prev + } + // slow path when we need to help remove operations + this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time + prevNext === op -> return prev // part of the same op -- don't recurse, didn't correct prev + prevNext is OpDescriptor -> { // help & retry + if (op != null && op.isEarlierThan(prevNext)) + return null // RETRY_ATOMIC + prevNext.perform(prev) + return correctPrev(op) // retry from scratch + } + prevNext is Removed -> { + if (last !== null) { + // newly added (prev) node is already removed, correct last.next around it + if (!last._next.compareAndSet(prev, prevNext.ref)) { + return correctPrev(op) // retry from scratch on failure to update next + } + prev = last + last = null + } else { + prev = prev._prev.value + } + } + else -> { // prevNext is a regular node, but not this -- help delete + last = prev + prev = prevNext as Node } - continue - } - val oldPrev = this._prev.value - if (oldPrev is Removed) return null // this node was removed, too -- its remover will take care - if (prevNext !== this) { - // need to fixup next - last = prev - prev = prevNext as Node - continue - } - if (oldPrev === prev) return null // it is already linked as needed - if (this._prev.compareAndSet(oldPrev, prev)) { - if (prev._prev.value !is Removed) return null // finish only if prev was not concurrently removed } } } @@ -691,7 +631,11 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { } // just a defensive programming -- makes sure that list head sentinel is never removed - public actual final override fun remove(): Boolean = throw UnsupportedOperationException() + public actual final override fun remove(): Boolean = error("head cannot be removed") + + // optimization: because head is never removed, we don't have to read _next.value to check these: + override val isRemoved: Boolean get() = false + override fun nextIfRemoved(): Node? = null internal fun validate() { var prev: Node = this diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt index 5fc87abfd6..00c5a6090f 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt @@ -1,11 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlinx.coroutines.selects.* +import org.junit.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* @@ -43,12 +44,20 @@ class ChannelSendReceiveStressTest( private val receivedTotal = AtomicInteger() private val receivedBy = IntArray(nReceivers) + private val pool = + newFixedThreadPoolContext(nSenders + nReceivers, "ChannelSendReceiveStressTest") + + @After + fun tearDown() { + pool.close() + } + @Test fun testSendReceiveStress() = runBlocking { println("--- ChannelSendReceiveStressTest $kind with nSenders=$nSenders, nReceivers=$nReceivers") val receivers = List(nReceivers) { receiverIndex -> // different event receivers use different code - launch(Dispatchers.Default + CoroutineName("receiver$receiverIndex")) { + launch(pool + CoroutineName("receiver$receiverIndex")) { when (receiverIndex % 5) { 0 -> doReceive(receiverIndex) 1 -> doReceiveOrNull(receiverIndex) @@ -60,7 +69,7 @@ class ChannelSendReceiveStressTest( } } val senders = List(nSenders) { senderIndex -> - launch(Dispatchers.Default + CoroutineName("sender$senderIndex")) { + launch(pool + CoroutineName("sender$senderIndex")) { when (senderIndex % 2) { 0 -> doSend(senderIndex) 1 -> doSendSelect(senderIndex)