From d172fd90a9eb2361ea421aa8a955aa4b45d402e8 Mon Sep 17 00:00:00 2001 From: thegeez Date: Sun, 17 Mar 2013 21:15:25 +0100 Subject: [PATCH 1/3] Failing test case to show bug in takeWhile --- .../main/java/rx/operators/OperationTake.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 68dc06e639..b3969a12de 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -29,7 +29,7 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; - +import rx.subjects.Subject; /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -178,6 +178,37 @@ public Boolean call(Integer input) { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() { + @Override + public Boolean call(Integer input) { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + @Test public void testTakeWhile2() { Observable w = Observable.toObservable("one", "two", "three"); From 2672b74cd8ee62d127ef130bfdfae1dcddaf7c72 Mon Sep 17 00:00:00 2001 From: thegeez Date: Sun, 17 Mar 2013 22:21:51 +0100 Subject: [PATCH 2/3] Call complete on take/takeWhile observables once the predicate is true. --- rxjava-core/src/main/java/rx/operators/OperationTake.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index b3969a12de..943856b5ab 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -147,6 +147,7 @@ public void onNext(T args) { if (predicate.call(args, counter.getAndIncrement())) { observer.onNext(args); } else { + observer.onCompleted(); // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } From 414dbc410c0a7fc3bee81dc5f782a6433af82593 Mon Sep 17 00:00:00 2001 From: thegeez Date: Tue, 19 Mar 2013 21:43:41 +0100 Subject: [PATCH 3/3] Whitespace --- rxjava-core/src/main/java/rx/operators/OperationTake.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 943856b5ab..e86263e562 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -325,4 +325,4 @@ public void run() { } } -} \ No newline at end of file +}