Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling awaitOne on nonconforming Publishers gives nondescriptive errors #2079

Closed
vemilyus opened this issue Jun 3, 2020 · 5 comments
Closed
Labels

Comments

@vemilyus
Copy link

vemilyus commented Jun 3, 2020

Hi, I've noticed that Publisher.awaitOne allows further calls to onComplete after receiving an onError call. This can potentially hide the exception that was supplied to onError and instead throws this exception:

java.lang.IllegalStateException: Already resumed, but proposed with update null
	at kotlinx.coroutines.CancellableContinuationImpl.alreadyResumedError(CancellableContinuationImpl.kt:335)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:330)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:250)
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onComplete(Await.kt:138)
	at org.jooq.impl.AbstractResultQuery$1.doComplete(AbstractResultQuery.java:415)
	at org.jooq.impl.AbstractResultQuery$1.request(AbstractResultQuery.java:409)
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onSubscribe(Await.kt:105)
	at org.jooq.impl.AbstractResultQuery.subscribe(AbstractResultQuery.java:381)
	at org.jooq.impl.SelectImpl.subscribe(SelectImpl.java:2716)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:97)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:95)
	at kotlinx.coroutines.reactive.AwaitKt.awaitFirstOrNull(Await.kt:46)
	[REDACTED]
	at kotlinx.coroutines.stream.StreamFlow.collect(Stream.kt:27)
	at kotlinx.coroutines.stream.StreamFlow$collect$1.invokeSuspend(Stream.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

The problem is that the call to onError doesn't mark the subscriber as done, so that no further calls to onComplete are accepted and processed, as is specified here (reactive-streams specification)

override fun onComplete() {
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
return
}
when {
mode == Mode.FIRST_OR_DEFAULT -> {
cont.resume(default as T)
}
cont.isActive -> {
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
}
}
}
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}

This is something that I noticed when using JOOQ and Publisher.awaitFirstOrNull() to get a result from the database. JOOQ calls onComplete at all times, even if it called onError before. I've also mentioned that in an issue for JOOQ: jOOQ/jOOQ#10245

An easy fix for this would be to set seenValue = true in onError or add an extra field which is then checked in onComplete.

@sdeleuze
Copy link
Contributor

👍 for such fix.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Mar 16, 2021

Hi @vemilyus!

Thanks for the report! It does look like this could be a useful change to make, but could you please clarify in which way the existing behavior diverges from the specifications? I looked through the spec and the only relevant point I found was 1.7:

Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.

By the definition of a signal, this means that a correct publisher must not call anything after onError and onComplete. Am I missing something here? It looks like the existing behavior simply relies on the guarantees given by the specification, and it's jOOQ that doesn't follow the spec.

@vemilyus
Copy link
Author

Hi @dkhalanskyjb!

Technically you are correct in your interpretation of the specification.

However, simply ignoring the current state and allowing an illegal call anyway doesn't seem the right approach to me. The way I interpret the specification is to explicitly prohibit such further signals after a terminal state. So rather than potentially creating an undefined state by allowing such signals I would throw an appropriate exception that indicates that no further signals were expected. This would also help others who are writing Publishers to implement correct behavior, if illegal calls were made explicitly apparent.

I think this is such a case where it would be helpful to follow the spirit of the specification, not just the letter of it. Please let me know if you have further thoughts on this.

@dkhalanskyjb
Copy link
Collaborator

We are going to do something, the question is what exactly. The initial comment states specifically that in order to conform to the spec, we have to ignore calls after a terminal state was reached. If this were true, then just throwing a more descriptive error would be plainly incorrect and we would need to ignore additional terminal calls, which is why I asked for clarifications.

So, are we in agreement here that our implementation is conforming in this regard, the error is on the jOOQ's side, and the best course of action for us will be to detect non-conforming publishers and throw a more informative error?

@vemilyus
Copy link
Author

Yes, the implementation is correct, but a more informative error would be much appreciated.

@dkhalanskyjb dkhalanskyjb changed the title Publisher.awaitOne doesn't properly follow the reactive-streams specification Calling awaitOne on nonconforming Publishers gives nondescriptive errors Mar 16, 2021
@qwwdfsad qwwdfsad removed their assignment Mar 16, 2021
dkhalanskyjb added a commit that referenced this issue Mar 16, 2021
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes #2079
dkhalanskyjb added a commit that referenced this issue Mar 16, 2021
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes #2079
dkhalanskyjb added a commit that referenced this issue Apr 6, 2021
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes #2079
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes Kotlin#2079
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants