diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7fea0a1fb1..985e3e8c02 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -59,7 +59,6 @@ import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationFinally; -import rx.operators.OperationFlatMap; import rx.operators.OperationGroupByUntil; import rx.operators.OperationGroupJoin; import rx.operators.OperationInterval; @@ -102,6 +101,8 @@ import rx.operators.OperatorMap; import rx.operators.OperatorMaterialize; import rx.operators.OperatorMerge; +import rx.operators.OperatorMergeMapPair; +import rx.operators.OperatorMergeMapTransform; import rx.operators.OperatorObserveOn; import rx.operators.OperatorOnErrorFlatMap; import rx.operators.OperatorOnErrorResumeNextViaFunction; @@ -4322,7 +4323,7 @@ public final Observable mergeMap( Func1> onNext, Func1> onError, Func0> onCompleted) { - return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted)); + return lift(new OperatorMergeMapTransform(onNext, onError, onCompleted)); } /** @@ -4345,7 +4346,7 @@ public final Observable mergeMap( */ public final Observable mergeMap(Func1> collectionSelector, Func2 resultSelector) { - return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector)); + return lift(new OperatorMergeMapPair(collectionSelector, resultSelector)); } /** @@ -4363,7 +4364,7 @@ public final Observable mergeMap(Func1 Observable mergeMapIterable(Func1> collectionSelector) { - return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector))); + return merge(map(OperatorMergeMapPair.convertSelector(collectionSelector))); } /** @@ -4387,7 +4388,7 @@ public final Observable mergeMapIterable(Func1 Observable mergeMapIterable(Func1> collectionSelector, Func2 resultSelector) { - return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector); + return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationFlatMap.java b/rxjava-core/src/main/java/rx/operators/OperationFlatMap.java deleted file mode 100644 index 6d2606b2f9..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationFlatMap.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicInteger; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.SerialSubscription; - -/** - * Additional flatMap operators. - */ -public final class OperationFlatMap { - /** Utility class. */ - private OperationFlatMap() { - throw new IllegalStateException("No instances!"); - } - - /** - * Observable that pairs up the source values and all the derived collection - * values and projects them via the selector. - */ - public static OnSubscribeFunc flatMap(Observable source, - Func1> collectionSelector, - Func2 resultSelector - ) { - return new FlatMapPairSelector(source, collectionSelector, resultSelector); - } - - /** - * Converts the result Iterable of a function into an Observable. - */ - public static Func1> flatMapIterableFunc( - Func1> collectionSelector) { - return new IterableToObservableFunc(collectionSelector); - } - - /** - * Converts the result Iterable of a function into an Observable. - * - * @param - * the parameter type - * @param - * the result type - */ - private static final class IterableToObservableFunc implements Func1> { - final Func1> func; - - public IterableToObservableFunc(Func1> func) { - this.func = func; - } - - @Override - public Observable call(T t1) { - return Observable.from(func.call(t1)); - } - } - - /** - * Pairs up the source value with each of the associated observable values - * and uses a selector function to calculate the result sequence. - * - * @param - * the source value type - * @param - * the collection value type - * @param - * the result type - */ - private static final class FlatMapPairSelector implements OnSubscribeFunc { - final Observable source; - final Func1> collectionSelector; - final Func2 resultSelector; - - public FlatMapPairSelector(Observable source, Func1> collectionSelector, Func2 resultSelector) { - this.source = source; - this.collectionSelector = collectionSelector; - this.resultSelector = resultSelector; - } - - @Override - public Subscription onSubscribe(Observer t1) { - CompositeSubscription csub = new CompositeSubscription(); - - csub.add(source.unsafeSubscribe(new SourceObserver(t1, collectionSelector, resultSelector, csub))); - - return csub; - } - - /** Observes the source, starts the collections and projects the result. */ - private static final class SourceObserver extends Subscriber { - final Observer observer; - final Func1> collectionSelector; - final Func2 resultSelector; - final CompositeSubscription csub; - final AtomicInteger wip; - /** Don't let various events run at the same time. */ - final Object guard; - boolean done; - - public SourceObserver(Observer observer, Func1> collectionSelector, Func2 resultSelector, CompositeSubscription csub) { - this.observer = observer; - this.collectionSelector = collectionSelector; - this.resultSelector = resultSelector; - this.csub = csub; - this.wip = new AtomicInteger(1); - this.guard = new Object(); - } - - @Override - public void onNext(T args) { - Observable coll; - try { - coll = collectionSelector.call(args); - } catch (Throwable e) { - onError(e); - return; - } - - SerialSubscription ssub = new SerialSubscription(); - csub.add(ssub); - wip.incrementAndGet(); - - ssub.set(coll.unsafeSubscribe(new CollectionObserver(this, args, ssub))); - } - - @Override - public void onError(Throwable e) { - synchronized (guard) { - if (done) { - return; - } - done = true; - observer.onError(e); - } - csub.unsubscribe(); - } - - @Override - public void onCompleted() { - if (wip.decrementAndGet() == 0) { - synchronized (guard) { - if (done) { - return; - } - done = true; - observer.onCompleted(); - } - csub.unsubscribe(); - } - } - - void complete(Subscription s) { - csub.remove(s); - onCompleted(); - } - - void emit(T t, U u) { - R r; - try { - r = resultSelector.call(t, u); - } catch (Throwable e) { - onError(e); - return; - } - synchronized (guard) { - if (done) { - return; - } - observer.onNext(r); - } - } - } - - /** Observe a collection and call emit with the pair of the key and the value. */ - private static final class CollectionObserver extends Subscriber { - final SourceObserver so; - final Subscription cancel; - final T value; - - public CollectionObserver(SourceObserver so, T value, Subscription cancel) { - this.so = so; - this.value = value; - this.cancel = cancel; - } - - @Override - public void onNext(U args) { - so.emit(value, args); - } - - @Override - public void onError(Throwable e) { - so.onError(e); - } - - @Override - public void onCompleted() { - so.complete(cancel); - } - }; - } - - /** - * Projects the notification of an observable sequence to an observable - * sequence and merges the results into one. - */ - public static OnSubscribeFunc flatMap(Observable source, - Func1> onNext, - Func1> onError, - Func0> onCompleted) { - return new FlatMapTransform(source, onNext, onError, onCompleted); - } - - /** - * Projects the notification of an observable sequence to an observable - * sequence and merges the results into one. - * - * @param - * the source value type - * @param - * the result value type - */ - private static final class FlatMapTransform implements OnSubscribeFunc { - final Observable source; - final Func1> onNext; - final Func1> onError; - final Func0> onCompleted; - - public FlatMapTransform(Observable source, Func1> onNext, Func1> onError, Func0> onCompleted) { - this.source = source; - this.onNext = onNext; - this.onError = onError; - this.onCompleted = onCompleted; - } - - @Override - public Subscription onSubscribe(Observer t1) { - CompositeSubscription csub = new CompositeSubscription(); - - csub.add(source.unsafeSubscribe(new SourceObserver(t1, onNext, onError, onCompleted, csub))); - - return csub; - } - - /** - * Observe the source and merge the values. - * - * @param - * the source value type - * @param - * the result value type - */ - private static final class SourceObserver extends Subscriber { - final Observer observer; - final Func1> onNext; - final Func1> onError; - final Func0> onCompleted; - final CompositeSubscription csub; - final AtomicInteger wip; - volatile boolean done; - final Object guard; - - public SourceObserver(Observer observer, Func1> onNext, Func1> onError, Func0> onCompleted, CompositeSubscription csub) { - this.observer = observer; - this.onNext = onNext; - this.onError = onError; - this.onCompleted = onCompleted; - this.csub = csub; - this.guard = new Object(); - this.wip = new AtomicInteger(1); - } - - @Override - public void onNext(T args) { - Observable o; - try { - o = onNext.call(args); - } catch (Throwable t) { - synchronized (guard) { - observer.onError(t); - } - csub.unsubscribe(); - return; - } - subscribeInner(o); - } - - @Override - public void onError(Throwable e) { - Observable o; - try { - o = onError.call(e); - } catch (Throwable t) { - synchronized (guard) { - observer.onError(t); - } - csub.unsubscribe(); - return; - } - subscribeInner(o); - done = true; - finish(); - } - - @Override - public void onCompleted() { - Observable o; - try { - o = onCompleted.call(); - } catch (Throwable t) { - synchronized (guard) { - observer.onError(t); - } - csub.unsubscribe(); - return; - } - subscribeInner(o); - done = true; - finish(); - } - - void subscribeInner(Observable o) { - SerialSubscription ssub = new SerialSubscription(); - wip.incrementAndGet(); - csub.add(ssub); - - ssub.set(o.unsafeSubscribe(new CollectionObserver(this, ssub))); - } - - void finish() { - if (wip.decrementAndGet() == 0) { - synchronized (guard) { - observer.onCompleted(); - } - csub.unsubscribe(); - } - } - } - - /** Observes the collections. */ - private static final class CollectionObserver extends Subscriber { - final SourceObserver parent; - final Subscription cancel; - - public CollectionObserver(SourceObserver parent, Subscription cancel) { - this.parent = parent; - this.cancel = cancel; - } - - @Override - public void onNext(R args) { - synchronized (parent.guard) { - parent.observer.onNext(args); - } - } - - @Override - public void onError(Throwable e) { - synchronized (parent.guard) { - parent.observer.onError(e); - } - parent.csub.unsubscribe(); - } - - @Override - public void onCompleted() { - parent.csub.remove(cancel); - parent.finish(); - } - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java new file mode 100644 index 0000000000..947347e882 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java @@ -0,0 +1,120 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicInteger; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * Observable that pairs up the source values and all the derived collection + * values and projects them via the selector. + * + * @param the input value type + * @param the derived collection value type + * @param the result type + */ +public final class OperatorMergeMapPair implements Operator { + + public static Func1> convertSelector(final Func1> selector) { + return new Func1>() { + @Override + public Observable call(T t1) { + return Observable.from(selector.call(t1)); + } + }; + } + + final Func1> collectionSelector; + final Func2 resultSelector; + + public OperatorMergeMapPair(Func1> collectionSelector, + Func2 resultSelector) { + this.collectionSelector = collectionSelector; + this.resultSelector = resultSelector; + } + + @Override + public Subscriber call(Subscriber child) { + final SerializedSubscriber s = new SerializedSubscriber(child); + final CompositeSubscription csub = new CompositeSubscription(); + child.add(csub); + + return new Subscriber(child) { + final AtomicInteger wip = new AtomicInteger(1); + final Subscriber self = this; + @Override + public void onNext(final T t) { + Observable collection; + try { + collection = collectionSelector.call(t); + } catch (Throwable e) { + onError(e); + return; + } + + Subscriber collectionSub = new Subscriber() { + + @Override + public void onNext(U u) { + try { + s.onNext(resultSelector.call(t, u)); + } catch (Throwable e) { + onError(e); + } + } + + @Override + public void onError(Throwable e) { + self.onError(e); + } + + @Override + public void onCompleted() { + try { + self.onCompleted(); + } finally { + csub.remove(this); + } + } + }; + csub.add(collectionSub); + wip.incrementAndGet(); + + collection.unsafeSubscribe(collectionSub); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + if (wip.decrementAndGet() == 0) { + s.onCompleted(); + } + } + + }; + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java new file mode 100644 index 0000000000..88d49cfb57 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java @@ -0,0 +1,131 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicInteger; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * Projects the notification of an observable sequence to an observable + * sequence and merges the results into one. + * + * @param the input value type + * @param the output value type + */ +public final class OperatorMergeMapTransform implements Operator { + final Func1> onNext; + final Func1> onError; + final Func0> onCompleted; + + public OperatorMergeMapTransform(Func1> onNext, + Func1> onError, + Func0> onCompleted) { + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + public Subscriber call(Subscriber child) { + final SerializedSubscriber s = new SerializedSubscriber(child); + final CompositeSubscription csub = new CompositeSubscription(); + child.add(csub); + + return new Subscriber(child) { + final AtomicInteger wip = new AtomicInteger(1); + @Override + public void onNext(T t) { + Observable o; + try { + o = onNext.call(t); + } catch (Throwable e) { + error(e); + return; + } + subscribeTo(o); + } + + @Override + public void onError(Throwable e) { + Observable o; + try { + o = onError.call(e); + } catch (Throwable t) { + error(t); + return; + } + subscribeTo(o); + finish(); + } + + @Override + public void onCompleted() { + Observable o; + try { + o = onCompleted.call(); + } catch (Throwable e) { + error(e); + return; + } + subscribeTo(o); + finish(); + } + void finish() { + if (wip.decrementAndGet() == 0) { + s.onCompleted(); + } + } + void error(Throwable t) { + s.onError(t); + unsubscribe(); + } + void subscribeTo(Observable o) { + Subscriber oSub = new Subscriber() { + + @Override + public void onNext(R t) { + s.onNext(t); + } + + @Override + public void onError(Throwable e) { + error(e); + } + + @Override + public void onCompleted() { + try { + finish(); + } finally { + csub.remove(this); + } + } + }; + csub.add(oSub); + wip.incrementAndGet(); + + o.unsafeSubscribe(oSub); + } + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationFlatMapTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMergeMapTest.java similarity index 99% rename from rxjava-core/src/test/java/rx/operators/OperationFlatMapTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorMergeMapTest.java index 8eb9ec7960..604096b446 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationFlatMapTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMergeMapTest.java @@ -32,7 +32,7 @@ import rx.functions.Func1; import rx.functions.Func2; -public class OperationFlatMapTest { +public class OperatorMergeMapTest { @Test public void testNormal() { @SuppressWarnings("unchecked")