Skip to content

Commit

Permalink
ensures that proper index is used during onNext check (#3614)
Browse files Browse the repository at this point in the history
* ensures that proper index is used during onNext check

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>

* adds proper index zeroing

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>

---------

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka authored Oct 9, 2023
1 parent a887af8 commit 759375e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,11 @@ public Object scanUnsafe(Attr key) {
@Override
public void onNext(final T value) {
int index;
boolean flush;
for(;;){
index = this.index + 1;
if(INDEX.compareAndSet(this, index - 1, index)){
flush = index % batchSize == 0;
if(INDEX.compareAndSet(this, index - 1, flush ? 0 : index)){
break;
}
}
Expand All @@ -715,8 +717,7 @@ public void onNext(final T value) {

nextCallback(value);

if (this.index % batchSize == 0) {
this.index = 0;
if (flush) {
if (timespanRegistration != null) {
timespanRegistration.dispose();
timespanRegistration = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -310,31 +312,54 @@ public void scanSubscriberCancelled() {

@Test
public void flushShouldNotRaceWithNext() {
Set<Integer> seen = new HashSet<>();
Consumer<List<Integer>> consumer = integers -> {
for (Integer i : integers) {
if (!seen.add(i)) {
throw new IllegalStateException("Duplicate! " + i);
for (int i = 0; i < 100; i++) {
AtomicInteger caller = new AtomicInteger();
AtomicBoolean stop = new AtomicBoolean();
Set<Integer> seen = new HashSet<>();
Consumer<List<Integer>> consumer = integers -> {
RuntimeException ex = new RuntimeException(integers.toString());
if (caller.getAndIncrement() == 0) {
for (Integer value : integers) {
if (!seen.add(value)) {
throw new IllegalStateException("Duplicate! " + value);
}
}

if (caller.decrementAndGet() != 0) {
stop.set(true);
throw ex;
}
}
else {
stop.set(true);
throw ex;
}
};
CoreSubscriber<List<Integer>> actual =
new LambdaSubscriber<>(consumer, null, null, null);

FluxBufferTimeout.BufferTimeoutSubscriber<Integer, List<Integer>> test =
new FluxBufferTimeout.BufferTimeoutSubscriber<Integer, List<Integer>>(
actual,
3,
1000,
TimeUnit.MILLISECONDS,
Schedulers.boundedElastic()
.createWorker(),
ArrayList::new);
test.onSubscribe(Operators.emptySubscription());

AtomicInteger counter = new AtomicInteger();
for (int j = 0; j < 500; j++) {
RaceTestUtils.race(() -> test.onNext(counter.getAndIncrement()), test.flushTask);
Assertions.assertThat(stop).isFalse();
}
};
CoreSubscriber<List<Integer>> actual = new LambdaSubscriber<>(consumer, null, null, null);

FluxBufferTimeout.BufferTimeoutSubscriber<Integer, List<Integer>> test = new FluxBufferTimeout.BufferTimeoutSubscriber<Integer, List<Integer>>(
actual, 3, 1000, TimeUnit.MILLISECONDS, Schedulers.boundedElastic().createWorker(), ArrayList::new);
test.onSubscribe(Operators.emptySubscription());

AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 500; i++) {
RaceTestUtils.race(
() -> test.onNext(counter.getAndIncrement()),
() -> test.flushCallback(null)
);
}

test.onComplete();
test.onComplete();

assertThat(seen.size()).isEqualTo(500);
assertThat(seen.size()).as(() -> seen.size() + " " + seen.toString())
.isEqualTo(500);
}
}

//see https://github.com/reactor/reactor-core/issues/1247
Expand Down

0 comments on commit 759375e

Please sign in to comment.