From 807cbf02eeb94988165814b97599fbf1e75d7d8c Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 3 Jan 2024 15:24:05 +0000 Subject: [PATCH] Backport: Fix `onErrorDropped` logged message (#1281) Caused by using `doOnError` rather than using `subscribe` and passing the handler in there. JAVA-5284 JAVA-5266 --- .../client/internal/BatchCursorFlux.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java index 9e28af9236..90bbe9ed0a 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java @@ -87,29 +87,28 @@ private void recurseCursor(){ batchCursor.setBatchSize(calculateBatchSize(sink.requestedFromDownstream())); Mono.from(batchCursor.next(() -> sink.isCancelled())) .doOnCancel(this::closeCursor) - .doOnError((e) -> { - try { - closeCursor(); - } finally { - sink.error(e); - } - }) - .doOnSuccess(results -> { - if (!results.isEmpty()) { - results - .stream() - .filter(Objects::nonNull) - .forEach(sink::next); - calculateDemand(-results.size()); - } - if (batchCursor.isClosed()) { - sink.complete(); - } else { - inProgress.set(false); - recurseCursor(); - } - }) - .subscribe(); + .subscribe(results -> { + if (!results.isEmpty()) { + results + .stream() + .filter(Objects::nonNull) + .forEach(sink::next); + calculateDemand(-results.size()); + } + if (batchCursor.isClosed()) { + sink.complete(); + } else { + inProgress.set(false); + recurseCursor(); + } + }, + e -> { + try { + closeCursor(); + } finally { + sink.error(e); + } + }); } } }