diff --git a/rxjava-contrib/rxjava-async-util/build.gradle b/rxjava-contrib/rxjava-async-util/build.gradle new file mode 100644 index 0000000000..09d9aae655 --- /dev/null +++ b/rxjava-contrib/rxjava-async-util/build.gradle @@ -0,0 +1,20 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +jar { + manifest { + name = 'rxjava-async-util' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + } +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Async.java b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java similarity index 75% rename from rxjava-core/src/main/java/rx/util/functions/Async.java rename to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java index 51a1787715..5505d9a91e 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Async.java +++ b/rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java @@ -1,374 +1,482 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.util.functions; +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.async; import rx.Observable; import rx.Scheduler; -import rx.schedulers.ExecutorScheduler; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; +import rx.util.functions.Action4; +import rx.util.functions.Action5; +import rx.util.functions.Action6; +import rx.util.functions.Action7; +import rx.util.functions.Action8; +import rx.util.functions.Action9; +import rx.util.functions.ActionN; +import rx.util.functions.Actions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; +import rx.util.functions.FuncN; /** * Utility methods to convert functions and actions into asynchronous * operations through the Observable/Observer pattern. */ public final class Async { - private Async() { throw new IllegalStateException("No instances!"); } + private Async() { + throw new IllegalStateException("No instances!"); + } + /** - * {@link Scheduler} intended for asynchronous conversions. + * Invokes the specified function asynchronously and returns an Observable + * that emits the result. + *

+ * Note: The function is called immediately and once, not whenever an + * observer subscribes to the resulting Observable. Multiple subscriptions + * to this Observable observe the same return value. *

- * Defaults to {@link #threadPoolForComputation()}. - * - * @return {@link ExecutorScheduler} for asynchronous conversion work. + * + * + * @param func + * function to run asynchronously + * @return an Observable that emits the function's result value, or notifies + * observers of an exception + * @see RxJava Wiki: start() + * @see MSDN: Observable.Start */ - public static Scheduler threadPoolForAsyncConversions() { - return Schedulers.threadPoolForComputation(); + public static Observable start(Func0 func) { + return Async.toAsync(func).call(); } + + /** + * Invokes the specified function asynchronously on the specified scheduler + * and returns an Observable that emits the result. + *

+ * Note: The function is called immediately and once, not whenever an + * observer subscribes to the resulting Observable. Multiple subscriptions + * to this Observable observe the same return value. + *

+ * + * + * @param func + * function to run asynchronously + * @param scheduler + * scheduler to run the function on + * @return an Observable that emits the function's result value, or notifies + * observers of an exception + * @see RxJava Wiki: start() + * @see MSDN: Observable.Start + */ + public static Observable start(Func0 func, Scheduler scheduler) { + return Async.toAsync(func, scheduler).call(); + } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func0> toAsync(Action0 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func0> toAsync(Func0 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func1> toAsync(Action1 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func1> toAsync(Func1 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func2> toAsync(Action2 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func2> toAsync(Func2 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func3> toAsync(Action3 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func3> toAsync(Func3 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ - public static Func4> toAsync(Action4 action) { - return toAsync(action, threadPoolForAsyncConversions()); + public static Func4> toAsync(Action4 action) { + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func4> toAsync(Func4 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func5> toAsync(Action5 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func5> toAsync(Func5 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func6> toAsync(Action6 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func6> toAsync(Func6 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func7> toAsync(Action7 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func7> toAsync(Func7 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func8> toAsync(Action8 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func8> toAsync(Func8 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func9> toAsync(Action9 action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func9> toAsync(Func9 func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * + * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * */ public static FuncN> toAsync(ActionN action) { - return toAsync(action, threadPoolForAsyncConversions()); + return toAsync(action, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * + * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * */ public static FuncN> toAsync(FuncN func) { - return toAsync(func, threadPoolForAsyncConversions()); + return toAsync(func, Schedulers.threadPoolForComputation()); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func0> toAsync(final Action0 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func0> toAsync(final Func0 func, final Scheduler scheduler) { @@ -394,31 +502,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func1> toAsync(final Action1 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func1> toAsync(final Func1 func, final Scheduler scheduler) { @@ -444,31 +558,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func2> toAsync(final Action2 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func2> toAsync(final Func2 func, final Scheduler scheduler) { @@ -494,31 +614,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func3> toAsync(final Action3 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func3> toAsync(final Func3 func, final Scheduler scheduler) { @@ -536,7 +662,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -544,31 +670,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func4> toAsync(final Action4 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func4> toAsync(final Func4 func, final Scheduler scheduler) { @@ -586,7 +718,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -594,31 +726,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func5> toAsync(final Action5 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func5> toAsync(final Func5 func, final Scheduler scheduler) { @@ -636,7 +774,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -644,31 +782,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func6> toAsync(final Action6 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func6> toAsync(final Func6 func, final Scheduler scheduler) { @@ -686,7 +830,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -694,31 +838,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func7> toAsync(final Action7 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func7> toAsync(final Func7 func, final Scheduler scheduler) { @@ -736,7 +886,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -744,31 +894,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func8> toAsync(final Action8 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func8> toAsync(final Func8 func, final Scheduler scheduler) { @@ -786,7 +942,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -794,31 +950,37 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * * @see MSDN: Observable.ToAsync */ public static Func9> toAsync(final Action9 action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * * @see MSDN: Observable.ToAsync */ public static Func9> toAsync(final Func9 func, final Scheduler scheduler) { @@ -836,7 +998,7 @@ public void call() { subject.onError(t); return; } - subject.onNext(result); + subject.onNext(result); subject.onCompleted(); } }); @@ -844,30 +1006,36 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. - * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * */ public static FuncN> toAsync(final ActionN action, final Scheduler scheduler) { return toAsync(Actions.toFunc(action), scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. - * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * */ public static FuncN> toAsync(final FuncN func, final Scheduler scheduler) { return new FuncN>() { @@ -892,64 +1060,74 @@ public void call() { } }; } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. *

* Alias for toAsync(ActionN) intended for dynamic languages. * - * @param action the action to convert - * + * @param action + * the action to convert + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * */ public static FuncN> asyncAction(final ActionN action) { return toAsync(action); } + /** * Convert a synchronous action call into an asynchronous function * call through an Observable sequence. *

* Alias for toAsync(ActionN, Scheduler) intended for dynamic languages. * - * @param action the action to convert - * @param scheduler the scheduler used to execute the {@code action} - * + * @param action + * the action to convert + * @param scheduler + * the scheduler used to execute the {@code action} + * * @return a function which returns an observable sequence which * executes the {@code action} and emits {@code null}. - * + * */ public static FuncN> asyncAction(final ActionN action, final Scheduler scheduler) { return toAsync(action, scheduler); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. *

* Alias for toAsync(FuncN) intended for dynamic languages. * - * @param func the function to convert - * + * @param func + * the function to convert + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * */ public static FuncN> asyncFunc(final FuncN func) { return toAsync(func); } + /** * Convert a synchronous function call into an asynchronous function * call through an Observable sequence. *

* Alias for toAsync(FuncN, Scheduler) intended for dynamic languages. * - * @param func the function to convert - * @param scheduler the scheduler used to call the {@code func} - * + * @param func + * the function to convert + * @param scheduler + * the scheduler used to call the {@code func} + * * @return a function which returns an observable sequence which * executes the {@code func} and emits its returned value. - * + * */ public static FuncN> asyncFunc(final FuncN func, final Scheduler scheduler) { return toAsync(func, scheduler); diff --git a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java similarity index 84% rename from rxjava-core/src/test/java/rx/util/functions/AsyncTest.java rename to rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java index 05a75c4f85..943e8ff898 100644 --- a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java @@ -14,20 +14,29 @@ * limitations under the License. */ -package rx.util.functions; +package rx.util.async; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import junit.framework.Assert; + import org.junit.Before; import org.junit.Test; -import static org.mockito.Matchers.any; +import org.mockito.InOrder; import org.mockito.Mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import rx.Observable; import rx.Observer; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; @@ -54,10 +63,12 @@ public class AsyncTest { @Mock Observer observer; + @Before public void before() { MockitoAnnotations.initMocks(this); } + @Test public void testAction0() { final AtomicInteger value = new AtomicInteger(); @@ -67,17 +78,18 @@ public void call() { value.incrementAndGet(); } }; - + Async.toAsync(action, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1, value.get()); } + @Test public void testAction0Error() { Action0 action = new Action0() { @@ -86,15 +98,16 @@ public void call() { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction1() { final AtomicInteger value = new AtomicInteger(); @@ -104,17 +117,18 @@ public void call(Integer t1) { value.set(t1); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1, value.get()); } + @Test public void testAction1Error() { Action1 action = new Action1() { @@ -123,15 +137,16 @@ public void call(Integer t1) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction2() { final AtomicInteger value = new AtomicInteger(); @@ -141,17 +156,18 @@ public void call(Integer t1, Integer t2) { value.set(t1 | t2); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(3, value.get()); } + @Test public void testAction2Error() { Action2 action = new Action2() { @@ -160,15 +176,16 @@ public void call(Integer t1, Integer t2) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction3() { final AtomicInteger value = new AtomicInteger(); @@ -178,17 +195,18 @@ public void call(Integer t1, Integer t2, Integer t3) { value.set(t1 | t2 | t3); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(7, value.get()); } + @Test public void testAction3Error() { Action3 action = new Action3() { @@ -197,15 +215,16 @@ public void call(Integer t1, Integer t2, Integer t3) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction4() { final AtomicInteger value = new AtomicInteger(); @@ -215,17 +234,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4) { value.set(t1 | t2 | t3 | t4); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(15, value.get()); } + @Test public void testAction4Error() { Action4 action = new Action4() { @@ -234,15 +254,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction5() { final AtomicInteger value = new AtomicInteger(); @@ -252,17 +273,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { value.set(t1 | t2 | t3 | t4 | t5); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(31, value.get()); } + @Test public void testAction5Error() { Action5 action = new Action5() { @@ -271,15 +293,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction6() { final AtomicInteger value = new AtomicInteger(); @@ -289,17 +312,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(63, value.get()); } + @Test public void testAction6Error() { Action6 action = new Action6() { @@ -308,15 +332,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction7() { final AtomicInteger value = new AtomicInteger(); @@ -326,17 +351,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(127, value.get()); } + @Test public void testAction7Error() { Action7 action = new Action7() { @@ -345,15 +371,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction8() { final AtomicInteger value = new AtomicInteger(); @@ -363,17 +390,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(255, value.get()); } + @Test public void testAction8Error() { Action8 action = new Action8() { @@ -382,15 +410,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testAction9() { final AtomicInteger value = new AtomicInteger(); @@ -400,17 +429,18 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8 | t9); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(511, value.get()); } + @Test public void testAction9Error() { Action9 action = new Action9() { @@ -419,15 +449,16 @@ public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Int throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testActionN() { final AtomicInteger value = new AtomicInteger(); @@ -436,22 +467,23 @@ public void testActionN() { public void call(Object... args) { int i = 0; for (Object o : args) { - i = i | (Integer)o; + i = i | (Integer) o; } value.set(i); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(null); verify(observer, times(1)).onCompleted(); - + Assert.assertEquals(1023, value.get()); } + @Test public void testActionNError() { ActionN action = new ActionN() { @@ -460,15 +492,16 @@ public void call(Object... args) { throw new RuntimeException("Forced failure"); } }; - + Async.toAsync(action, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onNext(null); verify(observer, never()).onCompleted(); } + @Test public void testFunc0() { Func0 func = new Func0() { @@ -480,12 +513,13 @@ public Integer call() { Async.toAsync(func, Schedulers.immediate()) .call() .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(0); verify(observer, times(1)).onCompleted(); - + } + @Test public void testFunc1() { Func1 func = new Func1() { @@ -497,11 +531,12 @@ public Integer call(Integer t1) { Async.toAsync(func, Schedulers.immediate()) .call(1) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc2() { Func2 func = new Func2() { @@ -513,11 +548,12 @@ public Integer call(Integer t1, Integer t2) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(3); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc3() { Func3 func = new Func3() { @@ -529,11 +565,12 @@ public Integer call(Integer t1, Integer t2, Integer t3) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(7); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc4() { Func4 func = new Func4() { @@ -545,11 +582,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(15); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc5() { Func5 func = new Func5() { @@ -561,11 +599,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(31); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc6() { Func6 func = new Func6() { @@ -577,11 +616,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(63); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc7() { Func7 func = new Func7() { @@ -593,11 +633,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(127); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc8() { Func8 func = new Func8() { @@ -609,11 +650,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(255); verify(observer, times(1)).onCompleted(); } + @Test public void testFunc9() { Func9 func = new Func9() { @@ -625,11 +667,12 @@ public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(511); verify(observer, times(1)).onCompleted(); } + @Test public void testFuncN() { FuncN func = new FuncN() { @@ -637,7 +680,7 @@ public void testFuncN() { public Integer call(Object... args) { int i = 0; for (Object o : args) { - i = i | (Integer)o; + i = i | (Integer) o; } return i; } @@ -645,9 +688,133 @@ public Integer call(Object... args) { Async.toAsync(func, Schedulers.immediate()) .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) .subscribe(observer); - + verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(1023); verify(observer, times(1)).onCompleted(); } + + @Test + public void testStartWithFunc() { + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + assertEquals("one", Async.start(func).toBlockingObservable().single()); + } + + @Test(expected = RuntimeException.class) + public void testStartWithFuncError() { + Func0 func = new Func0() { + @Override + public String call() { + throw new RuntimeException("Some error"); + } + }; + Async.start(func).toBlockingObservable().single(); + } + + @Test + public void testStartWhenSubscribeRunBeforeFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Async.start(func, scheduler); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verifyNoMoreInteractions(); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWhenSubscribeRunAfterFunc() { + TestScheduler scheduler = new TestScheduler(); + + Func0 func = new Func0() { + @Override + public String call() { + return "one"; + } + }; + + Observable observable = Async.start(func, scheduler); + + // Run func + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWithFuncAndMultipleObservers() { + TestScheduler scheduler = new TestScheduler(); + + @SuppressWarnings("unchecked") + Func0 func = (Func0) mock(Func0.class); + doAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return "one"; + } + }).when(func).call(); + + Observable observable = Async.start(func, scheduler); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer3 = mock(Observer.class); + + observable.subscribe(observer1); + observable.subscribe(observer2); + observable.subscribe(observer3); + + InOrder inOrder; + inOrder = inOrder(observer1); + inOrder.verify(observer1, times(1)).onNext("one"); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext("one"); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + inOrder = inOrder(observer3); + inOrder.verify(observer3, times(1)).onNext("one"); + inOrder.verify(observer3, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(func, times(1)).call(); + } + } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ebf9362123..e311414af8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -125,7 +125,6 @@ import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; -import rx.util.functions.Async; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -7711,45 +7710,4 @@ public Observable> gro return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } - /** - * Invokes the specified function asynchronously and returns an Observable - * that emits the result. - *

- * Note: The function is called immediately and once, not whenever an - * observer subscribes to the resulting Observable. Multiple subscriptions - * to this Observable observe the same return value. - *

- * - * - * @param func function to run asynchronously - * @return an Observable that emits the function's result value, or notifies - * observers of an exception - * @see RxJava Wiki: start() - * @see MSDN: Observable.Start - */ - public static Observable start(Func0 func) { - return Async.toAsync(func).call(); - } - - /** - * Invokes the specified function asynchronously on the specified scheduler - * and returns an Observable that emits the result. - *

- * Note: The function is called immediately and once, not whenever an - * observer subscribes to the resulting Observable. Multiple subscriptions - * to this Observable observe the same return value. - *

- * - * - * @param func function to run asynchronously - * @param scheduler scheduler to run the function on - * @return an Observable that emits the function's result value, or notifies - * observers of an exception - * @see RxJava Wiki: start() - * @see MSDN: Observable.Start - */ - public static Observable start(Func0 func, Scheduler scheduler) { - return Async.toAsync(func, scheduler).call(); - } - } diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 22e0f9cd60..fb3b95e7db 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -969,129 +969,6 @@ public void testRangeWithScheduler() { inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } - - @Test - public void testStartWithFunc() { - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - assertEquals("one", Observable.start(func).toBlockingObservable().single()); - } - - @Test(expected = RuntimeException.class) - public void testStartWithFuncError() { - Func0 func = new Func0() { - @Override - public String call() { - throw new RuntimeException("Some error"); - } - }; - Observable.start(func).toBlockingObservable().single(); - } - - @Test - public void testStartWhenSubscribeRunBeforeFunc() { - TestScheduler scheduler = new TestScheduler(); - - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - - Observable observable = Observable.start(func, scheduler); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verifyNoMoreInteractions(); - - // Run func - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - inOrder.verify(observer, times(1)).onNext("one"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWhenSubscribeRunAfterFunc() { - TestScheduler scheduler = new TestScheduler(); - - Func0 func = new Func0() { - @Override - public String call() { - return "one"; - } - }; - - Observable observable = Observable.start(func, scheduler); - - // Run func - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext("one"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testStartWithFuncAndMultipleObservers() { - TestScheduler scheduler = new TestScheduler(); - - @SuppressWarnings("unchecked") - Func0 func = (Func0) mock(Func0.class); - doAnswer(new Answer() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return "one"; - } - }).when(func).call(); - - Observable observable = Observable.start(func, scheduler); - - scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); - - @SuppressWarnings("unchecked") - Observer observer1 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer2 = mock(Observer.class); - @SuppressWarnings("unchecked") - Observer observer3 = mock(Observer.class); - - observable.subscribe(observer1); - observable.subscribe(observer2); - observable.subscribe(observer3); - - InOrder inOrder; - inOrder = inOrder(observer1); - inOrder.verify(observer1, times(1)).onNext("one"); - inOrder.verify(observer1, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer2); - inOrder.verify(observer2, times(1)).onNext("one"); - inOrder.verify(observer2, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - inOrder = inOrder(observer3); - inOrder.verify(observer3, times(1)).onNext("one"); - inOrder.verify(observer3, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - verify(func, times(1)).call(); - } @Test public void testCollectToList() { diff --git a/settings.gradle b/settings.gradle index 176e9150c5..c40f29adb6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,4 +8,5 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-swing', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-apache-http', \ -'rxjava-contrib:rxjava-string' +'rxjava-contrib:rxjava-string', \ +'rxjava-contrib:rxjava-async-util'