From 089adba63d79e295c0c1a7cdf67f2094f3d7e146 Mon Sep 17 00:00:00 2001 From: spierce Date: Sat, 1 Jun 2024 11:00:41 -0500 Subject: [PATCH 01/11] [#3821] Fix `FluxBuffer` to request 1 when buffer is not modified --- .../java/reactor/core/publisher/FluxBuffer.java | 6 +++--- .../reactor/core/publisher/FluxBufferTest.java | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index e3dd4695ca..c03414af0e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -153,9 +153,9 @@ public void onNext(T t) { buffer = b; } - b.add(t); - - if (b.size() == size) { + if (!b.add(t)) { + s.request(1); + } else if (b.size() == size) { buffer = null; actual.onNext(b); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 54617c2673..d1a3a936c5 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -19,9 +19,13 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -409,6 +413,17 @@ public void supplierThrows() { .assertNotComplete(); } + @Test + public void supplierUsesSet() { + Flux.just(1, 1, 2) + .>buffer(2, HashSet::new) + .take(1, true) + .as(StepVerifier::create) + .expectNext(Stream.of(1, 2).collect(Collectors.toSet())) + .expectComplete() + .verify(Duration.ofSeconds(2)); + } + @Test public void bufferWillSubdivideAnInputFlux() { Flux numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8); From d601552f98290aafdddd3056997ea319d4a73aa5 Mon Sep 17 00:00:00 2001 From: spierce Date: Wed, 19 Jun 2024 13:06:44 -0500 Subject: [PATCH 02/11] [#3821] Fully handle discard when buffer is unmodified --- .../src/main/java/reactor/core/publisher/FluxBuffer.java | 1 + .../src/test/java/reactor/core/publisher/FluxBufferTest.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index c03414af0e..4c548b7386 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -154,6 +154,7 @@ public void onNext(T t) { } if (!b.add(t)) { + Operators.onDiscard(t, actual.currentContext()); s.request(1); } else if (b.size() == size) { buffer = null; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index d1a3a936c5..0d4ef7e74c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -421,7 +421,8 @@ public void supplierUsesSet() { .as(StepVerifier::create) .expectNext(Stream.of(1, 2).collect(Collectors.toSet())) .expectComplete() - .verify(Duration.ofSeconds(2)); + .verifyThenAssertThat(Duration.ofSeconds(2)) + .hasDiscardedExactly(1); } @Test From b464df391b3f8e6be52ae2fdee4c743d93f94d0b Mon Sep 17 00:00:00 2001 From: spierce Date: Sat, 22 Jun 2024 11:20:45 -0500 Subject: [PATCH 03/11] [#3821] Update behavior of FluxBufferSkip and FluxBufferOverlap - Make `Collection` behavior consistent among all FluxBuffer* operators - Added several more tests for all FluxBuffer* operators covering usage of `Set` --- .../reactor/core/publisher/FluxBuffer.java | 35 +++--- .../core/publisher/FluxBufferTest.java | 107 +++++++++++++++--- 2 files changed, 115 insertions(+), 27 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index 4c548b7386..5c47065ff9 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -307,8 +307,11 @@ public void onNext(T t) { } if (b != null) { - b.add(t); - if (b.size() == size) { + if (!b.add(t)) { + Operators.onDiscard(t, this.ctx); + s.request(1); + return; + } else if (b.size() == size) { buffer = null; actual.onNext(b); } @@ -478,6 +481,19 @@ public void onNext(T t) { return; } + C b0 = peek(); + + for (C b : this) { + // It should never be the case that an element can be added to the first open + // buffer and not all of them. Otherwise, the buffer behavior is non-deterministic, + // and this operator's behavior is undefined. + if (!b.add(t) && b == b0) { + Operators.onDiscard(t, actual.currentContext()); + s.request(1); + return; + } + } + long i = index; if (i % skip == 0L) { @@ -494,25 +510,16 @@ public void onNext(T t) { return; } + b.add(t); offer(b); } - C b = peek(); - - if (b != null && b.size() + 1 == size) { + if (b0 != null && b0.size() == size) { poll(); - - b.add(t); - - actual.onNext(b); - + actual.onNext(b0); produced++; } - for (C b0 : this) { - b0.add(t); - } - index = i + 1; } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 0d4ef7e74c..53dea65657 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -19,21 +19,24 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.CsvSource; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.subscriber.AssertSubscriber; +import reactor.util.annotation.Nullable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -413,18 +416,6 @@ public void supplierThrows() { .assertNotComplete(); } - @Test - public void supplierUsesSet() { - Flux.just(1, 1, 2) - .>buffer(2, HashSet::new) - .take(1, true) - .as(StepVerifier::create) - .expectNext(Stream.of(1, 2).collect(Collectors.toSet())) - .expectComplete() - .verifyThenAssertThat(Duration.ofSeconds(2)) - .hasDiscardedExactly(1); - } - @Test public void bufferWillSubdivideAnInputFlux() { Flux numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8); @@ -677,4 +668,94 @@ public void discardOnErrorOverlap() { .verifyThenAssertThat() .hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer } + + @ParameterizedTestWithName + @CsvSource({ + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2;1, ", + "1|2|1|3, 1|2;1|3, ", + "1|1|2|3, 1|2;3, 1", + "2|1|1|3, 2|1;1|3, " + }) + public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) { + List> outputs = Arrays.stream(output.split(";")) + .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) + .collect(Collectors.toList()); + + StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) + .>buffer(2, HashSet::new) + .as(it -> StepVerifier.create(it, outputs.size())) + .expectNextSequence(outputs) + .expectComplete() + .verifyThenAssertThat(Duration.ofSeconds(2)); + + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } + } + + @ParameterizedTestWithName + @CsvSource({ + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2, ", + "1|2|1|3, 1|2;3, 1", + "1|2|1|1|3, 1|2;1|3, 1", + "1|1|2|3, 1|2, 1", + "2|1|1|3, 2|1;3, 1" + }) + public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @Nullable String discard) { + List> outputs = Arrays.stream(output.split(";")) + .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) + .collect(Collectors.toList()); + + StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) + .>buffer(2, 3, HashSet::new) + .as(it -> StepVerifier.create(it, outputs.size())) + .expectNextSequence(outputs) + .thenCancel() + .verifyThenAssertThat(Duration.ofSeconds(2)); + + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } + } + + @ParameterizedTestWithName + @CsvSource({ + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2, 1", + "1|2|1|3, 1|2|3;3, 1", + "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", + "1|1|2|3, 1|2|3;3, 1", + "2|1|1|3, 2|1|3;3, 1", + "1|2|1|2|3, 1|2|3;3, 1|2" + }) + public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) { + List> outputs = Arrays.stream(output.split(";")) + .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) + .collect(Collectors.toList()); + + StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) + .>buffer(4, 2, HashSet::new) + .as(it -> StepVerifier.create(it, outputs.size())) + .expectNextSequence(outputs) + .expectComplete() + .verifyThenAssertThat(Duration.ofSeconds(2)); + + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } + } } From 9d69ae1fc602b2e6cccdce8f1165b3ced96bac8d Mon Sep 17 00:00:00 2001 From: spierce Date: Sun, 23 Jun 2024 21:49:40 -0500 Subject: [PATCH 04/11] [#3821] Add test to `bufferTimeout` to verify `Set` bufferSupplier --- .../core/publisher/FluxBufferTest.java | 32 +++++++++---------- .../core/publisher/FluxBufferTimeoutTest.java | 11 +++++++ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 53dea65657..818ba207a4 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -692,8 +692,8 @@ public void bufferExactSupplierUsesSet(String input, String output, @Nullable St .verifyThenAssertThat(Duration.ofSeconds(2)); if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { + assertions.hasNotDiscardedElements(); + } else { assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); } } @@ -716,16 +716,16 @@ public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) .>buffer(2, 3, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) + .as(it -> StepVerifier.create(it, outputs.size())) .expectNextSequence(outputs) - .thenCancel() + .thenCancel() .verifyThenAssertThat(Duration.ofSeconds(2)); - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } } @ParameterizedTestWithName @@ -738,7 +738,7 @@ public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", "1|1|2|3, 1|2|3;3, 1", "2|1|1|3, 2|1|3;3, 1", - "1|2|1|2|3, 1|2|3;3, 1|2" + "1|2|1|2|3, 1|2|3;3, 1|2" }) public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) { List> outputs = Arrays.stream(output.split(";")) @@ -747,15 +747,15 @@ public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String outpu StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) .>buffer(4, 2, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) + .as(it -> StepVerifier.create(it, outputs.size())) .expectNextSequence(outputs) .expectComplete() .verifyThenAssertThat(Duration.ofSeconds(2)); - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index e750a5df96..3cc9006e6c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -421,4 +422,14 @@ public void discardOnError() { .verifyThenAssertThat() .hasDiscardedExactly(1, 2, 3); } + + @Test + public void bufferSupplierUsesSet() { + Flux.just(1, 1, 1, 1, 1, 1, 1) + .>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new) + .as(it -> StepVerifier.create(it, 3)) + .expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1)) + .expectComplete() + .verify(Duration.ofSeconds(2)); + } } From aceebef3a598f068a521a1f42a8b953f5f0b345b Mon Sep 17 00:00:00 2001 From: spierce Date: Mon, 24 Jun 2024 12:50:20 -0500 Subject: [PATCH 05/11] [#3821] Add note about Collection::add to `bufferTimeout` operators that take a `bufferSupplier` --- .../java/reactor/core/publisher/Flux.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index d28f3fccce..0f67a3bf3a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -2882,6 +2882,9 @@ public final Flux> buffer(int maxSize) { * will be emitted by the returned {@link Flux} each time the given max size is reached * or once this Flux completes. *

+ * Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation + * of {@link Collection#add(Object)} for a given element, that element will be discarded. + *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, @@ -2945,6 +2948,9 @@ public final Flux> buffer(int maxSize, int skip) { *

* When maxSize == skip : exact buffers *

+ * Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation + * of {@link Collection#add(Object)} for a given element, that element will be discarded. + *

* * *

Discard Support: This operator discards elements in between buffers (in the case of @@ -3123,6 +3129,10 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime) { * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses. *

+ * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation + * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is + * less than the specified max size. + *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3163,6 +3173,10 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, Schedule * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

+ * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation + * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is + * less than the specified max size. + *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3230,6 +3244,10 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses. *

+ * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation + * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is + * less than the specified max size. + *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3254,6 +3272,10 @@ public final > Flux bufferTimeout(int maxSiz * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

+ * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation + * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is + * less than the specified max size. + *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. From fe59d5da5f8a1c7a3e3ad1b89b10eb6854926ac8 Mon Sep 17 00:00:00 2001 From: spierce Date: Sat, 20 Jul 2024 12:49:48 -0500 Subject: [PATCH 06/11] [#3821] PR feedback --- .../main/java/reactor/core/publisher/Flux.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 0f67a3bf3a..bd4a762b0e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -2903,8 +2903,7 @@ public final > Flux buffer(int maxSize, Suppl /** * Collect incoming values into multiple {@link List} buffers that will be emitted * by the returned {@link Flux} each time the given max size is reached or once this - * Flux completes. Buffers can be created with gaps, as a new buffer will be created - * every time {@code skip} values have been emitted by the source. + * Flux completes. *

* When maxSize < skip : dropping buffers *

@@ -2918,6 +2917,13 @@ public final > Flux buffer(int maxSize, Suppl *

* * + *

There are nuances to consider if a supplied buffer may return {@code false} when elements + * are added. For "dropping" buffers, if an item is not added to an in-flight buffer, it is + * discarded, and therefore a buffer may not be created every {@code skip} elements. For + * "overlapping" buffers, an item will be discarded if it cannot be added to any + * in-flight buffer. For "exact" buffers, an item will be discarded if cannot be added to the + * in-flight buffer. + * *

Discard Support: This operator discards elements in between buffers (in the case of * dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. * Note however that overlapping buffer variant DOES NOT discard, as this might result in an element @@ -3131,7 +3137,7 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime) { *

* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. + * less than the specified max size. The element will be discarded in such a case. *

* * @@ -3175,7 +3181,7 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, Schedule *

* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. + * less than the specified max size. The element will be discarded in such a case. *

* * @@ -3246,7 +3252,7 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, *

* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. + * less than the specified max size. The element will be discarded in such a case. *

* * @@ -3274,7 +3280,7 @@ public final > Flux bufferTimeout(int maxSiz *

* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. + * less than the specified max size. The element will be discarded in such a case. *

* * From 1e44a216f8e7d04c266d3997f8842c1f9f6370db Mon Sep 17 00:00:00 2001 From: spierce Date: Sat, 20 Jul 2024 13:39:56 -0500 Subject: [PATCH 07/11] [#3821] Spotless --- .../src/main/java/reactor/core/publisher/FluxBuffer.java | 2 +- .../src/test/java/reactor/core/publisher/FluxBufferTest.java | 2 +- .../test/java/reactor/core/publisher/FluxBufferTimeoutTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index 5c47065ff9..2f9692fb4c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 818ba207a4..7c24f9d371 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index 3cc9006e6c..5b58ac06f3 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From ce82b3324fdeadc13b90af3418aa1d18f3872dd2 Mon Sep 17 00:00:00 2001 From: spierce Date: Sat, 20 Jul 2024 13:40:30 -0500 Subject: [PATCH 08/11] [#3821] Fix BufferOverlap to only discard if not added to _any_ in-flight buffer --- .../reactor/core/publisher/FluxBuffer.java | 31 ++++++++++--------- .../core/publisher/FluxBufferTest.java | 20 ++++++------ 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index 2f9692fb4c..e59a7ced8c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -481,17 +481,15 @@ public void onNext(T t) { return; } - C b0 = peek(); - + boolean added = isEmpty(); for (C b : this) { - // It should never be the case that an element can be added to the first open - // buffer and not all of them. Otherwise, the buffer behavior is non-deterministic, - // and this operator's behavior is undefined. - if (!b.add(t) && b == b0) { - Operators.onDiscard(t, actual.currentContext()); - s.request(1); - return; - } + added |= b.add(t); + } + + if (!added) { + Operators.onDiscard(t, actual.currentContext()); + s.request(1); + return; } long i = index; @@ -514,10 +512,15 @@ public void onNext(T t) { offer(b); } - if (b0 != null && b0.size() == size) { - poll(); - actual.onNext(b0); - produced++; + for (C b : this) { + if (b.size() == size) { + poll(); + actual.onNext(b); + produced++; + } else { + // Safe to break as soon as we find a buffer that's not yet at size + break; + } } index = i + 1; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 7c24f9d371..0601092bd6 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -730,15 +730,17 @@ public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @ParameterizedTestWithName @CsvSource({ - "1|2, 1|2, ", - "1|1|1, 1, 1|1", - "1|1|2, 1|2, 1", - "1|2|1, 1|2, 1", - "1|2|1|3, 1|2|3;3, 1", - "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", - "1|1|2|3, 1|2|3;3, 1", - "2|1|1|3, 2|1|3;3, 1", - "1|2|1|2|3, 1|2|3;3, 1|2" + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2, 1", + "1|2|1|3, 1|2|3;3, 1", + "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", + "1|2|3|1|3|4, 1|2|3|4;3|1|4;4, 3", + "3|2|1|2|3|2|3|4, 3|2|1|4;1|2|3|4;3|2|4;4, 3", + "1|1|2|3, 1|2|3;3, 1", + "2|1|1|3, 2|1|3;3, 1", + "1|2|1|2|3, 1|2|3;3, 1|2" }) public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) { List> outputs = Arrays.stream(output.split(";")) From d7b4d04c9c194de43edc59b64d68f4d6d9c1a4e6 Mon Sep 17 00:00:00 2001 From: spierce Date: Mon, 5 Aug 2024 15:24:12 -0500 Subject: [PATCH 09/11] [#3821] PR feedback --- .../java/reactor/core/publisher/FluxBuffer.java | 10 ++++++---- .../reactor/core/publisher/FluxBufferTest.java | 16 ++++++++-------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index e59a7ced8c..4aca02ec7d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -153,12 +153,14 @@ public void onNext(T t) { buffer = b; } - if (!b.add(t)) { + if (b.add(t)) { + if (b.size() == size) { + buffer = null; + actual.onNext(b); + } + } else { Operators.onDiscard(t, actual.currentContext()); s.request(1); - } else if (b.size() == size) { - buffer = null; - actual.onNext(b); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 0601092bd6..cf0f15c335 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -700,14 +700,14 @@ public void bufferExactSupplierUsesSet(String input, String output, @Nullable St @ParameterizedTestWithName @CsvSource({ - "1|2, 1|2, ", - "1|1|1, 1, 1|1", - "1|1|2, 1|2, 1", - "1|2|1, 1|2, ", - "1|2|1|3, 1|2;3, 1", - "1|2|1|1|3, 1|2;1|3, 1", - "1|1|2|3, 1|2, 1", - "2|1|1|3, 2|1;3, 1" + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2, ", + "1|2|1|3, 1|2;3, 1", + "1|2|1|1|3, 1|2;1|3, 1", + "1|1|2|3|3, 1|2;3, 1|3", + "2|1|1|3, 2|1;3, 1" }) public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @Nullable String discard) { List> outputs = Arrays.stream(output.split(";")) From e50b51a34f2a9982586c6bba645c3fc06c66eaaf Mon Sep 17 00:00:00 2001 From: spierce Date: Mon, 5 Aug 2024 15:57:47 -0500 Subject: [PATCH 10/11] [#3821] Revert changes to overlap and skip buffer ops --- .../java/reactor/core/publisher/Flux.java | 13 +--- .../reactor/core/publisher/FluxBuffer.java | 42 +++++-------- .../core/publisher/FluxBufferTest.java | 63 ------------------- 3 files changed, 18 insertions(+), 100 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index bd4a762b0e..b1e0455790 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -2903,7 +2903,8 @@ public final > Flux buffer(int maxSize, Suppl /** * Collect incoming values into multiple {@link List} buffers that will be emitted * by the returned {@link Flux} each time the given max size is reached or once this - * Flux completes. + * Flux completes. Buffers can be created with gaps, as a new buffer will be created + * every time {@code skip} values have been emitted by the source. *

* When maxSize < skip : dropping buffers *

@@ -2917,13 +2918,6 @@ public final > Flux buffer(int maxSize, Suppl *

* * - *

There are nuances to consider if a supplied buffer may return {@code false} when elements - * are added. For "dropping" buffers, if an item is not added to an in-flight buffer, it is - * discarded, and therefore a buffer may not be created every {@code skip} elements. For - * "overlapping" buffers, an item will be discarded if it cannot be added to any - * in-flight buffer. For "exact" buffers, an item will be discarded if cannot be added to the - * in-flight buffer. - * *

Discard Support: This operator discards elements in between buffers (in the case of * dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. * Note however that overlapping buffer variant DOES NOT discard, as this might result in an element @@ -2954,9 +2948,6 @@ public final Flux> buffer(int maxSize, int skip) { *

* When maxSize == skip : exact buffers *

- * Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation - * of {@link Collection#add(Object)} for a given element, that element will be discarded. - *

* * *

Discard Support: This operator discards elements in between buffers (in the case of diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index 4aca02ec7d..a979f85d3e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -309,11 +309,8 @@ public void onNext(T t) { } if (b != null) { - if (!b.add(t)) { - Operators.onDiscard(t, this.ctx); - s.request(1); - return; - } else if (b.size() == size) { + b.add(t); + if (b.size() == size) { buffer = null; actual.onNext(b); } @@ -483,17 +480,6 @@ public void onNext(T t) { return; } - boolean added = isEmpty(); - for (C b : this) { - added |= b.add(t); - } - - if (!added) { - Operators.onDiscard(t, actual.currentContext()); - s.request(1); - return; - } - long i = index; if (i % skip == 0L) { @@ -510,19 +496,23 @@ public void onNext(T t) { return; } - b.add(t); offer(b); } - for (C b : this) { - if (b.size() == size) { - poll(); - actual.onNext(b); - produced++; - } else { - // Safe to break as soon as we find a buffer that's not yet at size - break; - } + C b = peek(); + + if (b != null && b.size() + 1 == size) { + poll(); + + b.add(t); + + actual.onNext(b); + + produced++; + } + + for (C b0 : this) { + b0.add(t); } index = i + 1; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index cf0f15c335..0e02134af1 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -697,67 +697,4 @@ public void bufferExactSupplierUsesSet(String input, String output, @Nullable St assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); } } - - @ParameterizedTestWithName - @CsvSource({ - "1|2, 1|2, ", - "1|1|1, 1, 1|1", - "1|1|2, 1|2, 1", - "1|2|1, 1|2, ", - "1|2|1|3, 1|2;3, 1", - "1|2|1|1|3, 1|2;1|3, 1", - "1|1|2|3|3, 1|2;3, 1|3", - "2|1|1|3, 2|1;3, 1" - }) - public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @Nullable String discard) { - List> outputs = Arrays.stream(output.split(";")) - .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) - .collect(Collectors.toList()); - - StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) - .>buffer(2, 3, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) - .expectNextSequence(outputs) - .thenCancel() - .verifyThenAssertThat(Duration.ofSeconds(2)); - - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } - } - - @ParameterizedTestWithName - @CsvSource({ - "1|2, 1|2, ", - "1|1|1, 1, 1|1", - "1|1|2, 1|2, 1", - "1|2|1, 1|2, 1", - "1|2|1|3, 1|2|3;3, 1", - "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", - "1|2|3|1|3|4, 1|2|3|4;3|1|4;4, 3", - "3|2|1|2|3|2|3|4, 3|2|1|4;1|2|3|4;3|2|4;4, 3", - "1|1|2|3, 1|2|3;3, 1", - "2|1|1|3, 2|1|3;3, 1", - "1|2|1|2|3, 1|2|3;3, 1|2" - }) - public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) { - List> outputs = Arrays.stream(output.split(";")) - .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) - .collect(Collectors.toList()); - - StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) - .>buffer(4, 2, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) - .expectNextSequence(outputs) - .expectComplete() - .verifyThenAssertThat(Duration.ofSeconds(2)); - - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } - } } From aac0e9e793a51546304d79722525eb2985c9d084 Mon Sep 17 00:00:00 2001 From: spierce Date: Tue, 20 Aug 2024 15:57:18 -0500 Subject: [PATCH 11/11] [#3821] PR feedback --- .../java/reactor/core/publisher/Flux.java | 24 +++++-------------- .../core/publisher/FluxBufferTimeoutTest.java | 13 +--------- 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index b1e0455790..d9c8d3da07 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -2882,10 +2882,10 @@ public final Flux> buffer(int maxSize) { * will be emitted by the returned {@link Flux} each time the given max size is reached * or once this Flux completes. *

+ * + *

* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation * of {@link Collection#add(Object)} for a given element, that element will be discarded. - *

- * * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, * as well as latest unbuffered element if the bufferSupplier fails. @@ -2950,6 +2950,10 @@ public final Flux> buffer(int maxSize, int skip) { *

* * + *

+ * Note for exact buffers: If buffers provided by the bufferSupplier return {@literal false} upon invocation + * of {@link Collection#add(Object)} for a given element, that element will be discarded. + * *

Discard Support: This operator discards elements in between buffers (in the case of * dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. * Note however that overlapping buffer variant DOES NOT discard, as this might result in an element @@ -3126,10 +3130,6 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime) { * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses. *

- * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation - * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. The element will be discarded in such a case. - *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3170,10 +3170,6 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, Schedule * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

- * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation - * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. The element will be discarded in such a case. - *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3241,10 +3237,6 @@ public final Flux> bufferTimeout(int maxSize, Duration maxTime, * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses. *

- * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation - * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. The element will be discarded in such a case. - *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. @@ -3269,10 +3261,6 @@ public final > Flux bufferTimeout(int maxSiz * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum * size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. *

- * Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation - * of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is - * less than the specified max size. The element will be discarded in such a case. - *

* * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal. diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index 5b58ac06f3..e750a5df96 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -422,14 +421,4 @@ public void discardOnError() { .verifyThenAssertThat() .hasDiscardedExactly(1, 2, 3); } - - @Test - public void bufferSupplierUsesSet() { - Flux.just(1, 1, 1, 1, 1, 1, 1) - .>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new) - .as(it -> StepVerifier.create(it, 3)) - .expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1)) - .expectComplete() - .verify(Duration.ofSeconds(2)); - } }