diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 7cec48cc25..84338b96f9 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -1005,6 +1005,16 @@ final void setFirst(Node n) { set(n); } + /** + * Returns the current head for initializing the replay location + * for a new subscriber. + * Override it to consider linked but outdated elements. + * @return the current head + */ + Node getInitialHead() { + return get(); + } + @Override public final void next(T value) { Object o = enterTransform(nl.next(value)); @@ -1049,7 +1059,7 @@ public final void replay(InnerProducer output) { Node node = output.index(); if (node == null) { - node = get(); + node = getInitialHead(); output.index = node; /* @@ -1143,7 +1153,7 @@ void truncateFinal() { } /* test */ final void collect(Collection output) { - Node n = get(); + Node n = getInitialHead(); for (;;) { Node next = n.get(); if (next != null) { @@ -1219,6 +1229,20 @@ Object leaveTransform(Object value) { return ((Timestamped)value).getValue(); } + @Override + Node getInitialHead() { + long timeLimit = scheduler.now() - maxAgeInMillis; + Node prev = get(); + + Node next = prev.get(); + while (next != null && ((Timestamped)next.value).getTimestampMillis() <= timeLimit) { + prev = next; + next = next.get(); + } + + return prev; + } + @Override void truncate() { long timeLimit = scheduler.now() - maxAgeInMillis; diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index f8653f10bd..4d7f184951 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -193,7 +193,8 @@ public void testWindowedReplay() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + // since onComplete is also delayed, value 3 becomes too old for replay. + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); @@ -479,7 +480,8 @@ public void testWindowedReplayError() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + // since onError is also delayed, value 3 becomes too old for replay. + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); inOrder.verifyNoMoreInteractions(); @@ -788,10 +790,16 @@ public void testTimedAndSizedTruncation() { buf.next(1); test.advanceTimeBy(1, TimeUnit.SECONDS); buf.next(2); - test.advanceTimeBy(1, TimeUnit.SECONDS); + // exact 1 second makes value 1 too old + test.advanceTimeBy(900, TimeUnit.MILLISECONDS); buf.collect(values); Assert.assertEquals(Arrays.asList(1, 2), values); + values.clear(); + test.advanceTimeBy(100, TimeUnit.MILLISECONDS); + buf.collect(values); + Assert.assertEquals(Arrays.asList(2), values); + buf.next(3); buf.next(4); values.clear(); @@ -1257,4 +1265,36 @@ public void testSubscribersComeAndGoAtRequestBoundaries2() { ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10); ts3.assertCompleted(); } + + @Test + public void dontReplayOldValues() { + + PublishSubject ps = PublishSubject.create(); + + TestScheduler scheduler = new TestScheduler(); + + ConnectableObservable co = ps.replay(1, TimeUnit.SECONDS, scheduler); + + co.subscribe(); // make sure replay runs in unbounded mode + + co.connect(); + + ps.onNext(1); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + ps.onNext(2); + + scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + ps.onNext(3); + + scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + TestSubscriber ts = TestSubscriber.create(); + + co.subscribe(ts); + + ts.assertValue(3); + } } \ No newline at end of file