From 92d3b10025a4a3598faf951741492675652fac83 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 4 Jan 2022 17:26:01 +0100 Subject: [PATCH] Round of deprecation removals and misc cleanups ahead of 1.3.0 --- implementation/revapi.json | 250 ++++++++++++------ .../main/java/io/smallrye/mutiny/Multi.java | 46 ---- .../src/main/java/io/smallrye/mutiny/Uni.java | 13 - .../smallrye/mutiny/groups/MultiOverflow.java | 16 -- .../mutiny/groups/MultiTransform.java | 142 ---------- .../mutiny/groups/UniOnTerminate.java | 2 +- .../mutiny/helpers/UniCallbackSubscriber.java | 4 +- .../mutiny/helpers/test/AssertSubscriber.java | 2 +- .../helpers/test/UniAssertSubscriber.java | 2 +- .../mutiny/operators/AbstractMulti.java | 5 - .../mutiny/operators/AbstractUni.java | 3 +- .../multi/MultiOnCompletionCall.java | 2 +- .../multi/multicast/MultiPublishOp.java | 2 +- .../mutiny/operators/uni/UniRetryAtMost.java | 2 +- .../mutiny/groups/UniMemoizeTest.java | 4 +- .../mutiny/operators/MultiCollectTest.java | 6 +- .../mutiny/operators/MultiDistinctTest.java | 24 +- .../mutiny/operators/MultiGroupTest.java | 3 +- .../mutiny/operators/MultiOnOverflowTest.java | 9 +- .../operators/MultiSelectFirstOrLastTest.java | 18 -- .../MultiSelectWhereAndWhenTest.java | 29 +- .../mutiny/operators/MultiSkipTest.java | 19 -- .../operators/MultiSkipWhereAndWhenTest.java | 5 +- .../mutiny/operators/MultiTakeTest.java | 58 ++-- .../MultiTransformByMergingTest.java | 47 +--- .../UniSubscribeAsCompletionStageTest.java | 3 +- .../mutiny/operators/UniToPublisherTest.java | 3 +- .../operators/multi/MultiToHotStreamTest.java | 2 +- 28 files changed, 241 insertions(+), 480 deletions(-) delete mode 100644 implementation/src/main/java/io/smallrye/mutiny/groups/MultiTransform.java diff --git a/implementation/revapi.json b/implementation/revapi.json index 2eff61a66..9ec032716 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -1,79 +1,177 @@ -[ { - "extension" : "revapi.java", - "id" : "java", - "configuration" : { - "missing-classes" : { - "behavior" : "report", - "ignoreMissingAnnotations" : false - }, - "filter" : { - "packages" : { - "regex" : true, - "include" : [ "io\\.smallrye\\.mutiny(\\..+)?" ], - "exclude" : [ "io\\.smallrye\\.mutiny\\.operators(\\..+)?" ] - } - } - } -}, { - "extension" : "revapi.differences", - "id" : "breaking-changes", - "configuration" : { - "criticality" : "highlight", - "minSeverity" : "POTENTIALLY_BREAKING", - "minCriticality" : "documented", - "differences" : [ - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method void io.smallrye.mutiny.groups.UniAwait::(io.smallrye.mutiny.Uni)", - "new": "method void io.smallrye.mutiny.groups.UniAwait::(io.smallrye.mutiny.Uni, io.smallrye.mutiny.Context)", - "justification": "Private API impacted by the new Mutiny context support" - }, - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method void io.smallrye.mutiny.groups.UniAwaitOptional::(io.smallrye.mutiny.Uni)", - "new": "method void io.smallrye.mutiny.groups.UniAwaitOptional::(io.smallrye.mutiny.Uni, io.smallrye.mutiny.Context)", - "justification": "Private API impacted by the new Mutiny context support" - }, - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method void io.smallrye.mutiny.helpers.BlockingIterable::(org.reactivestreams.Publisher, int, java.util.function.Supplier>)", - "new": "method void io.smallrye.mutiny.helpers.BlockingIterable::(io.smallrye.mutiny.Multi, int, java.util.function.Supplier>, java.util.function.Supplier)", - "justification": "Private API impacted by the new Mutiny context support" - }, - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber::(java.util.function.Consumer, java.util.function.Consumer)", - "new": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber::(java.util.function.Consumer, java.util.function.Consumer, io.smallrye.mutiny.Context)", - "justification": "Private API impacted by the new Mutiny context support" +[ + { + "extension": "revapi.java", + "id": "java", + "configuration": { + "missing-classes": { + "behavior": "report", + "ignoreMissingAnnotations": false }, - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber::(java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", - "new": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber::(io.smallrye.mutiny.Context, java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", - "justification": "Private API impacted by the new Mutiny context support" - }, - { - "ignore": true, - "code": "java.method.numberOfParametersChanged", - "old": "method io.smallrye.mutiny.subscription.CancellableSubscriber io.smallrye.mutiny.subscription.Subscribers::from(java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", - "new": "method io.smallrye.mutiny.subscription.CancellableSubscriber io.smallrye.mutiny.subscription.Subscribers::from(io.smallrye.mutiny.Context, java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", - "justification": "Private API impacted by the new Mutiny context support" + "filter": { + "packages": { + "regex": true, + "include": [ + "io\\.smallrye\\.mutiny(\\..+)?" + ], + "exclude": [ + "io\\.smallrye\\.mutiny\\.operators(\\..+)?" + ] + } } - ] - } -}, { - "extension" : "revapi.reporter.json", - "configuration" : { - "minSeverity" : "POTENTIALLY_BREAKING", - "minCriticality" : "documented", - "output" : "target/compatibility.json", - "indent" : true, - "append" : false, - "keepEmptyFile" : true + } + }, + { + "extension": "revapi.differences", + "id": "breaking-changes", + "configuration": { + "criticality": "highlight", + "minSeverity": "POTENTIALLY_BREAKING", + "minCriticality": "documented", + "differences": [ + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.groups.UniAwait::(io.smallrye.mutiny.Uni)", + "new": "method void io.smallrye.mutiny.groups.UniAwait::(io.smallrye.mutiny.Uni, io.smallrye.mutiny.Context)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.groups.UniAwaitOptional::(io.smallrye.mutiny.Uni)", + "new": "method void io.smallrye.mutiny.groups.UniAwaitOptional::(io.smallrye.mutiny.Uni, io.smallrye.mutiny.Context)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.helpers.BlockingIterable::(org.reactivestreams.Publisher, int, java.util.function.Supplier>)", + "new": "method void io.smallrye.mutiny.helpers.BlockingIterable::(io.smallrye.mutiny.Multi, int, java.util.function.Supplier>, java.util.function.Supplier)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber::(java.util.function.Consumer, java.util.function.Consumer)", + "new": "method void io.smallrye.mutiny.helpers.UniCallbackSubscriber::(java.util.function.Consumer, java.util.function.Consumer, io.smallrye.mutiny.Context)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber::(java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", + "new": "method void io.smallrye.mutiny.subscription.Subscribers.CallbackBasedSubscriber::(io.smallrye.mutiny.Context, java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method io.smallrye.mutiny.subscription.CancellableSubscriber io.smallrye.mutiny.subscription.Subscribers::from(java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", + "new": "method io.smallrye.mutiny.subscription.CancellableSubscriber io.smallrye.mutiny.subscription.Subscribers::from(io.smallrye.mutiny.Context, java.util.function.Consumer, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer)", + "justification": "Private API impacted by the new Mutiny context support" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiCollect io.smallrye.mutiny.Multi::collectItems()", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiGroup io.smallrye.mutiny.Multi::groupItems()", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.Multi::transform()", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.Uni io.smallrye.mutiny.Uni::cache()", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.Multi io.smallrye.mutiny.groups.MultiOverflow::drop(java.util.function.Consumer)", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.groups.MultiTransform", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiGlobalSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnCancellationSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnCompletionSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnFailureSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnItemSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnOverflowSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnRequestSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnSubscribeSpy", + "justification": "Deprecated API removal (~1 year old)" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method io.smallrye.mutiny.groups.MultiTransform io.smallrye.mutiny.operators.AbstractMulti::transform() @ io.smallrye.mutiny.helpers.spies.MultiOnTerminationSpy", + "justification": "Deprecated API removal (~1 year old)" + } + ] + } + }, + { + "extension": "revapi.reporter.json", + "configuration": { + "minSeverity": "POTENTIALLY_BREAKING", + "minCriticality": "documented", + "output": "target/compatibility.json", + "indent": true, + "append": false, + "keepEmptyFile": true + } } -} ] \ No newline at end of file +] \ No newline at end of file diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index 40cd81c6d..f04e3f6e0 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -203,28 +203,6 @@ default O stage(Function, O> stage) { @CheckReturnValue Multi cache(); - /** - * Produces {@link Uni} collecting/aggregating items from this {@link Multi}. - * It allows accumulating the items emitted by this {@code multi} into a structure such as a into a - * {@link java.util.List} ({@link MultiCollect#asList()}), a {@link java.util.Map} - * ({@link MultiCollect#asMap(Function)}, or a custom collector. - * When this {@code multi} sends the completion signal, the structure is emitted by the returned {@link Uni}. - *

- * If this {@link Multi} emits a failure, the produced {@link Uni} produces the same failure and the aggregated items - * are discarded. - *

- * You can also retrieve the first and last items using {@link MultiCollect#first()} and {@link MultiCollect#last()}. - * Be aware to not used method collecting items on unbounded / infinite {@link Multi}. - * - * @return the object to configure the collection process. - * @deprecated Use {@link #collect()} instead - */ - @Deprecated - @CheckReturnValue - default MultiCollect collectItems() { - return collect(); - } - /** * Produces {@link Uni} collecting/aggregating items from this {@link Multi}. * It allows accumulating the items emitted by this {@code multi} into a structure such as a into a @@ -253,20 +231,6 @@ default MultiCollect collectItems() { @CheckReturnValue MultiGroup group(); - /** - * Produces {@link Multi} grouping items from this {@link Multi} into various "form of chunks" (list, {@link Multi}). - * The grouping can be done linearly ({@link MultiGroup#intoLists()} and {@link MultiGroup#intoMultis()}, or based - * on a grouping function ({@link MultiGroup#by(Function)}) - * - * @return the object to configure the grouping. - * @deprecated Use {@link #group()} instead - */ - @Deprecated - @CheckReturnValue - default MultiGroup groupItems() { - return group(); - } - /** * Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods * on the supplied {@link Executor}. @@ -301,16 +265,6 @@ default MultiGroup groupItems() { @CheckReturnValue MultiOnCompletion onCompletion(); - /** - * Transforms the streams by skipping, selecting, or merging. - * - * @return the object to configure the transformation. - * @deprecated Use {@link #select()} and {@link #skip()}instead - */ - @Deprecated - @CheckReturnValue - MultiTransform transform(); - /** * Selects items from this {@link Multi}. * diff --git a/implementation/src/main/java/io/smallrye/mutiny/Uni.java b/implementation/src/main/java/io/smallrye/mutiny/Uni.java index cd7388fe5..1da0693a6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Uni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Uni.java @@ -371,19 +371,6 @@ default UniAwait awaitUsing(Context context) { @CheckReturnValue UniMemoize memoize(); - /** - * Caches the events (item or failure) of this {@link Uni} and replays it for all further {@link UniSubscriber}. - * - * @return the new {@link Uni}. Unlike regular {@link Uni}, re-subscribing to this {@link Uni} does not re-compute - * the outcome but replayed the cached events. - * @deprecated Use {@link UniMemoize#indefinitely()} instead - */ - @Deprecated - @CheckReturnValue - default Uni cache() { - return memoize().indefinitely(); - } - /** * Transforms the item (potentially null) emitted by this {@link Uni} by applying a (synchronous) function to it. * This method is equivalent to {@code uni.onItem().transform(x -> ...)} diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOverflow.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOverflow.java index 3e46f16c9..c895c5ed1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOverflow.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOverflow.java @@ -53,22 +53,6 @@ public Multi drop() { return new MultiOverflowStrategy<>(upstream, null, null).drop(); } - /** - * When the downstream cannot keep up with the upstream emissions, instruct to drop the item. - * - * @param callback a callback invoked when an item is dropped. The callback receives the item. Must not be - * {@code null} - * @return the new multi - * @deprecated Use {@link Multi#invoke(Consumer)} and {@link MultiOverflowStrategy#drop()} as in - * {@code multi.onOverflow().invoke(consumer).drop()}. - */ - @Deprecated - @CheckReturnValue - public Multi drop(Consumer callback) { - Consumer actual = Infrastructure.decorate(nonNull(callback, "callback")); - return new MultiOverflowStrategy<>(upstream, actual, null).drop(); - } - /** * When the downstream cannot keep up with the upstream emissions, instruct to drop all previously buffered items. * diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiTransform.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiTransform.java deleted file mode 100644 index c7736d824..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiTransform.java +++ /dev/null @@ -1,142 +0,0 @@ -package io.smallrye.mutiny.groups; - -import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; -import java.util.function.Predicate; - -import org.reactivestreams.Publisher; - -import io.smallrye.common.annotation.CheckReturnValue; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; - -public class MultiTransform { - - private final Multi upstream; - - public MultiTransform(Multi upstream) { - this.upstream = upstream; - } - - @CheckReturnValue - public Multi bySkippingFirstItems(long number) { - return upstream.skip().first(number); - } - - @CheckReturnValue - public Multi bySkippingLastItems(int number) { - return upstream.skip().last(number); - } - - @CheckReturnValue - public Multi bySkippingItemsWhile(Predicate predicate) { - return upstream.skip().first(predicate); - } - - @CheckReturnValue - public Multi bySkippingItemsFor(Duration duration) { - return upstream.skip().first(duration); - } - - @CheckReturnValue - public Multi byTakingFirstItems(long number) { - return upstream.select().first(number); - } - - @CheckReturnValue - public Multi byTakingLastItems(int number) { - return upstream.select().last(number); - } - - @CheckReturnValue - public Multi byTakingItemsFor(Duration duration) { - return upstream.select().first(duration); - } - - @CheckReturnValue - public Multi byTakingItemsWhile(Predicate predicate) { - return upstream.select().first(predicate); - } - - @CheckReturnValue - public Multi byDroppingDuplicates() { - return upstream.select().distinct(); - } - - @CheckReturnValue - public Multi byDroppingRepetitions() { - return upstream.skip().repetitions(); - } - - @SafeVarargs - @CheckReturnValue - public final Multi byMergingWith(Publisher... publishers) { - List> list = new ArrayList<>(); - list.add(upstream); - list.addAll(Arrays.asList(nonNull(publishers, "publishers"))); - return Multi.createBy().merging().streams(list); - } - - @CheckReturnValue - public Multi byMergingWith(Iterable> iterable) { - List> list = new ArrayList<>(); - list.add(upstream); - nonNull(iterable, "iterable").forEach(list::add); - return Multi.createBy().merging().streams(list); - } - - /** - * Produces a {@link Multi} containing the items from this {@link Multi} passing the {@code predicate} test. - * - * @param predicate the predicate, must not be {@code null} - * @return the produced {@link Multi} - */ - @CheckReturnValue - public Multi byFilteringItemsWith(Predicate predicate) { - return upstream.select().where(predicate); - } - - /** - * Produces a {@link Multi} containing the items from this {@link Multi} passing the {@code predicate} - * asynchronous test. Unlike {@link #byFilteringItemsWith(Predicate)}, the test is asynchronous. Note that this method - * preserves ordering of the items, even if the test is asynchronous. - * - * @param predicate the predicate, must not be {@code null}, must not produce {@code null} - * @return the produced {@link Multi} - */ - @CheckReturnValue - public Multi byTestingItemsWith(Function> predicate) { - // Decoration happens in `when` - return upstream.select().when(predicate); - } - - /** - * Produces a new {@link Multi} transforming the upstream into a hot stream. - * With a hot stream, when no subscribers are present, emitted items are dropped. - * Late subscribers would only receive items emitted after their subscription. - * If the upstream has already been terminated, the termination event (failure or completion) is forwarded to the - * subscribers. - *

- * Note that this operator consumes the upstream stream without back-pressure. - * It still enforces downstream back-pressure. - * If the subscriber is not ready to receive an item when the upstream emits an item, the subscriber gets a - * {@link io.smallrye.mutiny.subscription.BackPressureFailure} failure. - * - * @return the new multi. - * @deprecated Use {@link Multi#toHotStream()} instead - */ - @Deprecated - @CheckReturnValue - public Multi toHotStream() { - BroadcastProcessor processor = BroadcastProcessor.create(); - upstream.subscribe(processor); - return processor; - } - -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnTerminate.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnTerminate.java index 32643c562..71b6bb323 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnTerminate.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnTerminate.java @@ -72,7 +72,7 @@ public Uni call(Functions.Function3> ma /** * Attaches an action that is executed when the {@link Uni} emits an item or a failure or when the subscriber - * cancels the subscription. Unlike {@link #call(Supplier)} (Functions.Function3)} the supplier does not receive the + * cancels the subscription. Unlike {@link #call(Functions.Function3)} the supplier does not receive the * item, failure or cancellation. * * @param supplier must return a non-{@code null} {@link Uni}. diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/UniCallbackSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/UniCallbackSubscriber.java index 4ccade449..f892ef85e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/UniCallbackSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/UniCallbackSubscriber.java @@ -36,14 +36,14 @@ public class UniCallbackSubscriber implements UniSubscriber, UniSubscripti * * @param onResultCallback callback invoked on item event, must not be {@code null} * @param onFailureCallback callback invoked on failure event, must not be {@code null} - * @param context + * @param context the subscriber context, must not be {@code null} */ public UniCallbackSubscriber(Consumer onResultCallback, Consumer onFailureCallback, Context context) { this.onResultCallback = nonNull(onResultCallback, "onResultCallback"); this.onFailureCallback = nonNull(onFailureCallback, "onFailureCallback"); - this.context = context; + this.context = nonNull(context, "context"); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java index e16824732..367ebef36 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java @@ -675,7 +675,7 @@ private void awaitItemEvents(int expected, Duration duration) { "Expected " + expected + " items, but received a failure event while waiting: " + getFailure() + ". Only " + items.size() + " items have been received."); } else { - System.out.println(e); + e.printStackTrace(); throw new AssertionError( "Expected " + expected + " items. Only " + items.size() + " items have been received."); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriber.java index f43496d32..ed9511b0c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriber.java @@ -121,7 +121,7 @@ public synchronized void onSubscribe(UniSubscription subscription) { @Override public synchronized void onItem(T item) { - signals.add(new OnItemUniSignal(item)); + signals.add(new OnItemUniSignal<>(item)); this.completion.complete(item); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index a2fd5c824..ed68be7df 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -94,11 +94,6 @@ public MultiOnCompletion onCompletion() { return new MultiOnCompletion<>(this); } - @Override - public MultiTransform transform() { - return new MultiTransform<>(this); - } - @Override public MultiSelect select() { return new MultiSelect<>(this); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java index 52fa97c1b..5839543e1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java @@ -108,10 +108,9 @@ public Uni runSubscriptionOn(Executor executor) { @Override public UniMemoize memoize() { - return new UniMemoize(this); + return new UniMemoize<>(this); } - @Override public Uni cache() { return Infrastructure.onUniCreation(new UniMemoizeOp<>(this)); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnCompletionCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnCompletionCall.java index 9c8f7573c..78cd06789 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnCompletionCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnCompletionCall.java @@ -37,7 +37,7 @@ public MultiOnCompletionCallProcessor(MultiSubscriber downstream) { public void onCompletion() { cancellable = execute().subscribe().with( ignored -> super.onCompletion(), - err -> super.onFailure(err)); + super::onFailure); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/multicast/MultiPublishOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/multicast/MultiPublishOp.java index e29a40044..ea7ea1f10 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/multicast/MultiPublishOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/multicast/MultiPublishOp.java @@ -82,7 +82,7 @@ public void connect(ConnectableMultiConnection connection) { } else { context = Context.empty(); } - PublishSubscriber u = new PublishSubscriber(current, bufferSize, context); + PublishSubscriber u = new PublishSubscriber<>(current, bufferSize, context); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java index cf3a567fb..1b101beea 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java @@ -25,7 +25,7 @@ public UniRetryAtMost(Uni upstream, Predicate predicate, l @Override public void subscribe(UniSubscriber subscriber) { - AbstractUni.subscribe(upstream(), new UniRetryAtMostProcessor(this, subscriber)); + AbstractUni.subscribe(upstream(), new UniRetryAtMostProcessor<>(this, subscriber)); } private static class UniRetryAtMostProcessor extends UniOperatorProcessor { diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java index 80634679d..d5e2c0e65 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java @@ -129,7 +129,7 @@ void testAtLeastNegative() { @DisplayName("Test the deprecated uni.cache() method") void testDeprecatedUniCache() { AtomicInteger counter = new AtomicInteger(); - Uni cache = Uni.createFrom().item(counter.incrementAndGet()).cache(); + Uni cache = Uni.createFrom().item(counter.incrementAndGet()).memoize().indefinitely(); UniAssertSubscriber sub1 = UniAssertSubscriber.create(); UniAssertSubscriber sub2 = UniAssertSubscriber.create(); @@ -280,7 +280,7 @@ void assertCachingTheValueEmittedByAProcessor() { @DisplayName("Test that uni.cache() caches values emitted by a processor") void assertCachingTheValueEmittedByAProcessorUsingDeprecatedUniCache() { UnicastProcessor processor = UnicastProcessor.create(); - Uni cached = Uni.createFrom().publisher(processor).cache(); + Uni cached = Uni.createFrom().publisher(processor).memoize().indefinitely(); UniAssertSubscriber sub1 = new UniAssertSubscriber<>(); UniAssertSubscriber sub2 = new UniAssertSubscriber<>(); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java index b001b8d2c..1c66cbd5a 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java @@ -54,14 +54,12 @@ public void testCollectFirstAndLast() { @Test public void testCollectFirstAndLastDeprecated() { Multi items = Multi.createFrom().items(1, 2, 3); - items - .collectItems().first() + items.collect().first() .subscribe().withSubscriber(UniAssertSubscriber.create()) .awaitItem() .assertItem(1); - items - .collectItems().last() + items.collect().last() .subscribe().withSubscriber(UniAssertSubscriber.create()) .awaitItem() .assertItem(3); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java index a91b29f0b..cb24a36ab 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java @@ -76,16 +76,6 @@ public void testDistinctWithComparatorReturningAlways1() { .assertItems(1, 2, 3, 4, 2, 4, 2, 4); } - @SuppressWarnings("deprecation") - @Test - public void testDistinctDeprecated() { - Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) - .transform().byDroppingDuplicates() - .subscribe().withSubscriber(AssertSubscriber.create(10)) - .assertCompleted() - .assertItems(1, 2, 3, 4); - } - @Test public void testDistinctWithUpstreamFailure() { Multi.createFrom(). failure(new IOException("boom")) @@ -102,15 +92,6 @@ public void testDistinctWithComparatorWithUpstreamFailure() { .assertFailedWith(IOException.class, "boom"); } - @SuppressWarnings("deprecation") - @Test - public void testDistinctWithUpstreamFailureDeprecated() { - Multi.createFrom(). failure(new IOException("boom")) - .transform().byDroppingDuplicates() - .subscribe().withSubscriber(AssertSubscriber.create(10)) - .assertFailedWith(IOException.class, "boom"); - } - @SuppressWarnings("ConstantConditions") @Test public void testThatNullSubscriberAreRejectedDistinct() { @@ -181,11 +162,10 @@ public void testSkipRepetitionsWithComparatorAlwaysReturning1() { .assertItems(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4); } - @SuppressWarnings("deprecation") @Test - public void testDroppedRepetitionsDeprecated() { + public void testDroppedRepetitions() { Multi.createFrom().items(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4) - .transform().byDroppingRepetitions() + .skip().repetitions() .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4, 2, 4, 1, 2, 4); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java index aa6ab13bb..39a6a343b 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java @@ -76,8 +76,7 @@ public void testGroupIntoListsOfTwoElements() { @SuppressWarnings("deprecation") @Test public void testGroupIntoListsOfTwoElementsDeprecated() { - AssertSubscriber> subscriber = Multi.createFrom().range(1, 10) - .groupItems().intoLists().of(2) + AssertSubscriber> subscriber = Multi.createFrom().range(1, 10).group().intoLists().of(2) .subscribe().withSubscriber(AssertSubscriber.create(100)); subscriber.assertCompleted(); assertThat(subscriber.getItems()).containsExactly( diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnOverflowTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnOverflowTest.java index eca8b213a..346a32a3a 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnOverflowTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnOverflowTest.java @@ -36,7 +36,8 @@ public class MultiOnOverflowTest { @Test public void testThatDropCallbackCannotBeNull() { - assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item(1).onOverflow().drop(null)); + assertThrows(IllegalArgumentException.class, + () -> Multi.createFrom().item(1).onOverflow().invoke((Consumer) null).drop()); } @Test @@ -86,7 +87,7 @@ public void testDropStrategyWithEmitter() { AtomicReference> emitter = new AtomicReference<>(); List list = new CopyOnWriteArrayList<>(); Multi multi = Multi.createFrom().emitter((Consumer>) emitter::set) - .onOverflow().drop(list::add); + .onOverflow().invoke(list::add).drop(); multi.subscribe(sub); emitter.get().emit(1); sub.request(2); @@ -119,9 +120,9 @@ public void testDropStrategyWithEmitterWithoutCallback() { @Test public void testDropStrategyWithCallbackThrowingAnException() { Multi.createFrom().items(2, 3, 4) - .onOverflow().drop(i -> { + .onOverflow().invoke(i -> { throw new IllegalStateException("boom"); - }) + }).drop() .subscribe().withSubscriber(AssertSubscriber.create(0)) .assertFailedWith(IllegalStateException.class, "boom"); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLastTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLastTest.java index 5e4df9ed6..7af302289 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLastTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLastTest.java @@ -61,15 +61,6 @@ public void testSelectFirst() { assertThat(counter).hasValue(1); } - @SuppressWarnings("deprecation") - @Test - public void testSelectFirstDeprecated() { - List list = Multi.createFrom().range(1, 5).transform().byTakingFirstItems(2) - .collect().asList().await().indefinitely(); - - assertThat(list).containsExactly(1, 2); - } - @Test public void testSelectFirst0() { List list = Multi.createFrom().range(1, 5).select().first(0) @@ -97,15 +88,6 @@ public void testSelectLast() { assertThat(list).containsExactly(4); } - @SuppressWarnings("deprecation") - @Test - public void testSelectLastDeprecated() { - List list = Multi.createFrom().range(1, 5).transform().byTakingLastItems(2) - .collect().asList().await().indefinitely(); - - assertThat(list).containsExactly(3, 4); - } - @Test public void testSelectLastWith0() { List list = Multi.createFrom().range(1, 5) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectWhereAndWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectWhereAndWhenTest.java index 0c0a7a2af..496a076ff 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectWhereAndWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectWhereAndWhenTest.java @@ -65,8 +65,7 @@ public void cannotRequestZeroItems() { public void testFilteringWithPredicate() { Predicate test = x -> x % 2 != 0; assertThat(Multi.createFrom().range(1, 4) - .select().where(test) - .collectItems().asList() + .select().where(test).collect().asList() .await().indefinitely()).containsExactly(1, 3); } @@ -76,8 +75,7 @@ public void testFilteringWithPredicateAndLimit() { MultiOnCancellationSpy spy = Spy .onCancellation(Multi.createFrom().range(1, 10)); assertThat(spy - .select().where(test, 2) - .collectItems().asList() + .select().where(test, 2).collect().asList() .await().indefinitely()).containsExactly(1, 3); assertThat(spy.isCancelled()).isTrue(); @@ -87,36 +85,15 @@ public void testFilteringWithPredicateAndLimit() { public void testFilteringWithPredicateAndZeroAsLimit() { Predicate test = x -> x % 2 != 0; assertThat(Multi.createFrom().range(1, 10) - .select().where(test, 0) - .collectItems().asList() + .select().where(test, 0).collect().asList() .await().indefinitely()).isEmpty(); } - @Test - @SuppressWarnings("deprecation") - public void testFilteringWithPredicateDeprecated() { - Predicate test = x -> x % 2 != 0; - assertThat(Multi.createFrom().range(1, 4) - .transform().byFilteringItemsWith(test) - .collect().asList() - .await().indefinitely()).containsExactly(1, 3); - } - @Test public void testFilteringWithUni() { assertThat(Multi.createFrom().range(1, 4) .select().when( x -> Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> x % 2 != 0))) - .collectItems().asList() - .await().indefinitely()).containsExactly(1, 3); - } - - @SuppressWarnings("deprecation") - @Test - public void testFilteringWithUniDeprecated() { - assertThat(Multi.createFrom().range(1, 4) - .transform().byTestingItemsWith( - x -> Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> x % 2 != 0))) .collect().asList() .await().indefinitely()).containsExactly(1, 3); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipTest.java index ceb140d66..0531a73e8 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipTest.java @@ -51,15 +51,6 @@ public void testSkipFirst() { assertThat(list).containsExactly(2, 3, 4); } - @SuppressWarnings("deprecation") - @Test - public void testSimpleSkipDeprecated() { - List list = Multi.createFrom().range(1, 5).transform().bySkippingFirstItems(1) - .collect().asList().await().indefinitely(); - - assertThat(list).containsExactly(2, 3, 4); - } - @Test public void testSkipFirstZero() { List list = Multi.createFrom().range(1, 5) @@ -78,16 +69,6 @@ public void testSimpleSkipLast() { assertThat(list).containsExactly(1, 2, 3); } - @SuppressWarnings("deprecation") - @Test - public void testSimpleSkipLastDeprecated() { - List list = Multi.createFrom().range(1, 5) - .transform().bySkippingLastItems(1) - .collectItems().asList().await().indefinitely(); - - assertThat(list).containsExactly(1, 2, 3); - } - @Test public void testSkipLast() { List list = Multi.createFrom().range(1, 5) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipWhereAndWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipWhereAndWhenTest.java index ce0fa5d6b..82bba77c8 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipWhereAndWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSkipWhereAndWhenTest.java @@ -41,8 +41,7 @@ public void cannotRequestZeroItems() { public void testFilteringWithPredicate() { Predicate test = x -> x % 2 != 0; assertThat(Multi.createFrom().range(1, 4) - .skip().where(test) - .collectItems().asList() + .skip().where(test).collect().asList() .await().indefinitely()).containsExactly(2); } @@ -51,7 +50,7 @@ public void testFilteringWithUni() { assertThat(Multi.createFrom().range(1, 4) .skip().when( x -> Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> x % 2 != 0))) - .collectItems().asList() + .collect().asList() .await().indefinitely()).containsExactly(2); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTakeTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTakeTest.java index 35a12d835..10bab67f6 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTakeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTakeTest.java @@ -27,7 +27,7 @@ public class MultiTakeTest { @Test public void testSimpleTake() { - List list = Multi.createFrom().range(1, 5).transform().byTakingFirstItems(1) + List list = Multi.createFrom().range(1, 5).select().first(1) .collect().asList().await().indefinitely(); assertThat(list).containsExactly(1); @@ -35,7 +35,7 @@ public void testSimpleTake() { @Test public void testTakeZero() { - List list = Multi.createFrom().range(1, 5).transform().byTakingFirstItems(0) + List list = Multi.createFrom().range(1, 5).select().first(0) .collect().asList().await().indefinitely(); assertThat(list).isEmpty(); @@ -43,7 +43,7 @@ public void testTakeZero() { @Test public void testSimpleTakeLast() { - List list = Multi.createFrom().range(1, 5).transform().byTakingLastItems(1) + List list = Multi.createFrom().range(1, 5).select().last(1) .collect().asList().await().indefinitely(); assertThat(list).containsExactly(4); @@ -51,7 +51,7 @@ public void testSimpleTakeLast() { @Test public void testSimpleTakeZeroLast() { - List list = Multi.createFrom().range(1, 5).transform().byTakingLastItems(0) + List list = Multi.createFrom().range(1, 5).select().last(0) .collect().asList().await().indefinitely(); assertThat(list).isEmpty(); @@ -59,7 +59,7 @@ public void testSimpleTakeZeroLast() { @Test public void testTakeOnUpstreamFailure() { - Multi.createFrom(). failure(new IOException("boom")).transform().byTakingFirstItems(1) + Multi.createFrom(). failure(new IOException("boom")).select().first(1) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertFailedWith(IOException.class, "boom") .assertHasNotReceivedAnyItem(); @@ -67,7 +67,7 @@ public void testTakeOnUpstreamFailure() { @Test public void testTakeLastOnUpstreamFailure() { - Multi.createFrom(). failure(new IOException("boom")).transform().byTakingLastItems(1) + Multi.createFrom(). failure(new IOException("boom")).select().last(1) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertFailedWith(IOException.class, "boom") .assertHasNotReceivedAnyItem(); @@ -75,7 +75,7 @@ public void testTakeLastOnUpstreamFailure() { @Test public void testTakeAll() { - Multi.createFrom().range(1, 5).transform().byTakingFirstItems(4) + Multi.createFrom().range(1, 5).select().first(4) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4); @@ -83,7 +83,7 @@ public void testTakeAll() { @Test public void testTakeLastAll() { - Multi.createFrom().range(1, 5).transform().byTakingLastItems(4) + Multi.createFrom().range(1, 5).select().last(4) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4); @@ -92,10 +92,10 @@ public void testTakeLastAll() { @Test public void testInvalidTakeNumber() { assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> Multi.createFrom().items(1, 2, 3).transform().byTakingFirstItems(-1)); + .isThrownBy(() -> Multi.createFrom().items(1, 2, 3).select().first(-1)); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> Multi.createFrom().items(1, 2, 3).transform().byTakingLastItems(-1)); + .isThrownBy(() -> Multi.createFrom().items(1, 2, 3).select().last(-1)); } @Test @@ -104,7 +104,7 @@ public void testTakeLastWithBackPressure() { AtomicReference> emitter = new AtomicReference<>(); Multi.createFrom(). emitter(emitter::set) - .transform().byTakingLastItems(3) + .select().last(3) .subscribe(subscriber); subscriber.assertNotTerminated() @@ -134,7 +134,7 @@ public void testTakeSomeLastItems() { AssertSubscriber subscriber = AssertSubscriber.create(Long.MAX_VALUE); Multi.createFrom().range(1, 11) - .transform().byTakingLastItems(3) + .select().last(3) .subscribe(subscriber); subscriber.assertCompleted() @@ -143,7 +143,7 @@ public void testTakeSomeLastItems() { @Test public void testTakeWhileWithMethodThrowingException() { - Multi.createFrom().range(1, 10).transform().byTakingItemsWhile(i -> { + Multi.createFrom().range(1, 10).select().where(i -> { throw new IllegalStateException("boom"); }).subscribe().withSubscriber(AssertSubscriber.create(10)) .assertFailedWith(IllegalStateException.class, "boom"); @@ -152,7 +152,7 @@ public void testTakeWhileWithMethodThrowingException() { @Test public void testTakeWhileWithUpstreamFailure() { Multi.createFrom(). failure(new IOException("boom")) - .transform().byTakingItemsWhile(i -> i < 5) + .select().where(i -> i < 5) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertFailedWith(IOException.class, "boom"); } @@ -160,12 +160,12 @@ public void testTakeWhileWithUpstreamFailure() { @Test public void testTakeWhileWithNullMethod() { assertThrows(IllegalArgumentException.class, - () -> Multi.createFrom().nothing().transform().byTakingItemsWhile(null)); + () -> Multi.createFrom().nothing().select().where(null)); } @Test public void testTakeWhile() { - Multi.createFrom().range(1, 10).transform().byTakingItemsWhile(i -> i < 5) + Multi.createFrom().range(1, 10).select().where(i -> i < 5) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4); @@ -173,7 +173,7 @@ public void testTakeWhile() { @Test public void testTakeWhileNone() { - Multi.createFrom().items(1, 2, 3, 4).transform().byTakingItemsWhile(i -> false) + Multi.createFrom().items(1, 2, 3, 4).select().where(i -> false) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertHasNotReceivedAnyItem(); @@ -181,7 +181,7 @@ public void testTakeWhileNone() { @Test public void testTakeWhileAll() { - Multi.createFrom().items(1, 2, 3, 4).transform().byTakingItemsWhile(i -> true) + Multi.createFrom().items(1, 2, 3, 4).select().where(i -> true) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4); @@ -189,8 +189,8 @@ public void testTakeWhileAll() { @Test public void testTakeWhileSomeWithBackPressure() { - AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3, 4).transform() - .byTakingItemsWhile(i -> i < 3) + AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3, 4).select() + .where(i -> i < 3) .subscribe().withSubscriber(AssertSubscriber.create(0)); subscriber.assertNotTerminated() @@ -210,7 +210,7 @@ public void testTakeWhileSomeWithBackPressure() { @Test public void testLimitingInfiniteStream() { Multi.createFrom().ticks().every(Duration.ofMillis(2)) - .transform().byTakingFirstItems(5) + .select().first(5) .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) .awaitCompletion() .assertItems(0L, 1L, 2L, 3L, 4L); @@ -218,8 +218,8 @@ public void testLimitingInfiniteStream() { @Test public void testTakeByTime() { - AssertSubscriber subscriber = Multi.createFrom().range(1, 100).transform() - .byTakingItemsFor(Duration.ofMillis(1000)) + AssertSubscriber subscriber = Multi.createFrom().range(1, 100).select() + .first(Duration.ofMillis(1000)) .subscribe().withSubscriber(AssertSubscriber.create(10)) .awaitCompletion(); @@ -233,7 +233,7 @@ public void testTakeByTimeWithFailure() { Multi.createFrom().failure(new TestException("boom")), Multi.createFrom().range(5, 10)); AssertSubscriber subscriber = multi - .transform().byTakingItemsFor(Duration.ofMillis(1000)) + .select().first(Duration.ofMillis(1000)) .subscribe().withSubscriber(AssertSubscriber.create(100)) .awaitFailure() .assertFailedWith(TestException.class, "boom"); @@ -247,7 +247,7 @@ public void testTakeByTimeWithCancellation() { Multi.createFrom().range(1, 5), Multi.createFrom().range(5, 10)); AssertSubscriber subscriber = multi - .transform().byTakingItemsFor(Duration.ofMillis(1000)) + .select().first(Duration.ofMillis(1000)) .subscribe().withSubscriber(AssertSubscriber.create(4)); await().until(() -> subscriber.getItems().size() == 4); @@ -262,7 +262,7 @@ public void testTakeByTimeWithImmediateCancellation() { Multi.createFrom().range(1, 5), Multi.createFrom().range(5, 10)); AssertSubscriber subscriber = multi - .transform().byTakingItemsFor(Duration.ofMillis(1000)) + .select().first(Duration.ofMillis(1000)) .subscribe().withSubscriber(new AssertSubscriber<>(4, true)); subscriber.assertSubscribed() @@ -283,7 +283,7 @@ public void subscribe(MultiSubscriber subscriber) { } }; AssertSubscriber subscriber = rogue - .transform().byTakingItemsFor(Duration.ofMillis(1000)) + .select().first(Duration.ofMillis(1000)) .subscribe().withSubscriber(AssertSubscriber.create(100)) .awaitCompletion(); @@ -303,7 +303,7 @@ public void subscribe(MultiSubscriber subscriber) { } }; AssertSubscriber subscriber = rogue - .transform().byTakingItemsFor(Duration.ofMillis(1000)) + .select().first(Duration.ofMillis(1000)) .subscribe().withSubscriber(AssertSubscriber.create(100)) .awaitFailure() .assertFailedWith(IOException.class, "boom"); @@ -314,7 +314,7 @@ public void subscribe(MultiSubscriber subscriber) { @Test public void testSkipByTimeWithInvalidDuration() { assertThrows(IllegalArgumentException.class, - () -> Multi.createFrom().item(1).transform().byTakingItemsFor(Duration.ofMillis(-1))); + () -> Multi.createFrom().item(1).select().first(Duration.ofMillis(-1))); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformByMergingTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformByMergingTest.java index 657dcd4b4..630f26039 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformByMergingTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformByMergingTest.java @@ -3,18 +3,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.test.AssertSubscriber; public class MultiTransformByMergingTest { @@ -23,7 +16,7 @@ public void testMerging() { Multi m1 = Multi.createFrom().range(1, 10); Multi m2 = Multi.createFrom().range(10, 12); - List list = m1.transform().byMergingWith(m2).collect().asList().await().indefinitely(); + List list = Multi.createBy().merging().streams(m1, m2).collect().asList().await().indefinitely(); assertThat(list).hasSize(11); } @@ -33,43 +26,17 @@ public void testMergingIterable() { Multi m2 = Multi.createFrom().range(10, 12); Multi m3 = Multi.createFrom().range(12, 14); - List list = m1.transform().byMergingWith(Arrays.asList(m2, m3)).collect().asList().await() + List list = Multi.createBy().merging().streams(m1, m2, m3).collect().asList().await() .indefinitely(); assertThat(list).hasSize(13); } @Test public void testMergingWithNull() { - assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item(1).transform() - .byMergingWith(Multi.createFrom().item(2), null, Multi.createFrom().item(3))); - } - - @Test - public void testMergingWithIterableContainingNull() { - assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item(1).transform() - .byMergingWith(Arrays.asList(Multi.createFrom().item(2), null, Multi.createFrom().item(3)))); - } - - @Test - public void testMergingWithNullIterable() { - assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item(1).transform() - .byMergingWith((Iterable>) null)); - } - - @Test - @Disabled("this test is failing on CI - must be investigated") - public void testConcurrentEmissionWithMerge() { - ExecutorService service = Executors.newFixedThreadPool(10); - Multi m1 = Multi.createFrom().range(1, 100).emitOn(service); - Multi m2 = Multi.createFrom().range(100, 150).emitOn(service).emitOn(service).emitOn(service); - Multi m3 = Multi.createFrom().range(150, 200).emitOn(service).emitOn(service); - - Multi merged = m1.transform().byMergingWith(m2, m3); - AssertSubscriber subscriber = merged.subscribe() - .withSubscriber(AssertSubscriber.create(1000)); - - subscriber.awaitCompletion(); - List items = subscriber.getItems(); - assertThat(Collections.singleton(items)).noneSatisfy(list -> assertThat(list).isSorted()); + assertThrows(IllegalArgumentException.class, () -> Multi.createBy().merging().streams( + Multi.createFrom().item(1), + Multi.createFrom().item(2), + null, + Multi.createFrom().item(3))); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniSubscribeAsCompletionStageTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniSubscribeAsCompletionStageTest.java index 5c9b32900..7854d63b1 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniSubscribeAsCompletionStageTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniSubscribeAsCompletionStageTest.java @@ -90,7 +90,8 @@ public void testThatSubscriptionsAreNotShared() { @Test public void testThatTwoSubscribersWithCache() { AtomicInteger count = new AtomicInteger(1); - Uni cached = Uni.createFrom().deferred(() -> Uni.createFrom().item(count.getAndIncrement())).cache(); + Uni cached = Uni.createFrom().deferred(() -> Uni.createFrom().item(count.getAndIncrement())).memoize() + .indefinitely(); CompletionStage cs1 = cached.subscribe().asCompletionStage(); CompletionStage cs2 = cached.subscribe().asCompletionStage(); assertThat(cs1).isNotNull(); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java index 0872256bf..9aec51cfa 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java @@ -123,7 +123,8 @@ public void testThatTwoSubscribersHaveTwoSubscriptions() { public void testThatTwoSubscribersWithCache() { AtomicInteger count = new AtomicInteger(1); Publisher publisher = Uni.createFrom() - .deferred(() -> Uni.createFrom().item(count.getAndIncrement())).cache().convert().toPublisher(); + .deferred(() -> Uni.createFrom().item(count.getAndIncrement())).memoize().indefinitely().convert() + .toPublisher(); assertThat(publisher).isNotNull(); Flowable flow = Flowable.fromPublisher(publisher); int first = flow.blockingFirst(); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiToHotStreamTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiToHotStreamTest.java index 3cff27f80..2a63bbc4a 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiToHotStreamTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiToHotStreamTest.java @@ -52,7 +52,7 @@ public void testWithTwoSubscribers() { public void testWithTwoSubscribersDeprecated() { UnicastProcessor processor = UnicastProcessor.create(); - Multi multi = processor.map(s -> s).transform().toHotStream(); + Multi multi = processor.map(s -> s).toHotStream(); AssertSubscriber subscriber1 = multi.subscribe() .withSubscriber(AssertSubscriber.create(10));