From 8ede9ebc9c8a24d1e0ba2d19c82338e8e2865310 Mon Sep 17 00:00:00 2001 From: OlegDokuka Date: Wed, 5 Apr 2023 17:13:47 +0300 Subject: [PATCH 1/2] ensures clear is called in all terminal cases this fix ensures that in case of async fusion, the queue.clear() method is called regardless it is cancellation or terminal (via onComplete/onError) Signed-off-by: Oleh Dokuka Signed-off-by: OlegDokuka --- .../reactor/netty/channel/MonoSendMany.java | 7 +- .../netty/channel/MonoSendManyTest.java | 85 ++++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java index 96453aa27e..23245303c6 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -351,6 +351,11 @@ public void run() { else { actual.onError(t); } + + if (sourceMode == ASYNC) { + // notify that consumption has been terminated + queue.clear(); + } return; } diff --git a/reactor-netty-core/src/test/java/reactor/netty/channel/MonoSendManyTest.java b/reactor-netty-core/src/test/java/reactor/netty/channel/MonoSendManyTest.java index 0a9cafe4fb..a4261ff549 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/channel/MonoSendManyTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/channel/MonoSendManyTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -32,14 +33,18 @@ import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import org.assertj.core.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.Exceptions; +import reactor.core.Fuseable; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; @@ -259,6 +264,84 @@ void shouldNotLeakIfFusedOnRacingCancelAndOnNext(boolean flushOnEach) { } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void shouldCallQueueClearToNotifyTermination(boolean flushOnEach) { + //use an extra handler + EmbeddedChannel channel = new EmbeddedChannel(true, true, new ChannelHandlerAdapter() {}); + AtomicBoolean cleared = new AtomicBoolean(); + + Sinks.Many source = Sinks.many().unicast().onBackpressureBuffer(); + MonoSendMany m = + MonoSendMany.byteBufSource(source.asFlux().transform(Operators.lift((__, + downstream) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + downstream.onSubscribe(new Fuseable.QueueSubscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public int size() { + return ((Fuseable.QueueSubscription) s).size(); + } + + @Override + public boolean isEmpty() { + return ((Fuseable.QueueSubscription) s).isEmpty(); + } + + @Override + public void clear() { + cleared.set(true); + ((Fuseable.QueueSubscription) s).clear(); + } + + @Override + public ByteBuf poll() { + return ((Fuseable.QueueSubscription) s).poll(); + } + + @Override + public int requestFusion(int requestedMode) { + return ((Fuseable.QueueSubscription) s).requestFusion(requestedMode); + } + }); + } + + @Override + public void onNext(ByteBuf buf) { + downstream.onNext(buf); + } + + @Override + public void onError(Throwable t) { + downstream.onError(t); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + })), channel, b -> flushOnEach); + m.subscribe(); + Queue messages = channel.outboundMessages(); + + source.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + + channel.flush(); + messages.forEach(ReferenceCountUtil::release); + Assertions.assertThat(cleared).isTrue(); + } + static void wait(WeakReference ref) { int duration = 5_000; int spins = duration / 100; From 85c4495225d7d569bc0993ebda0a259a1958caa1 Mon Sep 17 00:00:00 2001 From: OlegDokuka Date: Thu, 6 Apr 2023 20:33:31 +0300 Subject: [PATCH 2/2] adds comments Signed-off-by: Oleh Dokuka Signed-off-by: OlegDokuka --- .../src/main/java/reactor/netty/channel/MonoSendMany.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java b/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java index 23245303c6..c07120bc9f 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/MonoSendMany.java @@ -353,7 +353,13 @@ public void run() { } if (sourceMode == ASYNC) { - // notify that consumption has been terminated + // notify that queue draining is done and no more interactions are expected for here + // + // This is needed due to ASYNC fusion contract. + // This call notifies upstream that the interaction with the queue is done on the + // downstream side. + // Some upstreams implementations may have `onClose()` mechanism to send notification + // about its termination and finalization of the offered events consumption queue.clear(); } return;