Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add parallel hooks to RxJavaPlugins, add missing params validation #5043

Merged
merged 1 commit into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions src/main/java/io/reactivex/parallel/ParallelFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
ObjectHelper.verifyPositive(parallelism, "parallelism");
ObjectHelper.verifyPositive(prefetch, "prefetch");

return new ParallelFromPublisher<T>(source, parallelism, prefetch);
return RxJavaPlugins.onAssembly(new ParallelFromPublisher<T>(source, parallelism, prefetch));
}

/**
Expand All @@ -132,7 +132,7 @@ public static <T> ParallelFlowable<T> from(Publisher<? extends T> source,
@CheckReturnValue
public final <R> ParallelFlowable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper");
return new ParallelMap<T, R>(this, mapper);
return RxJavaPlugins.onAssembly(new ParallelMap<T, R>(this, mapper));
}

/**
Expand All @@ -145,7 +145,7 @@ public final <R> ParallelFlowable<R> map(Function<? super T, ? extends R> mapper
@CheckReturnValue
public final ParallelFlowable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate");
return new ParallelFilter<T>(this, predicate);
return RxJavaPlugins.onAssembly(new ParallelFilter<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -197,7 +197,7 @@ public final ParallelFlowable<T> runOn(Scheduler scheduler) {
public final ParallelFlowable<T> runOn(Scheduler scheduler, int prefetch) {
ObjectHelper.requireNonNull(scheduler, "scheduler");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return new ParallelRunOn<T>(this, scheduler, prefetch);
return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
}

/**
Expand Down Expand Up @@ -229,7 +229,7 @@ public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {
public final <R> ParallelFlowable<R> reduce(Callable<R> initialSupplier, BiFunction<R, ? super T, R> reducer) {
ObjectHelper.requireNonNull(initialSupplier, "initialSupplier");
ObjectHelper.requireNonNull(reducer, "reducer");
return new ParallelReduce<T, R>(this, initialSupplier, reducer);
return RxJavaPlugins.onAssembly(new ParallelReduce<T, R>(this, initialSupplier, reducer));
}

/**
Expand Down Expand Up @@ -304,6 +304,8 @@ public final Flowable<T> sorted(Comparator<? super T> comparator) {
*/
@CheckReturnValue
public final Flowable<T> sorted(Comparator<? super T> comparator, int capacityHint) {
ObjectHelper.requireNonNull(comparator, "comparator is null");
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
int ch = capacityHint / parallelism() + 1;
ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));
Expand Down Expand Up @@ -334,6 +336,9 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator) {
*/
@CheckReturnValue
public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator, int capacityHint) {
ObjectHelper.requireNonNull(comparator, "comparator is null");
ObjectHelper.verifyPositive(capacityHint, "capacityHint");

int ch = capacityHint / parallelism() + 1;
ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));
Expand All @@ -351,7 +356,8 @@ public final Flowable<List<T>> toSortedList(Comparator<? super T> comparator, in
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onNext, "onNext is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
onNext,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -360,7 +366,7 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -372,7 +378,8 @@ public final ParallelFlowable<T> doOnNext(Consumer<? super T> onNext) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
onAfterNext,
Functions.emptyConsumer(),
Expand All @@ -381,7 +388,7 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -392,7 +399,8 @@ public final ParallelFlowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onError, "onError is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
onError,
Expand All @@ -401,7 +409,7 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -412,7 +420,8 @@ public final ParallelFlowable<T> doOnError(Consumer<Throwable> onError) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnComplete(Action onComplete) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -421,7 +430,7 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -432,7 +441,8 @@ public final ParallelFlowable<T> doOnComplete(Action onComplete) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -441,7 +451,7 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -452,7 +462,8 @@ public final ParallelFlowable<T> doAfterTerminated(Action onAfterTerminate) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -461,7 +472,7 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
onSubscribe,
Functions.EMPTY_LONG_CONSUMER,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -472,7 +483,8 @@ public final ParallelFlowable<T> doOnSubscribe(Consumer<? super Subscription> on
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onRequest, "onRequest is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -481,7 +493,7 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
Functions.emptyConsumer(),
onRequest,
Functions.EMPTY_ACTION
);
));
}

/**
Expand All @@ -492,7 +504,8 @@ public final ParallelFlowable<T> doOnRequest(LongConsumer onRequest) {
*/
@CheckReturnValue
public final ParallelFlowable<T> doOnCancel(Action onCancel) {
return new ParallelPeek<T>(this,
ObjectHelper.requireNonNull(onCancel, "onCancel is null");
return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Expand All @@ -501,7 +514,7 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
onCancel
);
));
}

/**
Expand All @@ -515,7 +528,9 @@ public final ParallelFlowable<T> doOnCancel(Action onCancel) {
*/
@CheckReturnValue
public final <C> ParallelFlowable<C> collect(Callable<? extends C> collectionSupplier, BiConsumer<? super C, ? super T> collector) {
return new ParallelCollect<T, C>(this, collectionSupplier, collector);
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
ObjectHelper.requireNonNull(collector, "collector is null");
return RxJavaPlugins.onAssembly(new ParallelCollect<T, C>(this, collectionSupplier, collector));
}

/**
Expand All @@ -531,7 +546,7 @@ public static <T> ParallelFlowable<T> fromArray(Publisher<T>... publishers) {
if (publishers.length == 0) {
throw new IllegalArgumentException("Zero publishers not supported");
}
return new ParallelFromArray<T>(publishers);
return RxJavaPlugins.onAssembly(new ParallelFromArray<T>(publishers));
}

/**
Expand Down Expand Up @@ -562,7 +577,7 @@ public final <U> U to(Function<? super ParallelFlowable<T>, U> converter) {
*/
@CheckReturnValue
public final <U> ParallelFlowable<U> compose(Function<? super ParallelFlowable<T>, ParallelFlowable<U>> composer) {
return to(composer);
return RxJavaPlugins.onAssembly(to(composer));
}

/**
Expand Down Expand Up @@ -629,7 +644,10 @@ public final <R> ParallelFlowable<R> flatMap(
public final <R> ParallelFlowable<R> flatMap(
Function<? super T, ? extends Publisher<? extends R>> mapper,
boolean delayError, int maxConcurrency, int prefetch) {
return new ParallelFlatMap<T, R>(this, mapper, delayError, maxConcurrency, prefetch);
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelFlatMap<T, R>(this, mapper, delayError, maxConcurrency, prefetch));
}

/**
Expand Down Expand Up @@ -661,7 +679,9 @@ public final <R> ParallelFlowable<R> concatMap(
public final <R> ParallelFlowable<R> concatMap(
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch) {
return new ParallelConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE);
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}

/**
Expand Down Expand Up @@ -697,6 +717,9 @@ public final <R> ParallelFlowable<R> concatMapDelayError(
public final <R> ParallelFlowable<R> concatMapDelayError(
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, boolean tillTheEnd) {
return new ParallelConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY);
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(
this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
}
}
59 changes: 54 additions & 5 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
*/
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.*;

import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.annotations.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.*;

/**
* Utility class to inject handlers to certain standard RxJava operations.
Expand Down Expand Up @@ -71,6 +73,9 @@ public final class RxJavaPlugins {

static volatile Function<Completable, Completable> onCompletableAssembly;

@SuppressWarnings("rawtypes")
static volatile Function<ParallelFlowable, ParallelFlowable> onParallelAssembly;

@SuppressWarnings("rawtypes")
static volatile BiFunction<Flowable, Subscriber, Subscriber> onFlowableSubscribe;

Expand Down Expand Up @@ -414,6 +419,8 @@ public static void reset() {
setOnMaybeAssembly(null);
setOnMaybeSubscribe(null);

setOnParallelAssembly(null);

setFailOnNonBlockingScheduler(false);
setOnBeforeBlocking(null);
}
Expand Down Expand Up @@ -966,6 +973,48 @@ public static Completable onAssembly(Completable source) {
return source;
}

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @since 2.0.6 - experimental
*/
@Experimental
@SuppressWarnings("rawtypes")
public static void setOnParallelAssembly(Function<ParallelFlowable, ParallelFlowable> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
onParallelAssembly = handler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
* @since 2.0.6 - experimental
*/
@Experimental
@SuppressWarnings("rawtypes")
public static Function<ParallelFlowable, ParallelFlowable> getOnParallelAssembly() {
return onParallelAssembly;
}

/**
* Calls the associated hook function.
* @param <T> the value type of the source
* @param source the hook's input value
* @return the value returned by the hook
* @since 2.0.6 - experimental
*/
@Experimental
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> ParallelFlowable<T> onAssembly(ParallelFlowable<T> source) {
Function<ParallelFlowable, ParallelFlowable> f = onParallelAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

/**
* Called before an operator attempts a blocking operation
* such as awaiting a condition or signal
Expand Down
Loading