diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 68dc06e639..e86263e562 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -29,7 +29,7 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; - +import rx.subjects.Subject; /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -147,6 +147,7 @@ public void onNext(T args) { if (predicate.call(args, counter.getAndIncrement())) { observer.onNext(args); } else { + observer.onCompleted(); // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } @@ -178,6 +179,37 @@ public Boolean call(Integer input) { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() { + @Override + public Boolean call(Integer input) { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + @Test public void testTakeWhile2() { Observable w = Observable.toObservable("one", "two", "three"); @@ -293,4 +325,4 @@ public void run() { } } -} \ No newline at end of file +}