-
Notifications
You must be signed in to change notification settings - Fork 7.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for RxJava 0.18.+ #468
Changes from 11 commits
9488e86
7e3bebc
966254b
a048035
ee29804
53fee10
37ef1a3
f584287
a350444
17360ca
e009ea8
2765e40
2c19f42
97db5c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package retrofit; | ||
|
||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.FutureTask; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import rx.Observable; | ||
import rx.Scheduler; | ||
import rx.Subscriber; | ||
import rx.Subscription; | ||
import rx.functions.Action0; | ||
import rx.subscriptions.CompositeSubscription; | ||
import rx.subscriptions.Subscriptions; | ||
|
||
/** | ||
* Utilities for supporting RxJava Observables. | ||
* Used primarily by {@link retrofit.RestAdapter}. | ||
* | ||
* Remember RxJava might not be on the classpath, check its included before calling, use | ||
* {@link Platform#HAS_RX_JAVA} | ||
*/ | ||
final class RxSupport { | ||
private final Scheduler scheduler; | ||
private final ErrorHandler errorHandler; | ||
|
||
RxSupport(Executor executor, ErrorHandler errorHandler) { | ||
this.scheduler = new RetrofitScheduler(executor); | ||
this.errorHandler = errorHandler; | ||
} | ||
|
||
Observable createRequestObservable(final Callable<ResponseWrapper> request) { | ||
return Observable.create(new Observable.OnSubscribe<Object>() { | ||
@Override public void call(Subscriber<? super Object> subscriber) { | ||
if (subscriber.isUnsubscribed()) { | ||
return; | ||
} | ||
try { | ||
ResponseWrapper wrapper = request.call(); | ||
if (subscriber.isUnsubscribed()) { | ||
return; | ||
} | ||
subscriber.onNext(wrapper.responseBody); | ||
subscriber.onCompleted(); | ||
} catch (RetrofitError e) { | ||
subscriber.onError(errorHandler.handleError(e)); | ||
} catch (Exception e) { | ||
// This is from the Callable. It shouldn't actually throw. | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
}).subscribeOn(scheduler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is the only use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. We internalize scheduling for now since RxJava support was kind-of shoved in. In a future world, Retrofit will have a leaner core that an RxJava shim can be built upon where the library doesn't own scheduling at all. |
||
} | ||
|
||
|
||
/** | ||
* RetrofitScheduler, similar to the {@link rx.schedulers.EventLoopsScheduler} in the same way | ||
* it dumps requests onto a Executor, but we can pass in the Executor. | ||
* | ||
* This does not support Scheduled execution, which may cause issues with peoples implementations. | ||
* If they are doing, wait() or debouncing() on this scheduler. Future implementations, should | ||
* either add {@code schedule()} support, or let the user provide the {@link rx.Scheduler} to | ||
* RestAdapter builder. | ||
*/ | ||
static class RetrofitScheduler extends Scheduler { | ||
private final Executor executorService; | ||
|
||
/*package*/ RetrofitScheduler(Executor executorService) { | ||
this.executorService = executorService; | ||
} | ||
|
||
@Override | ||
public Worker createWorker() { | ||
return new EventLoopScheduler(executorService); | ||
} | ||
|
||
static class EventLoopScheduler extends Worker implements Subscription { | ||
private final CompositeSubscription innerSubscription = new CompositeSubscription(); | ||
private final Executor executor; | ||
|
||
EventLoopScheduler(Executor executor) { | ||
this.executor = executor; | ||
} | ||
|
||
@Override | ||
public Subscription schedule(final Action0 action) { | ||
if (innerSubscription.isUnsubscribed()) { | ||
// Don't schedule, we are un-subscribed. | ||
return Subscriptions.empty(); | ||
} | ||
|
||
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep in mind my comment here: #468 (comment) If this will only ever allow 1 If it ever gets used with something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another option is that you don't use For example: Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
return Subscriptions.from(executor.execute(new Runnable() {
try {
ResponseWrapper wrapper = request.call();
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(wrapper.responseBody);
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(errorHandler.handleError(e));
}
}));
}
}); Since Retrofit uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems a better solution thanks for the feedback. Much appreciated!
|
||
final FutureTask<Void> futureTask = | ||
new FutureTask<Void>(getActionRunnable(action, sf), null); | ||
final Subscription s = Subscriptions.from(futureTask); | ||
executor.execute(futureTask); | ||
|
||
sf.set(s); | ||
innerSubscription.add(s); | ||
return s; | ||
} | ||
|
||
@Override | ||
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { | ||
throw new UnsupportedOperationException("This Scheduler does not support timed Actions"); | ||
} | ||
|
||
@Override | ||
public void unsubscribe() { | ||
innerSubscription.unsubscribe(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return innerSubscription.isUnsubscribed(); | ||
} | ||
|
||
private Runnable getActionRunnable(final Action0 action, | ||
final AtomicReference<Subscription> sf) { | ||
return new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
if (innerSubscription.isUnsubscribed()) return; | ||
action.call(); | ||
} finally { | ||
// Remove the subscription now that we've completed. | ||
Subscription s = sf.get(); | ||
if (s != null) innerSubscription.remove(s); | ||
} | ||
} | ||
}; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change this method so that
onNext
is called outside of the same try/catch as that which invokes the request. I can do that in a follow-up.