Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for RxJava 0.18.+ #468

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<android.platform>16</android.platform>
<gson.version>2.2.4</gson.version>
<okhttp.version>1.3.0</okhttp.version>
<rxjava.version>0.17.1</rxjava.version>
<rxjava.version>0.18.1</rxjava.version>
<appengine.version>1.8.9</appengine.version>

<!-- Converter Dependencies -->
Expand Down
3 changes: 1 addition & 2 deletions retrofit-mock/src/main/java/retrofit/MockRestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import static retrofit.RestAdapter.LogLevel;
import static retrofit.RetrofitError.unexpectedError;
Expand Down Expand Up @@ -529,7 +528,7 @@ private static class MockRxSupport {
private final ErrorHandler errorHandler;

MockRxSupport(RestAdapter restAdapter) {
scheduler = Schedulers.executor(restAdapter.httpExecutor);
scheduler = new RxSupport.RetrofitScheduler(restAdapter.httpExecutor);
errorHandler = restAdapter.errorHandler;
}

Expand Down
39 changes: 1 addition & 38 deletions retrofit/src/main/java/retrofit/RestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import retrofit.Profiler.RequestInformation;
import retrofit.client.Client;
import retrofit.client.Header;
Expand All @@ -38,10 +39,6 @@
import retrofit.mime.TypedByteArray;
import retrofit.mime.TypedInput;
import retrofit.mime.TypedOutput;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

/**
* Adapts a Java interface to a REST API.
Expand Down Expand Up @@ -227,40 +224,6 @@ static RestMethodInfo getMethodInfo(Map<Method, RestMethodInfo> cache, Method me
}
}

/** Indirection to avoid VerifyError if RxJava isn't present. */
private static final class RxSupport {
private final Scheduler scheduler;
private final ErrorHandler errorHandler;

RxSupport(Executor executor, ErrorHandler errorHandler) {
this.scheduler = Schedulers.executor(executor);
this.errorHandler = errorHandler;
}

Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
ResponseWrapper wrapper = request.call();
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(wrapper.responseBody);
subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
}).subscribeOn(scheduler);
}
}

private class RestHandler implements InvocationHandler {
private final Map<Method, RestMethodInfo> methodDetailsCache;

Expand Down
133 changes: 133 additions & 0 deletions retrofit/src/main/java/retrofit/RxSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package retrofit;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/**
* Indirection to avoid VerifyError if RxJava isn't present.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer useful. Either drop it altogether or change it to something like "Utilities for supporting RxJava Observables".

*/
final class RxSupport {
private final Scheduler scheduler;
private final ErrorHandler errorHandler;

RxSupport(Executor executor, ErrorHandler errorHandler) {
this.scheduler = new RetrofitScheduler(executor);
this.errorHandler = errorHandler;
}

Observable createRequestObservable(final Callable<ResponseWrapper> request) {
return Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
ResponseWrapper wrapper = request.call();
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(wrapper.responseBody);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this method so that onNext is called outside of the same try/catch as that which invokes the request. I can do that in a follow-up.

subscriber.onCompleted();
} catch (RetrofitError e) {
subscriber.onError(errorHandler.handleError(e));
} catch (Exception e) {
// This is from the Callable. It shouldn't actually throw.
throw new RuntimeException(e);
}
}
}).subscribeOn(scheduler);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only use of Scheduler within Retrofit, then a multi-threaded Executor is fine, since it will only ever schedule a single action.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We internalize scheduling for now since RxJava support was kind-of shoved in. In a future world, Retrofit will have a leaner core that an RxJava shim can be built upon where the library doesn't own scheduling at all.

}


/**
* RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way
* it dumps requests onto a Executor, but we can pass in the Executor.
* <p/>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No invalid self-closing Javadoc tags or Java 8's doclint will fail spectacularly. Just use <p>.

* This does not support Scheduled execution, which may cause issues with peoples implementations.
* If they are doing, wait() or debouncing() on this scheduler. Future implementations, should
* either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to
* RestAdapter builder.
*/
static class RetrofitScheduler extends Scheduler {
private final Executor executorService;

/*package*/ RetrofitScheduler(Executor executorService) {
this.executorService = executorService;
}

@Override
public Worker createWorker() {
return new EventLoopScheduler(executorService);
}

static class EventLoopScheduler extends Worker implements Subscription {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final Executor executor;

EventLoopScheduler(Executor executor) {
this.executor = executor;
}

@Override
public Subscription schedule(final Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// Don't schedule, we are un-subscribed.
return Subscriptions.empty();
}

final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind my comment here: #468 (comment)

If this will only ever allow 1 Action0 scheduled per Worker or at a time, then it will be okay and conform to the Rx contract.

If it ever gets used with something like observeOn though it could break the contract and allow concurrent invocation of onNext. For this reason, you may want to safe-guard this implementation against concurrent scheduling or execution, similar to how you prevent use of schedule(final Action0 action, long delayTime, TimeUnit unit)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is that you don't use Scheduler and just use the Executor directly inside the Observable.OnSubscribe you create above. An Observable can do it's own concurrency/scheduling without the use of a Scheduler in a case like this where the intent appears to always be async.

For example:

        Observable.create(new Observable.OnSubscribe<Object>() {

            @Override
            public void call(Subscriber<? super Object> subscriber) {
                return Subscriptions.from(executor.execute(new Runnable() {
                    try {
                    ResponseWrapper wrapper = request.call();
                    if (subscriber.isUnsubscribed()) {
                      return;
                    }
                    subscriber.onNext(wrapper.responseBody);
                    subscriber.onCompleted();
                  } catch (Throwable e) {
                    subscriber.onError(errorHandler.handleError(e));
                  }
                }));
            }

        });

Since Retrofit uses Executor, this may be cleaner and less confusing than having 2 different abstractions, unless you truly need the ability to have a Retrofit Scheduler uses externally and for things such as observeOn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems a better solution thanks for the feedback. Much appreciated!
On 29 Apr 2014 19:56, "Ben Christensen" notifications@github.com wrote:

In retrofit/src/main/java/retrofit/RxSupport.java:

  • static class EventLoopScheduler extends Worker implements Subscription {
  •  private final CompositeSubscription innerSubscription = new CompositeSubscription();
    
  •  private final Executor executor;
    
  •  EventLoopScheduler(Executor executor) {
    
  •    this.executor = executor;
    
  •  }
    
  •  @Override
    
  •  public Subscription schedule(final Action0 action) {
    
  •    if (innerSubscription.isUnsubscribed()) {
    
  •      // Don't schedule, we are un-subscribed.
    
  •      return Subscriptions.empty();
    
  •    }
    
  •    final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
    

Another option is that you don't use Scheduler and just use the Executordirectly inside the
Observable.OnSubscribe you create above. An Observable can do it's own
concurrency/scheduling without the use of a Scheduler in a case like this
where the intent appears to always be async.

For example:

    Observable.create(new Observable.OnSubscribe<Object>() {

        @Override
        public void call(Subscriber<? super Object> subscriber) {
            return Subscriptions.from(executor.execute(new Runnable() {
                try {
                ResponseWrapper wrapper = request.call();
                if (subscriber.isUnsubscribed()) {
                  return;
                }
                subscriber.onNext(wrapper.responseBody);
                subscriber.onCompleted();
              } catch (Throwable e) {
                subscriber.onError(errorHandler.handleError(e));
              }
            }));
        }

    });

Since Retrofit uses Executor, this may be cleaner and less confusing than
having 2 different abstractions, unless you truly need the ability to have
a Retrofit Scheduler uses externally and for things such as observeOn.


Reply to this email directly or view it on GitHubhttps://github.com//pull/468/files#r12110642
.

final FutureTask<Void> futureTask =
new FutureTask<Void>(getActionRunnable(action, sf), null);
final Subscription s = Subscriptions.from(futureTask);
executor.execute(futureTask);

sf.set(s);
innerSubscription.add(s);
return s;
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
throw new UnsupportedOperationException("This Scheduler does not support timed Actions");
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

private Runnable getActionRunnable(final Action0 action,
final AtomicReference<Subscription> sf) {
return new Runnable() {
@Override
public void run() {
try {
if (innerSubscription.isUnsubscribed()) return;
action.call();
} finally {
// Remove the subscription now that we've completed.
Subscription s = sf.get();
if (s != null) innerSubscription.remove(s);
}
}
};
}
}
}
}