diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b3fba79201..71664eb8c3 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -118,7 +118,7 @@ public static ParallelFlowable from(Publisher source, ObjectHelper.verifyPositive(parallelism, "parallelism"); ObjectHelper.verifyPositive(prefetch, "prefetch"); - return new ParallelFromPublisher(source, parallelism, prefetch); + return RxJavaPlugins.onAssembly(new ParallelFromPublisher(source, parallelism, prefetch)); } /** @@ -132,7 +132,7 @@ public static ParallelFlowable from(Publisher source, @CheckReturnValue public final ParallelFlowable map(Function mapper) { ObjectHelper.requireNonNull(mapper, "mapper"); - return new ParallelMap(this, mapper); + return RxJavaPlugins.onAssembly(new ParallelMap(this, mapper)); } /** @@ -145,7 +145,7 @@ public final ParallelFlowable map(Function mapper @CheckReturnValue public final ParallelFlowable filter(Predicate predicate) { ObjectHelper.requireNonNull(predicate, "predicate"); - return new ParallelFilter(this, predicate); + return RxJavaPlugins.onAssembly(new ParallelFilter(this, predicate)); } /** @@ -197,7 +197,7 @@ public final ParallelFlowable runOn(Scheduler scheduler) { public final ParallelFlowable runOn(Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); - return new ParallelRunOn(this, scheduler, prefetch); + return RxJavaPlugins.onAssembly(new ParallelRunOn(this, scheduler, prefetch)); } /** @@ -229,7 +229,7 @@ public final Flowable reduce(BiFunction reducer) { public final ParallelFlowable reduce(Callable initialSupplier, BiFunction reducer) { ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); ObjectHelper.requireNonNull(reducer, "reducer"); - return new ParallelReduce(this, initialSupplier, reducer); + return RxJavaPlugins.onAssembly(new ParallelReduce(this, initialSupplier, reducer)); } /** @@ -304,6 +304,8 @@ public final Flowable sorted(Comparator comparator) { */ @CheckReturnValue public final Flowable sorted(Comparator comparator, int capacityHint) { + ObjectHelper.requireNonNull(comparator, "comparator is null"); + ObjectHelper.verifyPositive(capacityHint, "capacityHint"); int ch = capacityHint / parallelism() + 1; ParallelFlowable> railReduced = reduce(Functions.createArrayList(ch), ListAddBiConsumer.instance()); ParallelFlowable> railSorted = railReduced.map(new SorterFunction(comparator)); @@ -334,6 +336,9 @@ public final Flowable> toSortedList(Comparator comparator) { */ @CheckReturnValue public final Flowable> toSortedList(Comparator comparator, int capacityHint) { + ObjectHelper.requireNonNull(comparator, "comparator is null"); + ObjectHelper.verifyPositive(capacityHint, "capacityHint"); + int ch = capacityHint / parallelism() + 1; ParallelFlowable> railReduced = reduce(Functions.createArrayList(ch), ListAddBiConsumer.instance()); ParallelFlowable> railSorted = railReduced.map(new SorterFunction(comparator)); @@ -351,7 +356,8 @@ public final Flowable> toSortedList(Comparator comparator, in */ @CheckReturnValue public final ParallelFlowable doOnNext(Consumer onNext) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onNext, "onNext is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, onNext, Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -360,7 +366,7 @@ public final ParallelFlowable doOnNext(Consumer onNext) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -372,7 +378,8 @@ public final ParallelFlowable doOnNext(Consumer onNext) { */ @CheckReturnValue public final ParallelFlowable doAfterNext(Consumer onAfterNext) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), onAfterNext, Functions.emptyConsumer(), @@ -381,7 +388,7 @@ public final ParallelFlowable doAfterNext(Consumer onAfterNext) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -392,7 +399,8 @@ public final ParallelFlowable doAfterNext(Consumer onAfterNext) { */ @CheckReturnValue public final ParallelFlowable doOnError(Consumer onError) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onError, "onError is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), onError, @@ -401,7 +409,7 @@ public final ParallelFlowable doOnError(Consumer onError) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -412,7 +420,8 @@ public final ParallelFlowable doOnError(Consumer onError) { */ @CheckReturnValue public final ParallelFlowable doOnComplete(Action onComplete) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -421,7 +430,7 @@ public final ParallelFlowable doOnComplete(Action onComplete) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -432,7 +441,8 @@ public final ParallelFlowable doOnComplete(Action onComplete) { */ @CheckReturnValue public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -441,7 +451,7 @@ public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -452,7 +462,8 @@ public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { */ @CheckReturnValue public final ParallelFlowable doOnSubscribe(Consumer onSubscribe) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -461,7 +472,7 @@ public final ParallelFlowable doOnSubscribe(Consumer on onSubscribe, Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION - ); + )); } /** @@ -472,7 +483,8 @@ public final ParallelFlowable doOnSubscribe(Consumer on */ @CheckReturnValue public final ParallelFlowable doOnRequest(LongConsumer onRequest) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onRequest, "onRequest is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -481,7 +493,7 @@ public final ParallelFlowable doOnRequest(LongConsumer onRequest) { Functions.emptyConsumer(), onRequest, Functions.EMPTY_ACTION - ); + )); } /** @@ -492,7 +504,8 @@ public final ParallelFlowable doOnRequest(LongConsumer onRequest) { */ @CheckReturnValue public final ParallelFlowable doOnCancel(Action onCancel) { - return new ParallelPeek(this, + ObjectHelper.requireNonNull(onCancel, "onCancel is null"); + return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), @@ -501,7 +514,7 @@ public final ParallelFlowable doOnCancel(Action onCancel) { Functions.emptyConsumer(), Functions.EMPTY_LONG_CONSUMER, onCancel - ); + )); } /** @@ -515,7 +528,9 @@ public final ParallelFlowable doOnCancel(Action onCancel) { */ @CheckReturnValue public final ParallelFlowable collect(Callable collectionSupplier, BiConsumer collector) { - return new ParallelCollect(this, collectionSupplier, collector); + ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null"); + ObjectHelper.requireNonNull(collector, "collector is null"); + return RxJavaPlugins.onAssembly(new ParallelCollect(this, collectionSupplier, collector)); } /** @@ -531,7 +546,7 @@ public static ParallelFlowable fromArray(Publisher... publishers) { if (publishers.length == 0) { throw new IllegalArgumentException("Zero publishers not supported"); } - return new ParallelFromArray(publishers); + return RxJavaPlugins.onAssembly(new ParallelFromArray(publishers)); } /** @@ -562,7 +577,7 @@ public final U to(Function, U> converter) { */ @CheckReturnValue public final ParallelFlowable compose(Function, ParallelFlowable> composer) { - return to(composer); + return RxJavaPlugins.onAssembly(to(composer)); } /** @@ -629,7 +644,10 @@ public final ParallelFlowable flatMap( public final ParallelFlowable flatMap( Function> mapper, boolean delayError, int maxConcurrency, int prefetch) { - return new ParallelFlatMap(this, mapper, delayError, maxConcurrency, prefetch); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelFlatMap(this, mapper, delayError, maxConcurrency, prefetch)); } /** @@ -661,7 +679,9 @@ public final ParallelFlowable concatMap( public final ParallelFlowable concatMap( Function> mapper, int prefetch) { - return new ParallelConcatMap(this, mapper, prefetch, ErrorMode.IMMEDIATE); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelConcatMap(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } /** @@ -697,6 +717,9 @@ public final ParallelFlowable concatMapDelayError( public final ParallelFlowable concatMapDelayError( Function> mapper, int prefetch, boolean tillTheEnd) { - return new ParallelConcatMap(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY); + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelConcatMap( + this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } } diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 5840536e8c..9fd575dd8a 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -12,19 +12,21 @@ */ package io.reactivex.plugins; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.*; + +import org.reactivestreams.Subscriber; + import io.reactivex.*; -import io.reactivex.annotations.*; +import io.reactivex.annotations.Experimental; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; -import org.reactivestreams.Subscriber; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.*; /** * Utility class to inject handlers to certain standard RxJava operations. @@ -71,6 +73,9 @@ public final class RxJavaPlugins { static volatile Function onCompletableAssembly; + @SuppressWarnings("rawtypes") + static volatile Function onParallelAssembly; + @SuppressWarnings("rawtypes") static volatile BiFunction onFlowableSubscribe; @@ -414,6 +419,8 @@ public static void reset() { setOnMaybeAssembly(null); setOnMaybeSubscribe(null); + setOnParallelAssembly(null); + setFailOnNonBlockingScheduler(false); setOnBeforeBlocking(null); } @@ -966,6 +973,48 @@ public static Completable onAssembly(Completable source) { return source; } + /** + * Sets the specific hook function. + * @param handler the hook function to set, null allowed + * @since 2.0.6 - experimental + */ + @Experimental + @SuppressWarnings("rawtypes") + public static void setOnParallelAssembly(Function handler) { + if (lockdown) { + throw new IllegalStateException("Plugins can't be changed anymore"); + } + onParallelAssembly = handler; + } + + /** + * Returns the current hook function. + * @return the hook function, may be null + * @since 2.0.6 - experimental + */ + @Experimental + @SuppressWarnings("rawtypes") + public static Function getOnParallelAssembly() { + return onParallelAssembly; + } + + /** + * Calls the associated hook function. + * @param the value type of the source + * @param source the hook's input value + * @return the value returned by the hook + * @since 2.0.6 - experimental + */ + @Experimental + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static ParallelFlowable onAssembly(ParallelFlowable source) { + Function f = onParallelAssembly; + if (f != null) { + return apply(f, source); + } + return source; + } + /** * Called before an operator attempts a blocking operation * such as awaiting a condition or signal diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 19d2f5a308..06bd6ec6e3 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -29,10 +29,12 @@ import io.reactivex.internal.operators.flowable.FlowableRange; import io.reactivex.internal.operators.maybe.MaybeError; import io.reactivex.internal.operators.observable.ObservableRange; +import io.reactivex.internal.operators.parallel.ParallelFromPublisher; import io.reactivex.internal.operators.single.SingleJust; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscriptions.ScalarSubscription; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; import org.junit.*; import org.reactivestreams.*; @@ -2046,4 +2048,52 @@ public Scheduler apply(Scheduler scheduler) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void onBeforeBlocking() { + try { + RxJavaPlugins.setOnBeforeBlocking(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + throw new IllegalArgumentException(); + } + }); + + try { + RxJavaPlugins.onBeforeBlocking(); + fail("Should have thrown"); + } catch (IllegalArgumentException ex) { + // expected + } + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("rawtypes") + @Test + public void onParallelAssembly() { + try { + RxJavaPlugins.setOnParallelAssembly(new Function() { + @Override + public ParallelFlowable apply(ParallelFlowable pf) throws Exception { + return new ParallelFromPublisher(Flowable.just(2), 2, 2); + } + }); + + Flowable.just(1) + .parallel() + .sequential() + .test() + .assertResult(2); + } finally { + RxJavaPlugins.reset(); + } + + Flowable.just(1) + .parallel() + .sequential() + .test() + .assertResult(1); + } }