diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9287e105de..534342b24e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -2186,6 +2186,65 @@ public final static Observable mergeDelayError(Observableinstance(true, maxConcurrent)); } + /** + * Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all + * successfully emitted items from each of the source Observables without being interrupted by an error + * notification from one of them. + *

+ * This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an + * error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error notification until all of the merged Observables have finished emitting items. + *

+ * + *

+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only + * invoke the {@code onError} method of its Observers once. + *

+ *
Scheduler:
+ *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param sequences + * the Iterable of Observables + * @return an Observable that emits items that are the result of flattening the items emitted by the + * Observables in the Iterable + * @see ReactiveX operators documentation: Merge + */ + public static Observable mergeDelayError(Iterable> sequences) { + return mergeDelayError(from(sequences)); + } + + /** + * Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all + * successfully emitted items from each of the source Observables without being interrupted by an error + * notification from one of them, while limiting the number of concurrent subscriptions to these Observables. + *

+ * This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an + * error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error notification until all of the merged Observables have finished emitting items. + *

+ * + *

+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only + * invoke the {@code onError} method of its Observers once. + *

+ *
Scheduler:
+ *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param sequences + * the Iterable of Observables + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits items that are the result of flattening the items emitted by the + * Observables in the Iterable + * @see ReactiveX operators documentation: Merge + */ + public static Observable mergeDelayError(Iterable> sequences, int maxConcurrent) { + return mergeDelayError(from(sequences), maxConcurrent); + } + + /** * Flattens two Observables into one Observable, in a way that allows an Observer to receive all * successfully emitted items from each of the source Observables without being interrupted by an error diff --git a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index db086d6632..e5088a0a4a 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -272,6 +272,23 @@ public void testMergeList() { verify(stringObserver, times(2)).onNext("hello"); } + // This is pretty much a clone of testMergeList but with the overloaded MergeDelayError for Iterables + @Test + public void mergeIterable() { + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); + List> listOfObservables = new ArrayList>(); + listOfObservables.add(o1); + listOfObservables.add(o2); + + Observable m = Observable.mergeDelayError(listOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, never()).onError(any(Throwable.class)); + verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, times(2)).onNext("hello"); + } + @Test public void testMergeArrayWithThreading() { final TestASynchronousObservable o1 = new TestASynchronousObservable(); @@ -577,4 +594,4 @@ public void call(Long t1) { assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException); assertEquals(Arrays.asList(1L, 1L, 1L), requests); } -} \ No newline at end of file +}