Skip to content

Commit

Permalink
Scheduler.inner() for Language and Contrib Modules
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Apr 18, 2014
1 parent 22a5c46 commit 6b25274
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
83 changes: 61 additions & 22 deletions src/main/java/rx/operators/OperationConditionals.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import java.util.Map;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -92,7 +95,7 @@ public static <R> OnSubscribeFunc<R> ifThen(
* the post condition after the source completes
* @return a subscription function.
*/
public static <T> OnSubscribeFunc<T> doWhile(Observable<? extends T> source, Func0<Boolean> postCondition) {
public static <T> OnSubscribe<T> doWhile(Observable<? extends T> source, Func0<Boolean> postCondition) {
return new WhileDoWhile<T>(source, TRUE, postCondition);
}

Expand All @@ -109,7 +112,7 @@ public static <T> OnSubscribeFunc<T> doWhile(Observable<? extends T> source, Fun
* and subscribe to source if it returns {@code true}
* @return a subscription function.
*/
public static <T> OnSubscribeFunc<T> whileDo(Observable<? extends T> source, Func0<Boolean> preCondition) {
public static <T> OnSubscribe<T> whileDo(Observable<? extends T> source, Func0<Boolean> preCondition) {
return new WhileDoWhile<T>(source, preCondition, preCondition);
}

Expand Down Expand Up @@ -209,7 +212,7 @@ public Subscription onSubscribe(Observer<? super R> t1) {
* @param <T>
* the result value type
*/
private static final class WhileDoWhile<T> implements OnSubscribeFunc<T> {
private static final class WhileDoWhile<T> implements OnSubscribe<T> {
final Func0<Boolean> preCondition;
final Func0<Boolean> postCondition;
final Observable<? extends T> source;
Expand All @@ -222,45 +225,62 @@ public WhileDoWhile(Observable<? extends T> source,
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
public void call(Subscriber<? super T> child) {
boolean first;
try {
first = preCondition.call();
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
child.onError(t);
return;
}

SerialSubscription cancel = new SerialSubscription();
final WhileDoWhile<T>.SourceObserver sourceObserver = new SourceObserver(child, cancel);

if (first) {
MultipleAssignmentSubscription ssub = new MultipleAssignmentSubscription();
Subscriber<T> firstSubscription = new Subscriber<T>() {

@Override
public void onCompleted() {
sourceObserver.onCompleted();
}

@Override
public void onError(Throwable e) {
sourceObserver.onError(e);
}

ssub.set(source.subscribe(new SourceObserver(t1, ssub)));
@Override
public void onNext(T t) {
sourceObserver.onNext(t);
}

return ssub;
};
cancel.set(firstSubscription);
source.unsafeSubscribe(firstSubscription);
} else {
t1.onCompleted();
child.onCompleted();
}
return Subscriptions.empty();
}

/** Observe the source. */
final class SourceObserver implements Observer<T> {
final MultipleAssignmentSubscription cancel;
final Observer<? super T> observer;
final Subscriber<? super T> actual;
final SerialSubscription cancel;

public SourceObserver(Observer<? super T> observer, MultipleAssignmentSubscription cancel) {
this.observer = observer;
public SourceObserver(Subscriber<? super T> actual, SerialSubscription cancel) {
this.actual = actual;
this.cancel = cancel;
}

@Override
public void onNext(T args) {
observer.onNext(args);
actual.onNext(args);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
cancel.unsubscribe();
actual.onError(e);
}

@Override
Expand All @@ -269,14 +289,33 @@ public void onCompleted() {
try {
next = postCondition.call();
} catch (Throwable t) {
observer.onError(t);
actual.onError(t);
return;
}
if (next) {
cancel.set(source.subscribe(this));
Subscriber<T> newSubscription = new Subscriber<T>() {

@Override
public void onCompleted() {
SourceObserver.this.onCompleted();
}

@Override
public void onError(Throwable e) {
SourceObserver.this.onError(e);
}

@Override
public void onNext(T t) {
SourceObserver.this.onNext(t);
}

};
cancel.set(newSubscription);
source.unsafeSubscribe(newSubscription);

} else {
observer.onCompleted();
cancel.unsubscribe();
actual.onCompleted();
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/test/java/rx/operators/OperationConditionalsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ <T> void observe(Observable<? extends T> source, T... values) {
<T> void observeSequence(Observable<? extends T> source, Iterable<? extends T> values) {
Observer<T> o = mock(Observer.class);

Subscription s = source.subscribe(new TestObserver<T>(o));
TestObserver<T> testObserver = new TestObserver<T>(o);
Subscription s = source.subscribe(testObserver);

InOrder inOrder = inOrder(o);

Expand Down Expand Up @@ -401,7 +402,7 @@ public Boolean call() {

@Test
public void testDoWhileManyTimes() {
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline());

List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
for (int i = 0; i < numRecursion; i++) {
Expand Down

0 comments on commit 6b25274

Please sign in to comment.