diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java index 9b2508255a..dcc6f7228f 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferTimeout.java @@ -694,9 +694,11 @@ public Object scanUnsafe(Attr key) { @Override public void onNext(final T value) { int index; + boolean flush; for(;;){ index = this.index + 1; - if(INDEX.compareAndSet(this, index - 1, index)){ + flush = index % batchSize == 0; + if(INDEX.compareAndSet(this, index - 1, flush ? 0 : index)){ break; } } @@ -715,8 +717,7 @@ public void onNext(final T value) { nextCallback(value); - if (this.index % batchSize == 0) { - this.index = 0; + if (flush) { if (timespanRegistration != null) { timespanRegistration.dispose(); timespanRegistration = null; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index 9d703b4daa..e750a5df96 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -24,11 +24,13 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -310,31 +312,54 @@ public void scanSubscriberCancelled() { @Test public void flushShouldNotRaceWithNext() { - Set seen = new HashSet<>(); - Consumer> consumer = integers -> { - for (Integer i : integers) { - if (!seen.add(i)) { - throw new IllegalStateException("Duplicate! " + i); + for (int i = 0; i < 100; i++) { + AtomicInteger caller = new AtomicInteger(); + AtomicBoolean stop = new AtomicBoolean(); + Set seen = new HashSet<>(); + Consumer> consumer = integers -> { + RuntimeException ex = new RuntimeException(integers.toString()); + if (caller.getAndIncrement() == 0) { + for (Integer value : integers) { + if (!seen.add(value)) { + throw new IllegalStateException("Duplicate! " + value); + } + } + + if (caller.decrementAndGet() != 0) { + stop.set(true); + throw ex; + } } + else { + stop.set(true); + throw ex; + } + }; + CoreSubscriber> actual = + new LambdaSubscriber<>(consumer, null, null, null); + + FluxBufferTimeout.BufferTimeoutSubscriber> test = + new FluxBufferTimeout.BufferTimeoutSubscriber>( + actual, + 3, + 1000, + TimeUnit.MILLISECONDS, + Schedulers.boundedElastic() + .createWorker(), + ArrayList::new); + test.onSubscribe(Operators.emptySubscription()); + + AtomicInteger counter = new AtomicInteger(); + for (int j = 0; j < 500; j++) { + RaceTestUtils.race(() -> test.onNext(counter.getAndIncrement()), test.flushTask); + Assertions.assertThat(stop).isFalse(); } - }; - CoreSubscriber> actual = new LambdaSubscriber<>(consumer, null, null, null); - - FluxBufferTimeout.BufferTimeoutSubscriber> test = new FluxBufferTimeout.BufferTimeoutSubscriber>( - actual, 3, 1000, TimeUnit.MILLISECONDS, Schedulers.boundedElastic().createWorker(), ArrayList::new); - test.onSubscribe(Operators.emptySubscription()); - - AtomicInteger counter = new AtomicInteger(); - for (int i = 0; i < 500; i++) { - RaceTestUtils.race( - () -> test.onNext(counter.getAndIncrement()), - () -> test.flushCallback(null) - ); - } - test.onComplete(); + test.onComplete(); - assertThat(seen.size()).isEqualTo(500); + assertThat(seen.size()).as(() -> seen.size() + " " + seen.toString()) + .isEqualTo(500); + } } //see https://github.com/reactor/reactor-core/issues/1247