diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 5553074d74..161530ffa5 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -3034,7 +3034,7 @@ public static Observable rangeLong(long start, long count) { } /** - * Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the + * Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the * same by comparing the items emitted by each ObservableSource pairwise. *

* @@ -3049,16 +3049,16 @@ public static Observable rangeLong(long start, long count) { * the second ObservableSource to compare * @param * the type of items emitted by each ObservableSource - * @return an Observable that emits a Boolean value that indicates whether the two sequences are the same + * @return a Single that emits a Boolean value that indicates whether the two sequences are the same * @see ReactiveX operators documentation: SequenceEqual */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable sequenceEqual(ObservableSource source1, ObservableSource source2) { + public static Single sequenceEqual(ObservableSource source1, ObservableSource source2) { return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize()); } /** - * Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the + * Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the * same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified * equality function. *

@@ -3076,18 +3076,18 @@ public static Observable sequenceEqual(ObservableSource * the type of items emitted by each ObservableSource - * @return an Observable that emits a Boolean value that indicates whether the two ObservableSource two sequences + * @return a Single that emits a Boolean value that indicates whether the two ObservableSource two sequences * are the same according to the specified function * @see ReactiveX operators documentation: SequenceEqual */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable sequenceEqual(ObservableSource source1, ObservableSource source2, + public static Single sequenceEqual(ObservableSource source1, ObservableSource source2, BiPredicate isEqual) { return sequenceEqual(source1, source2, isEqual, bufferSize()); } /** - * Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the + * Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the * same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified * equality function. *

@@ -3112,17 +3112,17 @@ public static Observable sequenceEqual(ObservableSourceReactiveX operators documentation: SequenceEqual */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable sequenceEqual(ObservableSource source1, ObservableSource source2, + public static Single sequenceEqual(ObservableSource source1, ObservableSource source2, BiPredicate isEqual, int bufferSize) { ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(isEqual, "isEqual is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new ObservableSequenceEqual(source1, source2, isEqual, bufferSize)); + return RxJavaPlugins.onAssembly(new ObservableSequenceEqualSingle(source1, source2, isEqual, bufferSize)); } /** - * Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the + * Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the * same by comparing the items emitted by each ObservableSource pairwise. *

* @@ -3139,11 +3139,11 @@ public static Observable sequenceEqual(ObservableSource * the type of items emitted by each ObservableSource - * @return an Observable that emits a Boolean value that indicates whether the two sequences are the same + * @return a Single that emits a Boolean value that indicates whether the two sequences are the same * @see ReactiveX operators documentation: SequenceEqual */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable sequenceEqual(ObservableSource source1, ObservableSource source2, + public static Single sequenceEqual(ObservableSource source1, ObservableSource source2, int bufferSize) { return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize); } @@ -6319,7 +6319,7 @@ public final Observable distinct(Function keySelector) { public final Observable distinct(Function keySelector, Callable> collectionSupplier) { ObjectHelper.requireNonNull(keySelector, "keySelector is null"); ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null"); - return ObservableDistinct.withCollection(this, keySelector, collectionSupplier); + return new ObservableDistinct(this, keySelector, collectionSupplier); } /** @@ -6338,7 +6338,7 @@ public final Observable distinct(Function keySelector, Call */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable distinctUntilChanged() { - return ObservableDistinct.untilChanged(this); + return new ObservableDistinctUntilChanged(this, Functions.equalsPredicate()); } /** @@ -6362,7 +6362,7 @@ public final Observable distinctUntilChanged() { @SchedulerSupport(SchedulerSupport.NONE) public final Observable distinctUntilChanged(Function keySelector) { ObjectHelper.requireNonNull(keySelector, "keySelector is null"); - return ObservableDistinct.untilChanged(this, keySelector); + return new ObservableDistinctUntilChanged(this, Functions.equalsPredicate(keySelector)); } /** diff --git a/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java b/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java index a2716d4c3c..864038e4da 100644 --- a/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java +++ b/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java @@ -54,7 +54,8 @@ public void dispose() { @Override public boolean isDisposed() { - return cancelled; + Disposable d = resource; + return d != null ? d.isDisposed() : cancelled; } void disposeResource() { diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index 60fe109c92..2c8d6bb7f2 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -637,4 +637,29 @@ public static Function, List> listSorter(final Comparator(comparator); } + static final BiPredicate DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity()); + + @SuppressWarnings("unchecked") + public static BiPredicate equalsPredicate() { + return (BiPredicate)DEFAULT_EQUALS_PREDICATE; + } + + static final class KeyedEqualsPredicate implements BiPredicate { + final Function keySelector; + + KeyedEqualsPredicate(Function keySelector) { + this.keySelector = keySelector; + } + + @Override + public boolean test(T t1, T t2) throws Exception { + K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key"); + K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key"); + return ObjectHelper.equals(k1, k2); + } + } + + public static BiPredicate equalsPredicate(Function keySelector) { + return new KeyedEqualsPredicate(keySelector); + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index 441a68531f..bb61777cd3 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -13,16 +13,17 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.Arrays; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; +import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableCombineLatest extends Observable { @@ -86,7 +87,7 @@ static final class LatestCoordinator extends AtomicInteger implements Disp volatile boolean done; - final AtomicReference error = new AtomicReference(); + final AtomicThrowable errors = new AtomicThrowable(); int active; int complete; @@ -181,7 +182,7 @@ void combine(T value, int index) { if (value != null && f) { queue.offer(cs, latest.clone()); } else - if (value == null && error.get() != null) { + if (value == null && errors.get() != null) { done = true; // if this source completed without a value } } else { @@ -227,13 +228,6 @@ void drain() { @SuppressWarnings("unchecked") T[] array = (T[])q.poll(); - if (array == null) { - cancelled = true; - cancel(q); - a.onError(new IllegalStateException("Broken queue?! Sender received but not the array.")); - return; - } - R v; try { v = ObjectHelper.requireNonNull(combiner.apply(array), "The combiner returned a null"); @@ -265,7 +259,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, SpscLinkedArray if (delayError) { if (empty) { clear(queue); - Throwable e = error.get(); + Throwable e = errors.terminate(); if (e != null) { a.onError(e); } else { @@ -274,10 +268,10 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, SpscLinkedArray return true; } } else { - Throwable e = error.get(); + Throwable e = errors.get(); if (e != null) { cancel(q); - a.onError(e); + a.onError(errors.terminate()); return true; } else if (empty) { @@ -291,26 +285,16 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, SpscLinkedArray } void onError(Throwable e) { - for (;;) { - Throwable curr = error.get(); - if (curr != null) { - CompositeException ce = new CompositeException(curr, e); - e = ce; - } - Throwable next = e; - if (error.compareAndSet(curr, next)) { - return; - } + if (!errors.addThrowable(e)) { + RxJavaPlugins.onError(e); } } } - static final class CombinerObserver implements Observer, Disposable { + static final class CombinerObserver implements Observer { final LatestCoordinator parent; final int index; - boolean done; - final AtomicReference s = new AtomicReference(); CombinerObserver(LatestCoordinator parent, int index) { @@ -325,40 +309,22 @@ public void onSubscribe(Disposable s) { @Override public void onNext(T t) { - if (done) { - return; - } parent.combine(t, index); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } parent.onError(t); - done = true; parent.combine(null, index); } @Override public void onComplete() { - if (done) { - return; - } - done = true; parent.combine(null, index); } - @Override public void dispose() { DisposableHelper.dispose(s); } - - @Override - public boolean isDisposed() { - return s.get() == DisposableHelper.DISPOSED; - } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java index d3f0b72886..b9d9fcf7ba 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java @@ -18,186 +18,122 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; -import io.reactivex.functions.*; -import io.reactivex.internal.disposables.*; -import io.reactivex.internal.functions.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.observers.BasicFuseableObserver; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableDistinct extends AbstractObservableWithUpstream { + final Function keySelector; - final Callable> predicateSupplier; - public ObservableDistinct(ObservableSource source, Function keySelector, Callable> predicateSupplier) { + final Callable> collectionSupplier; + + public ObservableDistinct(ObservableSource source, Function keySelector, Callable> collectionSupplier) { super(source); - this.predicateSupplier = predicateSupplier; this.keySelector = keySelector; + this.collectionSupplier = collectionSupplier; } - public static ObservableDistinct withCollection(ObservableSource source, final Function keySelector, final Callable> collectionSupplier) { - Callable> p = new Callable>() { - @Override - public Predicate call() throws Exception { - final Collection coll = collectionSupplier.call(); - - return new Predicate() { - @Override - public boolean test(K t) { - if (t == null) { - coll.clear(); - return true; - } - return coll.add(t); - } - }; - } - }; - - return new ObservableDistinct(source, keySelector, p); - } - - public static ObservableDistinct untilChanged(ObservableSource source) { - Callable> p = new Callable>() { - @Override - public Predicate call() { - final Object[] last = { null }; - - return new Predicate() { - @Override - public boolean test(T t) { - if (t == null) { - last[0] = null; - return true; - } - Object o = last[0]; - last[0] = t; - return !ObjectHelper.equals(o, t); - } - }; - } - }; - return new ObservableDistinct(source, Functions.identity(), p); - } - - public static ObservableDistinct untilChanged(ObservableSource source, Function keySelector) { - Callable> p = new Callable>() { - @Override - public Predicate call() { - final Object[] last = { null }; - - return new Predicate() { - @Override - public boolean test(K t) { - if (t == null) { - last[0] = null; - return true; - } - Object o = last[0]; - last[0] = t; - return !ObjectHelper.equals(o, t); - } - }; - } - }; - return new ObservableDistinct(source, keySelector, p); - } - - @Override - public void subscribeActual(Observer t) { - Predicate coll; + protected void subscribeActual(Observer observer) { + Collection collection; + try { - coll = ObjectHelper.requireNonNull(predicateSupplier.call(), "predicateSupplier returned null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - EmptyDisposable.error(e, t); + collection = collectionSupplier.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); return; } - source.subscribe(new DistinctObserver(t, keySelector, coll)); + source.subscribe(new DistinctObserver(observer, keySelector, collection)); } - static final class DistinctObserver implements Observer, Disposable { - final Observer actual; - final Predicate predicate; + static final class DistinctObserver extends BasicFuseableObserver { + + final Collection collection; + final Function keySelector; - Disposable s; + Disposable d; + + SimpleQueue queue; - DistinctObserver(Observer actual, Function keySelector, Predicate predicate) { - this.actual = actual; + DistinctObserver(Observer actual, Function keySelector, Collection collection) { + super(actual); this.keySelector = keySelector; - this.predicate = predicate; + this.collection = collection; } @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); + public void onNext(T value) { + if (done) { + return; + } + if (sourceMode == NONE) { + K key; + boolean b; + + try { + key = ObjectHelper.requireNonNull(keySelector.apply(value), "The keySelector returned a null key"); + b = collection.add(key); + } catch (Throwable ex) { + fail(ex); + return; + } + + if (b) { + actual.onNext(value); + } + } else { + actual.onNext(null); } } - @Override - public void dispose() { - s.dispose(); + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + } else { + done = true; + collection.clear(); + actual.onError(e); + } } @Override - public boolean isDisposed() { - return s.isDisposed(); + public void onComplete() { + if (!done) { + done = true; + collection.clear(); + actual.onComplete(); + } } @Override - public void onNext(T t) { - K key; - - try { - key = ObjectHelper.requireNonNull(keySelector.apply(t), "Null key supplied"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.dispose(); - actual.onError(e); - return; - } - - boolean b; - try { - b = predicate.test(key); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.dispose(); - actual.onError(e); - return; - } - - if (b) { - actual.onNext(t); - } + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); } @Override - public void onError(Throwable t) { - try { - predicate.test(null); // special case: poison pill - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - actual.onError(new CompositeException(t, e)); - return; + public T poll() throws Exception { + for (;;) { + T v = qs.poll(); + + if (v == null || collection.add(ObjectHelper.requireNonNull(keySelector.apply(v), "The keySelector returned a null key"))) { + return v; + } } - actual.onError(t); } @Override - public void onComplete() { - try { - predicate.test(null); // special case: poison pill - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - actual.onError(e); - return; - } - actual.onComplete(); + public void clear() { + collection.clear(); + super.clear(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index c12a421d9e..3682ad3a41 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -23,6 +23,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.util.*; @@ -109,7 +110,6 @@ public void onSubscribe(Disposable s) { } } - @SuppressWarnings("unchecked") @Override public void onNext(T t) { // safeguard against misbehaving sources @@ -118,34 +118,51 @@ public void onNext(T t) { } ObservableSource p; try { - p = mapper.apply(t); + p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); + s.dispose(); onError(e); return; } - if (p instanceof Callable) { - tryEmitScalar(((Callable)p)); - } else { - if (maxConcurrency == Integer.MAX_VALUE) { - subscribeInner(p); - } else { - synchronized (this) { - if (wip == maxConcurrency) { - sources.offer(p); - return; - } - wip++; + + if (maxConcurrency != Integer.MAX_VALUE) { + synchronized (this) { + if (wip == maxConcurrency) { + sources.offer(p); + return; } - subscribeInner(p); + wip++; } } + + subscribeInner(p); } + @SuppressWarnings("unchecked") void subscribeInner(ObservableSource p) { - InnerObserver inner = new InnerObserver(this, uniqueId++); - addInner(inner); - p.subscribe(inner); + for (;;) { + if (p instanceof Callable) { + tryEmitScalar(((Callable)p)); + + if (maxConcurrency != Integer.MAX_VALUE) { + synchronized (this) { + p = sources.poll(); + if (p == null) { + wip--; + break; + } + } + } else { + break; + } + } else { + InnerObserver inner = new InnerObserver(this, uniqueId++); + addInner(inner); + p.subscribe(inner); + break; + } + } } void addInner(InnerObserver inner) { @@ -246,7 +263,7 @@ void tryEmitScalar(Callable value) { SimpleQueue getInnerQueue(InnerObserver inner) { SimpleQueue q = inner.queue; if (q == null) { - q = new SpscArrayQueue(bufferSize); + q = new SpscLinkedArrayQueue(bufferSize); inner.queue = q; } return q; @@ -264,10 +281,7 @@ void tryEmit(U value, InnerObserver inner) { q = new SpscLinkedArrayQueue(bufferSize); inner.queue = q; } - if (!q.offer(value)) { - onError(new MissingBackpressureException("Inner queue full?!")); - return; - } + q.offer(value); if (getAndIncrement() != 0) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 6788a85f5f..eb9d7a6f98 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -23,8 +23,11 @@ /** * Helper utility class to support Observable with inner classes. */ -public enum ObservableInternalHelper { - ; +public final class ObservableInternalHelper { + + private ObservableInternalHelper() { + throw new IllegalStateException("No instances!"); + } static final class SimpleGenerator implements BiFunction, S> { final Consumer> consumer; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java index d6a4c9bf31..ef227d6f47 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqual.java @@ -236,10 +236,7 @@ public void onSubscribe(Disposable s) { @Override public void onNext(T t) { - if (!queue.offer(t)) { - onError(new IllegalStateException("Queue full?!")); - return; - } + queue.offer(t); parent.drain(); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualSingle.java new file mode 100644 index 0000000000..256beadbbc --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualSingle.java @@ -0,0 +1,260 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiPredicate; +import io.reactivex.internal.disposables.ArrayCompositeDisposable; +import io.reactivex.internal.fuseable.FuseToObservable; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.plugins.RxJavaPlugins; + +public final class ObservableSequenceEqualSingle extends Single implements FuseToObservable { + final ObservableSource first; + final ObservableSource second; + final BiPredicate comparer; + final int bufferSize; + + public ObservableSequenceEqualSingle(ObservableSource first, ObservableSource second, + BiPredicate comparer, int bufferSize) { + this.first = first; + this.second = second; + this.comparer = comparer; + this.bufferSize = bufferSize; + } + + @Override + public void subscribeActual(SingleObserver s) { + EqualCoordinator ec = new EqualCoordinator(s, bufferSize, first, second, comparer); + s.onSubscribe(ec); + ec.subscribe(); + } + + @Override + public Observable fuseToObservable() { + return RxJavaPlugins.onAssembly(new ObservableSequenceEqual(first, second, comparer, bufferSize)); + } + + static final class EqualCoordinator extends AtomicInteger implements Disposable { + + private static final long serialVersionUID = -6178010334400373240L; + final SingleObserver actual; + final BiPredicate comparer; + final ArrayCompositeDisposable resources; + final ObservableSource first; + final ObservableSource second; + final EqualObserver[] observers; + + volatile boolean cancelled; + + T v1; + + T v2; + + EqualCoordinator(SingleObserver actual, int bufferSize, + ObservableSource first, ObservableSource second, + BiPredicate comparer) { + this.actual = actual; + this.first = first; + this.second = second; + this.comparer = comparer; + @SuppressWarnings("unchecked") + EqualObserver[] as = new EqualObserver[2]; + this.observers = as; + as[0] = new EqualObserver(this, 0, bufferSize); + as[1] = new EqualObserver(this, 1, bufferSize); + this.resources = new ArrayCompositeDisposable(2); + } + + boolean setDisposable(Disposable s, int index) { + return resources.setResource(index, s); + } + + void subscribe() { + EqualObserver[] as = observers; + first.subscribe(as[0]); + second.subscribe(as[1]); + } + + @Override + public void dispose() { + if (!cancelled) { + cancelled = true; + resources.dispose(); + + if (getAndIncrement() == 0) { + EqualObserver[] as = observers; + as[0].queue.clear(); + as[1].queue.clear(); + } + } + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void cancel(SpscLinkedArrayQueue q1, SpscLinkedArrayQueue q2) { + cancelled = true; + q1.clear(); + q2.clear(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + EqualObserver[] as = observers; + + final EqualObserver s1 = as[0]; + final SpscLinkedArrayQueue q1 = s1.queue; + final EqualObserver s2 = as[1]; + final SpscLinkedArrayQueue q2 = s2.queue; + + for (;;) { + + for (;;) { + if (cancelled) { + q1.clear(); + q2.clear(); + return; + } + + boolean d1 = s1.done; + + if (d1) { + Throwable e = s1.error; + if (e != null) { + cancel(q1, q2); + + actual.onError(e); + return; + } + } + + boolean d2 = s2.done; + if (d2) { + Throwable e = s2.error; + if (e != null) { + cancel(q1, q2); + + actual.onError(e); + return; + } + } + + if (v1 == null) { + v1 = q1.poll(); + } + boolean e1 = v1 == null; + + if (v2 == null) { + v2 = q2.poll(); + } + boolean e2 = v2 == null; + + if (d1 && d2 && e1 && e2) { + actual.onSuccess(true); + return; + } + if ((d1 && d2) && (e1 != e2)) { + cancel(q1, q2); + + actual.onSuccess(false); + return; + } + + if (!e1 && !e2) { + boolean c; + + try { + c = comparer.test(v1, v2); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(q1, q2); + + actual.onError(ex); + return; + } + + if (!c) { + cancel(q1, q2); + + actual.onSuccess(false); + return; + } + + v1 = null; + v2 = null; + } + + if (e1 || e2) { + break; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + } + + static final class EqualObserver implements Observer { + final EqualCoordinator parent; + final SpscLinkedArrayQueue queue; + final int index; + + volatile boolean done; + Throwable error; + + EqualObserver(EqualCoordinator parent, int index, int bufferSize) { + this.parent = parent; + this.index = index; + this.queue = new SpscLinkedArrayQueue(bufferSize); + } + + @Override + public void onSubscribe(Disposable s) { + parent.setDisposable(s, index); + } + + @Override + public void onNext(T t) { + queue.offer(t); + parent.drain(); + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + parent.drain(); + } + + @Override + public void onComplete() { + done = true; + parent.drain(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java index c1f34feba3..af6798c817 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import io.reactivex.*; @@ -21,7 +20,8 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.queue.*; import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; @@ -144,11 +144,10 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; + if (!done) { + done = true; + drain(); } - done = true; - drain(); } @Override @@ -219,7 +218,7 @@ void drain() { SwitchMapInnerObserver inner = active.get(); if (inner != null) { - SpscArrayQueue q = inner.queue; + SpscLinkedArrayQueue q = inner.queue; if (inner.done) { boolean empty = q.isEmpty(); @@ -252,17 +251,7 @@ void drain() { break; } - boolean d = inner.done; - R v = q.poll(); - boolean empty = v == null; - - if (d) { - if (delayErrors || empty) { - active.compareAndSet(inner, null); - retry = true; - break; - } - + if (!delayErrors) { Throwable ex = errors.get(); if (ex != null) { a.onError(errors.terminate()); @@ -270,6 +259,16 @@ void drain() { } } + boolean d = inner.done; + R v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + active.compareAndSet(inner, null); + retry = true; + break; + } + if (empty) { break; } @@ -307,32 +306,25 @@ static final class SwitchMapInnerObserver extends AtomicReference parent; final long index; - final SpscArrayQueue queue; + final SpscLinkedArrayQueue queue; volatile boolean done; SwitchMapInnerObserver(SwitchMapObserver parent, long index, int bufferSize) { this.parent = parent; this.index = index; - this.queue = new SpscArrayQueue(bufferSize); + this.queue = new SpscLinkedArrayQueue(bufferSize); } @Override public void onSubscribe(Disposable s) { - if (index == parent.unique) { - DisposableHelper.setOnce(this, s); - } else { - s.dispose(); - } + DisposableHelper.setOnce(this, s); } @Override public void onNext(R t) { if (index == parent.unique) { - if (!queue.offer(t)) { - onError(new IllegalStateException("Queue full?!")); - return; - } + queue.offer(t); parent.drain(); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeInterval.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeInterval.java index 3a29ef0382..0f9976bb1d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeInterval.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeInterval.java @@ -53,6 +53,7 @@ static final class TimeIntervalObserver implements Observer, Disposable { @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { + this.s = s; lastTime = scheduler.now(unit); actual.onSubscribe(this); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index a0db7da851..e5a4b6b5f6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -30,6 +30,17 @@ public final class ObservableTimeoutTimed extends AbstractObservableWithUpstr final Scheduler scheduler; final ObservableSource other; + + static final Disposable NEW_TIMER = new Disposable() { + @Override + public void dispose() { } + + @Override + public boolean isDisposed() { + return true; + } + }; + public ObservableTimeoutTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource other) { super(source); @@ -52,7 +63,10 @@ public void subscribeActual(Observer t) { } } - static final class TimeoutTimedOtherObserver implements Observer, Disposable { + static final class TimeoutTimedOtherObserver + extends AtomicReference implements Observer, Disposable { + private static final long serialVersionUID = -4619702551964128179L; + final Observer actual; final long timeout; final TimeUnit unit; @@ -63,18 +77,6 @@ static final class TimeoutTimedOtherObserver implements Observer, Disposab final ObserverFullArbiter arbiter; - final AtomicReference timer = new AtomicReference(); - - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override - public boolean isDisposed() { - return true; - } - }; - volatile long index; volatile boolean done; @@ -116,33 +118,28 @@ public void onNext(T t) { } void scheduleTimeout(final long idx) { - Disposable d = timer.get(); + Disposable d = get(); if (d != null) { d.dispose(); } - if (timer.compareAndSet(d, NEW_TIMER)) { + if (compareAndSet(d, NEW_TIMER)) { d = worker.schedule(new Runnable() { @Override public void run() { if (idx == index) { done = true; s.dispose(); - DisposableHelper.dispose(timer); - worker.dispose(); + DisposableHelper.dispose(TimeoutTimedOtherObserver.this); - if (other == null) { - actual.onError(new TimeoutException()); - } else { - subscribeNext(); - } + subscribeNext(); + + worker.dispose(); } } }, timeout, unit); - if (!timer.compareAndSet(NEW_TIMER, d)) { - d.dispose(); - } + DisposableHelper.replace(this, d); } } @@ -158,7 +155,7 @@ public void onError(Throwable t) { } done = true; worker.dispose(); - DisposableHelper.dispose(timer); + DisposableHelper.dispose(this); arbiter.onError(t, s); } @@ -169,23 +166,27 @@ public void onComplete() { } done = true; worker.dispose(); - DisposableHelper.dispose(timer); + DisposableHelper.dispose(this); arbiter.onComplete(s); } @Override public void dispose() { worker.dispose(); - DisposableHelper.dispose(timer); + DisposableHelper.dispose(this); } @Override public boolean isDisposed() { - return worker.isDisposed(); + return DisposableHelper.isDisposed(get()); } } - static final class TimeoutTimedObserver implements Observer, Disposable { + static final class TimeoutTimedObserver + extends AtomicReference + implements Observer, Disposable { + private static final long serialVersionUID = -8387234228317808253L; + final Observer actual; final long timeout; final TimeUnit unit; @@ -193,18 +194,6 @@ static final class TimeoutTimedObserver implements Observer, Disposable { Disposable s; - final AtomicReference timer = new AtomicReference(); - - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override - public boolean isDisposed() { - return true; - } - }; - volatile long index; volatile boolean done; @@ -240,27 +229,28 @@ public void onNext(T t) { } void scheduleTimeout(final long idx) { - Disposable d = timer.get(); + Disposable d = get(); if (d != null) { d.dispose(); } - if (timer.compareAndSet(d, NEW_TIMER)) { + if (compareAndSet(d, NEW_TIMER)) { d = worker.schedule(new Runnable() { @Override public void run() { if (idx == index) { done = true; - dispose(); + DisposableHelper.dispose(TimeoutTimedObserver.this); + s.dispose(); actual.onError(new TimeoutException()); + + worker.dispose(); } } }, timeout, unit); - if (!timer.compareAndSet(NEW_TIMER, d)) { - d.dispose(); - } + DisposableHelper.replace(this, d); } } @@ -290,13 +280,13 @@ public void onComplete() { @Override public void dispose() { worker.dispose(); - DisposableHelper.dispose(timer); + DisposableHelper.dispose(this); s.dispose(); } @Override public boolean isDisposed() { - return worker.isDisposed(); + return DisposableHelper.isDisposed(get()); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableToListSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableToListSingle.java index 9c4d5180a1..4a38df5729 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableToListSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableToListSingle.java @@ -22,6 +22,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.FuseToObservable; import io.reactivex.plugins.RxJavaPlugins; @@ -32,15 +33,10 @@ public final class ObservableToListSingle> final Callable collectionSupplier; + @SuppressWarnings({ "unchecked", "rawtypes" }) public ObservableToListSingle(ObservableSource source, final int defaultCapacityHint) { this.source = source; - this.collectionSupplier = new Callable() { - @Override - @SuppressWarnings("unchecked") - public U call() throws Exception { - return (U)new ArrayList(defaultCapacityHint); - } - }; + this.collectionSupplier = (Callable)Functions.createArrayList(defaultCapacityHint); } public ObservableToListSingle(ObservableSource source, Callable collectionSupplier) { diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java index b79711e279..2d9e1aeaf8 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java @@ -1,9 +1,16 @@ /** - * Concatenate values of each MaybeSource provided in an array and delays - * any errors till the very end. + * Copyright 2016 Netflix, Inc. * - * @param the value type + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. */ + package io.reactivex.internal.operators.maybe; import java.io.IOException; diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java index ba7f612cb5..acb32970ef 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -26,10 +27,11 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -860,4 +862,158 @@ public void combineLatestEmpty() { public void combineLatestDelayErrorEmpty() { assertSame(Observable.empty(), Observable.combineLatestDelayError(new ObservableSource[0], Functions.identity(), 16)); } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.combineLatest(Observable.never(), Observable.never(), new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + })); + } + + @Test + public void cancelWhileSubscribing() { + final TestObserver to = new TestObserver(); + + Observable.combineLatest( + Observable.just(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + to.cancel(); + } + }), + Observable.never(), + new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .subscribe(to); + } + + @Test + public void combineAsync() { + Observable source = Observable.range(1, 1000).subscribeOn(Schedulers.computation()); + + Observable.combineLatest(source, source, new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .take(500) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void error() { + Observable.combineLatest(Observable.never(), Observable.error(new TestException()), new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void errorDelayed() { + Observable.combineLatestDelayError( + new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return a; + } + }, + 128, + Observable.error(new TestException()), + Observable.just(1) + ) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void errorDelayed2() { + Observable.combineLatestDelayError( + new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return a; + } + }, + 128, + Observable.error(new TestException()).startWith(1), + Observable.empty() + ) + .test() + .assertFailure(TestException.class); + } + + @Test + public void onErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestObserver to = Observable.combineLatest(ps1, ps2, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + }).test(); + + final TestException ex1 = new TestException(); + final TestException ex2 = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + if (to.errorCount() != 0) { + if (to.errors().get(0) instanceof CompositeException) { + to.assertSubscribed() + .assertNotComplete() + .assertNoValues(); + + for (Throwable e : TestHelper.errorList(to)) { + assertTrue(e.toString(), e instanceof TestException); + } + + } else { + to.assertFailure(TestException.class); + } + } + + for (Throwable e : errors) { + assertTrue(e.toString(), e instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCountTest.java new file mode 100644 index 0000000000..348a2a610b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCountTest.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +public class ObservableCountTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).count()); + + TestHelper.checkDisposed(Observable.just(1).count().toObservable()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.count().toObservable(); + } + }); + + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.count(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctTest.java index b1c991668c..2f57dc97aa 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctTest.java @@ -13,13 +13,27 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.util.*; +import java.util.concurrent.Callable; + import org.junit.*; import org.mockito.InOrder; -import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.UnicastSubject; public class ObservableDistinctTest { @@ -119,4 +133,109 @@ public void testDistinctOfSourceWithExceptionsFromKeySelector() { inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, never()).onComplete(); } + + @Test + public void error() { + Observable.error(new TestException()) + .distinct() + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedSync() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Observable.just(1, 1, 2, 1, 3, 2, 4, 5, 4) + .distinct() + .subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.SYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void fusedAsync() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + UnicastSubject us = UnicastSubject.create(); + + us + .distinct() + .subscribe(to); + + TestHelper.emit(us, 1, 1, 2, 1, 3, 2, 4, 5, 4); + + ObserverFusion.assertFusion(to, QueueDisposable.ASYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void fusedClear() { + Observable.just(1, 1, 2, 1, 3, 2, 4, 5, 4) + .distinct() + .subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + QueueDisposable qd = (QueueDisposable)d; + + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + } + + @Override + public void onNext(Integer value) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void collectionSupplierThrows() { + Observable.just(1) + .distinct(Functions.identity(), new Callable>() { + @Override + public Collection call() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onComplete(); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .distinct() + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index ccfd2cdfff..5d39b0c6ed 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -30,6 +30,7 @@ import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; public class ObservableFlatMapTest { @Test @@ -608,4 +609,108 @@ public ObservableSource apply(Object v) throws Exception { } })); } + + @Test + public void mergeScalar() { + Observable.merge(Observable.just(Observable.just(1))) + .test() + .assertResult(1); + } + + @Test + public void mergeScalar2() { + Observable.merge(Observable.just(Observable.just(1)).hide()) + .test() + .assertResult(1); + } + + @Test + public void mergeScalarEmpty() { + Observable.merge(Observable.just(Observable.empty()).hide()) + .test() + .assertResult(); + } + + @Test + public void mergeScalarError() { + Observable.merge(Observable.just(Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + })).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void scalarReentrant() { + final PublishSubject> ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(Observable.just(2)); + } + } + }; + + Observable.merge(ps) + .subscribe(to); + + ps.onNext(Observable.just(1)); + ps.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void scalarReentrant2() { + final PublishSubject> ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + ps.onNext(Observable.just(2)); + } + } + }; + + Observable.merge(ps, 2) + .subscribe(to); + + ps.onNext(Observable.just(1)); + ps.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void innerCompleteCancelRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Observable.merge(Observable.just(ps)).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromIterableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromIterableTest.java index 7a78076e40..90bcff729a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromIterableTest.java @@ -13,7 +13,8 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -25,7 +26,11 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.internal.util.CrashingIterable; import io.reactivex.observers.*; public class ObservableFromIterableTest { @@ -243,4 +248,105 @@ public ObservableSource apply(Integer v) { to.assertNoErrors(); to.assertComplete(); } + + @Test + public void iteratorThrows() { + Observable.fromIterable(new CrashingIterable(1, 100, 100)) + .test() + .assertFailureAndMessage(TestException.class, "iterator()"); + } + + @Test + public void hasNext2Throws() { + Observable.fromIterable(new CrashingIterable(100, 2, 100)) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()", 0); + } + + @Test + public void hasNextCancels() { + final TestObserver to = new TestObserver(); + + Observable.fromIterable(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + + @Override + public boolean hasNext() { + if (++count == 2) { + to.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }) + .subscribe(to); + + to.assertValue(1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void fusionRejected() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ASYNC); + + Observable.fromIterable(Arrays.asList(1, 2, 3)) + .subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.NONE) + .assertResult(1, 2, 3); + } + + @Test + public void fusionClear() { + Observable.fromIterable(Arrays.asList(1, 2, 3)) + .subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable)d; + + qd.requestFusion(QueueDisposable.ANY); + + try { + assertEquals(1, qd.poll().intValue()); + } catch (Throwable ex) { + fail(ex.toString()); + } + + qd.clear(); + try { + assertNull(qd.poll()); + } catch (Throwable ex) { + fail(ex.toString()); + } + } + + @Override + public void onNext(Integer value) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java new file mode 100644 index 0000000000..1c769bcbba --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java @@ -0,0 +1,40 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.TestHelper; + +public class ObservableInternalHelperTest { + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(ObservableInternalHelper.class); + } + + @Test + public void enums() { + assertNotNull(ObservableInternalHelper.MapToInt.values()[0]); + assertNotNull(ObservableInternalHelper.MapToInt.valueOf("INSTANCE")); + + assertNotNull(ObservableInternalHelper.ErrorMapperFilter.values()[0]); + assertNotNull(ObservableInternalHelper.ErrorMapperFilter.valueOf("INSTANCE")); + } + + @Test + public void mapToInt() throws Exception { + assertEquals(0, ObservableInternalHelper.MapToInt.INSTANCE.apply(null)); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java index 9dfa0abbec..ed20ed08b2 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java @@ -299,7 +299,14 @@ public void lastOrErrorError() { @Test public void dispose() { + TestHelper.checkDisposed(Observable.never().lastElement().toObservable()); TestHelper.checkDisposed(Observable.never().lastElement()); + + TestHelper.checkDisposed(Observable.just(1).lastOrError().toObservable()); + TestHelper.checkDisposed(Observable.just(1).lastOrError()); + + TestHelper.checkDisposed(Observable.just(1).last(2).toObservable()); + TestHelper.checkDisposed(Observable.just(1).last(2)); } @Test @@ -310,6 +317,38 @@ public MaybeSource apply(Observable o) throws Exception { return o.lastElement(); } }); + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.lastElement().toObservable(); + } + }); + + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.lastOrError(); + } + }); + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.lastOrError().toObservable(); + } + }); + + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.last(2); + } + }); + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.last(2).toObservable(); + } + }); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualTest.java index 452d9bf6b8..ca2af6dc14 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSequenceEqualTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.*; import org.junit.*; @@ -21,12 +22,159 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.BiPredicate; +import io.reactivex.observers.TestObserver; +import io.reactivex.subjects.PublishSubject; public class ObservableSequenceEqualTest { @Test - public void test1() { + public void test1Observable() { + Observable o = Observable.sequenceEqual( + Observable.just("one", "two", "three"), + Observable.just("one", "two", "three")).toObservable(); + verifyResult(o, true); + } + + @Test + public void test2Observable() { + Observable o = Observable.sequenceEqual( + Observable.just("one", "two", "three"), + Observable.just("one", "two", "three", "four")).toObservable(); + verifyResult(o, false); + } + + @Test + public void test3Observable() { + Observable o = Observable.sequenceEqual( + Observable.just("one", "two", "three", "four"), + Observable.just("one", "two", "three")).toObservable(); + verifyResult(o, false); + } + + @Test + public void testWithError1Observable() { + Observable o = Observable.sequenceEqual( + Observable.concat(Observable.just("one"), + Observable. error(new TestException())), + Observable.just("one", "two", "three")).toObservable(); + verifyError(o); + } + + @Test + public void testWithError2Observable() { + Observable o = Observable.sequenceEqual( + Observable.just("one", "two", "three"), + Observable.concat(Observable.just("one"), + Observable. error(new TestException()))).toObservable(); + verifyError(o); + } + + @Test + public void testWithError3Observable() { + Observable o = Observable.sequenceEqual( + Observable.concat(Observable.just("one"), + Observable. error(new TestException())), + Observable.concat(Observable.just("one"), + Observable. error(new TestException()))).toObservable(); + verifyError(o); + } + + @Test + public void testWithEmpty1Observable() { + Observable o = Observable.sequenceEqual( + Observable. empty(), + Observable.just("one", "two", "three")).toObservable(); + verifyResult(o, false); + } + + @Test + public void testWithEmpty2Observable() { + Observable o = Observable.sequenceEqual( + Observable.just("one", "two", "three"), + Observable. empty()).toObservable(); + verifyResult(o, false); + } + + @Test + public void testWithEmpty3Observable() { + Observable o = Observable.sequenceEqual( + Observable. empty(), Observable. empty()).toObservable(); + verifyResult(o, true); + } + + @Test + @Ignore("Null values not allowed") + public void testWithNull1Observable() { + Observable o = Observable.sequenceEqual( + Observable.just((String) null), Observable.just("one")).toObservable(); + verifyResult(o, false); + } + + @Test + @Ignore("Null values not allowed") + public void testWithNull2Observable() { + Observable o = Observable.sequenceEqual( + Observable.just((String) null), Observable.just((String) null)).toObservable(); + verifyResult(o, true); + } + + @Test + public void testWithEqualityErrorObservable() { Observable o = Observable.sequenceEqual( + Observable.just("one"), Observable.just("one"), + new BiPredicate() { + @Override + public boolean test(String t1, String t2) { + throw new TestException(); + } + }).toObservable(); + verifyError(o); + } + + private void verifyResult(Single o, boolean result) { + SingleObserver observer = TestHelper.mockSingleObserver(); + + o.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onSuccess(result); + inOrder.verifyNoMoreInteractions(); + } + + private void verifyError(Observable observable) { + Observer observer = TestHelper.mockObserver(); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError(isA(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void verifyError(Single observable) { + SingleObserver observer = TestHelper.mockSingleObserver(); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError(isA(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void prefetchObservable() { + Observable.sequenceEqual(Observable.range(1, 20), Observable.range(1, 20), 2) + .toObservable() + .test() + .assertResult(true); + } + + @Test + public void disposedObservable() { + TestHelper.checkDisposed(Observable.sequenceEqual(Observable.just(1), Observable.just(2)).toObservable()); + } + + @Test + public void test1() { + Single o = Observable.sequenceEqual( Observable.just("one", "two", "three"), Observable.just("one", "two", "three")); verifyResult(o, true); @@ -34,7 +182,7 @@ public void test1() { @Test public void test2() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just("one", "two", "three"), Observable.just("one", "two", "three", "four")); verifyResult(o, false); @@ -42,7 +190,7 @@ public void test2() { @Test public void test3() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just("one", "two", "three", "four"), Observable.just("one", "two", "three")); verifyResult(o, false); @@ -50,7 +198,7 @@ public void test3() { @Test public void testWithError1() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.concat(Observable.just("one"), Observable. error(new TestException())), Observable.just("one", "two", "three")); @@ -59,7 +207,7 @@ Observable. error(new TestException())), @Test public void testWithError2() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just("one", "two", "three"), Observable.concat(Observable.just("one"), Observable. error(new TestException()))); @@ -68,7 +216,7 @@ public void testWithError2() { @Test public void testWithError3() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.concat(Observable.just("one"), Observable. error(new TestException())), Observable.concat(Observable.just("one"), @@ -78,7 +226,7 @@ Observable. error(new TestException())), @Test public void testWithEmpty1() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable. empty(), Observable.just("one", "two", "three")); verifyResult(o, false); @@ -86,7 +234,7 @@ Observable. empty(), @Test public void testWithEmpty2() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just("one", "two", "three"), Observable. empty()); verifyResult(o, false); @@ -94,7 +242,7 @@ public void testWithEmpty2() { @Test public void testWithEmpty3() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable. empty(), Observable. empty()); verifyResult(o, true); } @@ -102,7 +250,7 @@ public void testWithEmpty3() { @Test @Ignore("Null values not allowed") public void testWithNull1() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just((String) null), Observable.just("one")); verifyResult(o, false); } @@ -110,14 +258,14 @@ public void testWithNull1() { @Test @Ignore("Null values not allowed") public void testWithNull2() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just((String) null), Observable.just((String) null)); verifyResult(o, true); } @Test public void testWithEqualityError() { - Observable o = Observable.sequenceEqual( + Single o = Observable.sequenceEqual( Observable.just("one"), Observable.just("one"), new BiPredicate() { @Override @@ -139,19 +287,84 @@ private void verifyResult(Observable o, boolean result) { inOrder.verifyNoMoreInteractions(); } - private void verifyError(Observable observable) { - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onError(isA(TestException.class)); - inOrder.verifyNoMoreInteractions(); - } - @Test public void prefetch() { Observable.sequenceEqual(Observable.range(1, 20), Observable.range(1, 20), 2) .test() .assertResult(true); } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.sequenceEqual(Observable.just(1), Observable.just(2))); + } + + @Test + public void simpleInequal() { + Observable.sequenceEqual(Observable.just(1), Observable.just(2)) + .test() + .assertResult(false); + } + + @Test + public void simpleInequalObservable() { + Observable.sequenceEqual(Observable.just(1), Observable.just(2)) + .toObservable() + .test() + .assertResult(false); + } + + @Test + public void onNextCancelRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Observable.sequenceEqual(Observable.never(), ps).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } + + @Test + public void onNextCancelRaceObservable() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Observable.sequenceEqual(Observable.never(), ps).toObservable().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index 744d107a56..287bfd325f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -30,6 +30,7 @@ import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -622,4 +623,268 @@ public void switchMapInnerCancelled() { assertFalse(pp.hasObservers()); } + @Test + public void scalarMap() { + Observable.switchOnNext(Observable.just(Observable.just(1))) + .test() + .assertResult(1); + } + + @Test + public void scalarMapDelayError() { + Observable.switchOnNextDelayError(Observable.just(Observable.just(1))) + .test() + .assertResult(1); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.switchOnNext( + Observable.just(Observable.just(1)).hide())); + } + + @Test + public void nextSourceErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + ps1.switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + if (v == 1) { + return ps2; + } + return Observable.never(); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onNext(2); + } + }; + + final TestException ex = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + for (Throwable e : errors) { + assertTrue(e.toString(), e instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void outerInnerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + ps1.switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + if (v == 1) { + return ps2; + } + return Observable.never(); + } + }) + .test(); + + final TestException ex1 = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex1); + } + }; + + final TestException ex2 = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex2); + } + }; + + TestHelper.race(r1, r2); + + for (Throwable e : errors) { + assertTrue(e.toString(), e instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void nextCancelRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps1 = PublishSubject.create(); + + final TestObserver to = ps1.switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.never(); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void mapperThrows() { + Observable.just(1).hide() + .switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badMainSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .switchMap(Functions.justFunction(Observable.never())) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void emptyInner() { + Observable.range(1, 5) + .switchMap(Functions.justFunction(Observable.empty())) + .test() + .assertResult(); + } + + @Test + public void justInner() { + Observable.range(1, 5) + .switchMap(Functions.justFunction(Observable.just(1))) + .test() + .assertResult(1, 1, 1, 1, 1); + } + + @Test + public void badInnerSource() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.just(1).hide() + .switchMap(Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException()); + observer.onComplete(); + observer.onError(new TestException()); + observer.onComplete(); + } + })) + .test() + .assertFailure(TestException.class); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerCompletesReentrant() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ps.onComplete(); + } + }; + + Observable.just(1).hide() + .switchMap(Functions.justFunction(ps)) + .subscribe(to); + + ps.onNext(1); + + to.assertResult(1); + } + + @Test + public void innerErrorsReentrant() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ps.onError(new TestException()); + } + }; + + Observable.just(1).hide() + .switchMap(Functions.justFunction(ps)) + .subscribe(to); + + ps.onNext(1); + + to.assertFailure(TestException.class, 1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeIntervalTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeIntervalTest.java index 170b7ff192..37da8fc54c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeIntervalTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeIntervalTest.java @@ -21,6 +21,7 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; @@ -121,4 +122,17 @@ public Long apply(Timed v) throws Exception { } } + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).timeInterval()); + } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Observable.error(new TestException()) + .timeInterval() + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java index 88b1984fa0..20fd73d63b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java @@ -13,11 +13,12 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.io.IOException; +import java.util.List; import java.util.concurrent.*; import org.junit.*; @@ -25,7 +26,9 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -381,4 +384,101 @@ public void timedAndOther() { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().timeout(1, TimeUnit.DAYS)); + + TestHelper.checkDisposed(PublishSubject.create().timeout(1, TimeUnit.DAYS, Observable.just(1))); + } + + @Test + public void timedErrorOther() { + Observable.error(new TestException()) + .timeout(1, TimeUnit.DAYS, Observable.just(1)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void timedError() { + Observable.error(new TestException()) + .timeout(1, TimeUnit.DAYS) + .test() + .assertFailure(TestException.class); + } + + @Test + public void timedEmptyOther() { + Observable.empty() + .timeout(1, TimeUnit.DAYS, Observable.just(1)) + .test() + .assertResult(); + } + + @Test + public void timedEmpty() { + Observable.empty() + .timeout(1, TimeUnit.DAYS) + .test() + .assertResult(); + } + + @Test + public void newTimer() { + ObservableTimeoutTimed.NEW_TIMER.dispose(); + assertTrue(ObservableTimeoutTimed.NEW_TIMER.isDisposed()); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onComplete(); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .timeout(1, TimeUnit.DAYS) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSourceOther() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onComplete(); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .timeout(1, TimeUnit.DAYS, Observable.just(3)) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java index ee626aa2fc..8dce479ca0 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableToListTest.java @@ -188,6 +188,8 @@ public void capacityHint() { @Test public void dispose() { TestHelper.checkDisposed(Observable.just(1).toList().toObservable()); + + TestHelper.checkDisposed(Observable.just(1).toList()); } @SuppressWarnings("unchecked") @@ -200,6 +202,15 @@ public void error() { .assertFailure(TestException.class); } + @SuppressWarnings("unchecked") + @Test + public void errorSingle() { + Observable.error(new TestException()) + .toList() + .test() + .assertFailure(TestException.class); + } + @SuppressWarnings("unchecked") @Test public void collectionSupplierThrows() { @@ -214,4 +225,18 @@ public Collection call() throws Exception { .test() .assertFailure(TestException.class); } + + @SuppressWarnings("unchecked") + @Test + public void singleCollectionSupplierThrows() { + Observable.just(1) + .toList(new Callable>() { + @Override + public Collection call() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 2dea75f9db..8aedbb83a8 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -1259,7 +1259,7 @@ public void distinctUntilChangedBiPredicateNull() { @Test(expected = NullPointerException.class) public void distinctUntilChangedFunctionReturnsNull() { - just1.distinctUntilChanged(new Function() { + Observable.range(1, 2).distinctUntilChanged(new Function() { @Override public Object apply(Integer v) { return null; diff --git a/src/test/java/io/reactivex/observers/SerializedObserverTest.java b/src/test/java/io/reactivex/observers/SerializedObserverTest.java index 36adb73211..1290298482 100644 --- a/src/test/java/io/reactivex/observers/SerializedObserverTest.java +++ b/src/test/java/io/reactivex/observers/SerializedObserverTest.java @@ -1181,38 +1181,48 @@ public void startOnce() { @Test public void onCompleteOnErrorRace() { for (int i = 0; i < 500; i++) { - TestObserver ts = new TestObserver(); - final SerializedObserver so = new SerializedObserver(ts); + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver ts = new TestObserver(); - Disposable d = Disposables.empty(); + final SerializedObserver so = new SerializedObserver(ts); - so.onSubscribe(d); + Disposable d = Disposables.empty(); - final Throwable ex = new TestException(); + so.onSubscribe(d); - Runnable r1 = new Runnable() { - @Override - public void run() { - so.onError(ex); - } - }; + final Throwable ex = new TestException(); - Runnable r2 = new Runnable() { - @Override - public void run() { - so.onComplete(); - } - }; + Runnable r1 = new Runnable() { + @Override + public void run() { + so.onError(ex); + } + }; - TestHelper.race(r1, r2, Schedulers.single()); + Runnable r2 = new Runnable() { + @Override + public void run() { + so.onComplete(); + } + }; - ts.awaitDone(5, TimeUnit.SECONDS); + TestHelper.race(r1, r2, Schedulers.single()); - if (ts.completions() != 0) { - ts.assertResult(); - } else { - ts.assertFailure(TestException.class).assertError(ex); + ts.awaitDone(5, TimeUnit.SECONDS); + + if (ts.completions() != 0) { + ts.assertResult(); + } else { + ts.assertFailure(TestException.class).assertError(ex); + } + + for (Throwable e : errors) { + assertTrue(e.toString(), e instanceof TestException); + } + } finally { + RxJavaPlugins.reset(); } }