Skip to content

Commit

Permalink
Merge pull request #2995 from davidmoten/switch-overflow
Browse files Browse the repository at this point in the history
switchOnNext - ensure initial requests additive and fix request overflow
  • Loading branch information
akarnokd committed Jun 17, 2015
2 parents 0caeea8 + d32a1b0 commit 488fe1c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
27 changes: 20 additions & 7 deletions src/main/java/rx/internal/operators/OperatorSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,26 @@ public void request(long n) {
synchronized (guard) {
localSubscriber = currentSubscriber;
if (currentSubscriber == null) {
initialRequested = n;
long r = initialRequested + n;
if (r < 0) {
infinite = true;
} else {
initialRequested = r;
}
} else {
// If n == Long.MAX_VALUE, infinite will become true. Then currentSubscriber.requested won't be used.
// Therefore we don't need to worry about overflow.
currentSubscriber.requested += n;
long r = currentSubscriber.requested + n;
if (r < 0) {
infinite = true;
} else {
currentSubscriber.requested = r;
}
}
}
if (localSubscriber != null) {
localSubscriber.requestMore(n);
if (infinite)
localSubscriber.requestMore(Long.MAX_VALUE);
else
localSubscriber.requestMore(n);
}
}
});
Expand Down Expand Up @@ -167,7 +178,8 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
if (queue == null) {
queue = new ArrayList<Object>();
}
innerSubscriber.requested--;
if (innerSubscriber.requested != Long.MAX_VALUE)
innerSubscriber.requested--;
queue.add(value);
return;
}
Expand All @@ -183,7 +195,8 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
if (once) {
once = false;
synchronized (guard) {
innerSubscriber.requested--;
if (innerSubscriber.requested != Long.MAX_VALUE)
innerSubscriber.requested--;
}
s.onNext(value);
}
Expand Down
98 changes: 97 additions & 1 deletion src/test/java/rx/internal/operators/OperatorSwitchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -36,6 +44,7 @@
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
Expand Down Expand Up @@ -574,4 +583,91 @@ public void onNext(String t) {

Assert.assertEquals(250, ts.getOnNextEvents().size());
}

@Test(timeout = 10000)
public void testInitialRequestsAreAdditive() {
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(
new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long t) {
return Observable.just(1L, 2L, 3L);
}
}
).take(3))
.subscribe(ts);
ts.requestMore(Long.MAX_VALUE - 100);
ts.requestMore(1);
ts.awaitTerminalEvent();
}

@Test(timeout = 10000)
public void testInitialRequestsDontOverflow() {
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long t) {
return Observable.from(Arrays.asList(1L, 2L, 3L));
}
}).take(3)).subscribe(ts);
ts.requestMore(Long.MAX_VALUE - 1);
ts.requestMore(2);
ts.awaitTerminalEvent();
assertTrue(ts.getOnNextEvents().size() > 0);
}


@Test(timeout = 10000)
public void testSecondaryRequestsDontOverflow() throws InterruptedException {
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long t) {
return Observable.from(Arrays.asList(1L, 2L, 3L));
}
}).take(3)).subscribe(ts);
ts.requestMore(1);
//we will miss two of the first observable
Thread.sleep(250);
ts.requestMore(Long.MAX_VALUE - 1);
ts.requestMore(Long.MAX_VALUE - 1);
ts.awaitTerminalEvent();
ts.assertValueCount(7);
}

@Test(timeout = 10000)
public void testSecondaryRequestsAdditivelyAreMoreThanLongMaxValueInducesMaxValueRequestFromUpstream() throws InterruptedException {
final List<Long> requests = new CopyOnWriteArrayList<Long>();
final Action1<Long> addRequest = new Action1<Long>() {

@Override
public void call(Long n) {
requests.add(n);
}};
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.switchOnNext(
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long t) {
return Observable.from(Arrays.asList(1L, 2L, 3L)).doOnRequest(addRequest);
}
}).take(3)).subscribe(ts);
ts.requestMore(1);
//we will miss two of the first observable
Thread.sleep(250);
ts.requestMore(Long.MAX_VALUE - 1);
ts.requestMore(Long.MAX_VALUE - 1);
ts.awaitTerminalEvent();
assertTrue(ts.getOnNextEvents().size() > 0);
assertEquals(5, (int) requests.size());
assertEquals(Long.MAX_VALUE, (long) requests.get(3));
assertEquals(Long.MAX_VALUE, (long) requests.get(4));
}
}

0 comments on commit 488fe1c

Please sign in to comment.