Skip to content

Commit

Permalink
Make the subscriber in awaitOne less permissive
Browse files Browse the repository at this point in the history
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes #2079
  • Loading branch information
dkhalanskyjb committed Mar 16, 2021
1 parent 187f0aa commit af692aa
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 8 deletions.
67 changes: 59 additions & 8 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.lang.IllegalStateException
import java.util.*
import kotlin.coroutines.*

Expand Down Expand Up @@ -134,29 +135,47 @@ private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
/* This implementation must obey
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code
The numbers of rules are taken from there. */
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
// It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering
private var subscription: Subscription? = null
private var value: T? = null
private var seenValue = false
private var inTerminalState = false

override fun onSubscribe(sub: Subscription) {
/** cancelling the existing subscription due to rule 2.5, though the publisher would either have to
* subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
subscription?.let {
value = null
seenValue = false
inTerminalState = false
it.cancel()
}
subscription = sub
cont.invokeOnCancellation { sub.cancel() }
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE)
}

override fun onNext(t: T) {
val sub = subscription.checkInitialized("onNext")
subscription.checkInitialized("onNext")
if (inTerminalState)
gotSignalInTerminalStateException("onNext")
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
seenValue = true
subscription.cancel()
cont.resume(t)
}
if (seenValue)
// TODO: decide if we want to be lenient here: after all, nothing breaks if this isn't true
moreThanOneValueProvidedException(mode)
seenValue = true
sub.cancel()
cont.resume(t)
}
Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
subscription.cancel()
sub.cancel()
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
} else {
Expand All @@ -169,6 +188,8 @@ private suspend fun <T> Publisher<T>.awaitOne(

@Suppress("UNCHECKED_CAST")
override fun onComplete() {
subscription.checkInitialized("onComplete") // TODO: maybe don't enforce?
enterTerminalState("onComplete")
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
return
Expand All @@ -184,8 +205,38 @@ private suspend fun <T> Publisher<T>.awaitOne(
}

override fun onError(e: Throwable) {
subscription.checkInitialized("onError") // TODO: maybe don't enforce?
enterTerminalState("onError")
cont.resumeWithException(e)
}

/**
* Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete].
*/
private fun enterTerminalState(signalName: String) {
if (inTerminalState)
gotSignalInTerminalStateException(signalName)
inTerminalState = true
}
})
}

/**
* Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal
* state was reached.
*/
private fun gotSignalInTerminalStateException(signalName: String): Nothing =
throw IllegalStateException(
"'$signalName' was called after the publisher already signalled being in a terminal state")

/**
* Enforce rule 1.1: it is invalid for a publisher to provide more values than requested.
*/
private fun moreThanOneValueProvidedException(mode: Mode): Nothing =
throw IllegalStateException("Only a single value were requested in $mode, but the publisher provided more")

/**
* Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals.
*/
private fun Subscription?.checkInitialized(signalName: String): Subscription =
this ?: throw IllegalStateException("'$signalName' was called before 'onSubscribe'")
69 changes: 69 additions & 0 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import org.reactivestreams.*
import java.lang.IllegalStateException
import java.lang.RuntimeException
import kotlin.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -130,6 +132,73 @@ class IntegrationTest(
finish(3)
}

/**
* Test the behavior of [awaitOne] on unconforming publishers.
*/
@Test
fun testAwaitOnNonconformingPublishers() = runTest {
fun<T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
Publisher<T> { subscriber ->
subscriber.onSubscribe(object: Subscription {
override fun request(n: Long) {
subscriber.block(n)
}

override fun cancel() {
}
})
}
suspend fun<T> assertDetectsBadPublisher(operation: suspend Publisher<T>.() -> T,
block: Subscriber<in T>.(n: Long) -> Unit) =
assertFailsWith<IllegalStateException> { publisher(block).operation() }

// Rule 1.1 broken: the publisher produces more values than requested.
assertDetectsBadPublisher<Int>({ awaitFirst() }) {
onNext(1)
onNext(2)
}

// Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
// Using awaitSingle to check that bad publishers have priority over the lack of a value.
assertDetectsBadPublisher<Int>({ awaitSingle() }) {
onError(RuntimeException(""))
onComplete()
}
assertDetectsBadPublisher<Int>({ awaitSingle() }) {
onComplete()
onError(RuntimeException(""))
}
assertDetectsBadPublisher<Int>({ awaitSingle() }) {
onComplete()
onComplete()
}
assertDetectsBadPublisher<Int>({ awaitSingle() }) {
onComplete()
onNext(3)
}
assertDetectsBadPublisher<Int>({ awaitSingle() }) {
onError(RuntimeException(""))
onNext(3)
}

// Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
assertFailsWith<IllegalStateException> {
Publisher<Int> { subscriber ->
subscriber.onNext(3)
}.awaitFirst()
}
assertFailsWith<IllegalStateException> {
Publisher<Int> { subscriber ->
subscriber.onComplete()
}.awaitFirst()
}
assertFailsWith<IllegalStateException> {
Publisher<Int> { subscriber ->
subscriber.onError(RuntimeException(""))
}.awaitFirst()
}
}

private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
var last = 0
pub.collect {
Expand Down

0 comments on commit af692aa

Please sign in to comment.