Skip to content

Commit b0c4b28

Browse files
Merge pull request ReactiveX#151 from mairbek/TakeUntiFunctional
Implemented TakeUntil operation
2 parents 6b3c2d8 + 8fd3d5c commit b0c4b28

File tree

2 files changed

+309
-23
lines changed

2 files changed

+309
-23
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,7 @@
2929
import org.mockito.Mockito;
3030
import org.mockito.MockitoAnnotations;
3131

32-
import rx.operators.OperationConcat;
33-
import rx.operators.OperationFilter;
34-
import rx.operators.OperationLast;
35-
import rx.operators.OperationMap;
36-
import rx.operators.OperationMaterialize;
37-
import rx.operators.OperationMerge;
38-
import rx.operators.OperationMergeDelayError;
39-
import rx.operators.OperationNext;
40-
import rx.operators.OperationOnErrorResumeNextViaFunction;
41-
import rx.operators.OperationOnErrorResumeNextViaObservable;
42-
import rx.operators.OperationOnErrorReturn;
43-
import rx.operators.OperationScan;
44-
import rx.operators.OperationSkip;
45-
import rx.operators.OperationSynchronize;
46-
import rx.operators.OperationTake;
47-
import rx.operators.OperationTakeLast;
48-
import rx.operators.OperationToObservableFuture;
49-
import rx.operators.OperationToObservableIterable;
50-
import rx.operators.OperationToObservableList;
51-
import rx.operators.OperationToObservableSortedList;
52-
import rx.operators.OperationZip;
32+
import rx.operators.*;
5333
import rx.plugins.RxJavaErrorHandler;
5434
import rx.plugins.RxJavaPlugins;
5535
import rx.util.AtomicObservableSubscription;
@@ -978,6 +958,20 @@ public static <T> Observable<T> merge(Observable<T>... source) {
978958
return _create(OperationMerge.merge(source));
979959
}
980960

961+
/**
962+
* Returns the values from the source observable sequence until the other observable sequence produces a value.
963+
*
964+
* @param source the source sequence to propagate elements for.
965+
* @param other the observable sequence that terminates propagation of elements of the source sequence.
966+
* @param <T> the type of source.
967+
* @param <E> the other type.
968+
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
969+
*/
970+
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
971+
return OperatorTakeUntil.takeUntil(source, other);
972+
}
973+
974+
981975
/**
982976
* Combines the objects emitted by two or more Observables, and emits the result as a single Observable,
983977
* by using the <code>concat</code> method.
@@ -2838,6 +2832,17 @@ public Observable<T> takeLast(final int count) {
28382832
return takeLast(this, count);
28392833
}
28402834

2835+
/**
2836+
* Returns the values from the source observable sequence until the other observable sequence produces a value.
2837+
*
2838+
* @param other the observable sequence that terminates propagation of elements of the source sequence.
2839+
* @param <E> the other type.
2840+
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
2841+
*/
2842+
public <E> Observable<T> takeUntil(Observable<E> other) {
2843+
return takeUntil(this, other);
2844+
}
2845+
28412846
/**
28422847
* Returns an Observable that emits a single item, a list composed of all the items emitted by
28432848
* the source Observable.
@@ -2992,7 +2997,6 @@ public void testSequenceEqual() {
29922997
verify(result, times(1)).onNext(false);
29932998
}
29942999

2995-
29963000
@Test
29973001
public void testToIterable() {
29983002
Observable<String> obs = toObservable("one", "two", "three");
@@ -3082,7 +3086,7 @@ public Boolean call(Integer args) {
30823086

30833087
assertEquals(-1, last);
30843088
}
3085-
3089+
30863090
public void testSingle() {
30873091
Observable<String> observable = toObservable("one");
30883092
assertEquals("one", observable.single());
@@ -3151,6 +3155,7 @@ public Boolean call(String args) {
31513155
});
31523156
}
31533157

3158+
31543159
private static class TestException extends RuntimeException {
31553160

31563161
}
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.util.functions.Func1;
23+
24+
import static org.mockito.Mockito.*;
25+
26+
public class OperatorTakeUntil {
27+
28+
/**
29+
* Returns the values from the source observable sequence until the other observable sequence produces a value.
30+
*
31+
* @param source the source sequence to propagate elements for.
32+
* @param other the observable sequence that terminates propagation of elements of the source sequence.
33+
* @param <T> the type of source.
34+
* @param <E> the other type.
35+
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
36+
*/
37+
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
38+
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
39+
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));
40+
Observable<Notification<T>> result = Observable.merge(s, o);
41+
42+
return result.takeWhile(new Func1<Notification<T>, Boolean>() {
43+
@Override
44+
public Boolean call(Notification<T> notification) {
45+
return !notification.halt;
46+
}
47+
}).map(new Func1<Notification<T>, T>() {
48+
@Override
49+
public T call(Notification<T> notification) {
50+
return notification.value;
51+
}
52+
});
53+
}
54+
55+
private static class Notification<T> {
56+
private final boolean halt;
57+
private final T value;
58+
59+
public static <T> Notification<T> value(T value) {
60+
return new Notification<T>(false, value);
61+
}
62+
63+
public static <T> Notification<T> halt() {
64+
return new Notification<T>(true, null);
65+
}
66+
67+
private Notification(boolean halt, T value) {
68+
this.halt = halt;
69+
this.value = value;
70+
}
71+
72+
}
73+
74+
private static class SourceObservable<T> implements Func1<Observer<Notification<T>>, Subscription> {
75+
private final Observable<T> sequence;
76+
77+
private SourceObservable(Observable<T> sequence) {
78+
this.sequence = sequence;
79+
}
80+
81+
@Override
82+
public Subscription call(final Observer<Notification<T>> notificationObserver) {
83+
return sequence.subscribe(new Observer<T>() {
84+
@Override
85+
public void onCompleted() {
86+
notificationObserver.onNext(Notification.<T>halt());
87+
}
88+
89+
@Override
90+
public void onError(Exception e) {
91+
notificationObserver.onError(e);
92+
}
93+
94+
@Override
95+
public void onNext(T args) {
96+
notificationObserver.onNext(Notification.value(args));
97+
}
98+
});
99+
}
100+
}
101+
102+
private static class OtherObservable<T, E> implements Func1<Observer<Notification<T>>, Subscription> {
103+
private final Observable<E> sequence;
104+
105+
private OtherObservable(Observable<E> sequence) {
106+
this.sequence = sequence;
107+
}
108+
109+
@Override
110+
public Subscription call(final Observer<Notification<T>> notificationObserver) {
111+
return sequence.subscribe(new Observer<E>() {
112+
@Override
113+
public void onCompleted() {
114+
// Ignore
115+
}
116+
117+
@Override
118+
public void onError(Exception e) {
119+
notificationObserver.onError(e);
120+
}
121+
122+
@Override
123+
public void onNext(E args) {
124+
notificationObserver.onNext(Notification.<T>halt());
125+
}
126+
});
127+
}
128+
}
129+
130+
public static class UnitTest {
131+
132+
@Test
133+
public void testTakeUntil() {
134+
Subscription sSource = mock(Subscription.class);
135+
Subscription sOther = mock(Subscription.class);
136+
TestObservable source = new TestObservable(sSource);
137+
TestObservable other = new TestObservable(sOther);
138+
139+
Observer<String> result = mock(Observer.class);
140+
Observable<String> stringObservable = takeUntil(source, other);
141+
stringObservable.subscribe(result);
142+
source.sendOnNext("one");
143+
source.sendOnNext("two");
144+
other.sendOnNext("three");
145+
source.sendOnNext("four");
146+
source.sendOnCompleted();
147+
other.sendOnCompleted();
148+
149+
verify(result, times(1)).onNext("one");
150+
verify(result, times(1)).onNext("two");
151+
verify(result, times(0)).onNext("three");
152+
verify(result, times(0)).onNext("four");
153+
verify(sSource, times(1)).unsubscribe();
154+
verify(sOther, times(1)).unsubscribe();
155+
156+
}
157+
158+
@Test
159+
public void testTakeUntilSourceCompleted() {
160+
Subscription sSource = mock(Subscription.class);
161+
Subscription sOther = mock(Subscription.class);
162+
TestObservable source = new TestObservable(sSource);
163+
TestObservable other = new TestObservable(sOther);
164+
165+
Observer<String> result = mock(Observer.class);
166+
Observable<String> stringObservable = takeUntil(source, other);
167+
stringObservable.subscribe(result);
168+
source.sendOnNext("one");
169+
source.sendOnNext("two");
170+
source.sendOnCompleted();
171+
172+
verify(result, times(1)).onNext("one");
173+
verify(result, times(1)).onNext("two");
174+
verify(sSource, times(1)).unsubscribe();
175+
verify(sOther, times(1)).unsubscribe();
176+
177+
}
178+
179+
@Test
180+
public void testTakeUntilSourceError() {
181+
Subscription sSource = mock(Subscription.class);
182+
Subscription sOther = mock(Subscription.class);
183+
TestObservable source = new TestObservable(sSource);
184+
TestObservable other = new TestObservable(sOther);
185+
Exception error = new Exception();
186+
187+
Observer<String> result = mock(Observer.class);
188+
Observable<String> stringObservable = takeUntil(source, other);
189+
stringObservable.subscribe(result);
190+
source.sendOnNext("one");
191+
source.sendOnNext("two");
192+
source.sendOnError(error);
193+
194+
verify(result, times(1)).onNext("one");
195+
verify(result, times(1)).onNext("two");
196+
verify(result, times(1)).onError(error);
197+
verify(sSource, times(1)).unsubscribe();
198+
verify(sOther, times(1)).unsubscribe();
199+
200+
}
201+
202+
@Test
203+
public void testTakeUntilOtherError() {
204+
Subscription sSource = mock(Subscription.class);
205+
Subscription sOther = mock(Subscription.class);
206+
TestObservable source = new TestObservable(sSource);
207+
TestObservable other = new TestObservable(sOther);
208+
Exception error = new Exception();
209+
210+
Observer<String> result = mock(Observer.class);
211+
Observable<String> stringObservable = takeUntil(source, other);
212+
stringObservable.subscribe(result);
213+
source.sendOnNext("one");
214+
source.sendOnNext("two");
215+
other.sendOnError(error);
216+
217+
verify(result, times(1)).onNext("one");
218+
verify(result, times(1)).onNext("two");
219+
verify(result, times(1)).onError(error);
220+
verify(result, times(0)).onCompleted();
221+
verify(sSource, times(1)).unsubscribe();
222+
verify(sOther, times(1)).unsubscribe();
223+
224+
}
225+
226+
@Test
227+
public void testTakeUntilOtherCompleted() {
228+
Subscription sSource = mock(Subscription.class);
229+
Subscription sOther = mock(Subscription.class);
230+
TestObservable source = new TestObservable(sSource);
231+
TestObservable other = new TestObservable(sOther);
232+
233+
Observer<String> result = mock(Observer.class);
234+
Observable<String> stringObservable = takeUntil(source, other);
235+
stringObservable.subscribe(result);
236+
source.sendOnNext("one");
237+
source.sendOnNext("two");
238+
other.sendOnCompleted();
239+
240+
verify(result, times(1)).onNext("one");
241+
verify(result, times(1)).onNext("two");
242+
verify(result, times(0)).onCompleted();
243+
verify(sSource, times(0)).unsubscribe();
244+
verify(sOther, times(0)).unsubscribe();
245+
246+
}
247+
248+
private static class TestObservable extends Observable<String> {
249+
250+
Observer<String> observer = null;
251+
Subscription s;
252+
253+
public TestObservable(Subscription s) {
254+
this.s = s;
255+
}
256+
257+
/* used to simulate subscription */
258+
public void sendOnCompleted() {
259+
observer.onCompleted();
260+
}
261+
262+
/* used to simulate subscription */
263+
public void sendOnNext(String value) {
264+
observer.onNext(value);
265+
}
266+
267+
/* used to simulate subscription */
268+
@SuppressWarnings("unused")
269+
public void sendOnError(Exception e) {
270+
observer.onError(e);
271+
}
272+
273+
@Override
274+
public Subscription subscribe(final Observer<String> observer) {
275+
this.observer = observer;
276+
return s;
277+
}
278+
}
279+
280+
}
281+
}

0 commit comments

Comments
 (0)