Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage, cleanup, fixes 10/15-2 #4712

Merged
merged 2 commits into from
Oct 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3034,7 +3034,7 @@ public static Observable<Long> rangeLong(long start, long count) {
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3049,16 +3049,16 @@ public static Observable<Long> rangeLong(long start, long count) {
* the second ObservableSource to compare
* @param <T>
* the type of items emitted by each ObservableSource
* @return an Observable that emits a Boolean value that indicates whether the two sequences are the same
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize());
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3076,18 +3076,18 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
* a function used to compare items emitted by each ObservableSource
* @param <T>
* the type of items emitted by each ObservableSource
* @return an Observable that emits a Boolean value that indicates whether the two ObservableSource two sequences
* @return a Single that emits a Boolean value that indicates whether the two ObservableSource two sequences
* are the same according to the specified function
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
return sequenceEqual(source1, source2, isEqual, bufferSize());
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified
* equality function.
* <p>
Expand All @@ -3112,17 +3112,17 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual, int bufferSize) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(isEqual, "isEqual is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableSequenceEqual<T>(source1, source2, isEqual, bufferSize));
return RxJavaPlugins.onAssembly(new ObservableSequenceEqualSingle<T>(source1, source2, isEqual, bufferSize));
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
Expand All @@ -3139,11 +3139,11 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
* the number of items to prefetch from the first and second source ObservableSource
* @param <T>
* the type of items emitted by each ObservableSource
* @return an Observable that emits a Boolean value that indicates whether the two sequences are the same
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
int bufferSize) {
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize);
}
Expand Down Expand Up @@ -6319,7 +6319,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector) {
public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return ObservableDistinct.withCollection(this, keySelector, collectionSupplier);
return new ObservableDistinct<T, K>(this, keySelector, collectionSupplier);
}

/**
Expand All @@ -6338,7 +6338,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Call
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinctUntilChanged() {
return ObservableDistinct.<T>untilChanged(this);
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
}

/**
Expand All @@ -6362,7 +6362,7 @@ public final Observable<T> distinctUntilChanged() {
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
return ObservableDistinct.untilChanged(this, keySelector);
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void dispose() {

@Override
public boolean isDisposed() {
return cancelled;
Disposable d = resource;
return d != null ? d.isDisposed() : cancelled;
}

void disposeResource() {
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,4 +637,29 @@ public static <T> Function<List<T>, List<T>> listSorter(final Comparator<? super
return new ListSorter<T>(comparator);
}

static final BiPredicate<Object, Object> DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity());

@SuppressWarnings("unchecked")
public static <T> BiPredicate<T, T> equalsPredicate() {
return (BiPredicate<T, T>)DEFAULT_EQUALS_PREDICATE;
}

static final class KeyedEqualsPredicate<T, K> implements BiPredicate<T, T> {
final Function<? super T, K> keySelector;

KeyedEqualsPredicate(Function<? super T, K> keySelector) {
this.keySelector = keySelector;
}

@Override
public boolean test(T t1, T t2) throws Exception {
K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key");
K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key");
return ObjectHelper.equals(k1, k2);
}
}

public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
return new KeyedEqualsPredicate<T, K>(keySelector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

package io.reactivex.internal.operators.observable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.Arrays;
import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.AtomicThrowable;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableCombineLatest<T, R> extends Observable<R> {
Expand Down Expand Up @@ -86,7 +87,7 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp

volatile boolean done;

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final AtomicThrowable errors = new AtomicThrowable();

int active;
int complete;
Expand Down Expand Up @@ -181,7 +182,7 @@ void combine(T value, int index) {
if (value != null && f) {
queue.offer(cs, latest.clone());
} else
if (value == null && error.get() != null) {
if (value == null && errors.get() != null) {
done = true; // if this source completed without a value
}
} else {
Expand Down Expand Up @@ -227,13 +228,6 @@ void drain() {
@SuppressWarnings("unchecked")
T[] array = (T[])q.poll();

if (array == null) {
cancelled = true;
cancel(q);
a.onError(new IllegalStateException("Broken queue?! Sender received but not the array."));
return;
}

R v;
try {
v = ObjectHelper.requireNonNull(combiner.apply(array), "The combiner returned a null");
Expand Down Expand Up @@ -265,7 +259,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
if (delayError) {
if (empty) {
clear(queue);
Throwable e = error.get();
Throwable e = errors.terminate();
if (e != null) {
a.onError(e);
} else {
Expand All @@ -274,10 +268,10 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
return true;
}
} else {
Throwable e = error.get();
Throwable e = errors.get();
if (e != null) {
cancel(q);
a.onError(e);
a.onError(errors.terminate());
return true;
} else
if (empty) {
Expand All @@ -291,26 +285,16 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
}

void onError(Throwable e) {
for (;;) {
Throwable curr = error.get();
if (curr != null) {
CompositeException ce = new CompositeException(curr, e);
e = ce;
}
Throwable next = e;
if (error.compareAndSet(curr, next)) {
return;
}
if (!errors.addThrowable(e)) {
RxJavaPlugins.onError(e);
}
}
}

static final class CombinerObserver<T, R> implements Observer<T>, Disposable {
static final class CombinerObserver<T, R> implements Observer<T> {
final LatestCoordinator<T, R> parent;
final int index;

boolean done;

final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

CombinerObserver(LatestCoordinator<T, R> parent, int index) {
Expand All @@ -325,40 +309,22 @@ public void onSubscribe(Disposable s) {

@Override
public void onNext(T t) {
if (done) {
return;
}
parent.combine(t, index);
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
parent.onError(t);
done = true;
parent.combine(null, index);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
parent.combine(null, index);
}

@Override
public void dispose() {
DisposableHelper.dispose(s);
}

@Override
public boolean isDisposed() {
return s.get() == DisposableHelper.DISPOSED;
}
}
}
Loading