Skip to content

Commit

Permalink
ensure iterator.hasNext is not called unnecessarily as per #3006
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Jun 8, 2015
1 parent 2bf39a7 commit dfa1db1
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
44 changes: 27 additions & 17 deletions src/main/java/rx/internal/operators/OnSubscribeFromIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,42 +71,52 @@ public void request(long n) {
}
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
// fast-path without backpressure
while (it.hasNext()) {

while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
o.onNext(it.next());
}
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if (n > 0) {
// backpressure is requested
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `requested` value
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long r = requested;
long numToEmit = r;
while (it.hasNext() && --numToEmit >= 0) {
while (true) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(it.next());

}

if (!it.hasNext()) {
if (!o.isUnsubscribed()) {
} else if (it.hasNext()) {
if (--numToEmit >= 0) {
o.onNext(it.next());
} else
break;
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
return;
}
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
// we're done emitting the number requested so return
// we're done emitting the number requested so
// return
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -221,4 +223,85 @@ public void onNext(Object t) {
assertTrue(completed.get());
}

@Test
public void testDoesNotCallIteratorHasNextMoreThanRequiredWithBackpressure() {
final AtomicBoolean called = new AtomicBoolean(false);
Iterable<Integer> iterable = new Iterable<Integer>() {

@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {

int count = 1;

@Override
public boolean hasNext() {
if (count > 1) {
called.set(true);
return false;
} else
return true;
}

@Override
public Integer next() {
return count++;
}

};
}
};
Observable.from(iterable).take(1).subscribe();
assertFalse(called.get());
}

@Test
public void testDoesNotCallIteratorHasNextMoreThanRequiredFastPath() {
final AtomicBoolean called = new AtomicBoolean(false);
Iterable<Integer> iterable = new Iterable<Integer>() {

@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {

int count = 1;

@Override
public boolean hasNext() {
if (count > 1) {
called.set(true);
return false;
} else
return true;
}

@Override
public Integer next() {
return count++;
}

};
}
};
Observable.from(iterable).subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
// unsubscribe on first emission
unsubscribe();
}
});
assertFalse(called.get());
}

}

0 comments on commit dfa1db1

Please sign in to comment.