Skip to content

Commit

Permalink
Multithreaded support in select expression
Browse files Browse the repository at this point in the history
Fixes #1764
  • Loading branch information
elizarov authored and h0tk3y committed Apr 17, 2020
1 parent 34c15b0 commit 4e9a346
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 75 deletions.
12 changes: 8 additions & 4 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun trySelect ()Z
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
Expand All @@ -1095,8 +1095,12 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun isSelected ()Z
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect ()Z
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
}

public final class kotlinx/coroutines/selects/SelectKt {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
startCoroutine(start, this, receiver, block)
startCoroutine(start, receiver, this, block)
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(

internal fun <T, R> startCoroutineImpl(
start: CoroutineStart,
coroutine: AbstractCoroutine<T>,
receiver: R,
completion: Continuation<T>,
block: suspend R.() -> T
) = when (start) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

Expand All @@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
internal expect fun <T, R> startCoroutine(
start: CoroutineStart,
coroutine: AbstractCoroutine<T>,
receiver: R,
completion: Continuation<T>,
block: suspend R.() -> T
)

Expand Down
97 changes: 67 additions & 30 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
select.disposeOnSelect(node)
return
}
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
enqueueResult is Closed<*> -> {
node.dispose()
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
}
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
enqueueResult is Receive<*> -> {} // try to offer
else -> error("enqueueSend returned $enqueueResult ")
Expand Down Expand Up @@ -448,16 +451,18 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
override val pollResult: Any?,
@JvmField val channel: AbstractSendChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
block: suspend (SendChannel<E>) -> R
) : Send(), DisposableHandle {
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()

override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol? // must return symbol
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol

override fun completeResumeSend() {
block.startCoroutine(receiver = channel, completion = select.completion)
}
override fun completeResumeSend() =
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)

override fun dispose() { // invoked on select completion
block.shareableDispose(useIt = false)
remove()
}

Expand Down Expand Up @@ -773,7 +778,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
): Boolean {
val node = ReceiveSelect(this, select, block, receiveMode)
val result = enqueueReceive(node)
if (result) select.disposeOnSelect(node)
if (result) {
select.disposeOnSelect(node)
} else {
node.dispose()
}
return result
}

Expand Down Expand Up @@ -871,41 +880,53 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

private class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<Any?>,
cont: CancellableContinuation<Any?>,
@JvmField val receiveMode: Int
) : Receive<E>() {
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
private fun useCont() = _cont.getAndSet(null)

fun resumeValue(value: E): Any? = when (receiveMode) {
RECEIVE_RESULT -> ValueOrClosed.value(value)
else -> value
}

@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: run {
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
return RESUME_TOKEN
}

override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
override fun completeResumeReceive(value: E) { useCont()?.completeResume(RESUME_TOKEN) }

override fun resumeReceiveClosed(closed: Closed<*>) {
when {
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
else -> cont.resumeWithException(closed.receiveException)
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> useCont()?.resume(null)
receiveMode == RECEIVE_RESULT -> useCont()?.resume(closed.toResult<Any>())
else -> useCont()?.resumeWithException(closed.receiveException)
}
}
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
}

private class ReceiveHasNext<E>(
@JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
cont: CancellableContinuation<Boolean>
) : Receive<E>() {
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
private fun useCont() = _cont.getAndSet(null)

override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(true, otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: run{
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
Expand All @@ -918,51 +939,61 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
but completeResumeReceive is called once so we set iterator result here.
*/
iterator.result = value
cont.completeResume(RESUME_TOKEN)
useCont()?.completeResume(RESUME_TOKEN)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null) {
cont.tryResume(false)
_cont.value?.tryResume(false)
} else {
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
_cont.value?.let { cont ->
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
}
}
if (token != null) {
iterator.result = closed
cont.completeResume(token)
_cont.value?.completeResume(token)
}
_cont.value = null
}
override fun toString(): String = "ReceiveHasNext@$hexAddress"
}

private class ReceiveSelect<R, E>(
@JvmField val channel: AbstractChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Any?) -> R,
block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening

override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol?
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect()) return
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
RECEIVE_THROWS_ON_CLOSE -> {
block.shareableDispose(useIt = true)
select.resumeSelectWithException(closed.receiveException)
}
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
} else {
block.shareableDispose(useIt = true)
select.resumeSelectWithException(closed.receiveException)
}
}
}

override fun dispose() { // invoked on select completion
block.shareableDispose(useIt = false)
if (remove())
channel.onReceiveDequeued() // notify cancellation of receive
}
Expand Down Expand Up @@ -1031,17 +1062,23 @@ internal interface ReceiveOrClosed<in E> {
@Suppress("UNCHECKED_CAST")
internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation<Unit>
cont: CancellableContinuation<Unit>
) : Send() {
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
private fun useCont() = _cont.getAndSet(null)

override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: run {
_cont.value = null
return null
}
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare() // finish preparations
return RESUME_TOKEN
}
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun completeResumeSend() { useCont()?.completeResume(RESUME_TOKEN) }
override fun resumeSendClosed(closed: Closed<*>) { useCont()?.resumeWithException(closed.sendException) }
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
}

Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/Sharing.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
internal expect fun <T> Continuation<T>.shareableDispose()
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)

internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()

internal expect fun isReuseSupportedInPlatform(): Boolean
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)
Expand Down
Loading

0 comments on commit 4e9a346

Please sign in to comment.