Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for RxJava 0.18.+ #468

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<android.platform>16</android.platform>
<gson.version>2.2.4</gson.version>
<okhttp.version>1.3.0</okhttp.version>
<rxjava.version>0.17.1</rxjava.version>
<rxjava.version>0.18.1</rxjava.version>
<appengine.version>1.8.9</appengine.version>

<!-- Converter Dependencies -->
Expand Down
3 changes: 1 addition & 2 deletions retrofit-mock/src/main/java/retrofit/MockRestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import static retrofit.RestAdapter.LogLevel;
import static retrofit.RetrofitError.unexpectedError;
Expand Down Expand Up @@ -529,7 +528,7 @@ private static class MockRxSupport {
private final ErrorHandler errorHandler;

MockRxSupport(RestAdapter restAdapter) {
scheduler = Schedulers.executor(restAdapter.httpExecutor);
scheduler = new Schedulers.RetrofitScheduler(restAdapter.httpExecutor);
errorHandler = restAdapter.errorHandler;
}

Expand Down
4 changes: 2 additions & 2 deletions retrofit/src/main/java/retrofit/RestAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import retrofit.Profiler.RequestInformation;
import retrofit.client.Client;
import retrofit.client.Header;
Expand All @@ -41,7 +42,6 @@
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

/**
* Adapts a Java interface to a REST API.
Expand Down Expand Up @@ -233,7 +233,7 @@ private static final class RxSupport {
private final ErrorHandler errorHandler;

RxSupport(Executor executor, ErrorHandler errorHandler) {
this.scheduler = Schedulers.executor(executor);
this.scheduler = new Schedulers.RetrofitScheduler(executor);
this.errorHandler = errorHandler;
}

Expand Down
110 changes: 110 additions & 0 deletions retrofit/src/main/java/retrofit/Schedulers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package retrofit;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/**
* Indirect access to Scheduler API for
*/
/*package*/ final class Schedulers {

/**
* RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way
* it dumps requests onto a Executor, but we can pass in the Executor.
* <p/>
* This does not support Scheduled execution, which may cause issues with peoples implementations.
* If they are doing, wait() or debouncing() on this scheduler. Future implementations, should
* either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to
* RestAdapter builder.
*/
static class RetrofitScheduler extends Scheduler {
private final Executor executorService;

/*package*/ RetrofitScheduler(Executor executorService) {
this.executorService = executorService;
}

@Override
public Worker createWorker() {
return new EventLoopScheduler(executorService);
}

static class EventLoopScheduler extends Scheduler.Worker implements Subscription {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final Executor executor;

/* package */ EventLoopScheduler(Executor executor) {
this.executor = executor;
}

@Override
public Subscription schedule(final Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize + end with a period. Comments should be sentences (or at least punctuated like them).

return Subscriptions.empty();
}

final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
final Subscription s;
if (executor instanceof ExecutorService) {
s = Subscriptions.from(((ExecutorService) executor).submit(
getActionRunnable(action, sf)));
} else {
/*
This is not ideal, we should use a ExecutorService, that way we can pass future
back to the subscription, so if the user un-subscribe from the parent we can
request the Future to cancel. This will always execute, meaning we could
lock of the retrofit threads if a request is active for a long time.
I would potentially force an API change to make sure this is always an ExecutorService
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you switch this to a //-prefixed comment block. Also delete this last sentence. It should be an issue instead.

*/
s = Subscriptions.empty();
executor.execute(getActionRunnable(action, sf));
}

sf.set(s);
innerSubscription.add(s);
return s;
}

private Runnable getActionRunnable(final Action0 action,
final AtomicReference<Subscription> sf) {
return new Runnable() {
@Override
public void run() {
try {
if (innerSubscription.isUnsubscribed()) return;
action.call();
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
if (s != null) innerSubscription.remove(s);
}
}
};
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
throw new UnsupportedOperationException("This Scheduler does not support timed requests");
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
}
}
}