Skip to content

Commit

Permalink
Operator When
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 29, 2014
1 parent 95e0636 commit bc202f4
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@
import java.util.Map;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscription;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.joins.ActivePlan0;
import rx.joins.JoinObserver;
import rx.joins.Pattern1;
import rx.joins.Pattern2;
import rx.joins.Plan0;
import rx.observers.Subscribers;
import rx.subscriptions.CompositeSubscription;

/**
* Join patterns: And, Then, When.
*/
public class OperationJoinPatterns {
public class OperatorJoinPatterns {
/**
* Creates a pattern that matches when both observable sequences have an available element.
*/
Expand Down Expand Up @@ -68,7 +67,7 @@ public static <T1, R> Plan0<R> then(/* this */Observable<T1> source, Func1<T1, R
/**
* Joins together the results from several patterns.
*/
public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
public static <R> OnSubscribe<R> when(Plan0<R>... plans) {
if (plans == null) {
throw new NullPointerException("plans");
}
Expand All @@ -78,13 +77,13 @@ public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
/**
* Joins together the results from several patterns.
*/
public static <R> OnSubscribeFunc<R> when(final Iterable<? extends Plan0<R>> plans) {
public static <R> OnSubscribe<R> when(final Iterable<? extends Plan0<R>> plans) {
if (plans == null) {
throw new NullPointerException("plans");
}
return new OnSubscribeFunc<R>() {
return new OnSubscribe<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> t1) {
public void call(final Subscriber<? super R> t1) {
final Map<Object, JoinObserver> externalSubscriptions = new HashMap<Object, JoinObserver>();
final Object gate = new Object();
final List<ActivePlan0> activePlans = new ArrayList<ActivePlan0>();
Expand Down Expand Up @@ -122,14 +121,15 @@ public void call(ActivePlan0 activePlan) {
}));
}
} catch (Throwable t) {
return Observable.<R> error(t).unsafeSubscribe(Subscribers.from(t1));
Observable.<R> error(t).unsafeSubscribe(t1);
return;
}
CompositeSubscription group = new CompositeSubscription();
t1.add(group);
for (JoinObserver jo : externalSubscriptions.values()) {
jo.subscribe(gate);
group.add(jo);
}
return group;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import rx.functions.Func1;
import rx.joins.Pattern2;
import rx.joins.Plan0;
import rx.joins.operators.OperationJoinPatterns;
import rx.joins.operators.OperatorJoinPatterns;

public class JoinObservable<T> {

Expand Down Expand Up @@ -32,7 +32,7 @@ public static <T> JoinObservable<T> from(Observable<T> o) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229153.aspx">MSDN: Observable.And</a>
*/
public final <T2> Pattern2<T, T2> and(Observable<T2> right) {
return OperationJoinPatterns.and(o, right);
return OperatorJoinPatterns.and(o, right);
}

/**
Expand All @@ -52,7 +52,7 @@ public final static <R> JoinObservable<R> when(Iterable<? extends Plan0<R>> plan
if (plans == null) {
throw new NullPointerException("plans");
}
return from(Observable.create(OperationJoinPatterns.when(plans)));
return from(Observable.create(OperatorJoinPatterns.when(plans)));
}

/**
Expand All @@ -69,7 +69,7 @@ public final static <R> JoinObservable<R> when(Iterable<? extends Plan0<R>> plan
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229889.aspx">MSDN: Observable.When</a>
*/
public final static <R> JoinObservable<R> when(Plan0<R>... plans) {
return from(Observable.create(OperationJoinPatterns.when(plans)));
return from(Observable.create(OperatorJoinPatterns.when(plans)));
}

/**
Expand All @@ -85,7 +85,7 @@ public final static <R> JoinObservable<R> when(Plan0<R>... plans) {
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1) {
return from(Observable.create(OperationJoinPatterns.when(p1)));
return from(Observable.create(OperatorJoinPatterns.when(p1)));
}

/**
Expand All @@ -103,7 +103,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1) {
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2)));
}

/**
Expand All @@ -123,7 +123,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2) {
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3)));
}

/**
Expand All @@ -145,7 +145,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4)));
}

/**
Expand All @@ -169,7 +169,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5)));
}

/**
Expand All @@ -195,7 +195,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6)));
}

/**
Expand Down Expand Up @@ -223,7 +223,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)));
}

/**
Expand Down Expand Up @@ -253,7 +253,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)));
}

/**
Expand Down Expand Up @@ -285,7 +285,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
*/
@SuppressWarnings("unchecked")
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)));
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)));
}

/**
Expand All @@ -303,7 +303,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211662.aspx">MSDN: Observable.Then</a>
*/
public final <R> Plan0<R> then(Func1<T, R> selector) {
return OperationJoinPatterns.then(o, selector);
return OperatorJoinPatterns.then(o, selector);
}

public Observable<T> toObservable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

public class OperationJoinsTest {
public class OperatorJoinsTest {
@Mock
Observer<Integer> observer;

Expand Down

0 comments on commit bc202f4

Please sign in to comment.