Skip to content

Commit

Permalink
2.x: fix timed replay-like components replaying outdated items (#5140)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Feb 27, 2017
1 parent 3356444 commit 19ba993
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ public final void replay(InnerSubscription<T> output) {

Node node = output.index();
if (node == null) {
node = get();
node = getHead();
output.index = node;

BackpressureHelper.add(output.totalRequested, node.index);
Expand Down Expand Up @@ -1033,7 +1033,7 @@ void truncateFinal() {

}
/* test */ final void collect(Collection<? super T> output) {
Node n = get();
Node n = getHead();
for (;;) {
Node next = n.get();
if (next != null) {
Expand All @@ -1055,6 +1055,10 @@ void truncateFinal() {
/* test */ boolean hasCompleted() {
return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value));
}

Node getHead() {
return get();
}
}

/**
Expand Down Expand Up @@ -1172,5 +1176,28 @@ void truncateFinal() {
setFirst(prev);
}
}

@Override
Node getHead() {
long timeLimit = scheduler.now(unit) - maxAge;
Node prev = get();
Node next = prev.get();
for (;;) {
if (next == null) {
break;
}
Timed<?> v = (Timed<?>)next.value;
if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) {
break;
}
if (v.time() <= timeLimit) {
prev = next;
next = next.get();
} else {
break;
}
}
return prev;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public final void replay(InnerDisposable<T> output) {
for (;;) {
Node node = output.index();
if (node == null) {
node = get();
node = getHead();
output.index = node;
}

Expand Down Expand Up @@ -821,7 +821,7 @@ void truncateFinal() {

}
/* test */ final void collect(Collection<? super T> output) {
Node n = get();
Node n = getHead();
for (;;) {
Node next = n.get();
if (next != null) {
Expand All @@ -843,6 +843,10 @@ void truncateFinal() {
/* test */ boolean hasCompleted() {
return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value));
}

Node getHead() {
return get();
}
}

/**
Expand Down Expand Up @@ -960,5 +964,28 @@ void truncateFinal() {
setFirst(prev);
}
}

@Override
Node getHead() {
long timeLimit = scheduler.now(unit) - maxAge;
Node prev = get();
Node next = prev.get();
for (;;) {
if (next == null) {
break;
}
Timed<?> v = (Timed<?>)next.value;
if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) {
break;
}
if (v.time() <= timeLimit) {
prev = next;
next = next.get();
} else {
break;
}
}
return prev;
}
}
}
40 changes: 23 additions & 17 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ public T getValue() {
@Override
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
TimedNode<Object> h = head;
int s = size();
TimedNode<Object> h = getHead();
int s = size(h);

if (s == 0) {
if (array.length != 0) {
Expand All @@ -1093,6 +1093,22 @@ public T[] getValues(T[] array) {
return array;
}

TimedNode<Object> getHead() {
TimedNode<Object> index = head;
// skip old entries
long limit = scheduler.now(unit) - maxAge;
TimedNode<Object> next = index.get();
while (next != null) {
long ts = next.time;
if (ts > limit) {
break;
}
index = next;
next = index.get();
}
return index;
}

@Override
@SuppressWarnings("unchecked")
public void replay(ReplaySubscription<T> rs) {
Expand All @@ -1105,20 +1121,7 @@ public void replay(ReplaySubscription<T> rs) {

TimedNode<Object> index = (TimedNode<Object>)rs.index;
if (index == null) {
index = head;
if (!done) {
// skip old entries
long limit = scheduler.now(unit) - maxAge;
TimedNode<Object> next = index.get();
while (next != null) {
long ts = next.time;
if (ts > limit) {
break;
}
index = next;
next = index.get();
}
}
index = getHead();
}

for (;;) {
Expand Down Expand Up @@ -1185,8 +1188,11 @@ public void replay(ReplaySubscription<T> rs) {

@Override
public int size() {
return size(getHead());
}

int size(TimedNode<Object> h) {
int s = 0;
TimedNode<Object> h = head;
while (s != Integer.MAX_VALUE) {
TimedNode<Object> next = h.get();
if (next == null) {
Expand Down
40 changes: 23 additions & 17 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -1029,11 +1029,27 @@ public T getValue() {
return (T)v;
}

TimedNode<Object> getHead() {
TimedNode<Object> index = head;
// skip old entries
long limit = scheduler.now(unit) - maxAge;
TimedNode<Object> next = index.get();
while (next != null) {
long ts = next.time;
if (ts > limit) {
break;
}
index = next;
next = index.get();
}
return index;
}

@Override
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
TimedNode<Object> h = head;
int s = size();
TimedNode<Object> h = getHead();
int s = size(h);

if (s == 0) {
if (array.length != 0) {
Expand Down Expand Up @@ -1071,20 +1087,7 @@ public void replay(ReplayDisposable<T> rs) {

TimedNode<Object> index = (TimedNode<Object>)rs.index;
if (index == null) {
index = head;
if (!done) {
// skip old entries
long limit = scheduler.now(unit) - maxAge;
TimedNode<Object> next = index.get();
while (next != null) {
long ts = next.time;
if (ts > limit) {
break;
}
index = next;
next = index.get();
}
}
index = getHead();
}

for (;;) {
Expand Down Expand Up @@ -1142,8 +1145,11 @@ public void replay(ReplayDisposable<T> rs) {

@Override
public int size() {
return size(getHead());
}

int size(TimedNode<Object> h) {
int s = 0;
TimedNode<Object> h = head;
while (s != Integer.MAX_VALUE) {
TimedNode<Object> next = h.get();
if (next == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testWindowedReplay() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -451,7 +451,7 @@ public void testWindowedReplayError() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -775,7 +775,7 @@ public void testTimedAndSizedTruncation() {
buf.next(2);
test.advanceTimeBy(1, TimeUnit.SECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(1, 2), values);
Assert.assertEquals(Arrays.asList(2), values);

buf.next(3);
buf.next(4);
Expand Down Expand Up @@ -1648,7 +1648,7 @@ public void testTimedAndSizedTruncationError() {
buf.next(2);
test.advanceTimeBy(1, TimeUnit.SECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(1, 2), values);
Assert.assertEquals(Arrays.asList(2), values);

buf.next(3);
buf.next(4);
Expand Down Expand Up @@ -1731,4 +1731,21 @@ protected void subscribeActual(Subscriber<? super Integer> s) {

assertTrue(bs.isCancelled());
}

@Test
public void timedNoOutdatedData() {
TestScheduler scheduler = new TestScheduler();

Flowable<Integer> source = Flowable.just(1)
.replay(2, TimeUnit.SECONDS, scheduler)
.autoConnect();

source.test().assertResult(1);

source.test().assertResult(1);

scheduler.advanceTimeBy(3, TimeUnit.SECONDS);

source.test().assertResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testWindowedReplay() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -451,7 +451,7 @@ public void testWindowedReplayError() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -762,7 +762,7 @@ public void testTimedAndSizedTruncation() {
buf.next(2);
test.advanceTimeBy(1, TimeUnit.SECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(1, 2), values);
Assert.assertEquals(Arrays.asList(2), values);

buf.next(3);
buf.next(4);
Expand Down Expand Up @@ -805,7 +805,7 @@ public void testTimedAndSizedTruncationError() {
buf.next(2);
test.advanceTimeBy(1, TimeUnit.SECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(1, 2), values);
Assert.assertEquals(Arrays.asList(2), values);

buf.next(3);
buf.next(4);
Expand Down Expand Up @@ -1511,4 +1511,21 @@ protected void subscribeActual(Observer<? super Integer> s) {

assertTrue(bs.isDisposed());
}

@Test
public void timedNoOutdatedData() {
TestScheduler scheduler = new TestScheduler();

Observable<Integer> source = Observable.just(1)
.replay(2, TimeUnit.SECONDS, scheduler)
.autoConnect();

source.test().assertResult(1);

source.test().assertResult(1);

scheduler.advanceTimeBy(3, TimeUnit.SECONDS);

source.test().assertResult();
}
}
Loading

0 comments on commit 19ba993

Please sign in to comment.