diff --git a/pom.xml b/pom.xml index e71473b183..e64f858714 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 16 2.2.4 1.3.0 - 0.17.1 + 0.18.1 1.8.9 diff --git a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java index 7b1c1ff91e..1746c13d98 100644 --- a/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java +++ b/retrofit-mock/src/main/java/retrofit/MockRestAdapter.java @@ -8,13 +8,12 @@ import java.lang.reflect.Proxy; import java.util.Map; import java.util.Random; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import retrofit.client.Request; import retrofit.client.Response; import rx.Observable; -import rx.Scheduler; import rx.Subscriber; -import rx.schedulers.Schedulers; import static retrofit.RestAdapter.LogLevel; import static retrofit.RetrofitError.unexpectedError; @@ -525,30 +524,36 @@ private static long uptimeMillis() { /** Indirection to avoid VerifyError if RxJava isn't present. */ private static class MockRxSupport { - private final Scheduler scheduler; + private final Executor httpExecutor; private final ErrorHandler errorHandler; MockRxSupport(RestAdapter restAdapter) { - scheduler = Schedulers.executor(restAdapter.httpExecutor); + httpExecutor = restAdapter.httpExecutor; errorHandler = restAdapter.errorHandler; } Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo, final RequestInterceptor interceptor, final Object[] args) { return Observable.create(new Observable.OnSubscribe() { - @Override public void call(Subscriber subscriber) { - try { - Observable observable = - (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); - //noinspection unchecked - observable.subscribe(subscriber); - } catch (RetrofitError e) { - subscriber.onError(errorHandler.handleError(e)); - } catch (Throwable e) { - subscriber.onError(e); - } + @Override public void call(final Subscriber subscriber) { + if (subscriber.isUnsubscribed()) return; + httpExecutor.execute(new Runnable() { + @Override public void run() { + try { + if (subscriber.isUnsubscribed()) return; + Observable observable = + (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); + //noinspection unchecked + observable.subscribe(subscriber); + } catch (RetrofitError e) { + subscriber.onError(errorHandler.handleError(e)); + } catch (Throwable e) { + subscriber.onError(e); + } + } + }); } - }).subscribeOn(scheduler); + }); } } } diff --git a/retrofit/src/main/java/retrofit/RestAdapter.java b/retrofit/src/main/java/retrofit/RestAdapter.java index 18174734fd..ba2941f3d4 100644 --- a/retrofit/src/main/java/retrofit/RestAdapter.java +++ b/retrofit/src/main/java/retrofit/RestAdapter.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; + import retrofit.Profiler.RequestInformation; import retrofit.client.Client; import retrofit.client.Header; @@ -38,10 +39,6 @@ import retrofit.mime.TypedByteArray; import retrofit.mime.TypedInput; import retrofit.mime.TypedOutput; -import rx.Observable; -import rx.Scheduler; -import rx.Subscriber; -import rx.schedulers.Schedulers; /** * Adapts a Java interface to a REST API. @@ -227,40 +224,6 @@ static RestMethodInfo getMethodInfo(Map cache, Method me } } - /** Indirection to avoid VerifyError if RxJava isn't present. */ - private static final class RxSupport { - private final Scheduler scheduler; - private final ErrorHandler errorHandler; - - RxSupport(Executor executor, ErrorHandler errorHandler) { - this.scheduler = Schedulers.executor(executor); - this.errorHandler = errorHandler; - } - - Observable createRequestObservable(final Callable request) { - return Observable.create(new Observable.OnSubscribe() { - @Override public void call(Subscriber subscriber) { - if (subscriber.isUnsubscribed()) { - return; - } - try { - ResponseWrapper wrapper = request.call(); - if (subscriber.isUnsubscribed()) { - return; - } - subscriber.onNext(wrapper.responseBody); - subscriber.onCompleted(); - } catch (RetrofitError e) { - subscriber.onError(errorHandler.handleError(e)); - } catch (Exception e) { - // This is from the Callable. It shouldn't actually throw. - throw new RuntimeException(e); - } - } - }).subscribeOn(scheduler); - } - } - private class RestHandler implements InvocationHandler { private final Map methodDetailsCache; diff --git a/retrofit/src/main/java/retrofit/RxSupport.java b/retrofit/src/main/java/retrofit/RxSupport.java new file mode 100644 index 0000000000..b728205e72 --- /dev/null +++ b/retrofit/src/main/java/retrofit/RxSupport.java @@ -0,0 +1,65 @@ +package retrofit; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.subscriptions.Subscriptions; + +/** + * Utilities for supporting RxJava Observables. + * Used primarily by {@link retrofit.RestAdapter}. + * + * Remember RxJava might not be on the classpath, check its included before calling, use + * {@link Platform#HAS_RX_JAVA} + */ +final class RxSupport { + private final Executor executor; + private final ErrorHandler errorHandler; + + RxSupport(Executor executor, ErrorHandler errorHandler) { + this.executor = executor; + this.errorHandler = errorHandler; + } + + Observable createRequestObservable(final Callable request) { + return Observable.create(new Observable.OnSubscribe() { + @Override public void call(Subscriber subscriber) { + if (subscriber.isUnsubscribed()) { + return; + } + final FutureTask task = new FutureTask(getRunnable(subscriber, request), null); + final Subscription s = Subscriptions.from(task); + // We add our subscription to the current subscriber so the future task can be + // unSubscribed from. + subscriber.add(s); + executor.execute(task); + } + }); + } + + private Runnable getRunnable(final Subscriber subscriber, + final Callable request) { + return new Runnable() { + @Override public void run() { + try { + if (subscriber.isUnsubscribed()) { + return; + } + ResponseWrapper wrapper = request.call(); + subscriber.onNext(wrapper.responseBody); + subscriber.onCompleted(); + } catch (RetrofitError e) { + subscriber.onError(errorHandler.handleError(e)); + } catch (Exception e) { + // This is from the Callable. It shouldn't actually throw. + throw new RuntimeException(e); + } + } + }; + } + +} diff --git a/retrofit/src/test/java/retrofit/RxSupportTest.java b/retrofit/src/test/java/retrofit/RxSupportTest.java new file mode 100644 index 0000000000..e37da9c513 --- /dev/null +++ b/retrofit/src/test/java/retrofit/RxSupportTest.java @@ -0,0 +1,186 @@ +package retrofit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import retrofit.client.Header; +import retrofit.client.Response; +import retrofit.mime.TypedInput; +import rx.Observer; +import rx.Subscription; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class RxSupportTest { + + private Object response; + private ResponseWrapper responseWrapper; + private Callable callable = spy(new Callable() { + @Override public ResponseWrapper call() throws Exception { + return responseWrapper; + } + }); + + private QueuedSynchronousExecutor executor; + private ErrorHandler errorHandler; + private RxSupport rxSupport; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + response = new Object(); + responseWrapper = new ResponseWrapper( + new Response( + "http://example.com", 200, "Success", + Collections.
emptyList(), mock(TypedInput.class) + ), response + ); + executor = spy(new QueuedSynchronousExecutor()); + errorHandler = ErrorHandler.DEFAULT; + rxSupport = new RxSupport(executor, errorHandler); + } + + @Mock + Observer subscriber; + + @Test + public void testObservableCallsOnNextOnHttpExecutor() throws Exception { + rxSupport.createRequestObservable(callable).subscribe(subscriber); + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableCallsOnNextOnHttpExecutorWithSubscriber() throws Exception { + TestScheduler test = Schedulers.test(); + rxSupport.createRequestObservable(callable).subscribeOn(test).subscribe(subscriber); + // Subscription is handled via the Scheduler. + test.triggerActions(); + // This will only execute up to the executor in OnSubscribe. + verify(subscriber, never()).onNext(any()); + // Upon continuing the executor we then run the retrofit request. + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableUnSubscribesDoesNotExecuteCallable() throws Exception { + Subscription subscription = rxSupport.createRequestObservable(callable).subscribe(subscriber); + verify(subscriber, never()).onNext(any()); + + // UnSubscribe here should cancel the queued runnable. + subscription.unsubscribe(); + + executor.executeNextInQueue(); + verify(callable, never()).call(); + verify(subscriber, never()).onNext(response); + } + + @Test + public void testObservableCallsOperatorsOffHttpExecutor() throws Exception { + TestScheduler test = Schedulers.test(); + rxSupport.createRequestObservable(callable) + .delaySubscription(1000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + verify(subscriber, never()).onNext(any()); + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + // Upon continuing the executor we then run the retrofit request. + executor.executeNextInQueue(); + verify(subscriber, times(1)).onNext(response); + } + + @Test + public void testObservableDoesNotLockExecutor() throws Exception { + TestScheduler test = Schedulers.test(); + Subscription subscription1 = rxSupport.createRequestObservable(callable) + .delay(1000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + Subscription subscription2 = rxSupport.createRequestObservable(callable) + .delay(2000, TimeUnit.MILLISECONDS, test) + .subscribe(subscriber); + + // Nothing fired yet + verify(subscriber, never()).onNext(any()); + // Subscriptions should of been queued up and executed even tho we delayed on the Subscriber. + executor.executeNextInQueue(); + executor.executeNextInQueue(); + + verify(subscriber, never()).onNext(response); + + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + verify(subscriber, times(1)).onNext(response); + + test.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + verify(subscriber, times(2)).onNext(response); + } + + @Test + public void testObservableRespectsObserveOn() throws Exception { + TestScheduler observe = Schedulers.test(); + rxSupport.createRequestObservable(callable) + .observeOn(observe) + .subscribe(subscriber); + + verify(subscriber, never()).onNext(any()); + executor.executeNextInQueue(); + + // Should have no response yet, but callback should of been executed. + verify(subscriber, never()).onNext(any()); + verify(callable, times(1)).call(); + + // Forward the Observable Scheduler + observe.triggerActions(); + verify(subscriber, times(1)).onNext(response); + } + + /** + * Test Executor to iterate through Executions to aid in checking + * that the Observable implementation is correct. + */ + static class QueuedSynchronousExecutor implements Executor { + Deque runnableQueue = new ArrayDeque(); + + @Override public void execute(Runnable runnable) { + runnableQueue.add(runnable); + } + + /** + * Will throw exception if you are expecting something to be added to the Executor + * and it hasn't. + */ + void executeNextInQueue() { + runnableQueue.removeFirst().run(); + } + + /** + * Executes any queued executions on the executor. + */ + void executeAll() { + Iterator iterator = runnableQueue.iterator(); + while (iterator.hasNext()) { + Runnable next = iterator.next(); + next.run(); + iterator.remove(); + } + } + } +} \ No newline at end of file