Skip to content

Commit

Permalink
3.x: Improve JavaDocs of Observable and fix similar issues elsewhere (#…
Browse files Browse the repository at this point in the history
…6831)

* 3.x: Javadoc cleanup of Observable

* Another set of cleanups (too many things to fix at once)

* 3.x: Improve JavaDocs of Observable and fix similar issues elsewhere
  • Loading branch information
akarnokd authored Jan 10, 2020
1 parent 6030d83 commit 1e4dbcf
Show file tree
Hide file tree
Showing 19 changed files with 3,846 additions and 3,237 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ public static Completable create(@NonNull CompletableOnSubscribe source) {
* when the {@code Completable} is subscribed to.
* @return the created {@code Completable} instance
* @throws NullPointerException if {@code source} is {@code null}
* @throws IllegalArgumentException if {@code source} is a {@code Completable}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -364,7 +365,7 @@ public static Completable unsafeCreate(@NonNull CompletableSource source) {
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable defer(@NonNull Supplier<? extends CompletableSource> completableSupplier) {
Objects.requireNonNull(completableSupplier, "completableSupplier");
Objects.requireNonNull(completableSupplier, "completableSupplier is null");
return RxJavaPlugins.onAssembly(new CompletableDefer(completableSupplier));
}

Expand Down
228 changes: 123 additions & 105 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ public static <T> Maybe<T> amb(@NonNull Iterable<? extends MaybeSource<? extends
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in the array.
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code sources} is {@code null}
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@SafeVarargs
public static <T> Maybe<T> ambArray(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
}
Expand Down Expand Up @@ -402,6 +404,7 @@ public static <T> Flowable<T> concatArray(@NonNull MaybeSource<? extends T>... s
@SafeVarargs
@NonNull
public static <T> Flowable<T> concatArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return Flowable.empty();
} else
Expand Down Expand Up @@ -1364,6 +1367,7 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
@SafeVarargs
@NonNull
public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return Flowable.empty();
}
Expand Down Expand Up @@ -1764,6 +1768,8 @@ public static Maybe<Long> timer(long delay, @NonNull TimeUnit unit, @NonNull Sch
* @param <T> the value type
* @param onSubscribe the function that is called with the subscribing {@code MaybeObserver}
* @return the new {@code Maybe} instance
* @throws IllegalArgumentException if {@code onSubscribe} is a {@code Maybe}
* @throws NullPointerException if {@code onSubscribe} is {@code null}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -1858,6 +1864,7 @@ public static <T, D> Maybe<T> using(@NonNull Supplier<? extends D> resourceSuppl
* @param <T> the value type
* @param source the source to wrap
* @return the {@code Maybe} wrapper or the source cast to {@code Maybe} (if possible)
* @throws NullPointerException if {@code source} is {@code null}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -3522,7 +3529,7 @@ public final Single<Boolean> isEmpty() {
* if (str.length() &lt; 2) {
* downstream.onSuccess(str);
* } else {
* // Maybe i {@code Maybe} ly expected to produce one of the onXXX events
* // Maybe is expected to produce one of the onXXX events only
* downstream.onComplete();
* }
* }
Expand Down
6,690 changes: 3,593 additions & 3,097 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ public static <T> Single<T> amb(@NonNull Iterable<? extends SingleSource<? exten
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in this array.
* @return the new {@code Single} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
@NonNull
public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return error(SingleInternalHelper.emptyThrower());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ private FlowableBlockingSubscribe() {
/**
* Subscribes to the source and calls the Subscriber methods on the current thread.
* <p>
* @param o the source publisher
* @param source the source publisher
* The cancellation and backpressure is composed through.
* @param subscriber the subscriber to forward events and calls to in the current thread
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T> subscriber) {
public static <T> void subscribe(Publisher<? extends T> source, Subscriber<? super T> subscriber) {
final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();

BlockingSubscriber<T> bs = new BlockingSubscriber<>(queue);

o.subscribe(bs);
source.subscribe(bs);

try {
for (;;) {
Expand Down Expand Up @@ -77,15 +77,15 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* @param o the source publisher
* @param source the source to await
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o) {
public static <T> void subscribe(Publisher<? extends T> source) {
BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver();
LambdaSubscriber<T> ls = new LambdaSubscriber<>(Functions.emptyConsumer(),
callback, callback, Functions.REQUEST_MAX);

o.subscribe(ls);
source.subscribe(ls);

BlockingHelper.awaitForComplete(callback, ls);
Throwable e = callback.error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private ObservableBlockingSubscribe() {
* The call to dispose() is composed through.
* @param observer the subscriber to forward events and calls to in the current thread
* @param <T> the value type
* @throws NullPointerException if {@code observer} is {@code null}
*/
public static <T> void subscribe(ObservableSource<? extends T> o, Observer<? super T> observer) {
final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
Expand Down
33 changes: 19 additions & 14 deletions src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public abstract class ParallelFlowable<@NonNull T> {
*
* @param subscribers the array of Subscribers
* @return true if the number of subscribers equals to the parallelism level
* @throws NullPointerException if {@code subscribers} is {@code null}
* @throws IllegalArgumentException if {@code subscribers.length} is different from {@link #parallelism()}
*/
protected final boolean validate(@NonNull Subscriber<?>[] subscribers) {
Objects.requireNonNull(subscribers, "subscribers is null");
int p = parallelism();
if (subscribers.length != p) {
Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length);
Expand Down Expand Up @@ -156,7 +159,7 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
@BackpressureSupport(BackpressureKind.FULL)
public static <@NonNull T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source,
int parallelism, int prefetch) {
Objects.requireNonNull(source, "source");
Objects.requireNonNull(source, "source is null");
ObjectHelper.verifyPositive(parallelism, "parallelism");
ObjectHelper.verifyPositive(prefetch, "prefetch");

Expand All @@ -183,7 +186,7 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ParallelMap<>(this, mapper));
}

Expand Down Expand Up @@ -212,7 +215,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTry<>(this, mapper, errorHandler));
}
Expand Down Expand Up @@ -243,7 +246,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTry<>(this, mapper, errorHandler));
}
Expand All @@ -267,7 +270,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ParallelFilter<>(this, predicate));
}

Expand Down Expand Up @@ -295,7 +298,7 @@ public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate)
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelFilterTry<>(this, predicate, errorHandler));
}
Expand Down Expand Up @@ -325,7 +328,7 @@ public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate,
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelFilterTry<>(this, predicate, errorHandler));
}
Expand Down Expand Up @@ -401,7 +404,7 @@ public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
Objects.requireNonNull(scheduler, "scheduler");
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelRunOn<>(this, scheduler, prefetch));
}
Expand All @@ -426,7 +429,7 @@ public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetc
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> reduce(@NonNull BiFunction<T, T, T> reducer) {
Objects.requireNonNull(reducer, "reducer");
Objects.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ParallelReduceFull<>(this, reducer));
}

Expand All @@ -453,8 +456,8 @@ public final Flowable<T> reduce(@NonNull BiFunction<T, T, T> reducer) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> ParallelFlowable<R> reduce(@NonNull Supplier<R> initialSupplier, @NonNull BiFunction<R, ? super T, R> reducer) {
Objects.requireNonNull(initialSupplier, "initialSupplier");
Objects.requireNonNull(reducer, "reducer");
Objects.requireNonNull(initialSupplier, "initialSupplier is null");
Objects.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ParallelReduce<>(this, initialSupplier, reducer));
}

Expand Down Expand Up @@ -1024,13 +1027,15 @@ public final <C> ParallelFlowable<C> collect(@NonNull Supplier<? extends C> coll
* @param <T> the value type
* @param publishers the array of publishers
* @return the new ParallelFlowable instance
* @throws IllegalArgumentException if {@code publishers} is an empty array
*/
@CheckReturnValue
@NonNull
@SafeVarargs
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> ParallelFlowable<T> fromArray(@NonNull Publisher<T>... publishers) {
Objects.requireNonNull(publishers, "publishers is null");
if (publishers.length == 0) {
throw new IllegalArgumentException("Zero publishers not supported");
}
Expand Down Expand Up @@ -1427,7 +1432,7 @@ public final <U> ParallelFlowable<U> flatMapIterable(@NonNull Function<? super T
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ParallelMapOptional<>(this, mapper));
}

Expand Down Expand Up @@ -1456,7 +1461,7 @@ public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Op
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler));
}
Expand Down Expand Up @@ -1487,7 +1492,7 @@ public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Op
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void skipLastWithBackpressure() {

}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void skipLastWithNegativeCount() {
Flowable.just("one").skipLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void takeLastWithZeroCount() {
verify(subscriber, times(1)).onComplete();
}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastWithNegativeCount() {
Flowable.just("one").takeLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class FlowableTakeLastTimedTest extends RxJavaTest {

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastTimedWithNegativeCount() {
Flowable.just("one").takeLast(-1, 1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void badCapacityHint() throws Exception {
try {
Observable.concatEager(Arrays.asList(source, source, source), 1, -99);
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
}

}
Expand All @@ -547,7 +547,7 @@ public void mappingBadCapacityHint() throws Exception {
try {
Observable.just(source, source, source).concatMapEager((Function)Functions.identity(), 10, -99);
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void skipLastWithBackpressure() {

}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void skipLastWithNegativeCount() {
Observable.just("one").skipLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void takeLastWithZeroCount() {
verify(observer, times(1)).onComplete();
}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastWithNegativeCount() {
Observable.just("one").takeLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class ObservableTakeLastTimedTest extends RxJavaTest {

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastTimedWithNegativeCount() {
Observable.just("one").takeLast(-1, 1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ public void checkMaybe() throws Exception {
checkSource("Maybe", "io.reactivex.rxjava3.core");
}

@Test
public void checkObservable() throws Exception {
checkSource("Observable", "io.reactivex.rxjava3.core");
}

static void checkSource(String baseClassName, String packageName) throws Exception {
File f = TestHelper.findSource(baseClassName);
File f = TestHelper.findSource(baseClassName, packageName);
if (f == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,5 +990,5 @@ static void backpressureMentionedWithoutAnnotation(StringBuilder e, RxMethod m,
}
}

static final String[] AT_RETURN_WORDS = { "@return a ", "@return the new ", "@return a new " };
static final String[] AT_RETURN_WORDS = { "@return a ", "@return an ", "@return the new ", "@return a new " };
}
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,12 @@ void checkClass(Class<?> clazz) {
error = ex;
}

if (!success && error.getCause() instanceof NullPointerException) {
if (!error.getCause().toString().contains("is null")) {
fail++;
b.append("\r\nNPEs should indicate which argument failed: " + m + " # " + i + " = " + p + ", tag = " + tag + ", params = " + Arrays.toString(callParams2));
}
}
if (success != shouldSucceed) {
fail++;
if (shouldSucceed) {
Expand Down
Loading

0 comments on commit 1e4dbcf

Please sign in to comment.