From edff27426ca6a155c5f07fd5b71460ddfb1ec228 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Tue, 24 Nov 2020 09:21:30 +0100 Subject: [PATCH 1/3] Concat array enhancement Signed-off-by: Daniel Kec --- .../common/reactive/MultiConcatArray.java | 242 ++++++++++++++++-- 1 file changed, 216 insertions(+), 26 deletions(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java index 0e3bc61ffe8..28881e8a18b 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java @@ -17,8 +17,9 @@ package io.helidon.common.reactive; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicInteger; /** * Relay items in order from subsequent Flow.Publishers as a single Multi source. @@ -35,31 +36,89 @@ final class MultiConcatArray implements Multi { public void subscribe(Flow.Subscriber subscriber) { ConcatArraySubscriber parent = new ConcatArraySubscriber<>(subscriber, sources); subscriber.onSubscribe(parent); - parent.nextSource(); + parent.nextSource(parent.produced); } - static final class ConcatArraySubscriber extends SubscriptionArbiter - implements Flow.Subscriber { + protected static final class ConcatArraySubscriber + implements Flow.Subscriber, Flow.Subscription { private final Flow.Subscriber downstream; private final Flow.Publisher[] sources; - private final AtomicInteger wip; + private Flow.Subscription subscription; private int index; - private long produced; + private long produced = INIT; + + private volatile long requested = SEE_OTHER; + private volatile long pending = INIT; + private volatile Thread lastThreadCompleting; + private boolean redo; + + static final long BAD = Long.MIN_VALUE; + static final long CANCEL = Long.MIN_VALUE + 1; + static final long SEE_OTHER = Long.MIN_VALUE + 2; + static final long INIT = Long.MIN_VALUE + 3; + + static final VarHandle REQUESTED; + static final VarHandle PENDING; + static final VarHandle LASTTHREADCOMPLETING; + + static { + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + REQUESTED = lookup.findVarHandle(ConcatArraySubscriber.class, "requested", long.class); + PENDING = lookup.findVarHandle(ConcatArraySubscriber.class, "pending", long.class); + LASTTHREADCOMPLETING = lookup + .findVarHandle(ConcatArraySubscriber.class, "lastThreadCompleting", Thread.class); + } catch (Exception e) { + throw new Error("Expected lookup to succeed", e); + } + } ConcatArraySubscriber(Flow.Subscriber downstream, Flow.Publisher[] sources) { this.downstream = downstream; this.sources = sources; - this.wip = new AtomicInteger(); } @Override public void onSubscribe(Flow.Subscription subscription) { - super.setSubscription(subscription); + long p0 = pending; + if (p0 == CANCEL) { + subscription.cancel(); + return; + } + + produced++; // assert: matching request(1) has been done by nextSource() + this.subscription = subscription; + // assert: requested == SEE_OTHER + REQUESTED.setOpaque(this, p0); // assert: p0 is guaranteed to be a value of requested never seen before + // or is a terminal value (when concurrent good requests do not matter) + long p = (long) PENDING.getAndSet(this, SEE_OTHER); + + if (p == CANCEL) { + cancel(); + return; + } + + if (p == produced) { + return; + } + + // assert: p > produced, unless p == BAD - there were request() between nextSource() + // and this onSubscribe(); invoke request() on their behalf + long req = unconsumed(p, produced); + if (req < 0) { + updateRequest(req); + } else if (p != p0) { + // assert: p != BAD, because req > 0 (because p > produced) + // assert: p0 != BAD, because pending cannot be updated to p > produced after p0 = BAD + // assert: requested is at least p0; add the remainder that got added to pending + updateRequest(p - p0); + } + subscription.request(req); } @Override @@ -70,37 +129,168 @@ public void onNext(T item) { @Override public void onError(Throwable throwable) { + REQUESTED.setOpaque(this, CANCEL); downstream.onError(throwable); } @Override public void onComplete() { - long produced = this.produced; - if (produced != 0L) { - this.produced = 0L; - super.produced(produced); + Thread current = Thread.currentThread(); + if (LASTTHREADCOMPLETING.getOpaque(this) == current) { + redo = true; + return; + } + + LASTTHREADCOMPLETING.setOpaque(this, current); + VarHandle.storeStoreFence(); + boolean sameThread; + boolean again; + do { + redo = false; + // assert: pending == SEE_OTHER + PENDING.setOpaque(this, produced); + long r = (long) REQUESTED.getAndSet(this, SEE_OTHER); + subscription = null; + + nextSource(r); + again = redo; + VarHandle.loadLoadFence(); + sameThread = LASTTHREADCOMPLETING.getOpaque(this) == current; + } while (again && sameThread); + + if (sameThread) { + LASTTHREADCOMPLETING.compareAndSet(this, current, null); } - nextSource(); } - public void nextSource() { - if (wip.getAndIncrement() == 0) { - do { - if (index == sources.length) { - downstream.onComplete(); - } else { - sources[index++].subscribe(this); - } - } while (wip.decrementAndGet() != 0); + protected void nextSource(long r) { + // assert: requested == SEE_OTHER + if (r == CANCEL) { + return; } + + if (index == sources.length) { + downstream.onComplete(); + return; + } + + Flow.Publisher nextPub = sources[index++]; + + // assert: r >= produced, unless r == BAD - because produced + // gets incremented only in response to a preceding request + r = unconsumed(r, produced - 1); // assert: same as unconsumed(r+1, produced) for + // r representing a request count (not a terminal state); one request for the future onSubscribe; + // for other values of r the value of produced is ignored; + + // assert: this will update pending + updateRequest(r); + // assert: requested is guaranteed to change between the subscriptions + // so request() concurrent with onSubscribe cannot + // miss the update of subscription - they will + // always see requested change + + nextPub.subscribe(this); + } + + protected static long unconsumed(long req, long produced) { + // assert: all invocations of unconsumed ensure req > produced, or + // req represents a final state, where produced does not matter - + // MAX_VALUE, BAD, CANCEL + + if (req >= INIT && req < Long.MAX_VALUE) { + if (produced < 0 && Long.MAX_VALUE + produced < req) { + req = Long.MAX_VALUE; + } else { + req -= produced; + } + + // assert: req > 0 + } + + return req; } @Override public void request(long n) { - if (n <= 0) { - downstream.onError(new IllegalArgumentException("Rule ยง3.9 violated: non-positive requests are forbidden")); - } else { - super.request(n); + Flow.Subscription sub = updateRequest(n <= 0 ? BAD : n); + if (sub != null) { + sub.request(n); + } + } + + private boolean updatePending(long n) { + long req; + long nextReq; + do { + req = pending; + if (req == CANCEL) { + return true; + } + + if (req == SEE_OTHER) { + return false; + } + nextReq = n < INIT ? n + // assert: n >= 0 + : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE + : req + n; + } while (!PENDING.compareAndSet(this, req, nextReq)); + + return true; + } + + private Flow.Subscription updateRequest(long n) { + Flow.Subscription sub; + long req; + long nextReq; + do { + req = requested; + while (req < INIT) { + if (req != SEE_OTHER || updatePending(n)) { + return null; + } + req = requested; + } + + sub = subscription; + nextReq = n < INIT ? n + // assert: n >= 0 + : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE + : req + n; + + } while (!REQUESTED.compareAndSet(this, req, nextReq)); + + if (nextReq < INIT) { + // assert: good requests should be delivered once and only once to ensure + // no double-accounting happens - so we only + // attempt delivering to subscription seen before updating requested, and + // mutual exclusion between accesses to subscription.request() from + // request(), nextSource() and onSubscribe() is enforced. + // When MAX_VALUE is reached, good requests do not need delivering: concurrent + // request() may attempt to deliver to an old subscription, as it will not be + // able to observe new subscriptions (new values of requested), but good requests + // do not need delivering + + // assert: cancellations and bad requests can be delivered more than once - no + // double accounting + // occurs, and only one onError will be delivered by upstream Publisher. For + // this reason can read subscription as it appears after updating requested - + // this may result in both onSubscribe() and concurrent request() to call + // subscription.request, but this is ok for a bad request + // What we do not want to happen, is for bad request to be delivered to an old + // subscription, the update of which concurrent request() cannot detect after + // requested reaches MAX_VALUE - so, should read subscription after updating + // requested + return subscription; + } + return sub; + } + + @Override + public void cancel() { + Flow.Subscription sub = updateRequest(CANCEL); + if (sub != null) { + sub.cancel(); } } } From 67f59cbce720744bf1057da1e9c734ae26e9a4d1 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 6 Jan 2021 18:18:51 +0100 Subject: [PATCH 2/3] Assumes some Publishers are not fully conformant and may issue onNext/onComplete concurrently with onSubscribe in response to concurrent request(). Signed-off-by: Daniel Kec --- .../common/reactive/MultiConcatArray.java | 246 +++++++++++------- 1 file changed, 152 insertions(+), 94 deletions(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java index 28881e8a18b..46e7ba01b39 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java @@ -40,7 +40,7 @@ public void subscribe(Flow.Subscriber subscriber) { } protected static final class ConcatArraySubscriber - implements Flow.Subscriber, Flow.Subscription { + implements Flow.Subscriber, Flow.Subscription { private final Flow.Subscriber downstream; @@ -53,14 +53,15 @@ protected static final class ConcatArraySubscriber private long produced = INIT; private volatile long requested = SEE_OTHER; - private volatile long pending = INIT; + private volatile long pending = 0; + private volatile long oldRequested = 0; private volatile Thread lastThreadCompleting; private boolean redo; - static final long BAD = Long.MIN_VALUE; - static final long CANCEL = Long.MIN_VALUE + 1; + static final long BAD = Long.MIN_VALUE; + static final long CANCEL = Long.MIN_VALUE + 1; static final long SEE_OTHER = Long.MIN_VALUE + 2; - static final long INIT = Long.MIN_VALUE + 3; + static final long INIT = Long.MIN_VALUE + 3; static final VarHandle REQUESTED; static final VarHandle PENDING; @@ -85,40 +86,51 @@ protected static final class ConcatArraySubscriber @Override public void onSubscribe(Flow.Subscription subscription) { + produced++; // assert: matching request(1) has been done by nextSource() + this.subscription = subscription; + long oldProduced = produced; + long oldR = oldRequested; + long p0 = pending; - if (p0 == CANCEL) { - subscription.cancel(); - return; + if (p0 < 0 && oldR != CANCEL) { + // not entirely necessary, since BAD and CANCEL must be observed only eventually, but + // the least surprising behaviour is: + // if pending is known to be BAD or CANCEL, make sure requested does not + // appear good even temporarily + oldR = p0; } - produced++; // assert: matching request(1) has been done by nextSource() - this.subscription = subscription; // assert: requested == SEE_OTHER - REQUESTED.setOpaque(this, p0); // assert: p0 is guaranteed to be a value of requested never seen before - // or is a terminal value (when concurrent good requests do not matter) - long p = (long) PENDING.getAndSet(this, SEE_OTHER); + requested = oldR; // assume non-conforming upstream Publisher may start delivering onNext or + // onComplete concurrently upon observing a concurrent request: only use + // values read before this assignment, or + // method-locals, or atomic updates competing with request() or cancel() - if (p == CANCEL) { - cancel(); - return; + if (oldR == CANCEL) { + subscription.cancel(); + return; } - if (p == produced) { - return; + if (oldR != oldProduced) { + long req = unconsumed(oldR, oldProduced); + // assert: req != CANCEL + subscription.request(req); // assert: requesting necessarily from this subscription; + // if a concurrent onComplete is fired by a non-conforming + // Publisher before this request, the request is no-op, and onComplete + // will carry over req to the next Publisher - no double accounting + // occurs; + // but if there is no concurrent onComplete, need to request + // from this subscription + // (plus trivial arithmetical proof based on commutativity of + // addition - produced may change concurrently, too, but only by + // no more than concurrent requests, and the req can be seen to be + // preserved) } - // assert: p > produced, unless p == BAD - there were request() between nextSource() - // and this onSubscribe(); invoke request() on their behalf - long req = unconsumed(p, produced); - if (req < 0) { - updateRequest(req); - } else if (p != p0) { - // assert: p != BAD, because req > 0 (because p > produced) - // assert: p0 != BAD, because pending cannot be updated to p > produced after p0 = BAD - // assert: requested is at least p0; add the remainder that got added to pending - updateRequest(p - p0); + long p = claimPending(); + if (p != 0) { // all concurrent onSubscribe and requests that observe requested >= INIT attempt this + updateRequest(p); } - subscription.request(req); } @Override @@ -146,20 +158,18 @@ public void onComplete() { boolean sameThread; boolean again; do { - redo = false; - // assert: pending == SEE_OTHER - PENDING.setOpaque(this, produced); - long r = (long) REQUESTED.getAndSet(this, SEE_OTHER); - subscription = null; - - nextSource(r); - again = redo; - VarHandle.loadLoadFence(); - sameThread = LASTTHREADCOMPLETING.getOpaque(this) == current; + redo = false; + long r = (long) REQUESTED.getAndSet(this, SEE_OTHER); + subscription = null; + + nextSource(r); + again = redo; + VarHandle.loadLoadFence(); + sameThread = LASTTHREADCOMPLETING.getOpaque(this) == current; } while (again && sameThread); if (sameThread) { - LASTTHREADCOMPLETING.compareAndSet(this, current, null); + LASTTHREADCOMPLETING.compareAndSet(this, current, null); } } @@ -170,24 +180,14 @@ protected void nextSource(long r) { } if (index == sources.length) { + // assert: no onSubscribe in the future, so no need to preserve oldRequested downstream.onComplete(); return; } Flow.Publisher nextPub = sources[index++]; - // assert: r >= produced, unless r == BAD - because produced - // gets incremented only in response to a preceding request - r = unconsumed(r, produced - 1); // assert: same as unconsumed(r+1, produced) for - // r representing a request count (not a terminal state); one request for the future onSubscribe; - // for other values of r the value of produced is ignored; - - // assert: this will update pending - updateRequest(r); - // assert: requested is guaranteed to change between the subscriptions - // so request() concurrent with onSubscribe cannot - // miss the update of subscription - they will - // always see requested change + oldRequested = r < INIT || r == Long.MAX_VALUE ? r : r + 1; nextPub.subscribe(this); } @@ -198,13 +198,15 @@ protected static long unconsumed(long req, long produced) { // MAX_VALUE, BAD, CANCEL if (req >= INIT && req < Long.MAX_VALUE) { - if (produced < 0 && Long.MAX_VALUE + produced < req) { - req = Long.MAX_VALUE; - } else { - req -= produced; - } + if (produced < 0 && Long.MAX_VALUE + produced < req) { + // assert: if produced < 0, then MAX_VALUE + produced does not overflow + req = Long.MAX_VALUE; + } else { + // assert: if produced >= 0, then req-produced does not overflow (req > produced) + req -= produced; + } - // assert: req > 0 + // assert: req > 0 } return req; @@ -212,52 +214,91 @@ protected static long unconsumed(long req, long produced) { @Override public void request(long n) { - Flow.Subscription sub = updateRequest(n <= 0 ? BAD : n); - if (sub != null) { - sub.request(n); - } + updateRequest(n <= 0 ? BAD : n); } - private boolean updatePending(long n) { + /* + * If requested is in a state where update is possible, and there is anything accumulated in + * the pending counter, try to claim it. If the caller observes a non-zero return value, they + * must updateRequest with that value. + */ + private long claimPending() { + long p; + do { + p = pending; + if (p == 0) { + return 0; + + } + + long r = requested; + if (r < INIT && !(r == BAD && p == CANCEL)) { + // assert: updating requested is needed: + // + // BAD | if p == CANCEL + // CANCEL | no + // SEE_OTHER | no + // >= INIT | if p != 0 + return 0; + } + } while (!PENDING.compareAndSet(this, p, p < 0 ? p : 0)); + + return p; + } + + private long updatePending(long n) { long req; long nextReq; do { req = pending; - if (req == CANCEL) { - return true; + if (req < 0 && !(req == BAD && n == CANCEL)) { + // assert: updating pending is needed: + // + // BAD | if n == CANCEL + // CANCEL | no + // >= 0 | yes + break; } - if (req == SEE_OTHER) { - return false; - } - nextReq = n < INIT ? n + nextReq = n < 0 ? n // assert: n >= 0 - : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE - : req + n; + : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE : req + n; } while (!PENDING.compareAndSet(this, req, nextReq)); - return true; + return claimPending(); } - private Flow.Subscription updateRequest(long n) { + private void updateRequest(long n) { Flow.Subscription sub; long req; long nextReq; do { - req = requested; - while (req < INIT) { - if (req != SEE_OTHER || updatePending(n)) { - return null; - } - req = requested; - } - - sub = subscription; + req = requested; + while (req < INIT && !(req == BAD && n == CANCEL)) { + // assert: updating requested is needed: + // + // BAD | if n == CANCEL + // CANCEL | no - terminal state + // SEE_OTHER | no - keep track of n in pending + // >= INIT | yes + + if (req != SEE_OTHER) { + return; + } + n = updatePending(n); + if (n == 0) { // assert: requested is in a terminal state, or there is a claimPending() + // now or in the future that will propagate pending to requested and + // the actual Publisher + return; + } + + req = requested; + } + + sub = subscription; nextReq = n < INIT ? n // assert: n >= 0 - : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE - : req + n; - + : Long.MAX_VALUE - n <= req ? Long.MAX_VALUE : req + n; } while (!REQUESTED.compareAndSet(this, req, nextReq)); if (nextReq < INIT) { @@ -265,11 +306,12 @@ private Flow.Subscription updateRequest(long n) { // no double-accounting happens - so we only // attempt delivering to subscription seen before updating requested, and // mutual exclusion between accesses to subscription.request() from - // request(), nextSource() and onSubscribe() is enforced. + // request() and onSubscribe() is enforced. // When MAX_VALUE is reached, good requests do not need delivering: concurrent - // request() may attempt to deliver to an old subscription, as it will not be + // request() may miss an update to subscription, and attempt to deliver to an + // old subscription, as it will not be // able to observe new subscriptions (new values of requested), but good requests - // do not need delivering + // do not need delivering in this case // assert: cancellations and bad requests can be delivered more than once - no // double accounting @@ -281,17 +323,33 @@ private Flow.Subscription updateRequest(long n) { // subscription, the update of which concurrent request() cannot detect after // requested reaches MAX_VALUE - so, should read subscription after updating // requested - return subscription; + sub = subscription; + + // assert: subscription may be null, if requested was updated before it was set + // to SEE_OTHER by onComplete, but before subscription is set again by + // onSubscribe; consequently, if it is null, then there is onSubscribe in the + // future that will observe the update of requested and signal appropriately + if (sub != null) { + if (nextReq == CANCEL) { + sub.cancel(); + } else { + sub.request(BAD); + } + } + return; + } + + // assert: nextReq == req, if req == MAX_VALUE - nothing needs doing + if (nextReq != req) { + // assert: sub is not null, because when req != MAX_VALUE the change of subscription + // cannot be missed + sub.request(nextReq - req); } - return sub; } @Override public void cancel() { - Flow.Subscription sub = updateRequest(CANCEL); - if (sub != null) { - sub.cancel(); - } + updateRequest(CANCEL); } } } From 4b36aeacac8d97355b89b67016cfae4ced655846 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 6 Jan 2021 21:17:55 +0100 Subject: [PATCH 3/3] Copyright Signed-off-by: Daniel Kec --- .../main/java/io/helidon/common/reactive/MultiConcatArray.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java index 46e7ba01b39..eec852e3f97 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiConcatArray.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.