From 84309b65799ad69ee3b24957e4acfc3e1877dbe3 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 25 Jan 2014 22:18:45 -0800 Subject: [PATCH 1/4] Reimplement Zip Operator Using Lift - Use new lift operator implement and non-blocking synchronization approach. - I have had the concurrency model reviewed by some colleagues and all unit tests are passing but further review is justified and welcome. --- rxjava-core/src/main/java/rx/Observable.java | 44 ++-- .../main/java/rx/observers/TestObserver.java | 9 + .../main/java/rx/operators/OperatorZip.java | 236 ++++++++++++++++++ .../rx/operators/OperatorZipPerformance.java | 34 ++- ...ationZipTest.java => OperatorZipTest.java} | 196 ++++++++++++++- 5 files changed, 479 insertions(+), 40 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorZip.java rename rxjava-core/src/test/java/rx/operators/{OperationZipTest.java => OperatorZipTest.java} (83%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 872460b43d..101c412340 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -97,7 +97,7 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; -import rx.operators.OperationZip; +import rx.operators.OperatorZip; import rx.operators.OperatorCast; import rx.operators.OperatorFromIterable; import rx.operators.OperatorGroupBy; @@ -1645,11 +1645,9 @@ public final static Observable interval(long interval, TimeUnit unit, Sche * the type of that item * @return an Observable that emits {@code value} as a single item and then completes * @see RxJava Wiki: just() - * @deprecated Use {@link #from(T)} */ - @Deprecated public final static Observable just(T value) { - return from(Arrays.asList((value))); + return from(Arrays.asList(value)); } /** @@ -3058,7 +3056,11 @@ public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3 * @see RxJava Wiki: zip() */ public final static Observable zip(Iterable> ws, FuncN zipFunction) { - return create(OperationZip.zip(ws, zipFunction)); + List> os = new ArrayList>(); + for (Observable o : ws) { + os.add(o); + } + return Observable.just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction)); } /** @@ -3087,12 +3089,14 @@ public final static Observable zip(Iterable> ws, * @see RxJava Wiki: zip() */ public final static Observable zip(Observable> ws, final FuncN zipFunction) { - return ws.toList().mergeMap(new Func1>, Observable>() { + return ws.toList().map(new Func1>, Observable[]>() { + @Override - public final Observable call(List> wsList) { - return create(OperationZip.zip(wsList, zipFunction)); + public Observable[] call(List> o) { + return o.toArray(new Observable[o.size()]); } - }); + + }).lift(new OperatorZip(zipFunction)); } /** @@ -3118,8 +3122,8 @@ public final Observable call(List> wsList) { * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ - public final static Observable zip(Observable o1, Observable o2, Func2 zipFunction) { - return create(OperationZip.zip(o1, o2, zipFunction)); + public final static Observable zip(Observable o1, Observable o2, final Func2 zipFunction) { + return just(new Observable[] { o1, o2 }).lift(new OperatorZip(zipFunction)); } /** @@ -3149,7 +3153,7 @@ public final static Observable zip(Observable o1, O * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Func3 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, zipFunction)); + return just(new Observable[] { o1, o2, o3 }).lift(new OperatorZip(zipFunction)); } /** @@ -3181,7 +3185,7 @@ public final static Observable zip(Observable o * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Func4 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4 }).lift(new OperatorZip(zipFunction)); } /** @@ -3215,7 +3219,7 @@ public final static Observable zip(ObservableRxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip(zipFunction)); } /** @@ -3251,7 +3255,7 @@ public final static Observable zip(Observable Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Func6 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip(zipFunction)); } /** @@ -3289,7 +3293,7 @@ public final static Observable zip(Observable Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Func7 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip(zipFunction)); } /** @@ -3329,7 +3333,7 @@ public final static Observable zip(Observable */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Func8 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip(zipFunction)); } /** @@ -3371,7 +3375,7 @@ public final static Observable zip(Observ */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9, Func9 zipFunction) { - return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction)); + return just(new Observable[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip(zipFunction)); } /** @@ -8403,7 +8407,9 @@ public final Observable> window(Observable boundary) { * @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs */ public final Observable zip(Iterable other, Func2 zipFunction) { - return create(OperationZip.zipIterable(this, other, zipFunction)); +// return create(OperatorZip.zipIterable(this, other, zipFunction)); + // TODO fix this + return null; } /** diff --git a/rxjava-core/src/main/java/rx/observers/TestObserver.java b/rxjava-core/src/main/java/rx/observers/TestObserver.java index bbc667a546..a6198ada5b 100644 --- a/rxjava-core/src/main/java/rx/observers/TestObserver.java +++ b/rxjava-core/src/main/java/rx/observers/TestObserver.java @@ -16,6 +16,7 @@ package rx.observers; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -73,6 +74,14 @@ public List getOnNextEvents() { return Collections.unmodifiableList(onNextEvents); } + public List getEvents() { + ArrayList events = new ArrayList(); + events.add(onNextEvents); + events.add(onErrorEvents); + events.add(onCompletedEvents); + return Collections.unmodifiableList(events); + } + public void assertReceivedOnNext(List items) { if (onNextEvents.size() != items.size()) { throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + onNextEvents.size()); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorZip.java b/rxjava-core/src/main/java/rx/operators/OperatorZip.java new file mode 100644 index 0000000000..00af87f6c4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorZip.java @@ -0,0 +1,236 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; +import rx.util.functions.FuncN; +import rx.util.functions.Functions; + +/** + * Returns an Observable that emits the results of a function applied to sets of items emitted, in + * sequence, by two or more other Observables. + *

+ * + *

+ * The zip operation applies this function in strict sequence, so the first item emitted by the new + * Observable will be the result of the function applied to the first item emitted by each zipped + * Observable; the second item emitted by the new Observable will be the result of the function + * applied to the second item emitted by each zipped Observable; and so forth. + *

+ * The resulting Observable returned from zip will invoke onNext as many times as the + * number of onNext invocations of the source Observable that emits the fewest items. + */ +public final class OperatorZip implements Operator[]> { + /* + * Raw types are used so we can use a single implementation for all arities such as zip(t1, t2) and zip(t1, t2, t3) etc. + * The types will be cast on the edges so usage will be the type-safe but the internals are not. + */ + + final FuncN zipFunction; + + public OperatorZip(FuncN f) { + this.zipFunction = f; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func2 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func3 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func4 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func5 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func6 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func7 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func8 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorZip(Func9 f) { + this.zipFunction = Functions.fromFunc(f); + } + + @SuppressWarnings("rawtypes") + @Override + public Subscriber call(final Subscriber observer) { + return new Subscriber(observer) { + + @Override + public void onCompleted() { + // we only complete once a child Observable completes or errors + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(Observable[] observables) { + new Zip(observables, observer, zipFunction).zip(); + } + + }; + } + + private static class Zip { + @SuppressWarnings("rawtypes") + final Observable[] os; + final Object[] observers; + final Observer observer; + final FuncN zipFunction; + final CompositeSubscription childSubscription = new CompositeSubscription(); + + @SuppressWarnings("rawtypes") + public Zip(Observable[] os, final Subscriber observer, FuncN zipFunction) { + this.os = os; + this.observer = observer; + this.zipFunction = zipFunction; + observers = new Object[os.length]; + for (int i = 0; i < os.length; i++) { + InnerObserver io = new InnerObserver(); + observers[i] = io; + childSubscription.add(io); + } + + observer.add(childSubscription); + } + + @SuppressWarnings("unchecked") + public void zip() { + for (int i = 0; i < os.length; i++) { + os[i].subscribe((InnerObserver) observers[i]); + } + } + + final AtomicLong counter = new AtomicLong(0); + + /** + * check if we have values for each and emit if we do + * + * This will only allow one thread at a time to do the work, but ensures via `counter` increment/decrement + * that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn. + * + */ + @SuppressWarnings("unchecked") + void tick() { + if (counter.getAndIncrement() == 0) { + do { + Object[] vs = new Object[observers.length]; + boolean allHaveValues = true; + for (int i = 0; i < observers.length; i++) { + vs[i] = ((InnerObserver) observers[i]).items.peek(); + if (vs[i] instanceof Notification) { + observer.onCompleted(); + // we need to unsubscribe from all children since children are independently subscribed + childSubscription.unsubscribe(); + return; + } + if (vs[i] == null) { + allHaveValues = false; + // we continue as there may be an onCompleted on one of the others + continue; + } + } + if (allHaveValues) { + // all have something so emit + observer.onNext(zipFunction.call(vs)); + // now remove them + for (int i = 0; i < observers.length; i++) { + ((InnerObserver) observers[i]).items.poll(); + // eagerly check if the next item on this queue is an onComplete + if (((InnerObserver) observers[i]).items.peek() instanceof Notification) { + // it is an onComplete so shut down + observer.onCompleted(); + // we need to unsubscribe from all children since children are independently subscribed + childSubscription.unsubscribe(); + return; + } + } + } + } while (counter.decrementAndGet() > 0); + } + + } + + // used to observe each Observable we are zipping together + // it collects all items in an internal queue + @SuppressWarnings("rawtypes") + final class InnerObserver extends Subscriber { + // Concurrent* since we need to read it from across threads + final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue(); + + @SuppressWarnings("unchecked") + @Override + public void onCompleted() { + items.add(Notification.createOnCompleted()); + tick(); + } + + @Override + public void onError(Throwable e) { + // emit error and shut down + observer.onError(e); + } + + @SuppressWarnings("unchecked") + @Override + public void onNext(Object t) { + // TODO use a placeholder for NULL, such as Notification(null) + items.add(t); + tick(); + } + }; + } + +} diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java index 3974bb0005..2f22b81303 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java @@ -3,6 +3,7 @@ import rx.Observable; import rx.perf.AbstractPerformanceTester; import rx.perf.IntegerSumObserver; +import rx.perf.LongSumObserver; import rx.util.functions.Action0; import rx.util.functions.Func2; @@ -17,7 +18,7 @@ public static void main(String args[]) { @Override public void call() { spt.timeZipAandBwithSingleItems(); - // spt.timeZipAandBwith100Items(); + // spt.timeZipAandBwith100Items(); } }); } catch (Exception e) { @@ -42,6 +43,14 @@ public void call() { * Run: 12 - 1,196,630 ops/sec * Run: 13 - 1,206,332 ops/sec * Run: 14 - 1,206,169 ops/sec + * + * ... after v0.17 work (new implementation): + * + * Run: 10 - 1,668,248 ops/sec + * Run: 11 - 1,673,052 ops/sec + * Run: 12 - 1,672,479 ops/sec + * Run: 13 - 1,675,018 ops/sec + * Run: 14 - 1,668,830 ops/sec */ public long timeZipAandBwithSingleItems() { @@ -56,12 +65,13 @@ public Integer call(Integer t1, Integer t2) { }); - IntegerSumObserver o = new IntegerSumObserver(); - + int sum = 0; for (long l = 0; l < REPETITIONS; l++) { - s.subscribe(o); + IntegerSumObserver so = new IntegerSumObserver(); + s.subscribe(so); + sum += so.sum; } - return o.sum; + return sum; } /** @@ -77,6 +87,11 @@ public Integer call(Integer t1, Integer t2) { * Run: 0 - 40,048 ops/sec * Run: 1 - 40,165 ops/sec * + * ... after v0.17 work (new implementation): + * + * Run: 0 - 63,079 ops/sec + * Run: 1 - 63,505 ops/sec + * */ public long timeZipAandBwith100Items() { @@ -91,11 +106,12 @@ public Integer call(Integer t1, Integer t2) { }); - IntegerSumObserver o = new IntegerSumObserver(); - + int sum = 0; for (long l = 0; l < REPETITIONS; l++) { - s.subscribe(o); + IntegerSumObserver so = new IntegerSumObserver(); + s.subscribe(so); + sum += so.sum; } - return o.sum; + return sum; } } \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java similarity index 83% rename from rxjava-core/src/test/java/rx/operators/OperationZipTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorZipTest.java index 97cb87870a..bb40b6df9e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java @@ -15,31 +15,38 @@ */ package rx.operators; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationZip.*; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.operators.OperationReduceTest.CustomException; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.FuncN; import rx.util.functions.Functions; -public class OperationZipTest { +public class OperatorZipTest { Func2 concat2Strings; PublishSubject s1; PublishSubject s2; @@ -79,7 +86,7 @@ public void testCollectionSizeDifferentThanFunction() { @SuppressWarnings("rawtypes") Collection ws = java.util.Collections.singleton(Observable.from("one", "two")); - Observable w = Observable.create(zip(ws, zipr)); + Observable w = Observable.zip(ws, zipr); w.subscribe(observer); verify(observer, times(1)).onError(any(Throwable.class)); @@ -97,7 +104,7 @@ public void testZippingDifferentLengthObservableSequences1() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable zipW = Observable.create(zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr())); + Observable zipW = Observable.zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr()); zipW.subscribe(w); /* simulate sending data */ @@ -131,7 +138,7 @@ public void testZippingDifferentLengthObservableSequences2() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable zipW = Observable.create(zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr())); + Observable zipW = Observable.zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr()); zipW.subscribe(w); /* simulate sending data */ @@ -222,7 +229,6 @@ public void testAggregatorDifferentSizedResultsWithOnComplete() { PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer observer = mock(Observer.class); - Observable.zip(r1, r2, zipr2).subscribe(observer); /* simulate the Observables pushing data into the aggregator */ @@ -430,7 +436,7 @@ public void testZip2Types() { /* define a Observer to receive aggregated events */ Observer observer = mock(Observer.class); - Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2, 3, 4), zipr)); + Observable w = Observable.zip(Observable.from("one", "two"), Observable.from(2, 3, 4), zipr); w.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -449,7 +455,7 @@ public void testZip3Types() { /* define a Observer to receive aggregated events */ Observer observer = mock(Observer.class); - Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), zipr)); + Observable w = Observable.zip(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), zipr); w.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -465,7 +471,7 @@ public void testOnNextExceptionInvokesOnError() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); - Observable w = Observable.create(zip(Observable.from(10, 20, 30), Observable.from(0, 1, 2), zipr)); + Observable w = Observable.zip(Observable.from(10, 20, 30), Observable.from(0, 1, 2), zipr); w.subscribe(observer); verify(observer, times(1)).onError(any(Throwable.class)); @@ -479,7 +485,7 @@ public void testOnFirstCompletion() { @SuppressWarnings("unchecked") Observer obs = mock(Observer.class); - Observable o = Observable.create(zip(oA, oB, getConcat2Strings())); + Observable o = Observable.zip(oA, oB, getConcat2Strings()); o.subscribe(obs); InOrder io = inOrder(obs); @@ -530,7 +536,7 @@ public void testOnErrorTermination() { @SuppressWarnings("unchecked") Observer obs = mock(Observer.class); - Observable o = Observable.create(zip(oA, oB, getConcat2Strings())); + Observable o = Observable.zip(oA, oB, getConcat2Strings()); o.subscribe(obs); InOrder io = inOrder(obs); @@ -1018,7 +1024,7 @@ public void remove() { verify(o, never()).onCompleted(); } - + @Test public void testZipWithOnCompletedTwice() { // issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0 @@ -1063,4 +1069,170 @@ public void onNext(Integer args) { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testZip() { + Observable os = OBSERVABLE_OF_5_INTEGERS + .zip(OBSERVABLE_OF_5_INTEGERS, new Func2() { + + @Override + public String call(Integer a, Integer b) { + return a + "-" + b; + } + }); + + final ArrayList list = new ArrayList(); + os.subscribe(new Action1() { + + @Override + public void call(String s) { + System.out.println(s); + list.add(s); + } + }); + + assertEquals(5, list.size()); + assertEquals("1-1", list.get(0)); + assertEquals("2-2", list.get(1)); + assertEquals("5-5", list.get(4)); + } + + @Test + public void testZipAsync() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch infiniteObservables = new CountDownLatch(2); + Observable os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables) + .zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables), new Func2() { + + @Override + public String call(Integer a, Integer b) { + return a + "-" + b; + } + }).take(5); + + final ArrayList list = new ArrayList(); + os.subscribe(new Observer() { + + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + latch.countDown(); + } + + @Override + public void onNext(String s) { + System.out.println(s); + list.add(s); + } + }); + + latch.await(2000, TimeUnit.MILLISECONDS); + if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) { + throw new RuntimeException("didn't unsubscribe"); + } + + assertEquals(5, list.size()); + assertEquals("1-1", list.get(0)); + assertEquals("2-2", list.get(1)); + assertEquals("5-5", list.get(4)); + } + + @Test + public void testZipInfiniteAndFinite() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch infiniteObservable = new CountDownLatch(1); + Observable os = OBSERVABLE_OF_5_INTEGERS + .zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservable), new Func2() { + + @Override + public String call(Integer a, Integer b) { + return a + "-" + b; + } + }); + + final ArrayList list = new ArrayList(); + os.subscribe(new Observer() { + + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + latch.countDown(); + } + + @Override + public void onNext(String s) { + System.out.println(s); + list.add(s); + } + }); + + latch.await(1000, TimeUnit.MILLISECONDS); + if (!infiniteObservable.await(2000, TimeUnit.MILLISECONDS)) { + throw new RuntimeException("didn't unsubscribe"); + } + + assertEquals(5, list.size()); + assertEquals("1-1", list.get(0)); + assertEquals("2-2", list.get(1)); + assertEquals("5-5", list.get(4)); + } + + Observable OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); + + Observable OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { + return Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber o) { + for (int i = 1; i <= 5; i++) { + if (o.isUnsubscribed()) { + break; + } + numEmitted.incrementAndGet(); + o.onNext(i); + Thread.yield(); + } + o.onCompleted(); + } + + }); + } + + Observable ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) { + return Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber o) { + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("-------> subscribe to infinite sequence"); + System.out.println("Starting thread: " + Thread.currentThread()); + int i = 1; + while (!o.isUnsubscribed()) { + o.onNext(i++); + Thread.yield(); + } + o.onCompleted(); + latch.countDown(); + System.out.println("Ending thread: " + Thread.currentThread()); + } + }); + t.start(); + + } + + }); + } } From 9b3fca18c1ff2720b6ea7dfe841fccf11613623c Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 4 Feb 2014 16:25:02 -0800 Subject: [PATCH 2/4] Subscribers for common factory methods Similar to Observers. --- rxjava-core/src/main/java/rx/Subscriber.java | 21 --- .../main/java/rx/observers/Subscribers.java | 151 ++++++++++++++++++ .../java/rx/observers/TestSubscriber.java | 2 +- 3 files changed, 152 insertions(+), 22 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/observers/Subscribers.java diff --git a/rxjava-core/src/main/java/rx/Subscriber.java b/rxjava-core/src/main/java/rx/Subscriber.java index cfce1c60c5..3ad986813c 100644 --- a/rxjava-core/src/main/java/rx/Subscriber.java +++ b/rxjava-core/src/main/java/rx/Subscriber.java @@ -47,27 +47,6 @@ protected Subscriber(Subscriber op) { this(op.cs); } - public static Subscriber from(final Observer o) { - return new Subscriber() { - - @Override - public void onCompleted() { - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - o.onError(e); - } - - @Override - public void onNext(T t) { - o.onNext(t); - } - - }; - } - /** * Used to register an unsubscribe callback. */ diff --git a/rxjava-core/src/main/java/rx/observers/Subscribers.java b/rxjava-core/src/main/java/rx/observers/Subscribers.java new file mode 100644 index 0000000000..4cf5e48d14 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/Subscribers.java @@ -0,0 +1,151 @@ +package rx.observers; + +import rx.Observer; +import rx.Subscriber; +import rx.util.OnErrorNotImplementedException; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +public class Subscribers { + + public static Subscriber from(final Observer o) { + return new Subscriber() { + + @Override + public void onCompleted() { + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(T t) { + o.onNext(t); + } + + }; + } + + /** + * Create an empty Subscriber that ignores all events. + */ + public static final Subscriber create() { + return new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + // do nothing + } + + }; + } + + /** + * Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`. + */ + public static final Subscriber create(final Action1 onNext) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + + return new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + onNext.call(args); + } + + }; + } + + /** + * Create an Subscriber that receives `onNext` and `onError` and ignores `onCompleted`. + * + */ + public static final Subscriber create(final Action1 onNext, final Action1 onError) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + + return new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + onError.call(e); + } + + @Override + public final void onNext(T args) { + onNext.call(args); + } + + }; + } + + /** + * Create an Subscriber that receives `onNext`, `onError` and `onCompleted`. + * + */ + public static final Subscriber create(final Action1 onNext, final Action1 onError, final Action0 onComplete) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + if (onComplete == null) { + throw new IllegalArgumentException("onComplete can not be null"); + } + + return new Subscriber() { + + @Override + public final void onCompleted() { + onComplete.call(); + } + + @Override + public final void onError(Throwable e) { + onError.call(e); + } + + @Override + public final void onNext(T args) { + onNext.call(args); + } + + }; + } + +} diff --git a/rxjava-core/src/main/java/rx/observers/TestSubscriber.java b/rxjava-core/src/main/java/rx/observers/TestSubscriber.java index 677c9dcb94..097c6e8e24 100644 --- a/rxjava-core/src/main/java/rx/observers/TestSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/TestSubscriber.java @@ -26,7 +26,7 @@ */ public class TestSubscriber extends Subscriber { - private final Subscriber EMPTY = Subscriber.from(new EmptyObserver()); + private final Subscriber EMPTY = Subscribers.create(); private final TestObserver testObserver; From cf28bce52ea15dba3e6be587ba5b4d43b1029ea9 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 4 Feb 2014 16:25:55 -0800 Subject: [PATCH 3/4] OperatorZipIterable --- rxjava-core/src/main/java/rx/Observable.java | 5 +- .../rx/operators/OperatorZipIterable.java | 72 ++++ .../rx/operators/OperatorZipIterableTest.java | 339 ++++++++++++++++++ .../java/rx/operators/OperatorZipTest.java | 259 ------------- 4 files changed, 413 insertions(+), 262 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorZipIterable.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorZipIterableTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 101c412340..5337bf3123 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -108,6 +108,7 @@ import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; import rx.operators.OperatorToObservableSortedList; +import rx.operators.OperatorZipIterable; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; @@ -8407,9 +8408,7 @@ public final Observable> window(Observable boundary) { * @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs */ public final Observable zip(Iterable other, Func2 zipFunction) { -// return create(OperatorZip.zipIterable(this, other, zipFunction)); - // TODO fix this - return null; + return lift(new OperatorZipIterable(other, zipFunction)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperatorZipIterable.java b/rxjava-core/src/main/java/rx/operators/OperatorZipIterable.java new file mode 100644 index 0000000000..eac5f80809 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorZipIterable.java @@ -0,0 +1,72 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Iterator; + +import rx.Subscriber; +import rx.observers.Subscribers; +import rx.util.functions.Func2; + +public final class OperatorZipIterable implements Operator { + + final Iterable iterable; + final Func2 zipFunction; + + public OperatorZipIterable(Iterable iterable, Func2 zipFunction) { + this.iterable = iterable; + this.zipFunction = zipFunction; + } + + @Override + public Subscriber call(final Subscriber subscriber) { + final Iterator iterator = iterable.iterator(); + try { + if (!iterator.hasNext()) { + subscriber.onCompleted(); + return Subscribers.create(); + } + } catch (Throwable e) { + subscriber.onError(e); + } + return new Subscriber() { + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(T1 t) { + try { + subscriber.onNext(zipFunction.call(t, iterator.next())); + if (!iterator.hasNext()) { + onCompleted(); + } + } catch (Throwable e) { + onError(e); + } + } + + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorZipIterableTest.java b/rxjava-core/src/test/java/rx/operators/OperatorZipIterableTest.java new file mode 100644 index 0000000000..99e8296539 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorZipIterableTest.java @@ -0,0 +1,339 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.Iterator; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.operators.OperationReduceTest.CustomException; +import rx.subjects.PublishSubject; +import rx.util.functions.Func2; +import rx.util.functions.Func3; + +public class OperatorZipIterableTest { + Func2 concat2Strings; + PublishSubject s1; + PublishSubject s2; + Observable zipped; + + Observer observer; + InOrder inOrder; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + concat2Strings = new Func2() { + @Override + public String call(String t1, String t2) { + return t1 + "-" + t2; + } + }; + + s1 = PublishSubject.create(); + s2 = PublishSubject.create(); + zipped = Observable.zip(s1, s2, concat2Strings); + + observer = mock(Observer.class); + inOrder = inOrder(observer); + + zipped.subscribe(observer); + } + + Func2 zipr2 = new Func2() { + + @Override + public String call(Object t1, Object t2) { + return "" + t1 + t2; + } + + }; + Func3 zipr3 = new Func3() { + + @Override + public String call(Object t1, Object t2, Object t3) { + return "" + t1 + t2 + t3; + } + + }; + + @Test + public void testZipIterableSameSize() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onNext("three-3"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testZipIterableEmptyFirstSize() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onCompleted(); + + io.verify(o).onCompleted(); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testZipIterableEmptySecond() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList(); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onCompleted(); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testZipIterableFirstShorter() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testZipIterableSecondShorter() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testZipIterableFirstThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + + } + + @Test + public void testZipIterableIteratorThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + @Override + public Iterator iterator() { + throw new OperationReduceTest.CustomException(); + } + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + verify(o, never()).onNext(any(String.class)); + + } + + @Test + public void testZipIterableHasNextThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + int count; + + @Override + public boolean hasNext() { + if (count == 0) { + return true; + } + throw new CustomException(); + } + + @Override + public String next() { + count++; + return "1"; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + } + + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onNext("one-1"); + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + + } + + @Test + public void testZipIterableNextThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + int count; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public String next() { + throw new CustomException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + } + + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onCompleted(); + + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java index bb40b6df9e..ad3b187687 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java @@ -766,265 +766,6 @@ public void testSecondFails() { inOrder.verifyNoMoreInteractions(); } - @Test - public void testZipIterableSameSize() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList("1", "2", "3"); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onNext("three-"); - r1.onCompleted(); - - io.verify(o).onNext("one-1"); - io.verify(o).onNext("two-2"); - io.verify(o).onNext("three-3"); - io.verify(o).onCompleted(); - - verify(o, never()).onError(any(Throwable.class)); - - } - - @Test - public void testZipIterableEmptyFirstSize() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList("1", "2", "3"); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onCompleted(); - - io.verify(o).onCompleted(); - - verify(o, never()).onNext(any(String.class)); - verify(o, never()).onError(any(Throwable.class)); - - } - - @Test - public void testZipIterableEmptySecond() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList(); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onNext("three-"); - r1.onCompleted(); - - io.verify(o).onCompleted(); - - verify(o, never()).onNext(any(String.class)); - verify(o, never()).onError(any(Throwable.class)); - } - - @Test - public void testZipIterableFirstShorter() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList("1", "2", "3"); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onCompleted(); - - io.verify(o).onNext("one-1"); - io.verify(o).onNext("two-2"); - io.verify(o).onCompleted(); - - verify(o, never()).onError(any(Throwable.class)); - - } - - @Test - public void testZipIterableSecondShorter() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList("1", "2"); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onNext("three-"); - r1.onCompleted(); - - io.verify(o).onNext("one-1"); - io.verify(o).onNext("two-2"); - io.verify(o).onCompleted(); - - verify(o, never()).onError(any(Throwable.class)); - - } - - @Test - public void testZipIterableFirstThrows() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = Arrays.asList("1", "2", "3"); - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onError(new OperationReduceTest.CustomException()); - - io.verify(o).onNext("one-1"); - io.verify(o).onNext("two-2"); - io.verify(o).onError(any(OperationReduceTest.CustomException.class)); - - verify(o, never()).onCompleted(); - - } - - @Test - public void testZipIterableIteratorThrows() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = new Iterable() { - @Override - public Iterator iterator() { - throw new OperationReduceTest.CustomException(); - } - }; - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onNext("two-"); - r1.onError(new OperationReduceTest.CustomException()); - - io.verify(o).onError(any(OperationReduceTest.CustomException.class)); - - verify(o, never()).onCompleted(); - verify(o, never()).onNext(any(String.class)); - - } - - @Test - public void testZipIterableHasNextThrows() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - int count; - - @Override - public boolean hasNext() { - if (count == 0) { - return true; - } - throw new CustomException(); - } - - @Override - public String next() { - count++; - return "1"; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Not supported yet."); - } - - }; - } - - }; - - r1.zip(r2, zipr2).subscribe(o); - - r1.onNext("one-"); - r1.onError(new OperationReduceTest.CustomException()); - - io.verify(o).onNext("one-1"); - io.verify(o).onError(any(OperationReduceTest.CustomException.class)); - - verify(o, never()).onCompleted(); - - } - - @Test - public void testZipIterableNextThrows() { - PublishSubject r1 = PublishSubject.create(); - /* define a Observer to receive aggregated events */ - Observer o = mock(Observer.class); - InOrder io = inOrder(o); - - Iterable r2 = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - int count; - - @Override - public boolean hasNext() { - return true; - } - - @Override - public String next() { - throw new CustomException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Not supported yet."); - } - - }; - } - - }; - - r1.zip(r2, zipr2).subscribe(o); - - r1.onError(new OperationReduceTest.CustomException()); - - io.verify(o).onError(any(OperationReduceTest.CustomException.class)); - - verify(o, never()).onNext(any(String.class)); - verify(o, never()).onCompleted(); - - } - @Test public void testZipWithOnCompletedTwice() { // issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0 From 3d5474ff4bf1612cf585c478b7492a42683b3d93 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 4 Feb 2014 19:51:38 -0800 Subject: [PATCH 4/4] Zip NULL and COMPLETE Sentinels --- .../main/java/rx/operators/OperatorZip.java | 24 ++++--- .../java/rx/operators/OperatorZipTest.java | 62 ++++++++++++++++++- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorZip.java b/rxjava-core/src/main/java/rx/operators/OperatorZip.java index 00af87f6c4..802de494b5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorZip.java @@ -18,7 +18,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscriber; @@ -131,6 +130,9 @@ private static class Zip { final FuncN zipFunction; final CompositeSubscription childSubscription = new CompositeSubscription(); + static Object NULL_SENTINEL = new Object(); + static Object COMPLETE_SENTINEL = new Object(); + @SuppressWarnings("rawtypes") public Zip(Observable[] os, final Subscriber observer, FuncN zipFunction) { this.os = os; @@ -170,13 +172,16 @@ void tick() { boolean allHaveValues = true; for (int i = 0; i < observers.length; i++) { vs[i] = ((InnerObserver) observers[i]).items.peek(); - if (vs[i] instanceof Notification) { + if (vs[i] == NULL_SENTINEL) { + // special handling for null + vs[i] = null; + } else if (vs[i] == COMPLETE_SENTINEL) { + // special handling for onComplete observer.onCompleted(); // we need to unsubscribe from all children since children are independently subscribed childSubscription.unsubscribe(); return; - } - if (vs[i] == null) { + } else if (vs[i] == null) { allHaveValues = false; // we continue as there may be an onCompleted on one of the others continue; @@ -189,7 +194,7 @@ void tick() { for (int i = 0; i < observers.length; i++) { ((InnerObserver) observers[i]).items.poll(); // eagerly check if the next item on this queue is an onComplete - if (((InnerObserver) observers[i]).items.peek() instanceof Notification) { + if (((InnerObserver) observers[i]).items.peek() == COMPLETE_SENTINEL) { // it is an onComplete so shut down observer.onCompleted(); // we need to unsubscribe from all children since children are independently subscribed @@ -213,7 +218,7 @@ final class InnerObserver extends Subscriber { @SuppressWarnings("unchecked") @Override public void onCompleted() { - items.add(Notification.createOnCompleted()); + items.add(COMPLETE_SENTINEL); tick(); } @@ -226,8 +231,11 @@ public void onError(Throwable e) { @SuppressWarnings("unchecked") @Override public void onNext(Object t) { - // TODO use a placeholder for NULL, such as Notification(null) - items.add(t); + if (t == null) { + items.add(NULL_SENTINEL); + } else { + items.add(t); + } tick(); } }; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java index ad3b187687..f20b5caab2 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorZipTest.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,12 +31,12 @@ import org.junit.Test; import org.mockito.InOrder; +import rx.Notification; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; import rx.Subscription; -import rx.operators.OperationReduceTest.CustomException; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; @@ -928,6 +927,65 @@ public void onNext(String s) { assertEquals("5-5", list.get(4)); } + @Test + public void testEmitNull() { + Observable oi = Observable.from(1, null, 3); + Observable os = Observable.from("a", "b", null); + Observable o = Observable.zip(oi, os, new Func2() { + + @Override + public String call(Integer t1, String t2) { + return t1 + "-" + t2; + } + + }); + + final ArrayList list = new ArrayList(); + o.subscribe(new Action1() { + + @Override + public void call(String s) { + System.out.println(s); + list.add(s); + } + }); + + assertEquals(3, list.size()); + assertEquals("1-a", list.get(0)); + assertEquals("null-b", list.get(1)); + assertEquals("3-null", list.get(2)); + } + + @Test + public void testEmitMaterializedNotifications() { + Observable> oi = Observable.from(1, 2, 3).materialize(); + Observable> os = Observable.from("a", "b", "c").materialize(); + Observable o = Observable.zip(oi, os, new Func2, Notification, String>() { + + @Override + public String call(Notification t1, Notification t2) { + return t1.getKind() + "_" + t1.getValue() + "-" + t2.getKind() + "_" + t2.getValue(); + } + + }); + + final ArrayList list = new ArrayList(); + o.subscribe(new Action1() { + + @Override + public void call(String s) { + System.out.println(s); + list.add(s); + } + }); + + assertEquals(4, list.size()); + assertEquals("OnNext_1-OnNext_a", list.get(0)); + assertEquals("OnNext_2-OnNext_b", list.get(1)); + assertEquals("OnNext_3-OnNext_c", list.get(2)); + assertEquals("OnCompleted_null-OnCompleted_null", list.get(3)); + } + Observable OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); Observable OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {