Skip to content

Commit

Permalink
Merge pull request #3491 from akarnokd/ScanUnboundedRequestFix1x
Browse files Browse the repository at this point in the history
1.x: make scan's delayed Producer independent of event serialization
  • Loading branch information
Aaron Tull committed Dec 2, 2015
2 parents 16e55c3 + 91d3d3a commit b6d59a4
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 105 deletions.
141 changes: 55 additions & 86 deletions src/main/java/rx/internal/operators/OperatorScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.Operator;
Expand Down Expand Up @@ -175,12 +176,10 @@ static final class InitialProducer<R> implements Producer, Observer<R> {
boolean missed;
/** Missed a request. */
long missedRequested;
/** Missed a producer. */
Producer missedProducer;
/** The current requested amount. */
long requested;
final AtomicLong requested;
/** The current producer. */
Producer producer;
volatile Producer producer;

volatile boolean done;
Throwable error;
Expand All @@ -196,41 +195,7 @@ public InitialProducer(R initialValue, Subscriber<? super R> child) {
}
this.queue = q;
q.offer(NotificationLite.instance().next(initialValue));
}

@Override
public void request(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= required but it was " + n);
} else
if (n != 0L) {
synchronized (this) {
if (emitting) {
long mr = missedRequested;
long mu = mr + n;
if (mu < 0L) {
mu = Long.MAX_VALUE;
}
missedRequested = mu;
return;
}
emitting = true;
}

long r = requested;
long u = r + n;
if (u < 0L) {
u = Long.MAX_VALUE;
}
requested = u;

Producer p = producer;
if (p != null) {
p.request(n);
}

emitLoop();
}
this.requested = new AtomicLong();
}

@Override
Expand Down Expand Up @@ -270,23 +235,51 @@ public void onCompleted() {
emit();
}

@Override
public void request(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= required but it was " + n);
} else
if (n != 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
Producer p = producer;
if (p == null) {
// not synchronizing on this to avoid clash with emit()
synchronized (requested) {
p = producer;
if (p == null) {
long mr = missedRequested;
missedRequested = BackpressureUtils.addCap(mr, n);
}
}
}
if (p != null) {
p.request(n);
}
emit();
}
}

public void setProducer(Producer p) {
if (p == null) {
throw new NullPointerException();
}
synchronized (this) {
if (emitting) {
missedProducer = p;
return;
long mr;
// not synchronizing on this to avoid clash with emit()
synchronized (requested) {
if (producer != null) {
throw new IllegalStateException("Can't set more than one Producer!");
}
emitting = true;
// request one less because of the initial value, this happens once
mr = missedRequested - 1;
missedRequested = 0L;
producer = p;
}
producer = p;
long r = requested;
if (r != 0L) {
p.request(r);

if (mr > 0L) {
p.request(mr);
}
emitLoop();
emit();
}

void emit() {
Expand All @@ -304,14 +297,17 @@ void emitLoop() {
final Subscriber<? super R> child = this.child;
final Queue<Object> queue = this.queue;
final NotificationLite<R> nl = NotificationLite.instance();
long r = requested;
AtomicLong requested = this.requested;

long r = requested.get();
for (;;) {
boolean max = r == Long.MAX_VALUE;
boolean d = done;
boolean empty = queue.isEmpty();
if (checkTerminated(d, empty, child)) {
return;
}
long e = 0L;
while (r != 0L) {
d = done;
Object o = queue.poll();
Expand All @@ -325,52 +321,25 @@ void emitLoop() {
R v = nl.getValue(o);
try {
child.onNext(v);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
child.onError(OnErrorThrowable.addValueAsLastCause(e, v));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
child.onError(OnErrorThrowable.addValueAsLastCause(ex, v));
return;
}
if (!max) {
r--;
}
r--;
e--;
}
if (!max) {
requested = r;

if (e != 0 && !max) {
r = requested.addAndGet(e);
}

Producer p;
long mr;
synchronized (this) {
p = missedProducer;
mr = missedRequested;
if (!missed && p == null && mr == 0L) {
if (!missed) {
emitting = false;
return;
}
missed = false;
missedProducer = null;
missedRequested = 0L;
}

if (mr != 0L && !max) {
long u = r + mr;
if (u < 0L) {
u = Long.MAX_VALUE;
}
requested = u;
r = u;
}

if (p != null) {
producer = p;
if (r != 0L) {
p.request(r);
}
} else {
p = producer;
if (p != null && mr != 0L) {
p.request(mr);
}
}
}
}
Expand Down
50 changes: 32 additions & 18 deletions src/main/java/rx/internal/producers/ProducerObserverArbiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.*;
import rx.Observer;
import rx.exceptions.*;
import rx.internal.operators.BackpressureUtils;

/**
* Producer that serializes any event emission with requesting and producer changes.
Expand Down Expand Up @@ -135,6 +136,7 @@ public void request(long n) {
}
emitting = true;
}
Producer p = currentProducer;
boolean skipFinal = false;
try {
long r = requested;
Expand All @@ -143,12 +145,7 @@ public void request(long n) {
u = Long.MAX_VALUE;
}
requested = u;

Producer p = currentProducer;
if (p != null) {
p.request(n);
}


emitLoop();
skipFinal = true;
} finally {
Expand All @@ -158,6 +155,9 @@ public void request(long n) {
}
}
}
if (p != null) {
p.request(n);
}
}

public void setProducer(Producer p) {
Expand All @@ -169,12 +169,9 @@ public void setProducer(Producer p) {
emitting = true;
}
boolean skipFinal = false;
currentProducer = p;
long r = requested;
try {
currentProducer = p;
long r = requested;
if (p != null && r != 0) {
p.request(r);
}
emitLoop();
skipFinal = true;
} finally {
Expand All @@ -184,17 +181,24 @@ public void setProducer(Producer p) {
}
}
}
if (p != null && r != 0) {
p.request(r);
}
}

void emitLoop() {
final Subscriber<? super T> c = child;

long toRequest = 0L;
Producer requestFrom = null;

outer:
for (;;) {
long localRequested;
Producer localProducer;
Object localTerminal;
List<T> q;
boolean quit = false;
synchronized (this) {
localRequested = missedRequested;
localProducer = missedProducer;
Expand All @@ -203,13 +207,21 @@ void emitLoop() {
if (localRequested == 0L && localProducer == null && q == null
&& localTerminal == null) {
emitting = false;
return;
quit = true;
} else {
missedRequested = 0L;
missedProducer = null;
queue = null;
missedTerminal = null;
}
missedRequested = 0L;
missedProducer = null;
queue = null;
missedTerminal = null;
}
if (quit) {
if (toRequest != 0L && requestFrom != null) {
requestFrom.request(toRequest);
}
return;
}

boolean empty = q == null || q.isEmpty();
if (localTerminal != null) {
if (localTerminal != Boolean.TRUE) {
Expand Down Expand Up @@ -266,13 +278,15 @@ void emitLoop() {
} else {
currentProducer = localProducer;
if (r != 0L) {
localProducer.request(r);
toRequest = BackpressureUtils.addCap(toRequest, r);
requestFrom = localProducer;
}
}
} else {
Producer p = currentProducer;
if (p != null && localRequested != 0L) {
p.request(localRequested);
toRequest = BackpressureUtils.addCap(toRequest, localRequested);
requestFrom = p;
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,24 @@ public Integer call(Integer t1, Integer t2) {
ts.assertNoErrors();
ts.assertCompleted();
}

@Test(timeout = 1000)
public void testUnboundedSource() {
Observable.range(0, Integer.MAX_VALUE)
.scan(0, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) {
return 0;
}
})
.subscribe(new TestSubscriber<Integer>() {
int count;
@Override
public void onNext(Integer t) {
if (++count == 2) {
unsubscribe();
}
}
});
}
}
Loading

0 comments on commit b6d59a4

Please sign in to comment.