diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java index b5b52275b3..60dc2f2ab7 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java @@ -13,14 +13,17 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; -import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; public final class FlowableScanSeed extends AbstractFlowableWithUpstream { @@ -45,20 +48,57 @@ protected void subscribeActual(Subscriber s) { return; } - source.subscribe(new ScanSeedSubscriber(s, accumulator, r)); + source.subscribe(new ScanSeedSubscriber(s, accumulator, r, bufferSize())); } - static final class ScanSeedSubscriber extends SinglePostCompleteSubscriber { + static final class ScanSeedSubscriber + extends AtomicInteger + implements Subscriber, Subscription { private static final long serialVersionUID = -1776795561228106469L; + final Subscriber actual; + final BiFunction accumulator; - boolean done; + final SimplePlainQueue queue; + + final AtomicLong requested; + + final int prefetch; + + final int limit; + + volatile boolean cancelled; + + volatile boolean done; + Throwable error; + + Subscription s; + + R value; - ScanSeedSubscriber(Subscriber actual, BiFunction accumulator, R value) { - super(actual); + int consumed; + + ScanSeedSubscriber(Subscriber actual, BiFunction accumulator, R value, int prefetch) { + this.actual = actual; this.accumulator = accumulator; this.value = value; + this.prefetch = prefetch; + this.limit = prefetch - (prefetch >> 2); + this.queue = new SpscArrayQueue(prefetch); + this.queue.offer(value); + this.requested = new AtomicLong(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(prefetch - 1); + } } @Override @@ -68,21 +108,18 @@ public void onNext(T t) { } R v = value; - - R u; - try { - u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); + v = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); s.cancel(); - onError(e); + onError(ex); return; } - value = u; - produced++; - actual.onNext(v); + value = v; + queue.offer(v); + drain(); } @Override @@ -91,9 +128,9 @@ public void onError(Throwable t) { RxJavaPlugins.onError(t); return; } + error = t; done = true; - value = null; - actual.onError(t); + drain(); } @Override @@ -102,7 +139,104 @@ public void onComplete() { return; } done = true; - complete(value); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + s.cancel(); + if (getAndIncrement() == 0) { + queue.clear(); + } + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber a = actual; + SimplePlainQueue q = queue; + int lim = limit; + int c = consumed; + + for (;;) { + + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + boolean d = done; + + if (d) { + Throwable ex = error; + if (ex != null) { + q.clear(); + a.onError(ex); + return; + } + } + + R v = q.poll(); + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + if (++c == lim) { + c = 0; + s.request(lim); + } + } + + if (e == r) { + if (done) { + Throwable ex = error; + if (ex != null) { + q.clear(); + a.onError(ex); + return; + } + if (q.isEmpty()) { + a.onComplete(); + return; + } + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + } + + consumed = c; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } } } } diff --git a/src/test/java/io/reactivex/flowable/FlowableScanTests.java b/src/test/java/io/reactivex/flowable/FlowableScanTests.java deleted file mode 100644 index 47285cc403..0000000000 --- a/src/test/java/io/reactivex/flowable/FlowableScanTests.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.flowable; - -import static org.junit.Assert.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import io.reactivex.*; -import io.reactivex.exceptions.UndeliverableException; -import io.reactivex.flowable.FlowableEventStream.Event; -import io.reactivex.functions.*; -import io.reactivex.plugins.RxJavaPlugins; - -public class FlowableScanTests { - - @Test - public void testUnsubscribeScan() { - - FlowableEventStream.getEventStream("HTTP-ClusterB", 20) - .scan(new HashMap(), new BiFunction, Event, HashMap>() { - @Override - public HashMap apply(HashMap accum, Event perInstanceEvent) { - accum.put("instance", perInstanceEvent.instanceId); - return accum; - } - }) - .take(10) - .blockingForEach(new Consumer>() { - @Override - public void accept(HashMap v) { - System.out.println(v); - } - }); - } - - @Test - public void testScanWithSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { - final List list = new CopyOnWriteArrayList(); - Consumer errorConsumer = new Consumer() { - @Override - public void accept(Throwable t) throws Exception { - list.add(t); - }}; - try { - RxJavaPlugins.setErrorHandler(errorConsumer); - final RuntimeException e = new RuntimeException(); - final RuntimeException e2 = new RuntimeException(); - Burst.items(1).error(e2) - .scan(0, throwingBiFunction(e)) - .test() - .assertNoValues() - .assertError(e); - - assertEquals("" + list, 1, list.size()); - assertTrue("" + list, list.get(0) instanceof UndeliverableException); - assertEquals(e2, list.get(0).getCause()); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { - final RuntimeException e = new RuntimeException(); - Burst.item(1).create() - .scan(0, throwingBiFunction(e)) - .test() - .assertNoValues() - .assertError(e); - } - - @Test - public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { - final RuntimeException e = new RuntimeException(); - final AtomicInteger count = new AtomicInteger(); - Burst.items(1, 2).create().scan(0, new BiFunction() { - - @Override - public Integer apply(Integer n1, Integer n2) throws Exception { - count.incrementAndGet(); - throw e; - }}) - .test() - .assertNoValues() - .assertError(e); - assertEquals(1, count.get()); - } - - @Test - public void testScanWithSeedCompletesNormally() { - Flowable.just(1,2,3).scan(0, SUM) - .test() - .assertValues(0, 1, 3, 6) - .assertComplete(); - } - - @Test - public void testScanWithSeedWhenScanSeedProviderThrows() { - final RuntimeException e = new RuntimeException(); - Flowable.just(1,2,3).scanWith(throwingCallable(e), - SUM) - .test() - .assertError(e) - .assertNoValues(); - } - - @Test - public void testScanNoSeed() { - Flowable.just(1, 2, 3) - .scan(SUM) - .test() - .assertValues(1, 3, 6) - .assertComplete(); - } - - @Test - public void testScanNoSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { - final List list = new CopyOnWriteArrayList(); - Consumer errorConsumer = new Consumer() { - @Override - public void accept(Throwable t) throws Exception { - list.add(t); - }}; - try { - RxJavaPlugins.setErrorHandler(errorConsumer); - final RuntimeException e = new RuntimeException(); - final RuntimeException e2 = new RuntimeException(); - Burst.items(1, 2).error(e2) - .scan(throwingBiFunction(e)) - .test() - .assertValue(1) - .assertError(e); - - assertEquals("" + list, 1, list.size()); - assertTrue("" + list, list.get(0) instanceof UndeliverableException); - assertEquals(e2, list.get(0).getCause()); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { - final RuntimeException e = new RuntimeException(); - Burst.items(1, 2).create() - .scan(throwingBiFunction(e)) - .test() - .assertValue(1) - .assertError(e); - } - - @Test - public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { - final RuntimeException e = new RuntimeException(); - final AtomicInteger count = new AtomicInteger(); - Burst.items(1, 2, 3).create().scan(new BiFunction() { - - @Override - public Integer apply(Integer n1, Integer n2) throws Exception { - count.incrementAndGet(); - throw e; - }}) - .test() - .assertValue(1) - .assertError(e); - assertEquals(1, count.get()); - } - - private static BiFunction throwingBiFunction(final RuntimeException e) { - return new BiFunction() { - @Override - public Integer apply(Integer n1, Integer n2) throws Exception { - throw e; - } - }; - } - - private static final BiFunction SUM = new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) throws Exception { - return t1 + t2; - } - }; - - private static Callable throwingCallable(final RuntimeException e) { - return new Callable() { - @Override - public Integer call() throws Exception { - throw e; - } - }; - } -} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java index 291084809f..a01495295e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java @@ -13,11 +13,11 @@ package io.reactivex.internal.operators.flowable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.util.*; -import java.util.concurrent.Callable; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.*; @@ -25,8 +25,11 @@ import io.reactivex.*; import io.reactivex.Flowable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; +import io.reactivex.flowable.*; +import io.reactivex.flowable.FlowableEventStream.Event; import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.subscribers.*; @@ -362,7 +365,7 @@ public void onNext(Integer integer) { }); verify(producer.get(), never()).request(0); - verify(producer.get(), times(2)).request(1); + verify(producer.get(), times(1)).request(Flowable.bufferSize() - 1); } @Test @@ -459,4 +462,255 @@ public Object apply(Object a, Object b) throws Exception { .test() .assertFailure(TestException.class); } + + @Test + public void neverSource() { + Flowable.never() + .scan(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertValue(0) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void testUnsubscribeScan() { + + FlowableEventStream.getEventStream("HTTP-ClusterB", 20) + .scan(new HashMap(), new BiFunction, Event, HashMap>() { + @Override + public HashMap apply(HashMap accum, Event perInstanceEvent) { + accum.put("instance", perInstanceEvent.instanceId); + return accum; + } + }) + .take(10) + .blockingForEach(new Consumer>() { + @Override + public void accept(HashMap v) { + System.out.println(v); + } + }); + } + + @Test + public void testScanWithSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + final List list = new CopyOnWriteArrayList(); + Consumer errorConsumer = new Consumer() { + @Override + public void accept(Throwable t) throws Exception { + list.add(t); + }}; + try { + RxJavaPlugins.setErrorHandler(errorConsumer); + final RuntimeException e = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Burst.items(1).error(e2) + .scan(0, throwingBiFunction(e)) + .test() + .assertValues(0) + .assertError(e); + + assertEquals("" + list, 1, list.size()); + assertTrue("" + list, list.get(0) instanceof UndeliverableException); + assertEquals(e2, list.get(0).getCause()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + Burst.item(1).create() + .scan(0, throwingBiFunction(e)) + .test() + .assertValue(0) + .assertError(e); + } + + @Test + public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + final AtomicInteger count = new AtomicInteger(); + Burst.items(1, 2).create().scan(0, new BiFunction() { + + @Override + public Integer apply(Integer n1, Integer n2) throws Exception { + count.incrementAndGet(); + throw e; + }}) + .test() + .assertValues(0) + .assertError(e); + assertEquals(1, count.get()); + } + + @Test + public void testScanWithSeedCompletesNormally() { + Flowable.just(1,2,3).scan(0, SUM) + .test() + .assertValues(0, 1, 3, 6) + .assertComplete(); + } + + @Test + public void testScanWithSeedWhenScanSeedProviderThrows() { + final RuntimeException e = new RuntimeException(); + Flowable.just(1,2,3).scanWith(throwingCallable(e), + SUM) + .test() + .assertError(e) + .assertNoValues(); + } + + @Test + public void testScanNoSeed() { + Flowable.just(1, 2, 3) + .scan(SUM) + .test() + .assertValues(1, 3, 6) + .assertComplete(); + } + + @Test + public void testScanNoSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + final List list = new CopyOnWriteArrayList(); + Consumer errorConsumer = new Consumer() { + @Override + public void accept(Throwable t) throws Exception { + list.add(t); + }}; + try { + RxJavaPlugins.setErrorHandler(errorConsumer); + final RuntimeException e = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Burst.items(1, 2).error(e2) + .scan(throwingBiFunction(e)) + .test() + .assertValue(1) + .assertError(e); + + assertEquals("" + list, 1, list.size()); + assertTrue("" + list, list.get(0) instanceof UndeliverableException); + assertEquals(e2, list.get(0).getCause()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + Burst.items(1, 2).create() + .scan(throwingBiFunction(e)) + .test() + .assertValue(1) + .assertError(e); + } + + @Test + public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + final AtomicInteger count = new AtomicInteger(); + Burst.items(1, 2, 3).create().scan(new BiFunction() { + + @Override + public Integer apply(Integer n1, Integer n2) throws Exception { + count.incrementAndGet(); + throw e; + }}) + .test() + .assertValue(1) + .assertError(e); + assertEquals(1, count.get()); + } + + private static BiFunction throwingBiFunction(final RuntimeException e) { + return new BiFunction() { + @Override + public Integer apply(Integer n1, Integer n2) throws Exception { + throw e; + } + }; + } + + private static final BiFunction SUM = new BiFunction() { + + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }; + + private static Callable throwingCallable(final RuntimeException e) { + return new Callable() { + @Override + public Integer call() throws Exception { + throw e; + } + }; + } + + @Test + public void scanEmptyBackpressured() { + Flowable.empty() + .scan(0, SUM) + .test(1) + .assertResult(0); + } + + @Test + public void scanErrorBackpressured() { + Flowable.error(new TestException()) + .scan(0, SUM) + .test(0) + .assertFailure(TestException.class); + } + + @Test + public void scanTake() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + onComplete(); + cancel(); + } + }; + + Flowable.range(1, 10) + .scan(0, SUM) + .subscribe(ts) + ; + + ts.assertResult(0); + } + + @Test + public void scanLong() { + int n = 2 * Flowable.bufferSize(); + + for (int b = 1; b <= n; b *= 2) { + List list = Flowable.range(1, n) + .scan(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return b; + } + }) + .rebatchRequests(b) + .toList() + .blockingGet(); + + for (int i = 0; i <= n; i++) { + assertEquals(i, list.get(i).intValue()); + } + } + } }