Skip to content

Commit

Permalink
fix decrement of consumerCapacity so doesn't decrement Long.MAX_VALUE…
Browse files Browse the repository at this point in the history
…, remove c==0 check in request method because of race condition
  • Loading branch information
davidmoten committed May 7, 2015
1 parent 2532484 commit d912922
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
23 changes: 18 additions & 5 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,23 @@ public void onError(Throwable e) {
@Override
public void onNext(T v) {
if (!done) {
if (consumerCapacity.get() != Long.MAX_VALUE) {
consumerCapacity.decrementAndGet();
}
child.onNext(v);
decrementConsumerCapacity();
}
}

private void decrementConsumerCapacity() {
// use a CAS loop because we don't want to decrement the value
// if it is Long.MAX_VALUE
while (true) {
long cc = consumerCapacity.get();
if (cc != Long.MAX_VALUE) {
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
break;
}
} else {
break;
}
}
}

Expand Down Expand Up @@ -334,12 +347,12 @@ public void setProducer(Producer producer) {

@Override
public void request(final long n) {
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
Producer producer = currentProducer.get();
if (producer != null) {
producer.request(n);
} else
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
if (resumeBoundary.compareAndSet(true, false)) {
worker.schedule(subscribeToSource);
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/test/java/rx/internal/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,10 @@ static <T> StringBuilder allSequenceFrequency(Map<Integer, List<T>> its) {
}
static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
StringBuilder sb = new StringBuilder();

Object prev = null;
int cnt = 0;

for (Object curr : it) {
if (sb.length() > 0) {
if (!curr.equals(prev)) {
Expand All @@ -805,7 +805,10 @@ static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
}
prev = curr;
}

if (cnt > 1) {
sb.append(" x ").append(cnt);
}

return sb;
}
@Test(timeout = 3000)
Expand Down

0 comments on commit d912922

Please sign in to comment.