-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement split() with backpressure #47
Conversation
@Override | ||
public void onError(Throwable e) { | ||
if (done) { | ||
// RxJavaHooks.onError(e); RxJava 1.2+ required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment valid? I see that RxJava dependency was bumped to v. 1.2.3 in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Updated
* limitations under the License. | ||
*/ | ||
|
||
package rx.observables; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you move this to rx.internal.operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@akarnokd to the rescue. |
thanks @akarnokd, all it needed was 5 or 6 minutes of your time... |
This PR reimplements the
split()
operator to support backpressure.The algorithm uses the stable-prefetch approach and queues the entire split array of strings instead of enqueueing them one by one - this allows a bounded queue instead of an unbounded one. On the drain side, we know there are only strings of length 2 or more, and the emission simply ignores the last element - no need to copy items into a new array when enqueueing. Stripping off the trailing empty strings was a bit tricky but got inspiration from the current implementation. When the current array is traversed, empty strings are counted only and while this happens, the emission count doesn't move forward. Once a non-empty array entry was found, all previously counted empty elements are emitted just then - considering the request amount in the process. Once all empties have been emitted, the actual non-empty element is emitted.
Also this PR bumps RxJava version to 1.2.3 to have access to better operators such as
rebatchRequests
which allows requesting one by one.