From 20430945276d8522dc3fe3e16463330b1a311c1c Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Tue, 24 Feb 2015 09:06:43 +1100 Subject: [PATCH] add request overflow check for combineLatest --- .../operators/OnSubscribeCombineLatest.java | 2 +- .../OnSubscribeCombineLatestTest.java | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index c5ea0d1291..1537f91aaf 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -110,7 +110,7 @@ public MultiSourceProducer(final Subscriber child, final List> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8))); + Observable o = Observable.combineLatest(sources,new FuncN() { + @Override + public Integer call(Object... args) { + return (Integer) args[0]; + }}); + //should get at least 4 + final CountDownLatch latch = new CountDownLatch(4); + o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(2); + } + + @Override + public void onCompleted() { + //ignore + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Integer t) { + latch.countDown(); + request(Long.MAX_VALUE-1); + }}); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } }