From d9588ffd6f5f0626eaa1276b8cfa3c604a7d75cc Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 23 Apr 2015 08:33:32 +0200 Subject: [PATCH] OperatorPublish full rewrite with comments. --- .../internal/operators/OperatorPublish.java | 945 ++++++++++++------ .../operators/OperatorPublishTest.java | 106 ++ 2 files changed, 763 insertions(+), 288 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorPublish.java b/src/main/java/rx/internal/operators/OperatorPublish.java index 193bdba6c4..0fca3d6c64 100644 --- a/src/main/java/rx/internal/operators/OperatorPublish.java +++ b/src/main/java/rx/internal/operators/OperatorPublish.java @@ -15,367 +15,736 @@ */ package rx.internal.operators; -import java.util.*; +import java.util.Queue; import java.util.concurrent.atomic.*; import rx.*; -import rx.Observable; -import rx.exceptions.*; +import rx.exceptions.MissingBackpressureException; import rx.functions.*; -import rx.internal.util.RxRingBuffer; +import rx.internal.util.*; +import rx.internal.util.unsafe.*; import rx.observables.ConnectableObservable; import rx.subscriptions.Subscriptions; -public class OperatorPublish extends ConnectableObservable { +/** + * A connectable observable which shares an underlying source and dispatches source values to subscribers in a backpressure-aware + * manner. + * @param the value type + */ +public final class OperatorPublish extends ConnectableObservable { + /** The source observable. */ final Observable source; - private final RequestHandler requestHandler; + /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ + final AtomicReference> current; + /** + * Creates a OperatorPublish instance to publish values of the given source observable. + * @param source the source observable + * @return the connectable observable + */ public static ConnectableObservable create(Observable source) { - return new OperatorPublish(source); + // the current connection to source needs to be shared between the operator and its onSubscribe call + final AtomicReference> curr = new AtomicReference>(); + OnSubscribe onSubscribe = new OnSubscribe() { + @Override + public void call(Subscriber child) { + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + PublishSubscriber r = curr.get(); + // if there isn't one or it is unsubscribed + if (r == null || r.isUnsubscribed()) { + // create a new subscriber to source + PublishSubscriber u = new PublishSubscriber(curr); + // perform extra initialization to avoid 'this' to escape during construction + u.init(); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + // create the backpressure-managing producer for this child + InnerProducer inner = new InnerProducer(r, child); + /* + * Try adding it to the current subscriber-to-source, add is atomic in respect + * to other adds and the termination of the subscriber-to-source. + */ + if (!r.add(inner)) { + /* + * The current PublishSubscriber has been terminated, try with a newer one. + */ + continue; + /* + * Note: although technically corrent, concurrent disconnects can cause + * unexpected behavior such as child subscribers never receiving anything + * (unless connected again). An alternative approach, similar to + * PublishSubject would be to immediately terminate such child + * subscribers as well: + * + * Object term = r.terminalEvent; + * if (r.nl.isCompleted(term)) { + * child.onCompleted(); + * } else { + * child.onError(r.nl.getError(term)); + * } + * return; + * + * The original concurrent behavior was non-deterministic in this regard as well. + * Allowing this behavior, however, may introduce another unexpected behavior: + * after disconnecting a previous connection, one might not be able to prepare + * a new connection right after a previous termination by subscribing new child + * subscribers asynchronously before a connect call. + */ + } + // the producer has been registered with the current subscriber-to-source so + // at least it will receive the next terminal event + child.add(inner); + // setting the producer will trigger the first request to be considered by + // the subscriber-to-source. + child.setProducer(inner); + break; + } + } + }; + return new OperatorPublish(onSubscribe, source, curr); } - public static Observable create(final Observable source, final Func1, ? extends Observable> selector) { - return Observable.create(new OnSubscribe() { - + public static Observable create(final Observable source, + final Func1, ? extends Observable> selector) { + return create(new OnSubscribe() { @Override public void call(final Subscriber child) { - OperatorPublish op = new OperatorPublish(source); + ConnectableObservable op = create(source); + selector.call(op).unsafeSubscribe(child); + op.connect(new Action1() { - @Override - public void call(Subscription sub) { - child.add(sub); + public void call(Subscription t1) { + child.add(t1); } - }); } - }); } - private OperatorPublish(Observable source) { - this(source, new Object(), new RequestHandler()); - } - - private OperatorPublish(Observable source, final Object guard, final RequestHandler requestHandler) { - super(new OnSubscribe() { - @Override - public void call(final Subscriber subscriber) { - subscriber.setProducer(new Producer() { - - @Override - public void request(long n) { - requestHandler.requestFromChildSubscriber(subscriber, n); - } - - }); - subscriber.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - requestHandler.state.removeSubscriber(subscriber); - } - - })); - } - }); + private OperatorPublish(OnSubscribe onSubscribe, Observable source, + final AtomicReference> current) { + super(onSubscribe); this.source = source; - this.requestHandler = requestHandler; + this.current = current; } @Override public void connect(Action1 connection) { - // each time we connect we create a new Subscription - boolean shouldSubscribe = false; + boolean doConnect = false; + PublishSubscriber ps; + // we loop because concurrent connect/disconnect and termination may change the state + for (;;) { + // retrieve the current subscriber-to-source instance + ps = current.get(); + // if there is none yet or the current has unsubscribed + if (ps == null || ps.isUnsubscribed()) { + // create a new subscriber-to-source + PublishSubscriber u = new PublishSubscriber(current); + // initialize out the constructor to avoid 'this' to escape + u.init(); + // try setting it as the current subscriber-to-source + if (!current.compareAndSet(ps, u)) { + // did not work, perhaps a new subscriber arrived + // and created a new subscriber-to-source as well, retry + continue; + } + ps = u; + } + // if connect() was called concurrently, only one of them should actually + // connect to the source + doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); + break; + } + /* + * Notify the callback that we have a (new) connection which it can unsubscribe + * but since ps is unique to a connection, multiple calls to connect() will return the + * same Subscription and even if there was a connect-disconnect-connect pair, the older + * references won't disconnect the newer connection. + * Synchronous source consumers have the opportunity to disconnect via unsubscribe on the + * Subscription as unsafeSubscribe may never return in its own. + * + * Note however, that asynchronously disconnecting a running source might leave + * child-subscribers without any terminal event; PublishSubject does not have this + * issue because the unsubscription was always triggered by the child-subscribers + * themselves. + */ + connection.call(ps); + if (doConnect) { + source.unsafeSubscribe(ps); + } + } + + @SuppressWarnings("rawtypes") + static final class PublishSubscriber extends Subscriber implements Subscription { + /** Holds notifications from upstream. */ + final Queue queue; + /** The notification-lite factory. */ + final NotificationLite nl; + /** Holds onto the current connected PublishSubscriber. */ + final AtomicReference> current; + /** Contains either an onCompleted or an onError token from upstream. */ + volatile Object terminalEvent; + + /** Indicates an empty array of inner producers. */ + static final InnerProducer[] EMPTY = new InnerProducer[0]; + /** Indicates a terminated PublishSubscriber. */ + static final InnerProducer[] TERMINATED = new InnerProducer[0]; + + /** Tracks the subscribed producers. */ + final AtomicReference producers; + /** + * Atomically changed from false to true by connect to make sure the + * connection is only performed by one thread. + */ + final AtomicBoolean shouldConnect; + + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; - // subscription is the state of whether we are connected or not - OriginSubscriber origin = requestHandler.state.getOrigin(); - if (origin == null) { - shouldSubscribe = true; - requestHandler.state.setOrigin(new OriginSubscriber(requestHandler)); + public PublishSubscriber(AtomicReference> current) { + this.queue = UnsafeAccess.isUnsafeAvailable() + ? new SpscArrayQueue(RxRingBuffer.SIZE) + : new SynchronizedQueue(RxRingBuffer.SIZE); + + this.nl = NotificationLite.instance(); + this.producers = new AtomicReference(EMPTY); + this.current = current; + this.shouldConnect = new AtomicBoolean(); } - - // in the lock above we determined we should subscribe, do it now outside the lock - if (shouldSubscribe) { - // register a subscription that will shut this down - connection.call(Subscriptions.create(new Action0() { + + /** Should be called after the constructor finished to setup nulling-out the current reference. */ + void init() { + add(Subscriptions.create(new Action0() { @Override public void call() { - OriginSubscriber s = requestHandler.state.getOrigin(); - requestHandler.state.setOrigin(null); - if (s != null) { - s.unsubscribe(); - } + PublishSubscriber.this.producers.getAndSet(TERMINATED); + current.compareAndSet(PublishSubscriber.this, null); + // we don't care if it fails because it means the current has + // been replaced in the meantime } })); - - // now that everything is hooked up let's subscribe - // as long as the subscription is not null (which can happen if already unsubscribed) - OriginSubscriber os = requestHandler.state.getOrigin(); - if (os != null) { - source.unsafeSubscribe(os); - } } - } - - private static class OriginSubscriber extends Subscriber { - - private final RequestHandler requestHandler; - private final AtomicLong originOutstanding = new AtomicLong(); - private final long THRESHOLD = RxRingBuffer.SIZE / 4; - private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance(); - - OriginSubscriber(RequestHandler requestHandler) { - this.requestHandler = requestHandler; - add(buffer); - } - + @Override public void onStart() { - requestMore(RxRingBuffer.SIZE); + // since subscribers may have different amount of requests, we try to + // optimize by buffering values up-front and replaying it on individual demand + request(RxRingBuffer.SIZE); } - - private void requestMore(long r) { - originOutstanding.addAndGet(r); - request(r); - } - @Override - public void onCompleted() { - try { - requestHandler.emit(requestHandler.notifier.completed()); - } catch (MissingBackpressureException e) { - onError(e); + public void onNext(T t) { + // we expect upstream to honor backpressure requests + // nl is required because JCTools queue doesn't accept nulls. + if (!queue.offer(nl.next(t))) { + onError(new MissingBackpressureException()); + } else { + // since many things can happen concurrently, we have a common dispatch + // loop to act on the current state serially + dispatch(); } } - @Override public void onError(Throwable e) { - List errors = null; - for (Subscriber subscriber : requestHandler.state.getSubscribers()) { - try { - subscriber.onError(e); - } catch (Throwable e2) { - if (errors == null) { - errors = new ArrayList(); - } - errors.add(e2); - } + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (terminalEvent == null) { + terminalEvent = nl.error(e); + // since many things can happen concurrently, we have a common dispatch + // loop to act on the current state serially + dispatch(); } - Exceptions.throwIfAny(errors); } - @Override - public void onNext(T t) { - try { - requestHandler.emit(requestHandler.notifier.next(t)); - } catch (MissingBackpressureException e) { - onError(e); + public void onCompleted() { + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (terminalEvent == null) { + terminalEvent = nl.completed(); + // since many things can happen concurrently, we have a common dispatch loop + // to act on the current state serially + dispatch(); } } - - } - - /** - * Synchronized mutable state. - * - * benjchristensen => I have not figured out a non-blocking approach to this that doesn't involve massive object allocation overhead - * with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the - * lock is small (such as never emitting data). - * - * This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber. - * - * @param - */ - private static class State { - private long outstandingRequests = -1; - private OriginSubscriber origin; - // using AtomicLong to simplify mutating it, not for thread-safety since we're synchronizing access to this class - // using LinkedHashMap so the order of Subscribers having onNext invoked is deterministic (same each time the code is run) - private final Map, AtomicLong> ss = new LinkedHashMap, AtomicLong>(); - @SuppressWarnings("unchecked") - private Subscriber[] subscribers = new Subscriber[0]; - - public synchronized OriginSubscriber getOrigin() { - return origin; - } - - public synchronized void setOrigin(OriginSubscriber o) { - this.origin = o; - } - - public synchronized boolean canEmitWithDecrement() { - if (outstandingRequests > 0) { - outstandingRequests--; - return true; + + /** + * Atomically try adding a new InnerProducer to this Subscriber or return false if this + * Subscriber was terminated. + * @param producer the producer to add + * @return true if succeeded, false otherwise + */ + boolean add(InnerProducer producer) { + if (producer == null) { + throw new NullPointerException(); + } + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // get the current producer array + InnerProducer[] c = producers.get(); + // if this subscriber-to-source reached a terminal state by receiving + // an onError or onCompleted, just refuse to add the new producer + if (c == TERMINATED) { + return false; + } + // we perform a copy-on-write logic + int len = c.length; + InnerProducer[] u = new InnerProducer[len + 1]; + System.arraycopy(c, 0, u, 0, len); + u[len] = producer; + // try setting the producers array + if (producers.compareAndSet(c, u)) { + return true; + } + // if failed, some other operation succeded (another add, remove or termination) + // so retry } - return false; - } - - public synchronized boolean hasNoSubscriber() { - return subscribers.length == 0; - } - - public synchronized void incrementOutstandingAfterFailedEmit() { - outstandingRequests++; - } - - public synchronized Subscriber[] getSubscribers() { - return subscribers; } - + /** - * @return long outstandingRequests + * Atomically removes the given producer from the producers array. + * @param producer the producer to remove */ - public synchronized long requestFromSubscriber(Subscriber subscriber, long request) { - Map, AtomicLong> subs = ss; - AtomicLong r = subs.get(subscriber); - if (r == null) { - subs.put(subscriber, new AtomicLong(request)); - } else { - do { - long current = r.get(); - if (current == Long.MAX_VALUE) { + void remove(InnerProducer producer) { + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // let's read the current producers array + InnerProducer[] c = producers.get(); + // if it is either empty or terminated, there is nothing to remove so we quit + if (c == EMPTY || c == TERMINATED) { + return; + } + // let's find the supplied producer in the array + // although this is O(n), we don't expect too many child subscribers in general + int j = -1; + int len = c.length; + for (int i = 0; i < len; i++) { + if (c[i].equals(producer)) { + j = i; break; } - long u = current + request; - if (u < 0) { - u = Long.MAX_VALUE; + } + // we didn't find it so just quit + if (j < 0) { + return; + } + // we do copy-on-write logic here + InnerProducer[] u; + // we don't create a new empty array if producer was the single inhabitant + // but rather reuse an empty array + if (len == 1) { + u = EMPTY; + } else { + // otherwise, create a new array one less in size + u = new InnerProducer[len - 1]; + // copy elements being before the given producer + System.arraycopy(c, 0, u, 0, j); + // copy elements being after the given producer + System.arraycopy(c, j + 1, u, j, len - j - 1); + } + // try setting this new array as + if (producers.compareAndSet(c, u)) { + return; + } + // if we failed, it means something else happened + // (a concurrent add/remove or termination), we need to retry + } + } + + /** + * Perform termination actions in case the source has terminated in some way and + * the queue has also become empty. + * @param term the terminal event (a NotificationLite.error or completed) + * @param empty set to true if the queue is empty + * @return true if there is indeed a terminal condition + */ + boolean checkTerminated(Object term, boolean empty) { + // first of all, check if there is actually a terminal event + if (term != null) { + // is it a completion event (impl. note, this is much cheaper than checking for isError) + if (nl.isCompleted(term)) { + // but we also need to have an empty queue + if (empty) { + // this will prevent OnSubscribe spinning on a terminated but + // not yet unsubscribed PublishSubscriber + current.compareAndSet(this, null); + try { + /* + * This will swap in a terminated array so add() in OnSubscribe will reject + * child subscribers to associate themselves with a terminated and thus + * never again emitting chain. + * + * Since we atomically change the contents of 'producers' only one + * operation wins at a time. If an add() wins before this getAndSet, + * its value will be part of the returned array by getAndSet and thus + * will receive the terminal notification. Otherwise, if getAndSet wins, + * add() will refuse to add the child producer and will trigger the + * creation of subscriber-to-source. + */ + for (InnerProducer ip : producers.getAndSet(TERMINATED)) { + ip.child.onCompleted(); + } + } finally { + // we explicitely unsubscribe/disconnect from the upstream + // after we sent out the terminal event to child subscribers + unsubscribe(); + } + // indicate we reached the terminal state + return true; } - if (r.compareAndSet(current, u)) { - break; + } else { + Throwable t = nl.getError(term); + // this will prevent OnSubscribe spinning on a terminated + // but not yet unsubscribed PublishSubscriber + current.compareAndSet(this, null); + try { + // this will swap in a terminated array so add() in OnSubscribe will reject + // child subscribers to associate themselves with a terminated and thus + // never again emitting chain + for (InnerProducer ip : producers.getAndSet(TERMINATED)) { + ip.child.onError(t); + } + } finally { + // we explicitely unsubscribe/disconnect from the upstream + // after we sent out the terminal event to child subscribers + unsubscribe(); } - } while (true); + // indicate we reached the terminal state + return true; + } } - - return resetAfterSubscriberUpdate(subs); - } - - public synchronized void removeSubscriber(Subscriber subscriber) { - Map, AtomicLong> subs = ss; - subs.remove(subscriber); - resetAfterSubscriberUpdate(subs); + // there is still work to be done + return false; } - - @SuppressWarnings("unchecked") - private long resetAfterSubscriberUpdate(Map, AtomicLong> subs) { - Subscriber[] subscriberArray = new Subscriber[subs.size()]; - int i = 0; - long lowest = -1; - for (Map.Entry, AtomicLong> e : subs.entrySet()) { - subscriberArray[i++] = e.getKey(); - AtomicLong l = e.getValue(); - long c = l.get(); - if (lowest == -1 || c < lowest) { - lowest = c; + + /** + * The common serialization point of events arriving from upstream and child-subscribers + * requesting more. + */ + void dispatch() { + // standard construct of emitter loop (blocking) + // if there is an emission going on, indicate that more work needs to be done + // the exact nature of this work needs to be determined from other data structures + synchronized (this) { + if (emitting) { + missed = true; + return; } + // there was no emission going on, we won and will start emitting + emitting = true; + missed = false; } - this.subscribers = subscriberArray; /* - * when receiving a request from a subscriber we reset 'outstanding' to the lowest of all subscribers + * In case an exception is thrown in the loop, we need to set emitting back to false + * on the way out (the exception will propagate up) so if it bounces back and + * onError is called, its dispatch() call will have the opportunity to emit it. + * However, if we want to exit regularly, we will set the emitting to false (+ other operations) + * atomically so we want to prevent the finally part to accidentally unlock some other + * emissions happening between the two synchronized blocks. */ - outstandingRequests = lowest; - return lowest; + boolean skipFinal = false; + try { + for (;;) { + /* + * See if the queue is empty; since we need this information multiple + * times later on, we read it one. + * Although the queue can become non-empty in the mean time, we will + * detect it through the missing flag and will do another iteration. + */ + boolean empty = queue.isEmpty(); + // if the queue is empty and the terminal event was received, quit + // and don't bother restoring emitting to false: no further activity is + // possible at this point + if (checkTerminated(terminalEvent, empty)) { + skipFinal = true; + return; + } + + // We have elements queued. Note that due to the serialization nature of dispatch() + // this loop is the only one which can turn a non-empty queue into an empty one + // and as such, no need to ask the queue itself again for that. + if (!empty) { + // We take a snapshot of the current child-subscribers. + // Concurrent subscribers may miss this iteration, but it is to be expected + @SuppressWarnings("unchecked") + InnerProducer[] ps = producers.get(); + + int len = ps.length; + // Let's assume everyone requested the maximum value. + long maxRequested = Long.MAX_VALUE; + // count how many have triggered unsubscription + int unsubscribed = 0; + + // Now find the minimum amount each child-subscriber requested + // since we can only emit that much to all of them without violating + // backpressure constraints + for (InnerProducer ip : ps) { + long r = ip.get(); + // if there is one child subscriber that hasn't requested yet + // we can't emit anything to anyone + if (r >= 0L) { + maxRequested = Math.min(maxRequested, r); + } else + // unsubscription is indicated by a special value + if (r == InnerProducer.UNSUBSCRIBED) { + unsubscribed++; + } + // we ignore those with NOT_REQUESTED as if they aren't even there + } + + // it may happen everyone has unsubscribed between here and producers.get() + // or we have no subscribers at all to begin with + if (len == unsubscribed) { + // so let's consume a value from the queue + Object v = queue.poll(); + // or terminate if there was a terminal event and the queue is empty + if (checkTerminated(terminalEvent, v == null)) { + skipFinal = true; + return; + } + // otherwise, just ask for a new value + request(1); + // and retry emitting to potential new child-subscribers + continue; + } + // if we get here, it means there are non-unsubscribed child-subscribers + // and we count the number of emitted values because the queue + // may contain less than requested + int d = 0; + while (d < maxRequested) { + Object v = queue.poll(); + empty = v == null; + // let's check if there is a terminal event and the queue became empty just now + if (checkTerminated(terminalEvent, empty)) { + skipFinal = true; + return; + } + // the queue is empty but we aren't terminated yet, finish this emission loop + if (empty) { + break; + } + // we need to unwrap potential nulls + T value = nl.getValue(v); + // let's emit this value to all child subscribers + for (InnerProducer ip : ps) { + // if ip.get() is negative, the child has either unsubscribed in the + // meantime or hasn't requested anything yet + // this eager behavior will skip unsubscribed children in case + // multiple values are available in the queue + if (ip.get() > 0L) { + try { + ip.child.onNext(value); + } catch (Throwable t) { + // we bounce back exceptions and kick out the child subscriber + ip.unsubscribe(); + ip.child.onError(t); + continue; + } + // indicate this child has received 1 element + ip.produced(1); + } + } + // indicate we emitted one element + d++; + } + + // if we did emit at least one element, request more to replenish the queue + if (d > 0) { + request(d); + } + // if we have requests but not an empty queue after emission + // let's try again to see if more requests/child-subscribers are + // ready to receive more + if (maxRequested != 0L && !empty) { + continue; + } + } + + // we did what we could: either the queue is empty or child subscribers + // haven't requested more (or both), let's try to finish dispatching + synchronized (this) { + // since missed is changed atomically, if we see it as true + // it means some state has changed and we need to loop again + // and handle that case + if (!missed) { + // but if no missed dispatch happened, let's stop emitting + emitting = false; + // and skip the emitting = false in the finally block as well + skipFinal = true; + return; + } + // we acknowledge the missed changes so far + missed = false; + } + } + } finally { + // unless returned cleanly (i.e., some method above threw) + if (!skipFinal) { + // we stop emitting so the error can propagate back down through onError + synchronized (this) { + emitting = false; + } + } + } } } - - private static class RequestHandler { - private final NotificationLite notifier = NotificationLite.instance(); + /** + * A Producer and Subscription that manages the request and unsubscription state of a + * child subscriber in thread-safe manner. + * We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also + * save the overhead of the AtomicIntegerFieldUpdater. + * @param the value type + */ + static final class InnerProducer extends AtomicLong implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -4453897557930727610L; + /** + * The parent subscriber-to-source used to allow removing the child in case of + * child unsubscription. + */ + final PublishSubscriber parent; + /** The actual child subscriber. */ + final Subscriber child; + /** + * Indicates this child has been unsubscribed: the state is swapped in atomically and + * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. + */ + static final long UNSUBSCRIBED = Long.MIN_VALUE; + /** + * Indicates this child has not yet requested any value. We pretend we don't + * see such child subscribers in dispatch() to allow other child subscribers who + * have requested to make progress. In a concurrent subscription scennario, + * one can't be sure when a subscription happens exactly so this virtual shift + * should not cause any problems. + */ + static final long NOT_REQUESTED = Long.MIN_VALUE / 2; - private final State state = new State(); - @SuppressWarnings("unused") - volatile long wip; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater WIP = AtomicLongFieldUpdater.newUpdater(RequestHandler.class, "wip"); - - public void requestFromChildSubscriber(Subscriber subscriber, long request) { - state.requestFromSubscriber(subscriber, request); - OriginSubscriber originSubscriber = state.getOrigin(); - if(originSubscriber != null) { - drainQueue(originSubscriber); - } + public InnerProducer(PublishSubscriber parent, Subscriber child) { + this.parent = parent; + this.child = child; + this.lazySet(NOT_REQUESTED); } - - public void emit(Object t) throws MissingBackpressureException { - OriginSubscriber originSubscriber = state.getOrigin(); - if(originSubscriber == null) { - // unsubscribed so break ... we are done + + @Override + public void request(long n) { + // ignore negative requests + if (n < 0) { return; } - if (notifier.isCompleted(t)) { - originSubscriber.buffer.onCompleted(); - } else { - originSubscriber.buffer.onNext(notifier.getValue(t)); + // In general, RxJava doesn't prevent concurrent requests (with each other or with + // an unsubscribe) so we need a CAS-loop, but we need to handle + // request overflow and unsubscribed/not requested state as well. + for (;;) { + // get the current request amount + long r = get(); + // if child called unsubscribe() do nothing + if (r == UNSUBSCRIBED) { + return; + } + // ignore zero requests except any first that sets in zero + if (r >= 0L && n == 0) { + return; + } + long u; + // if this child has not requested yet + if (r == NOT_REQUESTED) { + // let the new request value this (no overflow check needed) + u = n; + } else { + // otherwise, increase the request count + u = r + n; + // and check for long overflow + if (u < 0) { + // cap at max value, which is essentially unlimited + u = Long.MAX_VALUE; + } + } + // try setting the new request value + if (compareAndSet(r, u)) { + // if successful, notify the parent dispacher this child can receive more + // elements + parent.dispatch(); + return; + } + // otherwise, someone else changed the state (perhaps a concurrent + // request or unsubscription so retry } - drainQueue(originSubscriber); } - - private void requestMoreAfterEmission(int emitted) { - if (emitted > 0) { - OriginSubscriber origin = state.getOrigin(); - if (origin != null) { - long r = origin.originOutstanding.addAndGet(-emitted); - if (r <= origin.THRESHOLD) { - origin.requestMore(RxRingBuffer.SIZE - origin.THRESHOLD); - } + + /** + * Indicate that values have been emitted to this child subscriber by the dispatch() method. + * @param n the number of items emitted + * @return the updated request value (may indicate how much can be produced or a terminal state) + */ + public long produced(long n) { + // we don't allow producing zero or less: it would be a bug in the operator + if (n <= 0) { + throw new IllegalArgumentException("Cant produce zero or less"); + } + for (;;) { + // get the current request value + long r = get(); + // if no request has been made yet, we shouldn't have emitted to this child + // subscriber so there is a bug in this operator + if (r == NOT_REQUESTED) { + throw new IllegalStateException("Produced without request"); } + // if the child has unsubscribed, simply return and indicate this + if (r == UNSUBSCRIBED) { + return UNSUBSCRIBED; + } + // reduce the requested amount + long u = r - n; + // if the new amount is less than zero, we have a bug in this operator + if (u < 0) { + throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); + } + // try updating the request value + if (compareAndSet(r, u)) { + // and return the udpated value + return u; + } + // otherwise, some concurrent activity happened and we need to retry } } - - public void drainQueue(OriginSubscriber originSubscriber) { - if (WIP.getAndIncrement(this) == 0) { - State localState = state; - Map, AtomicLong> localMap = localState.ss; - RxRingBuffer localBuffer = originSubscriber.buffer; - NotificationLite nl = notifier; - - int emitted = 0; - do { - /* - * Set to 1 otherwise it could have grown very large while in the last poll loop - * and then we can end up looping all those times again here before exiting even once we've drained - */ - WIP.set(this, 1); - /** - * This is done in the most inefficient possible way right now and we can revisit the approach. - * If we want to batch this then we need to account for new subscribers arriving with a lower request count - * concurrently while iterating the batch ... or accept that they won't - */ - while (true) { - if (localState.hasNoSubscriber()) { - // Drop items due to no subscriber - if (localBuffer.poll() == null) { - // Exit due to no more item - break; - } else { - // Keep dropping cached items. - continue; - } - } - - boolean shouldEmit = localState.canEmitWithDecrement(); - if (!shouldEmit) { - break; - } - Object o = localBuffer.poll(); - if (o == null) { - // nothing in buffer so increment outstanding back again - localState.incrementOutstandingAfterFailedEmit(); - break; - } - - for (Subscriber s : localState.getSubscribers()) { - AtomicLong req = localMap.get(s); - if (req != null) { // null req indicates a concurrent unsubscription happened - nl.accept(s, o); - req.decrementAndGet(); - } - } - emitted++; - } - } while (WIP.decrementAndGet(this) > 0); - requestMoreAfterEmission(emitted); + + @Override + public boolean isUnsubscribed() { + return get() == UNSUBSCRIBED; + } + @Override + public void unsubscribe() { + long r = get(); + // let's see if we are unsubscribed + if (r != UNSUBSCRIBED) { + // if not, swap in the terminal state, this is idempotent + // because other methods using CAS won't overwrite this value, + // concurrent calls to unsubscribe will atomically swap in the same + // terminal value + r = getAndSet(UNSUBSCRIBED); + // and only one of them will see a non-terminated value before the swap + if (r != UNSUBSCRIBED) { + // remove this from the parent + parent.remove(this); + // After removal, we might have unblocked the other child subscribers: + // let's assume this child had 0 requested before the unsubscription while + // the others had non-zero. By removing this 'blocking' child, the others + // are now free to receive events + parent.dispatch(); + } } } } diff --git a/src/test/java/rx/internal/operators/OperatorPublishTest.java b/src/test/java/rx/internal/operators/OperatorPublishTest.java index 3d8481a676..8ffc265f9b 100644 --- a/src/test/java/rx/internal/operators/OperatorPublishTest.java +++ b/src/test/java/rx/internal/operators/OperatorPublishTest.java @@ -259,4 +259,110 @@ public void testConnectWithNoSubscriber() { subscriber.assertNoErrors(); subscriber.assertTerminalEvent(); } + + @Test + public void testSubscribeAfterDisconnectThenConnect() { + ConnectableObservable source = Observable.just(1).publish(); + + TestSubscriber ts1 = new TestSubscriber(); + + source.subscribe(ts1); + + Subscription s = source.connect(); + + ts1.assertReceivedOnNext(Arrays.asList(1)); + ts1.assertNoErrors(); + ts1.assertTerminalEvent(); + + TestSubscriber ts2 = new TestSubscriber(); + + source.subscribe(ts2); + + Subscription s2 = source.connect(); + + ts2.assertReceivedOnNext(Arrays.asList(1)); + ts2.assertNoErrors(); + ts2.assertTerminalEvent(); + + System.out.println(s); + System.out.println(s2); + } + + @Test + public void testNoSubscriberRetentionOnCompleted() { + OperatorPublish source = (OperatorPublish)Observable.just(1).publish(); + + TestSubscriber ts1 = new TestSubscriber(); + + source.unsafeSubscribe(ts1); + + ts1.assertReceivedOnNext(Arrays.asList()); + ts1.assertNoErrors(); + assertTrue(ts1.getOnCompletedEvents().isEmpty()); + + source.connect(); + + ts1.assertReceivedOnNext(Arrays.asList(1)); + ts1.assertNoErrors(); + ts1.assertTerminalEvent(); + + assertNull(source.current.get()); + } + + @Test + public void testNonNullConnection() { + ConnectableObservable source = Observable.never().publish(); + + assertNotNull(source.connect()); + assertNotNull(source.connect()); + } + + @Test + public void testNoDisconnectSomeoneElse() { + ConnectableObservable source = Observable.never().publish(); + + Subscription s1 = source.connect(); + Subscription s2 = source.connect(); + + s1.unsubscribe(); + + Subscription s3 = source.connect(); + + s2.unsubscribe(); + + assertTrue(s1.isUnsubscribed()); + assertTrue(s2.isUnsubscribed()); + assertFalse(s3.isUnsubscribed()); + } + + @Test + public void testZeroRequested() { + ConnectableObservable source = Observable.just(1).publish(); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + request(0); + } + }; + + source.subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList()); + ts.assertNoErrors(); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + + source.connect(); + + ts.assertReceivedOnNext(Arrays.asList()); + ts.assertNoErrors(); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + + ts.requestMore(5); + + ts.assertReceivedOnNext(Arrays.asList(1)); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + } }