Skip to content

Commit

Permalink
Fix forwarding null when CancellableMonoSink is cancelled (#11221)
Browse files Browse the repository at this point in the history
Might fix #11209, will see.
  • Loading branch information
yawkat authored Sep 30, 2024
1 parent d78a88c commit e0e378b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ final class CancellableMonoSink<T> implements Publisher<T>, Sinks.One<T>, Subscr
private T value;
private Throwable failure;
private boolean complete = false;
private boolean cancelled = false;
private Subscriber<? super T> subscriber = null;
private boolean subscriberWaiting = false;

Expand Down Expand Up @@ -72,7 +73,7 @@ public void subscribe(Subscriber<? super T> s) {
}

private void tryForward() {
if (subscriberWaiting && complete) {
if (subscriberWaiting && complete && !cancelled) {
if (failure == null) {
if (value != EMPTY) {
subscriber.onNext(value);
Expand Down Expand Up @@ -181,6 +182,7 @@ public void cancel() {
lock.lock();
try {
complete = true;
cancelled = true;
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.micronaut.http.client.netty

import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Mono
import spock.lang.Specification

class CancellableMonoSinkSpec extends Specification {
def "cancel before request"() {
given:
def sink = new CancellableMonoSink<String>(null)
def result = "unset"
Subscription subscription = null
sink.subscribe(new Subscriber<String>() {
@Override
void onSubscribe(Subscription s) {
subscription = s
}

@Override
void onNext(String s) {
result = s
}

@Override
void onError(Throwable t) {
}

@Override
void onComplete() {
}
})

when:
sink.cancel()
subscription.request(1)

then:
result == "unset"
}
}

0 comments on commit e0e378b

Please sign in to comment.