Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for RxJava 0.18.+ #468

Closed
wants to merge 14 commits into from
Closed

Conversation

chrisjenx
Copy link
Contributor

Needs review.

This adds a RetrofitScheduler which is loosely based on the NewThreadScheduler but alows us to provide the Executor.

I was not sure the direction the Square guys want to go with this. So apologies if it does not follow your guidelines, I'll happily change/move stuff.

I moved the RetrofitScheduler into package safe Schedulers class to hide and share the API in the RestAdapter and MockRestAdapter.

Few questions still outstanding:

  • Is Schedulers the right place for the RetrofitScheduler? Not sure if there would ever be more than one?
  • The RetrofitScheduler does a type check for ExecutorService but thats not guaranteed. Should we force ExecutorService?
  • MockRetrofitScheduler seems to do waiting of its own, Is there a reason we need to use the httpExecutor or could we just use rx.Schedulers.newThread()?
  • Should we implement the schedule method in the RetrofitScheduler, I feel that any timing based observable work should not be passed to the Retrofit ThreadPool.

I was toying with a way to allow users to pass through a Scheduler to Retrofit, but I think that would be unnecessary as we can just call retrofit service synchronously.

@Override
public Subscription schedule(final Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize + end with a period. Comments should be sentences (or at least punctuated like them).

@JakeWharton
Copy link
Collaborator

The RetrofitScheduler does a type check for ExecutorService but thats not guaranteed. Should we force ExecutorService?

We can't force, but we can add an API for it and deprecate the old one.

MockRetrofitScheduler seems to do waiting of its own, Is there a reason we need to use the httpExecutor or could we just use rx.Schedulers.newThread()?

I'd prefer not to do this. If you have a pool of 3 threads and make 10 requests you want to see the queuing behavior that the normal app would exhibit.

Is Schedulers the right place for the RetrofitScheduler? Not sure if there would ever be more than one?

There's an RxJavaSupport class already inside RestAdapter. We should probably promote it to top-level and have this scheduler live inside of it.

Should we implement the schedule method in the RetrofitScheduler, I feel that any timing based observable work should not be passed to the Retrofit ThreadPool.

Agree.

@chrisjenx
Copy link
Contributor Author

We can't force, but we can add an API for it and deprecate the old one.

Unless you feel it's necessary for this issue? I will leave that for another PR.

There's an RxJavaSupport class already inside RestAdapter. We should probably promote it to top-level and have this scheduler live inside of it.

Yeah I started adding to RxSupport, then doubled back. I shall promote RxSupport class then and add it as an inner class.

I'd prefer not to do this. If you have a pool of 3 threads and make 10 requests you want to see the queuing behavior that the normal app would exhibit.

I thought that might be the case, I'll leave as-is then.

@codefromthecrypt
Copy link

@jhump @dragonsinth you might be interested in this.

@chrisjenx
Copy link
Contributor Author

Made changes as per feedback.

// I would potentially force an API change to make sure this is always an
// ExecutorService.
s = Subscriptions.empty();
executor.execute(getActionRunnable(action, sf));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just use new FutureTask<Void>(getActionRunnable(action, sf), null).

@chrisjenx
Copy link
Contributor Author

Made changes as per @jhump. Didn't Realise I could do that (guess I should of checked what submit actually did, my bad!).

@JakeWharton latest change would negate the need to move towards needing an ExecutorService to be provided.

Might be worth @benjchristensen checking it over, to see if I have missed anything.

import rx.subscriptions.Subscriptions;

/**
* Indirection to avoid VerifyError if RxJava isn't present.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer useful. Either drop it altogether or change it to something like "Utilities for supporting RxJava Observables".

@benjchristensen
Copy link

The reason why ExecutorScheduler was removed is because it does not meet the guarantees of sequential execution expected of an Observable. See ReactiveX/RxJava#713 and ReactiveX/RxJava#711.

I considered trying to put event loops on top of an Executor but it's rather tricky, as the Executor queue can't be used, each loop (a Scheduler.Worker) must keep the enqueued work external from the Executor and only allow work to be done sequentially.

Keep this in mind while adding support for Executor here. If the responses only ever emit a single item, or it's only ever subscribeOn that gets used, then it's no big deal. If observeOn or other similar mechanism occur however where multiple onNext are scheduled, an Executor will allow concurrent emission and break the Rx contract.

If someone wants to tackle an ExecutorScheduler implementation that has Worker event loops on top of a multi-threaded Executor and maintains the contract, I'll happily accept it back into RxJava.

Hope this provides some context on this breaking change.

As for how many more breaking changes to expect ... theoretically there should not be any more significant ones as we are near the end of the roadmap to 1.0: ReactiveX/RxJava#1001

throw new RuntimeException(e);
}
}
}).subscribeOn(scheduler);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only use of Scheduler within Retrofit, then a multi-threaded Executor is fine, since it will only ever schedule a single action.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We internalize scheduling for now since RxJava support was kind-of shoved in. In a future world, Retrofit will have a leaner core that an RxJava shim can be built upon where the library doesn't own scheduling at all.

@chrisjenx
Copy link
Contributor Author

So based off of @benjchristensen comments; Implementing the executor into OnSubscribe makes more sense, we never want to expose the RetrofitScheduler to the outside world anyway, it was a workaround for the removal of Schedulers.executor().

This also allows the user to define the subscribeOn Scheduler without having to worry if they will lock up Retrofit Threadpool.
As we don't define our Scheduler, by default Rx uses the computation Scheduler for operators. @benjchristensen if you could confirm?
Which means we don't need to then worry about the user then running operators on our thread pool, potentially locking up requests.

We do ignore the passed in callbackExecutor via the RestAdapter.Builder but Retrofit did that before anyway, so that does not change from the current implementation.
I would expect the user to be defining their own observeOn anyway, the Rx Android plugin provides Activity/Fragment binding.

From what I have understood, I hope that makes sense?

@benjchristensen
Copy link

default Rx uses the computation Scheduler for operators.

When an operator requires concurrency (such as window or interval) then yes, Schedulers.computation() is the default.

The callback via onNext from your Observable would run on your thread pool though, as Rx does not inject new concurrency unless asked with a particular operator.

This should not be a problem though, as the use of Rx is supposed to be non-blocking. If someone does IO in a callback (such as in a map function) and blocks the threads, they are "doing the wrong thing" :-) That problem exists whether you use subscribeOn or your own executor inside OnSubscribe.

If you do not want the callbacks running on your thread pool, then you would use executor within OnSubscribe and then observable.observeOn(Schedulers.computation())' to move theonNextfrom your thread to thecomputation` event loops.

@chrisjenx
Copy link
Contributor Author

Yeah that co-1insides with what I understood.

I'll amend this pr and add some tests so anyone looking at retrofit will
see how this is intended to work.
On 30 Apr 2014 15:59, "Ben Christensen" notifications@github.com wrote:

default Rx uses the computation Scheduler for operators.

When an operator requires concurrency (such as window or interval) then
yes, Schedulers.computation() is the default.

The callback via onNext from your Observable would run on your thread
pool though, as Rx does not inject new concurrency unless asked with a
particular operator.

This should not be a problem though, as the use of Rx is supposed to be
non-blocking. If someone does IO in a callback (such as in a mapfunction) and blocks the threads, they are "doing the wrong thing" :-) That
problem exists whether you use subscribeOn or your own executor inside
OnSubscribe.

If you do not want the callbacks running on your thread pool, then you
would use executor within OnSubscribe and then observable.observeOn(Schedulers.computation())'
to move theonNextfrom your thread to thecomputation` event loops.


Reply to this email directly or view it on GitHubhttps://github.com//pull/468#issuecomment-41806796
.

@benjchristensen
Copy link

Great. And sorry about the volatility during the 0.x versions! We are very close to stabilizing on 1.0.

@chrisjenx
Copy link
Contributor Author

Done a bit of a round trip here, but I'm much happier with this implementation now.

Hope you guys agree? I might of over engineered the Observer.OnSubscribe. Do i need to add the child subscription?

Other than that, removal of the Subscriber made sense, we shouldn't need to worry about that. We only care about executing the Request and passing it off to something else as quick as possible.

}
@Override public void call(final Subscriber<? super Object> subscriber) {
if (subscriber.isUnsubscribed()) return;
httpExecutor.execute(new Runnable() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to support cancelation, you can capture the Future from this and register it with the Subscriber.

For example:

subscriber.add(Subscriptions.from(future));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is the MockRestAdapter so it probably doesn't matter ...

@benjchristensen
Copy link

The RxSupport class looks good.

@codefromthecrypt
Copy link

Thanks for the advice, Ben!

@chrisjenx
Copy link
Contributor Author

Are you guys happy with this? Let me know if not.

@JakeWharton
Copy link
Collaborator

@benjchristensen Thanks for all the help!

@chrisjenx I'll take a closer look tomorrow morning but it's looking good.

@loganj loganj mentioned this pull request May 7, 2014
@JakeWharton
Copy link
Collaborator

Squashed and merged to master.

@JakeWharton JakeWharton closed this May 8, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants