Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for RxJava 0.18.+ #468

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<android.platform>16</android.platform>
<gson.version>2.2.4</gson.version>
<okhttp.version>1.3.0</okhttp.version>
<rxjava.version>0.17.1</rxjava.version>
<rxjava.version>0.18.1</rxjava.version>
<appengine.version>1.8.9</appengine.version>

<!-- Converter Dependencies -->
Expand Down
37 changes: 21 additions & 16 deletions retrofit-mock/src/main/java/retrofit/MockRestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import retrofit.client.Request;
import retrofit.client.Response;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import static retrofit.RestAdapter.LogLevel;
import static retrofit.RetrofitError.unexpectedError;
Expand Down Expand Up @@ -525,30 +524,36 @@ private static long uptimeMillis() {

/** Indirection to avoid VerifyError if RxJava isn't present. */
private static class MockRxSupport {
private final Scheduler scheduler;
private final Executor httpExecutor;
private final ErrorHandler errorHandler;

MockRxSupport(RestAdapter restAdapter) {
scheduler = Schedulers.executor(restAdapter.httpExecutor);
httpExecutor = restAdapter.httpExecutor;
errorHandler = restAdapter.errorHandler;
}

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
final RequestInterceptor interceptor, final Object[] args) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
try {
Observable observable =
(Observable) mockHandler.invokeSync(methodInfo, interceptor, args);
//noinspection unchecked
observable.subscribe(subscriber);
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Throwable e) {
subscriber.onError(e);
}
@Override public void call(final Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) return;
httpExecutor.execute(new Runnable() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to support cancelation, you can capture the Future from this and register it with the Subscriber.

For example:

subscriber.add(Subscriptions.from(future));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is the MockRestAdapter so it probably doesn't matter ...

@Override public void run() {
try {
if (subscriber.isUnsubscribed()) return;
Observable observable =
(Observable) mockHandler.invokeSync(methodInfo, interceptor, args);
//noinspection unchecked
observable.subscribe(subscriber);
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
}
}).subscribeOn(scheduler);
});
}
}
}
39 changes: 1 addition & 38 deletions retrofit/src/main/java/retrofit/RestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import retrofit.Profiler.RequestInformation;
import retrofit.client.Client;
import retrofit.client.Header;
Expand All @@ -38,10 +39,6 @@
import retrofit.mime.TypedByteArray;
import retrofit.mime.TypedInput;
import retrofit.mime.TypedOutput;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

/**
* Adapts a Java interface to a REST API.
Expand Down Expand Up @@ -227,40 +224,6 @@ static RestMethodInfo getMethodInfo(Map<Method, RestMethodInfo> cache, Method me
}
}

/** Indirection to avoid VerifyError if RxJava isn't present. */
private static final class RxSupport {
private final Scheduler scheduler;
private final ErrorHandler errorHandler;

RxSupport(Executor executor, ErrorHandler errorHandler) {
this.scheduler = Schedulers.executor(executor);
this.errorHandler = errorHandler;
}

Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
ResponseWrapper wrapper = request.call();
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(wrapper.responseBody);
subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
}).subscribeOn(scheduler);
}
}

private class RestHandler implements InvocationHandler {
private final Map<Method, RestMethodInfo> methodDetailsCache;

Expand Down
65 changes: 65 additions & 0 deletions retrofit/src/main/java/retrofit/RxSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package retrofit;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.subscriptions.Subscriptions;

/**
* Utilities for supporting RxJava Observables.
* Used primarily by {@link retrofit.RestAdapter}.
*
* Remember RxJava might not be on the classpath, check its included before calling, use
* {@link Platform#HAS_RX_JAVA}
*/
final class RxSupport {
private final Executor executor;
private final ErrorHandler errorHandler;

RxSupport(Executor executor, ErrorHandler errorHandler) {
this.executor = executor;
this.errorHandler = errorHandler;
}

Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
final FutureTask<Void> task = new FutureTask<Void>(getRunnable(subscriber, request), null);
final Subscription s = Subscriptions.from(task);
// We add our subscription to the current subscriber so the future task can be
// unSubscribed from.
subscriber.add(s);
executor.execute(task);
}
});
}

private Runnable getRunnable(final Subscriber<? super Object> subscriber,
final Callable<ResponseWrapper> request) {
return new Runnable() {
@Override public void run() {
try {
if (subscriber.isUnsubscribed()) {
return;
}
ResponseWrapper wrapper = request.call();
subscriber.onNext(wrapper.responseBody);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this method so that onNext is called outside of the same try/catch as that which invokes the request. I can do that in a follow-up.

subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
};
}

}
186 changes: 186 additions & 0 deletions retrofit/src/test/java/retrofit/RxSupportTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package retrofit;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import retrofit.client.Header;
import retrofit.client.Response;
import retrofit.mime.TypedInput;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class RxSupportTest {

private Object response;
private ResponseWrapper responseWrapper;
private Callable<ResponseWrapper> callable = spy(new Callable<ResponseWrapper>() {
@Override public ResponseWrapper call() throws Exception {
return responseWrapper;
}
});

private QueuedSynchronousExecutor executor;
private ErrorHandler errorHandler;
private RxSupport rxSupport;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
response = new Object();
responseWrapper = new ResponseWrapper(
new Response(
"http://example.com", 200, "Success",
Collections.<Header>emptyList(), mock(TypedInput.class)
), response
);
executor = spy(new QueuedSynchronousExecutor());
errorHandler = ErrorHandler.DEFAULT;
rxSupport = new RxSupport(executor, errorHandler);
}

@Mock
Observer<Object> subscriber;

@Test
public void testObservableCallsOnNextOnHttpExecutor() throws Exception {
rxSupport.createRequestObservable(callable).subscribe(subscriber);
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}

@Test
public void testObservableCallsOnNextOnHttpExecutorWithSubscriber() throws Exception {
TestScheduler test = Schedulers.test();
rxSupport.createRequestObservable(callable).subscribeOn(test).subscribe(subscriber);
// Subscription is handled via the Scheduler.
test.triggerActions();
// This will only execute up to the executor in OnSubscribe.
verify(subscriber, never()).onNext(any());
// Upon continuing the executor we then run the retrofit request.
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}

@Test
public void testObservableUnSubscribesDoesNotExecuteCallable() throws Exception {
Subscription subscription = rxSupport.createRequestObservable(callable).subscribe(subscriber);
verify(subscriber, never()).onNext(any());

// UnSubscribe here should cancel the queued runnable.
subscription.unsubscribe();

executor.executeNextInQueue();
verify(callable, never()).call();
verify(subscriber, never()).onNext(response);
}

@Test
public void testObservableCallsOperatorsOffHttpExecutor() throws Exception {
TestScheduler test = Schedulers.test();
rxSupport.createRequestObservable(callable)
.delaySubscription(1000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);

verify(subscriber, never()).onNext(any());
test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// Upon continuing the executor we then run the retrofit request.
executor.executeNextInQueue();
verify(subscriber, times(1)).onNext(response);
}

@Test
public void testObservableDoesNotLockExecutor() throws Exception {
TestScheduler test = Schedulers.test();
Subscription subscription1 = rxSupport.createRequestObservable(callable)
.delay(1000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);

Subscription subscription2 = rxSupport.createRequestObservable(callable)
.delay(2000, TimeUnit.MILLISECONDS, test)
.subscribe(subscriber);

// Nothing fired yet
verify(subscriber, never()).onNext(any());
// Subscriptions should of been queued up and executed even tho we delayed on the Subscriber.
executor.executeNextInQueue();
executor.executeNextInQueue();

verify(subscriber, never()).onNext(response);

test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(response);

test.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
verify(subscriber, times(2)).onNext(response);
}

@Test
public void testObservableRespectsObserveOn() throws Exception {
TestScheduler observe = Schedulers.test();
rxSupport.createRequestObservable(callable)
.observeOn(observe)
.subscribe(subscriber);

verify(subscriber, never()).onNext(any());
executor.executeNextInQueue();

// Should have no response yet, but callback should of been executed.
verify(subscriber, never()).onNext(any());
verify(callable, times(1)).call();

// Forward the Observable Scheduler
observe.triggerActions();
verify(subscriber, times(1)).onNext(response);
}

/**
* Test Executor to iterate through Executions to aid in checking
* that the Observable implementation is correct.
*/
static class QueuedSynchronousExecutor implements Executor {
Deque<Runnable> runnableQueue = new ArrayDeque<Runnable>();

@Override public void execute(Runnable runnable) {
runnableQueue.add(runnable);
}

/**
* Will throw exception if you are expecting something to be added to the Executor
* and it hasn't.
*/
void executeNextInQueue() {
runnableQueue.removeFirst().run();
}

/**
* Executes any queued executions on the executor.
*/
void executeAll() {
Iterator<Runnable> iterator = runnableQueue.iterator();
while (iterator.hasNext()) {
Runnable next = iterator.next();
next.run();
iterator.remove();
}
}
}
}