From a1a7f9aa9313430033cc4cd0da6f1f091df864a1 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 15 Jun 2021 23:52:15 -0700 Subject: [PATCH 1/4] Cache UpdateDispositionWorkItem Mono --- .../implementation/ServiceBusReactorReceiver.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index ca77bbf32802c..f7b8e6eca00b9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -250,7 +250,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", error, handler.getErrorContext(receiver))); } - }); + }).cache().then(Mono.empty()); // cache because closeAsync use `when` to subscribe this Mono again. workItem.setMono(result); @@ -362,7 +362,7 @@ private void cleanupWorkItems() { }); } - private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) { + private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) { final boolean isSettled = delivery != null && delivery.remotelySettled(); if (isSettled) { delivery.settle(); @@ -392,7 +392,7 @@ private static final class UpdateDispositionWorkItem { private Mono mono; private Instant expirationTime; - private MonoSink sink; + private MonoSink sink; private Throwable throwable; private UpdateDispositionWorkItem(String lockToken, DeliveryState state, Duration timeout) { @@ -429,11 +429,11 @@ private Mono getMono() { return mono; } - private MonoSink getSink() { + private MonoSink getSink() { return sink; } - private void start(MonoSink sink) { + private void start(MonoSink sink) { Objects.requireNonNull(sink, "'sink' cannot be null."); this.sink = sink; this.sink.onDispose(() -> isDisposed.set(true)); From 626bd69452f0023f212579b9a3ecaf07d7c690eb Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 21 Jun 2021 21:37:21 -0700 Subject: [PATCH 2/4] Remove unnecessary Mono.empty() in then() --- .../servicebus/implementation/ServiceBusReactorReceiver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index f7b8e6eca00b9..2b17ecd75b291 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -250,7 +250,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", error, handler.getErrorContext(receiver))); } - }).cache().then(Mono.empty()); // cache because closeAsync use `when` to subscribe this Mono again. + }).cache().then(); // cache because closeAsync use `when` to subscribe this Mono again. workItem.setMono(result); From cde33ffa59dd00abe61e5dd0ff2ce35cdf104ce1 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 22 Jun 2021 12:54:42 -0700 Subject: [PATCH 3/4] Use Mono instead of Mono --- .../implementation/ServiceBusReactorReceiver.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index 2b17ecd75b291..5d1c47fa63d5a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -239,7 +239,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del } final UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, timeout); - final Mono result = Mono.create(sink -> { + final Mono result = Mono.create(sink -> { workItem.start(sink); try { provider.getReactorDispatcher().invoke(() -> { @@ -362,7 +362,7 @@ private void cleanupWorkItems() { }); } - private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) { + private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) { final boolean isSettled = delivery != null && delivery.remotelySettled(); if (isSettled) { delivery.settle(); @@ -392,7 +392,7 @@ private static final class UpdateDispositionWorkItem { private Mono mono; private Instant expirationTime; - private MonoSink sink; + private MonoSink sink; private Throwable throwable; private UpdateDispositionWorkItem(String lockToken, DeliveryState state, Duration timeout) { @@ -429,11 +429,11 @@ private Mono getMono() { return mono; } - private MonoSink getSink() { + private MonoSink getSink() { return sink; } - private void start(MonoSink sink) { + private void start(MonoSink sink) { Objects.requireNonNull(sink, "'sink' cannot be null."); this.sink = sink; this.sink.onDispose(() -> isDisposed.set(true)); From c8cc502605b5a87d5b230a4bb4946ed2e13af70a Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 22 Jun 2021 13:00:35 -0700 Subject: [PATCH 4/4] Remove .then() --- .../servicebus/implementation/ServiceBusReactorReceiver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index 5d1c47fa63d5a..c9a29d5e15bc9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -250,7 +250,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", error, handler.getErrorContext(receiver))); } - }).cache().then(); // cache because closeAsync use `when` to subscribe this Mono again. + }).cache(); // cache because closeAsync use `when` to subscribe this Mono again. workItem.setMono(result);