Skip to content

Commit

Permalink
Request data in batches.
Browse files Browse the repository at this point in the history
  • Loading branch information
vqvu committed Sep 16, 2015
1 parent f54252f commit d1a8739
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.internal.util.RxRingBuffer;

/**
* Returns an Iterator that iterates over all items emitted by a specified Observable.
Expand Down Expand Up @@ -56,17 +57,19 @@ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
public static final class SubscriberIterator<T>
extends Subscriber<Notification<? extends T>> implements Iterator<T> {

static final int LIMIT = 3 * RxRingBuffer.SIZE / 4;

private final BlockingQueue<Notification<? extends T>> notifications;
private Notification<? extends T> buf;
private int received;

public SubscriberIterator() {
this.notifications = new LinkedBlockingQueue<Notification<? extends T>>();
this.buf = null;
}

@Override
public void onStart() {
request(0);
request(RxRingBuffer.SIZE);
}

@Override
Expand All @@ -87,8 +90,12 @@ public void onNext(Notification<? extends T> args) {
@Override
public boolean hasNext() {
if (buf == null) {
request(1);
buf = take();
received++;
if (received >= LIMIT) {
request(received);
received = 0;
}
}
if (buf.isOnError()) {
throw Exceptions.propagate(buf.getThrowable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.internal.operators.BlockingOperatorToIterator.SubscriberIterator;
import rx.internal.util.RxRingBuffer;

public class BlockingOperatorToIteratorTest {

Expand Down Expand Up @@ -96,26 +98,28 @@ public Iterator<Integer> iterator() {
Iterator<Integer> it = toIterator(obs);
while (it.hasNext()) {
// Correct backpressure should cause this interleaved behavior.
// We first request RxRingBuffer.SIZE. Then in increments of
// SubscriberIterator.LIMIT.
int i = it.next();
assertEquals(i + 1, src.count);
int expected = i - (i % SubscriberIterator.LIMIT) + RxRingBuffer.SIZE;
expected = Math.min(expected, Counter.MAX);

assertEquals(expected, src.count);
}
}

public static final class Counter implements Iterator<Integer> {
static final int MAX = 5 * RxRingBuffer.SIZE;
public int count;

public Counter() {
this.count = 0;
}

@Override
public boolean hasNext() {
return count < 5;
return count < MAX;
}

@Override
public Integer next() {
return count++;
return ++count;
}

@Override
Expand Down

0 comments on commit d1a8739

Please sign in to comment.