From e0e378bc3739fa78cbde2aa4b74b67c8b90c2a22 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Mon, 30 Sep 2024 15:48:09 +0200 Subject: [PATCH] Fix forwarding null when CancellableMonoSink is cancelled (#11221) Might fix #11209, will see. --- .../client/netty/CancellableMonoSink.java | 4 +- .../netty/CancellableMonoSinkSpec.groovy | 41 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 http-client/src/test/groovy/io/micronaut/http/client/netty/CancellableMonoSinkSpec.groovy diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/CancellableMonoSink.java b/http-client/src/main/java/io/micronaut/http/client/netty/CancellableMonoSink.java index c86796a3f19..b7cbeda3f05 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/CancellableMonoSink.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/CancellableMonoSink.java @@ -44,6 +44,7 @@ final class CancellableMonoSink implements Publisher, Sinks.One, Subscr private T value; private Throwable failure; private boolean complete = false; + private boolean cancelled = false; private Subscriber subscriber = null; private boolean subscriberWaiting = false; @@ -72,7 +73,7 @@ public void subscribe(Subscriber s) { } private void tryForward() { - if (subscriberWaiting && complete) { + if (subscriberWaiting && complete && !cancelled) { if (failure == null) { if (value != EMPTY) { subscriber.onNext(value); @@ -181,6 +182,7 @@ public void cancel() { lock.lock(); try { complete = true; + cancelled = true; } finally { lock.unlock(); } diff --git a/http-client/src/test/groovy/io/micronaut/http/client/netty/CancellableMonoSinkSpec.groovy b/http-client/src/test/groovy/io/micronaut/http/client/netty/CancellableMonoSinkSpec.groovy new file mode 100644 index 00000000000..b15f36e27f2 --- /dev/null +++ b/http-client/src/test/groovy/io/micronaut/http/client/netty/CancellableMonoSinkSpec.groovy @@ -0,0 +1,41 @@ +package io.micronaut.http.client.netty + +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import reactor.core.publisher.Mono +import spock.lang.Specification + +class CancellableMonoSinkSpec extends Specification { + def "cancel before request"() { + given: + def sink = new CancellableMonoSink(null) + def result = "unset" + Subscription subscription = null + sink.subscribe(new Subscriber() { + @Override + void onSubscribe(Subscription s) { + subscription = s + } + + @Override + void onNext(String s) { + result = s + } + + @Override + void onError(Throwable t) { + } + + @Override + void onComplete() { + } + }) + + when: + sink.cancel() + subscription.request(1) + + then: + result == "unset" + } +}