From 7fdd26cd689229487dce2f2d14984a19c9dd5761 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 16 Mar 2021 16:19:29 +0300 Subject: [PATCH] Make the subscriber in awaitOne less permissive The implementation of Reactive Streams' Subscriber used for `await*` operations was assuming that the publisher is correct. Now, the implementation detects some instances of problematic behavior for publishers and reports them. Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2079 --- .../kotlinx-coroutines-reactive/src/Await.kt | 67 +++++++++++++++--- .../test/IntegrationTest.kt | 69 +++++++++++++++++++ 2 files changed, 128 insertions(+), 8 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index e9f6955085..ab1b43a7eb 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import java.lang.IllegalStateException import java.util.* import kotlin.coroutines.* @@ -134,29 +135,47 @@ private suspend fun Publisher.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> + /* This implementation must obey + https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code + The numbers of rules are taken from there. */ injectCoroutineContext(cont.context).subscribe(object : Subscriber { - private lateinit var subscription: Subscription + // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering + private var subscription: Subscription? = null private var value: T? = null private var seenValue = false + private var inTerminalState = false override fun onSubscribe(sub: Subscription) { + /** cancelling the existing subscription due to rule 2.5, though the publisher would either have to + * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */ + subscription?.let { + value = null + seenValue = false + inTerminalState = false + it.cancel() + } subscription = sub cont.invokeOnCancellation { sub.cancel() } - sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE) + sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) } override fun onNext(t: T) { + val sub = subscription.checkInitialized("onNext") + subscription.checkInitialized("onNext") + if (inTerminalState) + gotSignalInTerminalStateException("onNext") when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { - if (!seenValue) { - seenValue = true - subscription.cancel() - cont.resume(t) - } + if (seenValue) + // TODO: decide if we want to be lenient here: after all, nothing breaks if this isn't true + moreThanOneValueProvidedException(mode) + seenValue = true + sub.cancel() + cont.resume(t) } Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> { if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) { - subscription.cancel() + sub.cancel() if (cont.isActive) cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode")) } else { @@ -169,6 +188,8 @@ private suspend fun Publisher.awaitOne( @Suppress("UNCHECKED_CAST") override fun onComplete() { + subscription.checkInitialized("onComplete") // TODO: maybe don't enforce? + enterTerminalState("onComplete") if (seenValue) { if (cont.isActive) cont.resume(value as T) return @@ -184,8 +205,38 @@ private suspend fun Publisher.awaitOne( } override fun onError(e: Throwable) { + subscription.checkInitialized("onError") // TODO: maybe don't enforce? + enterTerminalState("onError") cont.resumeWithException(e) } + + /** + * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete]. + */ + private fun enterTerminalState(signalName: String) { + if (inTerminalState) + gotSignalInTerminalStateException(signalName) + inTerminalState = true + } }) } +/** + * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal + * state was reached. + */ +private fun gotSignalInTerminalStateException(signalName: String): Nothing = + throw IllegalStateException( + "'$signalName' was called after the publisher already signalled being in a terminal state") + +/** + * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested. + */ +private fun moreThanOneValueProvidedException(mode: Mode): Nothing = + throw IllegalStateException("Only a single value were requested in $mode, but the publisher provided more") + +/** + * Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. + */ +private fun Subscription?.checkInitialized(signalName: String): Subscription = + this ?: throw IllegalStateException("'$signalName' was called before 'onSubscribe'") diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 18cd012d16..78a73f4d09 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -9,6 +9,8 @@ import org.junit.Test import org.junit.runner.* import org.junit.runners.* import org.reactivestreams.* +import java.lang.IllegalStateException +import java.lang.RuntimeException import kotlin.coroutines.* import kotlin.test.* @@ -130,6 +132,73 @@ class IntegrationTest( finish(3) } + /** + * Test the behavior of [awaitOne] on unconforming publishers. + */ + @Test + fun testAwaitOnNonconformingPublishers() = runTest { + fun publisher(block: Subscriber.(n: Long) -> Unit) = + Publisher { subscriber -> + subscriber.onSubscribe(object: Subscription { + override fun request(n: Long) { + subscriber.block(n) + } + + override fun cancel() { + } + }) + } + suspend fun assertDetectsBadPublisher(operation: suspend Publisher.() -> T, + block: Subscriber.(n: Long) -> Unit) = + assertFailsWith { publisher(block).operation() } + + // Rule 1.1 broken: the publisher produces more values than requested. + assertDetectsBadPublisher({ awaitFirst() }) { + onNext(1) + onNext(2) + } + + // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state. + // Using awaitSingle to check that bad publishers have priority over the lack of a value. + assertDetectsBadPublisher({ awaitSingle() }) { + onError(RuntimeException("")) + onComplete() + } + assertDetectsBadPublisher({ awaitSingle() }) { + onComplete() + onError(RuntimeException("")) + } + assertDetectsBadPublisher({ awaitSingle() }) { + onComplete() + onComplete() + } + assertDetectsBadPublisher({ awaitSingle() }) { + onComplete() + onNext(3) + } + assertDetectsBadPublisher({ awaitSingle() }) { + onError(RuntimeException("")) + onNext(3) + } + + // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe') + assertFailsWith { + Publisher { subscriber -> + subscriber.onNext(3) + }.awaitFirst() + } + assertFailsWith { + Publisher { subscriber -> + subscriber.onComplete() + }.awaitFirst() + } + assertFailsWith { + Publisher { subscriber -> + subscriber.onError(RuntimeException("")) + }.awaitFirst() + } + } + private suspend fun checkNumbers(n: Int, pub: Publisher) { var last = 0 pub.collect {