diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index ca77bbf32802c..c9a29d5e15bc9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -239,7 +239,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del } final UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, timeout); - final Mono result = Mono.create(sink -> { + final Mono result = Mono.create(sink -> { workItem.start(sink); try { provider.getReactorDispatcher().invoke(() -> { @@ -250,7 +250,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", error, handler.getErrorContext(receiver))); } - }); + }).cache(); // cache because closeAsync use `when` to subscribe this Mono again. workItem.setMono(result);