From 598ec37cab2f3c5d0c63fa86b2c7bfd22b79886f Mon Sep 17 00:00:00 2001 From: derar Date: Wed, 20 May 2020 17:16:16 +0300 Subject: [PATCH 1/3] no upstream.cancel() in FlowablePublishMulticast when the sequence is finished normally via onComplete/onError from upstream; minor code cleanup - unnecessary Disposable implementation to avoid method name clash --- .../flowable/FlowablePublishMulticast.java | 17 +++-- .../flowable/FlowablePublishFunctionTest.java | 65 +++++++++++++++++++ 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java index 05e0e7a907..e579243826 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java @@ -22,7 +22,6 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.fuseable.*; @@ -124,7 +123,7 @@ public void cancel() { } } - static final class MulticastProcessor extends Flowable implements FlowableSubscriber, Disposable { + static final class MulticastProcessor extends Flowable implements FlowableSubscriber { @SuppressWarnings("rawtypes") static final MulticastSubscription[] EMPTY = new MulticastSubscription[0]; @@ -192,18 +191,18 @@ public void onSubscribe(Subscription s) { } } - @Override public void dispose() { - SubscriptionHelper.cancel(upstream); - if (wip.getAndIncrement() == 0) { - SimpleQueue q = queue; - if (q != null) { - q.clear(); + if (!done) { + SubscriptionHelper.cancel(upstream); + if (wip.getAndIncrement() == 0) { + SimpleQueue q = queue; + if (q != null) { + q.clear(); + } } } } - @Override public boolean isDisposed() { return upstream.get() == SubscriptionHelper.CANCELLED; } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java index 69ed3a4aed..17fde63388 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; @@ -538,4 +539,68 @@ public boolean test(Integer w) throws Exception { .test() .assertResult(1000000); } + + @Test + public void noUpstreamCancelOnCasualChainClose() { + AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); + Flowable.range(1, 10) + .doOnCancel(new Action() { + @Override + public void run() throws Throwable { + parentUpstreamCancelled.set(true); + } + }) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable v) throws Exception { + return v; + } + }) + .test() + .awaitDone(1, TimeUnit.SECONDS); + assertFalse("Unnecessary upstream .cancel() call in FlowablePublishMulticast", parentUpstreamCancelled.get()); + } + + @Test + public void noUpstreamCancelOnCasualChainCloseWithInnerCancels() { + AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); + Flowable.range(1, 10) + .doOnCancel(new Action() { + @Override + public void run() throws Throwable { + parentUpstreamCancelled.set(true); + } + }) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable v) throws Exception { + return Flowable.concat(v.take(1), v.skip(5)); + } + }) + .test() + .awaitDone(1, TimeUnit.SECONDS); + assertFalse("Unnecessary upstream .cancel() call in FlowablePublishMulticast", parentUpstreamCancelled.get()); + } + + @Test + public void upstreamCancelOnDownstreamCancel() { + AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); + Flowable.range(1, 10) + .doOnCancel(new Action() { + @Override + public void run() throws Throwable { + parentUpstreamCancelled.set(true); + } + }) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable v) throws Exception { + return v; + } + }) + .take(1) + .test() + .awaitDone(1, TimeUnit.SECONDS); + assertTrue("Upstream .cancel() not called in FlowablePublishMulticast", parentUpstreamCancelled.get()); + } } From da2399b560f0d8e399336c6178ba730e4773d0a6 Mon Sep 17 00:00:00 2001 From: derar Date: Wed, 20 May 2020 20:50:13 +0300 Subject: [PATCH 2/3] cleanup in FlowablePublishFunctionTest: refactored anonymous classes to lambdas --- .../flowable/FlowablePublishFunctionTest.java | 261 ++++-------------- 1 file changed, 54 insertions(+), 207 deletions(-) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java index 17fde63388..62998eb543 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -27,7 +27,6 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.*; -import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava3.processors.PublishProcessor; @@ -40,12 +39,9 @@ public class FlowablePublishFunctionTest extends RxJavaTest { public void concatTakeFirstLastCompletes() { TestSubscriber ts = new TestSubscriber<>(); - Flowable.range(1, 3).publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return Flowable.concat(f.take(5), f.takeLast(5)); - } - }).subscribe(ts); + Flowable.range(1, 3) + .publish(f -> Flowable.concat(f.take(5), f.takeLast(5))) + .subscribe(ts); ts.assertValues(1, 2, 3); ts.assertNoErrors(); @@ -56,12 +52,9 @@ public Flowable apply(Flowable f) { public void concatTakeFirstLastBackpressureCompletes() { TestSubscriber ts = TestSubscriber.create(0L); - Flowable.range(1, 6).publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return Flowable.concat(f.take(5), f.takeLast(5)); - } - }).subscribe(ts); + Flowable.range(1, 6) + .publish(f -> Flowable.concat(f.take(5), f.takeLast(5))) + .subscribe(ts); ts.assertNoValues(); ts.assertNoErrors(); @@ -87,12 +80,7 @@ public void canBeCancelled() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return Flowable.concat(f.take(5), f.takeLast(5)); - } - }).subscribe(ts); + pp.publish(f -> Flowable.concat(f.take(5), f.takeLast(5))).subscribe(ts); pp.onNext(1); pp.onNext(2); @@ -109,8 +97,7 @@ public Flowable apply(Flowable f) { @Test public void invalidPrefetch() { try { - Flowable.never().publish( - Functions.>identity(), -99); + Flowable.never().publish(Functions.identity(), -99); fail("Didn't throw IllegalArgumentException"); } catch (IllegalArgumentException ex) { Assert.assertEquals("prefetch > 0 required but it was -99", ex.getMessage()); @@ -123,12 +110,7 @@ public void takeCompletes() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f.take(1); - } - }).subscribe(ts); + pp.publish(f -> f.take(1)).subscribe(ts); pp.onNext(1); @@ -154,12 +136,7 @@ public void onStart() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f.take(1); - } - }).subscribe(ts); + pp.publish(f -> f.take(1)).subscribe(ts); Assert.assertEquals(1, startCount.get()); } @@ -170,12 +147,7 @@ public void takeCompletesUnsafe() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f.take(1); - } - }).subscribe(ts); + pp.publish(f -> f.take(1)).subscribe(ts); pp.onNext(1); @@ -192,12 +164,7 @@ public void directCompletesUnsafe() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f; - } - }).subscribe(ts); + pp.publish(Functions.identity()).subscribe(ts); pp.onNext(1); pp.onComplete(); @@ -215,12 +182,7 @@ public void overflowMissingBackpressureException() { PublishProcessor pp = PublishProcessor.create(); - pp.publish(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f; - } - }).subscribe(ts); + pp.publish(Functions.identity()).subscribe(ts); for (int i = 0; i < Flowable.bufferSize() * 2; i++) { pp.onNext(i); @@ -241,12 +203,7 @@ public void overflowMissingBackpressureExceptionDelayed() { PublishProcessor pp = PublishProcessor.create(); - new FlowablePublishMulticast<>(pp, new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) { - return f; - } - }, Flowable.bufferSize(), true).subscribe(ts); + new FlowablePublishMulticast<>(pp, Functions.identity(), Flowable.bufferSize(), true).subscribe(ts); for (int i = 0; i < Flowable.bufferSize() * 2; i++) { pp.onNext(i); @@ -265,22 +222,16 @@ public Flowable apply(Flowable f) { @Test public void emptyIdentityMapped() { Flowable.empty() - .publish(Functions.>identity()) + .publish(Functions.identity()) .test() - .assertResult() - ; + .assertResult(); } @Test public void independentlyMapped() { PublishProcessor pp = PublishProcessor.create(); - TestSubscriber ts = pp.publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return Flowable.range(1, 5); - } - }).test(0); + TestSubscriber ts = pp.publish(v -> Flowable.range(1, 5)).test(0); assertTrue("pp has no Subscribers?!", pp.hasSubscribers()); @@ -297,12 +248,7 @@ public Publisher apply(Flowable v) throws Exception { @Test public void badSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return f.publish(Functions.>identity()); - } - }, false, 1, 1, 1); + TestHelper.checkBadSourceFlowable(f -> f.publish(Functions.identity()), false, 1, 1, 1); } @Test @@ -316,7 +262,7 @@ protected void subscribeActual(Subscriber s) { } } } - .publish(Functions.>identity(), 8) + .publish(Functions.identity(), 8) .test(0) .assertFailure(MissingBackpressureException.class); } @@ -324,12 +270,7 @@ protected void subscribeActual(Subscriber s) { @Test public void errorResubscribe() { Flowable.error(new TestException()) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) throws Exception { - return f.onErrorResumeWith(f); - } - }) + .publish(f -> f.onErrorResumeWith(f)) .test() .assertFailure(TestException.class); } @@ -337,21 +278,18 @@ public Publisher apply(Flowable f) throws Exception { @Test public void fusedInputCrash() { Flowable.just(1) - .map(new Function() { - @Override - public Integer apply(Integer v) throws Exception { - throw new TestException(); - } + .map(v -> { + throw new TestException(); }) - .publish(Functions.>identity()) + .publish(Functions.identity()) .test() .assertFailure(TestException.class); } @Test public void error() { - new FlowablePublishMulticast<>(Flowable.just(1).concatWith(Flowable.error(new TestException())), - Functions.>identity(), 16, true) + new FlowablePublishMulticast<>(Flowable.just(1).concatWith(Flowable.error(new TestException())), + Functions.identity(), 16, true) .test() .assertFailure(TestException.class, 1); } @@ -359,7 +297,7 @@ public void error() { @Test public void backpressuredEmpty() { Flowable.empty() - .publish(Functions.>identity()) + .publish(Functions.identity()) .test(0L) .assertResult(); } @@ -367,7 +305,7 @@ public void backpressuredEmpty() { @Test public void oneByOne() { Flowable.range(1, 10) - .publish(Functions.>identity()) + .publish(Functions.identity()) .rebatchRequests(1) .test() .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @@ -388,7 +326,7 @@ public void onNext(Integer t) { } }; - pp.publish(Functions.>identity()).subscribe(ts); + pp.publish(Functions.identity()).subscribe(ts); pp.onNext(1); @@ -400,12 +338,7 @@ public void onNext(Integer t) { @Test public void inputOutputSubscribeRace() { Flowable source = Flowable.just(1) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) throws Exception { - return f.subscribeOn(Schedulers.single()); - } - }); + .publish(f -> f.subscribeOn(Schedulers.single())); for (int i = 0; i < 500; i++) { source.test() @@ -417,7 +350,7 @@ public Publisher apply(Flowable f) throws Exception { @Test public void inputOutputSubscribeRace2() { Flowable source = Flowable.just(1).subscribeOn(Schedulers.single()) - .publish(Functions.>identity()); + .publish(Functions.identity()); for (int i = 0; i < 500; i++) { source.test() @@ -432,30 +365,19 @@ public void sourceSubscriptionDelayed() { final TestSubscriber ts1 = new TestSubscriber<>(0L); Flowable.just(1) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(final Flowable f) throws Exception { - Runnable r1 = new Runnable() { - @Override - public void run() { - f.subscribe(ts1); - } - }; - - Runnable r2 = new Runnable() { - @Override - public void run() { + .publish(f -> { + Runnable r1 = () -> f.subscribe(ts1); + + Runnable r2 = () -> { for (int j = 0; j < 100; j++) { ts1.request(1); } - } - }; + }; - TestHelper.race(r1, r2); - return f; - } - }).test() - .assertResult(1); + TestHelper.race(r1, r2); + return f; + }).test() + .assertResult(1); ts1.assertResult(1); } @@ -464,24 +386,9 @@ public void run() { @Test public void longFlow() { Flowable.range(1, 1000000) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return Flowable.mergeArray( - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 == 0; - } - }), - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 != 0; - } - })); - } - }) + .publish(v -> Flowable.mergeArray( + v.filter(w -> w % 2 == 0), + v.filter(w -> w % 2 != 0))) .takeLast(1) .test() .assertResult(1000000); @@ -490,24 +397,9 @@ public boolean test(Integer w) throws Exception { @Test public void longFlow2() { Flowable.range(1, 100000) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return Flowable.mergeArray( - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 == 0; - } - }), - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 != 0; - } - })); - } - }) + .publish(v -> Flowable.mergeArray( + v.filter(w -> w % 2 == 0), + v.filter(w -> w % 2 != 0))) .test() .assertValueCount(100000) .assertNoErrors() @@ -517,24 +409,9 @@ public boolean test(Integer w) throws Exception { @Test public void longFlowHidden() { Flowable.range(1, 1000000).hide() - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return Flowable.mergeArray( - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 == 0; - } - }), - v.filter(new Predicate() { - @Override - public boolean test(Integer w) throws Exception { - return w % 2 != 0; - } - })); - } - }) + .publish(v -> Flowable.mergeArray( + v.filter(w -> w % 2 == 0), + v.filter(w -> w % 2 != 0))) .takeLast(1) .test() .assertResult(1000000); @@ -544,18 +421,8 @@ public boolean test(Integer w) throws Exception { public void noUpstreamCancelOnCasualChainClose() { AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); Flowable.range(1, 10) - .doOnCancel(new Action() { - @Override - public void run() throws Throwable { - parentUpstreamCancelled.set(true); - } - }) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return v; - } - }) + .doOnCancel(() -> parentUpstreamCancelled.set(true)) + .publish(Functions.identity()) .test() .awaitDone(1, TimeUnit.SECONDS); assertFalse("Unnecessary upstream .cancel() call in FlowablePublishMulticast", parentUpstreamCancelled.get()); @@ -565,18 +432,8 @@ public Publisher apply(Flowable v) throws Exception { public void noUpstreamCancelOnCasualChainCloseWithInnerCancels() { AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); Flowable.range(1, 10) - .doOnCancel(new Action() { - @Override - public void run() throws Throwable { - parentUpstreamCancelled.set(true); - } - }) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return Flowable.concat(v.take(1), v.skip(5)); - } - }) + .doOnCancel(() -> parentUpstreamCancelled.set(true)) + .publish(v -> Flowable.concat(v.take(1), v.skip(5))) .test() .awaitDone(1, TimeUnit.SECONDS); assertFalse("Unnecessary upstream .cancel() call in FlowablePublishMulticast", parentUpstreamCancelled.get()); @@ -586,18 +443,8 @@ public Publisher apply(Flowable v) throws Exception { public void upstreamCancelOnDownstreamCancel() { AtomicBoolean parentUpstreamCancelled = new AtomicBoolean(false); Flowable.range(1, 10) - .doOnCancel(new Action() { - @Override - public void run() throws Throwable { - parentUpstreamCancelled.set(true); - } - }) - .publish(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return v; - } - }) + .doOnCancel(() -> parentUpstreamCancelled.set(true)) + .publish(Functions.identity()) .take(1) .test() .awaitDone(1, TimeUnit.SECONDS); From 39c83219be8e6f6d05fda06d99511d21d62fded3 Mon Sep 17 00:00:00 2001 From: derar Date: Wed, 20 May 2020 20:57:12 +0300 Subject: [PATCH 3/3] reduced visibility for dispose() and isDisposed() in the inner MulticastProcessor --- .../internal/operators/flowable/FlowablePublishMulticast.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java index e579243826..26dbaa7f4b 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java @@ -191,7 +191,7 @@ public void onSubscribe(Subscription s) { } } - public void dispose() { + void dispose() { if (!done) { SubscriptionHelper.cancel(upstream); if (wip.getAndIncrement() == 0) { @@ -203,7 +203,7 @@ public void dispose() { } } - public boolean isDisposed() { + boolean isDisposed() { return upstream.get() == SubscriptionHelper.CANCELLED; }