Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3821] Support Set for exact buffers in Flux.buffer #3822

Merged
merged 11 commits into from
Aug 22, 2024
19 changes: 19 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,9 @@ public final Flux<List<T>> buffer(int maxSize) {
* will be emitted by the returned {@link Flux} each time the given max size is reached
* or once this Flux completes.
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
chemicL marked this conversation as resolved.
Show resolved Hide resolved
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal,
Expand Down Expand Up @@ -3123,6 +3126,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3163,6 +3170,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false as well.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3230,6 +3241,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime,
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one is also false. Discard support is not present in bufferTimeout for buffer.add returning false.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand All @@ -3254,6 +3269,10 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also false here.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,11 +153,14 @@ public void onNext(T t) {
buffer = b;
}

b.add(t);

if (b.size() == size) {
buffer = null;
actual.onNext(b);
if (b.add(t)) {
if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
} else {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,17 +19,24 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.CsvSource;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -661,4 +668,33 @@ public void discardOnErrorOverlap() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2;1, ",
"1|2|1|3, 1|2;1|3, ",
"1|1|2|3, 1|2;3, 1",
"2|1|1|3, 2|1;1|3, "
})
public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -421,4 +422,14 @@ public void discardOnError() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void bufferSupplierUsesSet() {
Flux.just(1, 1, 1, 1, 1, 1, 1)
.<Set<Object>>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new)
.as(it -> StepVerifier.create(it, 3))
.expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it of little practical use in the end -> the result is that a buffer is emitted every time maxSize items are consumed from the source, not when the buffer size is at the maxSize limit.

.expectComplete()
.verify(Duration.ofSeconds(2));
}
}