Skip to content

Commit

Permalink
OperatorMergeMap
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 24, 2014
1 parent 4e77f8a commit e4cb23c
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 400 deletions.
11 changes: 6 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
Expand Down Expand Up @@ -102,6 +101,8 @@
import rx.operators.OperatorMap;
import rx.operators.OperatorMaterialize;
import rx.operators.OperatorMerge;
import rx.operators.OperatorMergeMapPair;
import rx.operators.OperatorMergeMapTransform;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
Expand Down Expand Up @@ -4322,7 +4323,7 @@ public final <R> Observable<R> mergeMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted) {
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted));
return lift(new OperatorMergeMapTransform<T, R>(onNext, onError, onCompleted));
}

/**
Expand All @@ -4345,7 +4346,7 @@ public final <R> Observable<R> mergeMap(
*/
public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector));
return lift(new OperatorMergeMapPair<T, U, R>(collectionSelector, resultSelector));
}

/**
Expand All @@ -4363,7 +4364,7 @@ public final <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
*/
public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector)));
return merge(map(OperatorMergeMapPair.convertSelector(collectionSelector)));
}

/**
Expand All @@ -4387,7 +4388,7 @@ public final <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Itera
*/
public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector);
}

/**
Expand Down
Loading

0 comments on commit e4cb23c

Please sign in to comment.