From d912922a69478187d3bbf0f38fdc5611d16ae4dd Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 7 May 2015 12:01:20 +1000 Subject: [PATCH] fix decrement of consumerCapacity so doesn't decrement Long.MAX_VALUE, remove c==0 check in request method because of race condition --- .../internal/operators/OnSubscribeRedo.java | 23 +++++++++++++++---- .../internal/operators/OperatorRetryTest.java | 9 +++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 1ba5d1f281..f0a52d0423 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -235,10 +235,23 @@ public void onError(Throwable e) { @Override public void onNext(T v) { if (!done) { - if (consumerCapacity.get() != Long.MAX_VALUE) { - consumerCapacity.decrementAndGet(); - } child.onNext(v); + decrementConsumerCapacity(); + } + } + + private void decrementConsumerCapacity() { + // use a CAS loop because we don't want to decrement the value + // if it is Long.MAX_VALUE + while (true) { + long cc = consumerCapacity.get(); + if (cc != Long.MAX_VALUE) { + if (consumerCapacity.compareAndSet(cc, cc - 1)) { + break; + } + } else { + break; + } } } @@ -334,12 +347,12 @@ public void setProducer(Producer producer) { @Override public void request(final long n) { - long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n); + BackpressureUtils.getAndAddRequest(consumerCapacity, n); Producer producer = currentProducer.get(); if (producer != null) { producer.request(n); } else - if (c == 0 && resumeBoundary.compareAndSet(true, false)) { + if (resumeBoundary.compareAndSet(true, false)) { worker.schedule(subscribeToSource); } } diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index a5aa9f1c31..a354159372 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -783,10 +783,10 @@ static StringBuilder allSequenceFrequency(Map> its) { } static StringBuilder sequenceFrequency(Iterable it) { StringBuilder sb = new StringBuilder(); - + Object prev = null; int cnt = 0; - + for (Object curr : it) { if (sb.length() > 0) { if (!curr.equals(prev)) { @@ -805,7 +805,10 @@ static StringBuilder sequenceFrequency(Iterable it) { } prev = curr; } - + if (cnt > 1) { + sb.append(" x ").append(cnt); + } + return sb; } @Test(timeout = 3000)