diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 489e1df858..a630688a7a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -947,7 +947,7 @@ public final void replay(InnerSubscription output) { Node node = output.index(); if (node == null) { - node = get(); + node = getHead(); output.index = node; BackpressureHelper.add(output.totalRequested, node.index); @@ -1033,7 +1033,7 @@ void truncateFinal() { } /* test */ final void collect(Collection output) { - Node n = get(); + Node n = getHead(); for (;;) { Node next = n.get(); if (next != null) { @@ -1055,6 +1055,10 @@ void truncateFinal() { /* test */ boolean hasCompleted() { return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value)); } + + Node getHead() { + return get(); + } } /** @@ -1172,5 +1176,28 @@ void truncateFinal() { setFirst(prev); } } + + @Override + Node getHead() { + long timeLimit = scheduler.now(unit) - maxAge; + Node prev = get(); + Node next = prev.get(); + for (;;) { + if (next == null) { + break; + } + Timed v = (Timed)next.value; + if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) { + break; + } + if (v.time() <= timeLimit) { + prev = next; + next = next.get(); + } else { + break; + } + } + return prev; + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 14884030a2..1234bc0f7e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -757,7 +757,7 @@ public final void replay(InnerDisposable output) { for (;;) { Node node = output.index(); if (node == null) { - node = get(); + node = getHead(); output.index = node; } @@ -821,7 +821,7 @@ void truncateFinal() { } /* test */ final void collect(Collection output) { - Node n = get(); + Node n = getHead(); for (;;) { Node next = n.get(); if (next != null) { @@ -843,6 +843,10 @@ void truncateFinal() { /* test */ boolean hasCompleted() { return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value)); } + + Node getHead() { + return get(); + } } /** @@ -960,5 +964,28 @@ void truncateFinal() { setFirst(prev); } } + + @Override + Node getHead() { + long timeLimit = scheduler.now(unit) - maxAge; + Node prev = get(); + Node next = prev.get(); + for (;;) { + if (next == null) { + break; + } + Timed v = (Timed)next.value; + if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) { + break; + } + if (v.time() <= timeLimit) { + prev = next; + next = next.get(); + } else { + break; + } + } + return prev; + } } } diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 4fd3a9d41b..cc1297f073 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -1066,8 +1066,8 @@ public T getValue() { @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { - TimedNode h = head; - int s = size(); + TimedNode h = getHead(); + int s = size(h); if (s == 0) { if (array.length != 0) { @@ -1093,6 +1093,22 @@ public T[] getValues(T[] array) { return array; } + TimedNode getHead() { + TimedNode index = head; + // skip old entries + long limit = scheduler.now(unit) - maxAge; + TimedNode next = index.get(); + while (next != null) { + long ts = next.time; + if (ts > limit) { + break; + } + index = next; + next = index.get(); + } + return index; + } + @Override @SuppressWarnings("unchecked") public void replay(ReplaySubscription rs) { @@ -1105,20 +1121,7 @@ public void replay(ReplaySubscription rs) { TimedNode index = (TimedNode)rs.index; if (index == null) { - index = head; - if (!done) { - // skip old entries - long limit = scheduler.now(unit) - maxAge; - TimedNode next = index.get(); - while (next != null) { - long ts = next.time; - if (ts > limit) { - break; - } - index = next; - next = index.get(); - } - } + index = getHead(); } for (;;) { @@ -1185,8 +1188,11 @@ public void replay(ReplaySubscription rs) { @Override public int size() { + return size(getHead()); + } + + int size(TimedNode h) { int s = 0; - TimedNode h = head; while (s != Integer.MAX_VALUE) { TimedNode next = h.get(); if (next == null) { diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 286125d7eb..c56618d17a 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -1029,11 +1029,27 @@ public T getValue() { return (T)v; } + TimedNode getHead() { + TimedNode index = head; + // skip old entries + long limit = scheduler.now(unit) - maxAge; + TimedNode next = index.get(); + while (next != null) { + long ts = next.time; + if (ts > limit) { + break; + } + index = next; + next = index.get(); + } + return index; + } + @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { - TimedNode h = head; - int s = size(); + TimedNode h = getHead(); + int s = size(h); if (s == 0) { if (array.length != 0) { @@ -1071,20 +1087,7 @@ public void replay(ReplayDisposable rs) { TimedNode index = (TimedNode)rs.index; if (index == null) { - index = head; - if (!done) { - // skip old entries - long limit = scheduler.now(unit) - maxAge; - TimedNode next = index.get(); - while (next != null) { - long ts = next.time; - if (ts > limit) { - break; - } - index = next; - next = index.get(); - } - } + index = getHead(); } for (;;) { @@ -1142,8 +1145,11 @@ public void replay(ReplayDisposable rs) { @Override public int size() { + return size(getHead()); + } + + int size(TimedNode h) { int s = 0; - TimedNode h = head; while (s != Integer.MAX_VALUE) { TimedNode next = h.get(); if (next == null) { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index e6752e4c99..a66a1651f9 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -175,7 +175,7 @@ public void testWindowedReplay() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); @@ -451,7 +451,7 @@ public void testWindowedReplayError() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); inOrder.verifyNoMoreInteractions(); @@ -775,7 +775,7 @@ public void testTimedAndSizedTruncation() { buf.next(2); test.advanceTimeBy(1, TimeUnit.SECONDS); buf.collect(values); - Assert.assertEquals(Arrays.asList(1, 2), values); + Assert.assertEquals(Arrays.asList(2), values); buf.next(3); buf.next(4); @@ -1648,7 +1648,7 @@ public void testTimedAndSizedTruncationError() { buf.next(2); test.advanceTimeBy(1, TimeUnit.SECONDS); buf.collect(values); - Assert.assertEquals(Arrays.asList(1, 2), values); + Assert.assertEquals(Arrays.asList(2), values); buf.next(3); buf.next(4); @@ -1731,4 +1731,21 @@ protected void subscribeActual(Subscriber s) { assertTrue(bs.isCancelled()); } + + @Test + public void timedNoOutdatedData() { + TestScheduler scheduler = new TestScheduler(); + + Flowable source = Flowable.just(1) + .replay(2, TimeUnit.SECONDS, scheduler) + .autoConnect(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index b0f3bf103d..2057e33692 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -175,7 +175,7 @@ public void testWindowedReplay() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); @@ -451,7 +451,7 @@ public void testWindowedReplayError() { InOrder inOrder = inOrder(observer1); co.subscribe(observer1); - inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, never()).onNext(3); inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); inOrder.verifyNoMoreInteractions(); @@ -762,7 +762,7 @@ public void testTimedAndSizedTruncation() { buf.next(2); test.advanceTimeBy(1, TimeUnit.SECONDS); buf.collect(values); - Assert.assertEquals(Arrays.asList(1, 2), values); + Assert.assertEquals(Arrays.asList(2), values); buf.next(3); buf.next(4); @@ -805,7 +805,7 @@ public void testTimedAndSizedTruncationError() { buf.next(2); test.advanceTimeBy(1, TimeUnit.SECONDS); buf.collect(values); - Assert.assertEquals(Arrays.asList(1, 2), values); + Assert.assertEquals(Arrays.asList(2), values); buf.next(3); buf.next(4); @@ -1511,4 +1511,21 @@ protected void subscribeActual(Observer s) { assertTrue(bs.isDisposed()); } + + @Test + public void timedNoOutdatedData() { + TestScheduler scheduler = new TestScheduler(); + + Observable source = Observable.just(1) + .replay(2, TimeUnit.SECONDS, scheduler) + .autoConnect(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 71afd720d9..14fb6869a1 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -503,7 +503,7 @@ public void testReplayTimestampedAfterTermination() { verify(o, never()).onNext(1); verify(o, never()).onNext(2); - verify(o).onNext(3); + verify(o, never()).onNext(3); verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } @@ -793,9 +793,11 @@ public void testSizeAndHasAnyValueTimeBounded() { for (int i = 0; i < 1000; i++) { rs.onNext(i); - ts.advanceTimeBy(2, TimeUnit.SECONDS); assertEquals(1, rs.size()); assertTrue(rs.hasValue()); + ts.advanceTimeBy(2, TimeUnit.SECONDS); + assertEquals(0, rs.size()); + assertFalse(rs.hasValue()); } rs.onComplete(); @@ -1279,4 +1281,21 @@ public void onNext(Integer t) { ts.assertResult(1, 2); } + + @Test + public void timedNoOutdatedData() { + TestScheduler scheduler = new TestScheduler(); + + ReplayProcessor source = ReplayProcessor.createWithTime(2, TimeUnit.SECONDS, scheduler); + source.onNext(1); + source.onComplete(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 4e6877230f..6280788ad7 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -492,7 +492,7 @@ public void testReplayTimestampedAfterTermination() { verify(o, never()).onNext(1); verify(o, never()).onNext(2); - verify(o).onNext(3); + verify(o, never()).onNext(3); verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } @@ -782,9 +782,11 @@ public void testSizeAndHasAnyValueTimeBounded() { for (int i = 0; i < 1000; i++) { rs.onNext(i); - ts.advanceTimeBy(2, TimeUnit.SECONDS); assertEquals(1, rs.size()); assertTrue(rs.hasValue()); + ts.advanceTimeBy(2, TimeUnit.SECONDS); + assertEquals(0, rs.size()); + assertFalse(rs.hasValue()); } rs.onComplete(); @@ -1164,4 +1166,21 @@ public void dispose() { TestHelper.checkDisposed(ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, Schedulers.single(), 10)); } + + @Test + public void timedNoOutdatedData() { + TestScheduler scheduler = new TestScheduler(); + + ReplaySubject source = ReplaySubject.createWithTime(2, TimeUnit.SECONDS, scheduler); + source.onNext(1); + source.onComplete(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } }