From bc202f40b820c0804ddbf6feaaee4a5d33602826 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 29 Apr 2014 10:59:29 +0200 Subject: [PATCH] Operator When --- ...atterns.java => OperatorJoinPatterns.java} | 20 ++++++------- .../java/rx/observables/JoinObservable.java | 28 +++++++++---------- ...nJoinsTest.java => OperatorJoinsTest.java} | 2 +- 3 files changed, 25 insertions(+), 25 deletions(-) rename rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/{OperationJoinPatterns.java => OperatorJoinPatterns.java} (88%) rename rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/{OperationJoinsTest.java => OperatorJoinsTest.java} (99%) diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperationJoinPatterns.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java similarity index 88% rename from rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperationJoinPatterns.java rename to rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java index 81ea84aee1..1079796f03 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperationJoinPatterns.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java @@ -22,9 +22,9 @@ import java.util.Map; import rx.Observable; -import rx.Observable.OnSubscribeFunc; +import rx.Observable.OnSubscribe; import rx.Observer; -import rx.Subscription; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.joins.ActivePlan0; @@ -32,13 +32,12 @@ import rx.joins.Pattern1; import rx.joins.Pattern2; import rx.joins.Plan0; -import rx.observers.Subscribers; import rx.subscriptions.CompositeSubscription; /** * Join patterns: And, Then, When. */ -public class OperationJoinPatterns { +public class OperatorJoinPatterns { /** * Creates a pattern that matches when both observable sequences have an available element. */ @@ -68,7 +67,7 @@ public static Plan0 then(/* this */Observable source, Func1 OnSubscribeFunc when(Plan0... plans) { + public static OnSubscribe when(Plan0... plans) { if (plans == null) { throw new NullPointerException("plans"); } @@ -78,13 +77,13 @@ public static OnSubscribeFunc when(Plan0... plans) { /** * Joins together the results from several patterns. */ - public static OnSubscribeFunc when(final Iterable> plans) { + public static OnSubscribe when(final Iterable> plans) { if (plans == null) { throw new NullPointerException("plans"); } - return new OnSubscribeFunc() { + return new OnSubscribe() { @Override - public Subscription onSubscribe(final Observer t1) { + public void call(final Subscriber t1) { final Map externalSubscriptions = new HashMap(); final Object gate = new Object(); final List activePlans = new ArrayList(); @@ -122,14 +121,15 @@ public void call(ActivePlan0 activePlan) { })); } } catch (Throwable t) { - return Observable. error(t).unsafeSubscribe(Subscribers.from(t1)); + Observable. error(t).unsafeSubscribe(t1); + return; } CompositeSubscription group = new CompositeSubscription(); + t1.add(group); for (JoinObserver jo : externalSubscriptions.values()) { jo.subscribe(gate); group.add(jo); } - return group; } }; } diff --git a/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java b/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java index d79b1f2204..2faefd2d4c 100644 --- a/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java +++ b/rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java @@ -4,7 +4,7 @@ import rx.functions.Func1; import rx.joins.Pattern2; import rx.joins.Plan0; -import rx.joins.operators.OperationJoinPatterns; +import rx.joins.operators.OperatorJoinPatterns; public class JoinObservable { @@ -32,7 +32,7 @@ public static JoinObservable from(Observable o) { * @see MSDN: Observable.And */ public final Pattern2 and(Observable right) { - return OperationJoinPatterns.and(o, right); + return OperatorJoinPatterns.and(o, right); } /** @@ -52,7 +52,7 @@ public final static JoinObservable when(Iterable> plan if (plans == null) { throw new NullPointerException("plans"); } - return from(Observable.create(OperationJoinPatterns.when(plans))); + return from(Observable.create(OperatorJoinPatterns.when(plans))); } /** @@ -69,7 +69,7 @@ public final static JoinObservable when(Iterable> plan * @see MSDN: Observable.When */ public final static JoinObservable when(Plan0... plans) { - return from(Observable.create(OperationJoinPatterns.when(plans))); + return from(Observable.create(OperatorJoinPatterns.when(plans))); } /** @@ -85,7 +85,7 @@ public final static JoinObservable when(Plan0... plans) { */ @SuppressWarnings("unchecked") public final static JoinObservable when(Plan0 p1) { - return from(Observable.create(OperationJoinPatterns.when(p1))); + return from(Observable.create(OperatorJoinPatterns.when(p1))); } /** @@ -103,7 +103,7 @@ public final static JoinObservable when(Plan0 p1) { */ @SuppressWarnings("unchecked") public final static JoinObservable when(Plan0 p1, Plan0 p2) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2))); } /** @@ -123,7 +123,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2) { */ @SuppressWarnings("unchecked") public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3))); } /** @@ -145,7 +145,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4))); } /** @@ -169,7 +169,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5))); } /** @@ -195,7 +195,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6))); } /** @@ -223,7 +223,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7))); } /** @@ -253,7 +253,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8))); } /** @@ -285,7 +285,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0 JoinObservable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8, Plan0 p9) { - return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9))); + return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9))); } /** @@ -303,7 +303,7 @@ public final static JoinObservable when(Plan0 p1, Plan0 p2, Plan0MSDN: Observable.Then */ public final Plan0 then(Func1 selector) { - return OperationJoinPatterns.then(o, selector); + return OperatorJoinPatterns.then(o, selector); } public Observable toObservable() { diff --git a/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperationJoinsTest.java b/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java similarity index 99% rename from rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperationJoinsTest.java rename to rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java index 3b5fac3050..b6b5dfe5a3 100644 --- a/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperationJoinsTest.java +++ b/rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java @@ -40,7 +40,7 @@ import rx.observers.TestSubscriber; import rx.subjects.PublishSubject; -public class OperationJoinsTest { +public class OperatorJoinsTest { @Mock Observer observer;