diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index aff51453e8..4afefabc8e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -6255,16 +6255,7 @@ public final Subscription subscribe(Subscriber subscriber) { subscriber = new SafeSubscriber(subscriber); } onSubscribeFunction.call(subscriber); - final Subscription returnSubscription = hook.onSubscribeReturn(subscriber); - // we return it inside a Subscription so it can't be cast back to Subscriber - return Subscriptions.create(new Action0() { - - @Override - public void call() { - returnSubscription.unsubscribe(); - } - - }); + return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); diff --git a/rxjava-core/src/perf/java/rx/usecases/PerfObserveOn.java b/rxjava-core/src/perf/java/rx/usecases/PerfObserveOn.java new file mode 100644 index 0000000000..3b6aa1d2ea --- /dev/null +++ b/rxjava-core/src/perf/java/rx/usecases/PerfObserveOn.java @@ -0,0 +1,30 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.usecases; + +import org.openjdk.jmh.annotations.GenerateMicroBenchmark; + +import rx.schedulers.Schedulers; + +public class PerfObserveOn { + + @GenerateMicroBenchmark + public void observeOn(UseCaseInput input) throws InterruptedException { + input.observable.observeOn(Schedulers.computation()).subscribe(input.observer); + input.awaitCompletion(); + } + +} diff --git a/rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java b/rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java new file mode 100644 index 0000000000..d67ccd3a0e --- /dev/null +++ b/rxjava-core/src/perf/java/rx/usecases/PerfTransforms.java @@ -0,0 +1,71 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.usecases; + +import org.openjdk.jmh.annotations.GenerateMicroBenchmark; + +import rx.Observable; +import rx.functions.Func1; + +public class PerfTransforms { + + @GenerateMicroBenchmark + public void mapTransformation(UseCaseInput input) throws InterruptedException { + input.observable.map(new Func1() { + + @Override + public String call(Integer i) { + return String.valueOf(i); + } + + }).map(new Func1() { + + @Override + public Integer call(String i) { + return Integer.parseInt(i); + } + + }).subscribe(input.observer); + input.awaitCompletion(); + } + + @GenerateMicroBenchmark + public void flatMapTransformsUsingFrom(UseCaseInput input) throws InterruptedException { + input.observable.flatMap(new Func1>() { + + @Override + public Observable call(Integer i) { + return Observable.from(i); + } + + }).subscribe(input.observer); + input.awaitCompletion(); + } + + @GenerateMicroBenchmark + public void flatMapTransformsUsingJust(UseCaseInput input) throws InterruptedException { + input.observable.flatMap(new Func1>() { + + @Override + public Observable call(Integer i) { + return Observable.just(i); + } + + }).subscribe(input.observer); + input.awaitCompletion(); + } + +} diff --git a/rxjava-core/src/perf/java/rx/usecases/UseCaseInput.java b/rxjava-core/src/perf/java/rx/usecases/UseCaseInput.java new file mode 100644 index 0000000000..c0463d9190 --- /dev/null +++ b/rxjava-core/src/perf/java/rx/usecases/UseCaseInput.java @@ -0,0 +1,81 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.usecases; + +import java.util.concurrent.CountDownLatch; + +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.logic.BlackHole; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Subscriber; + +/** + * Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole. + */ +@State(Scope.Thread) +public class UseCaseInput { + @Param({ "1", "1024" }) + public int size; + + public Observable observable; + public Observer observer; + + private CountDownLatch latch; + + @Setup + public void setup() { + observable = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber o) { + for (int value = 0; value < size; value++) { + o.onNext(value); + } + o.onCompleted(); + } + }); + + final BlackHole bh = new BlackHole(); + latch = new CountDownLatch(1); + + observer = new Observer() { + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Integer value) { + bh.consume(value); + } + }; + + } + + public void awaitCompletion() throws InterruptedException { + latch.await(); + } +}