Skip to content

Commit

Permalink
Merge pull request #2769 from davidmoten/combineLatest-request-overflow
Browse files Browse the repository at this point in the history
OperatorCombineLatest request overflow check
  • Loading branch information
akarnokd committed Feb 23, 2015
2 parents 14ed8d4 + 2043094 commit 3a6ce5a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public MultiSourceProducer(final Subscriber<? super R> child, final List<? exten

@Override
public void request(long n) {
requested.getAndAdd(n);
BackpressureUtils.getAndAddRequest(requested, n);
if (!started.get() && started.compareAndSet(false, true)) {
/*
* NOTE: this logic will ONLY work if we don't have more sources than the size of the buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,5 +851,40 @@ public Long call(Long t1, Integer t2) {

assertEquals(SIZE, count.get());
}

@Test(timeout=10000)
public void testCombineLatestRequestOverflow() throws InterruptedException {
List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8)));
Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return (Integer) args[0];
}});
//should get at least 4
final CountDownLatch latch = new CountDownLatch(4);
o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(2);
}

@Override
public void onCompleted() {
//ignore
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Integer t) {
latch.countDown();
request(Long.MAX_VALUE-1);
}});
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

}

0 comments on commit 3a6ce5a

Please sign in to comment.