diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt index 9f6c9951b53..ceceed21077 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt @@ -3820,12 +3820,14 @@ namespace Akka.Streams.Implementation [Akka.Annotations.InternalApiAttribute()] public sealed class VirtualProcessor : Akka.Util.AtomicReference, Reactive.Streams.IProcessor, Reactive.Streams.IPublisher, Reactive.Streams.ISubscriber { + public const bool IsDebug = true; public VirtualProcessor() { } public void OnComplete() { } public void OnError(System.Exception cause) { } public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } public void Subscribe(Reactive.Streams.ISubscriber subscriber) { } + public override string ToString() { } } } namespace Akka.Streams.Implementation.Fusing diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index a1a6a7f942b..a7535dbedfc 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -3820,12 +3820,14 @@ namespace Akka.Streams.Implementation [Akka.Annotations.InternalApiAttribute()] public sealed class VirtualProcessor : Akka.Util.AtomicReference, Reactive.Streams.IProcessor, Reactive.Streams.IPublisher, Reactive.Streams.ISubscriber { + public const bool IsDebug = true; public VirtualProcessor() { } public void OnComplete() { } public void OnError(System.Exception cause) { } public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } public void Subscribe(Reactive.Streams.ISubscriber subscriber) { } + public override string ToString() { } } } namespace Akka.Streams.Implementation.Fusing diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 9f6c9951b53..ceceed21077 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -3820,12 +3820,14 @@ namespace Akka.Streams.Implementation [Akka.Annotations.InternalApiAttribute()] public sealed class VirtualProcessor : Akka.Util.AtomicReference, Reactive.Streams.IProcessor, Reactive.Streams.IPublisher, Reactive.Streams.ISubscriber { + public const bool IsDebug = true; public VirtualProcessor() { } public void OnComplete() { } public void OnError(System.Exception cause) { } public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } public void Subscribe(Reactive.Streams.ISubscriber subscriber) { } + public override string ToString() { } } } namespace Akka.Streams.Implementation.Fusing diff --git a/src/core/Akka.Streams/Implementation/StreamLayout.cs b/src/core/Akka.Streams/Implementation/StreamLayout.cs index 8f98d38dcd9..d0920de484b 100644 --- a/src/core/Akka.Streams/Implementation/StreamLayout.cs +++ b/src/core/Akka.Streams/Implementation/StreamLayout.cs @@ -8,9 +8,11 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Linq; using System.Runtime.Serialization; using Akka.Annotations; +using Akka.Event; using Akka.Pattern; using Akka.Streams.Dsl; using Akka.Streams.Implementation.Fusing; @@ -1490,21 +1492,27 @@ public abstract class AtomicModule : Module /// little like this: /// /// | Subscriber | - /// +--------------+ +------------+ - /// | | - /// (1) | | (1) - /// \|/ \|/ - /// +--------------+ (2) +------------+ --\ - /// | Subscription | ----------> | Both | | (4) - /// +--------------+ +------------+ <-/ - /// | | - /// (3) | | (3) - /// \|/ \|/ - /// +--------------+ (2) +------------+ --\ - /// | Publisher | ----------> | Inert | | (4, *) - /// +--------------+ +------------+ <-/ + /// +--------+ (2) +---------------+ + /// | null +------------>+ Subscriber | + /// +---+----+ +-----+---------+ + /// | | + /// (1)| | (1) + /// v v + /// +---+----------+ (2) +-----+---------+ + /// | Subscription +------>+ Establishing | + /// +---+----------+ +-----+---------+ + /// | | + /// | | (4) + /// | v + /// | +-----+---------+ --- + /// | (3) | Both | | (5) + /// | +-----+---------+ <-- + /// | | + /// | | + /// v v + /// +---+----------+ (2) +-----+---------+ --- + /// | Publisher +-----> | Inert | | (5, *) + /// +--------------+ +---------------+ <-- /// ]]> /// The idea is to keep the major state in only one atomic reference. The actions /// that can happen are: @@ -1512,7 +1520,8 @@ public abstract class AtomicModule : Module /// (1) onSubscribe /// (2) subscribe /// (3) onError / onComplete - /// (4) onNext + /// (4) establishing subscription completes + /// (5) onNext /// (*) Inert can be reached also by cancellation after which onNext is still fine /// so we just silently ignore possible spec violations here /// @@ -1535,10 +1544,17 @@ public abstract class AtomicModule : Module [InternalApi] public sealed class VirtualProcessor : AtomicReference, IProcessor { + public const bool IsDebug = true; private const string NoDemand = "spec violation: OnNext was signaled from upstream without demand"; + private readonly string _hashCode; #region internal classes + private interface IHasActualSubscriber + { + ISubscriber Subscriber { get; } + } + private sealed class Inert { public static readonly ISubscriber Subscriber = new CancellingSubscriber(); @@ -1550,7 +1566,7 @@ private Inert() } } - private sealed class Both + private sealed class Both: IHasActualSubscriber { public Both(ISubscriber subscriber) { @@ -1561,8 +1577,46 @@ public Both(ISubscriber subscriber) } + private sealed class Establishing: IHasActualSubscriber + { + public Establishing(ISubscriber subscriber, bool onCompleteBuffered = false, Exception onErrorBuffered = null) + { + Subscriber = subscriber; + OnCompleteBuffered = onCompleteBuffered; + OnErrorBuffered = onErrorBuffered; + } + + public ISubscriber Subscriber { get; } + public bool OnCompleteBuffered { get; } + public Exception OnErrorBuffered { get; } + + public Establishing Copy( + ISubscriber subscriber = null, + bool? onCompleteBuffered = null, + Exception onErrorBuffered = null) + => new Establishing( + subscriber ?? Subscriber, + onCompleteBuffered ?? OnCompleteBuffered, + onErrorBuffered ?? OnErrorBuffered); + } + #endregion + public VirtualProcessor() + { + _hashCode = GetHashCode().ToString(); + PrintDebug($"Created: {this}"); + } + + public override string ToString() => $"VirtualProcessor({_hashCode})"; + + [Conditional("DEBUG")] + private static void PrintDebug(string msg) + { + if(IsDebug) + Console.WriteLine(msg); + } + /// /// TBD /// @@ -1570,55 +1624,54 @@ public Both(ISubscriber subscriber) /// TBD public void Subscribe(ISubscriber subscriber) { + void Rec(ISubscriber sub) + { + switch (Value) + { + case null: + PrintDebug($"VirtualProcessor#{_hashCode}(null).Subscribe.Rec({subscriber}) -> sub"); + if (!CompareAndSet(null, subscriber)) + Rec(sub); + return; + + case ISubscription subscription: + PrintDebug($"VirtualProcessor#{_hashCode}({subscription}).Subscribe.Rec({subscriber}) -> Establishing(sub)"); + var establishing = new Establishing(sub); + if(CompareAndSet(subscription, establishing)) + EstablishSubscription(establishing, subscription); + else + Rec(sub); + return; + + case IPublisher publisher: + PrintDebug($"VirtualProcessor#{_hashCode}({publisher}).Subscribe.Rec({subscriber}) -> Inert"); + if (CompareAndSet(publisher, Inert.Instance)) + publisher.Subscribe(sub); + else + Rec(sub); + return; + + case var other: + PrintDebug($"VirtualProcessor#{_hashCode}({other}).Subscribe.Rec({subscriber}): RejectAdditionalSubscriber"); + ReactiveStreamsCompliance.RejectAdditionalSubscriber(sub, "VirtualProcessor"); + break; + } + } + if (subscriber == null) { - var ex = ReactiveStreamsCompliance.SubscriberMustNotBeNullException; try { - TrySubscribe(Inert.Subscriber); + Rec(Inert.Subscriber); } finally { // must throw ArgumentNullEx, rule 2:13 - throw ex; + throw ReactiveStreamsCompliance.SubscriberMustNotBeNullException; } } - TrySubscribe(subscriber); - } - - private void TrySubscribe(ISubscriber subscriber) - { - if (Value == null) - { - if (!CompareAndSet(null, subscriber)) - TrySubscribe(subscriber); - return; - } - - var subscription = Value as ISubscription; - if (subscription != null) - { - if (CompareAndSet(subscription, new Both(subscriber))) - EstablishSubscription(subscriber, subscription); - else - TrySubscribe(subscriber); - - return; - } - - var publisher = Value as IPublisher; - if (publisher != null) - { - if (CompareAndSet(publisher, Inert.Instance)) - publisher.Subscribe(subscriber); - else - TrySubscribe(subscriber); - - return; - } - - ReactiveStreamsCompliance.RejectAdditionalSubscriber(subscriber, "VirtualProcessor"); + Rec(subscriber); } /// @@ -1628,12 +1681,54 @@ private void TrySubscribe(ISubscriber subscriber) /// TBD public void OnSubscribe(ISubscription subscription) { + void Rec(object obj) + { + while (true) + { + switch (Value) + { + case null: + PrintDebug($"VirtualProcessor#{_hashCode}({null}).OnSubscribe.Rec({obj}) -> {obj.GetType()}"); + if (!CompareAndSet(null, obj)) + continue; + return; + case ISubscriber subscriber: + switch (obj) + { + case ISubscription sub: + PrintDebug($"VirtualProcessor#{_hashCode}({subscriber}).OnSubscribe.Rec({obj}) -> Establishing"); + var establishing = new Establishing(subscriber); + if (CompareAndSet(subscriber, establishing)) + EstablishSubscription(establishing, sub); + else + continue; + return; + + case IPublisher publisher: + PrintDebug($"VirtualProcessor#{_hashCode}({publisher}).OnSubscribe.Rec({obj}) -> Inert"); + var inert = GetAndSet(Inert.Instance); + if (inert != Inert.Instance) publisher.Subscribe(subscriber); + return; + + case var other: + throw new IllegalStateException($"Unexpected state in VirtualProcessor: {other}"); + } + + case var state: + PrintDebug($"VirtualProcessor#{_hashCode}({state}).OnSubscribe.Rec({obj}): Spec violation."); + // spec violation + ReactiveStreamsCompliance.TryCancel(subscription, new IllegalStateException($"Spec violation: VirtualProcessor in wrong state [{state.GetType()}].")); + return; + } + } + } + if (subscription == null) { var ex = ReactiveStreamsCompliance.SubscriptionMustNotBeNullException; try { - TryOnSubscribe(new ErrorPublisher(ex, "failed-VirtualProcessor"), subscription); + Rec(new ErrorPublisher(ex, "failed-VirtualProcessor")); } finally { @@ -1641,60 +1736,62 @@ public void OnSubscribe(ISubscription subscription) throw ex; } } - - TryOnSubscribe(subscription, subscription); - } - - private void TryOnSubscribe(object obj, ISubscription s) - { - while (true) - { - switch (Value) - { - case null: - if (!CompareAndSet(null, obj)) - continue; - return; - case ISubscriber subscriber: - switch (obj) - { - case ISubscription subscription: - if (CompareAndSet(subscriber, new Both(subscriber))) - EstablishSubscription(subscriber, subscription); - else - continue; - return; - case IPublisher publisher: - var inert = GetAndSet(Inert.Instance); - if (inert != Inert.Instance) - publisher.Subscribe(subscriber); - return; - case var other: - throw new IllegalStateException($"Unexpected state in VirtualProcessor: {other.GetType()}"); - } - case var state: - // spec violation - ReactiveStreamsCompliance.TryCancel(s, new IllegalStateException($"Spec violation: VirtualProcessor in wrong state [{state.GetType()}].")); - return; - } - } + + Rec(subscription); } - private void EstablishSubscription(ISubscriber subscriber, ISubscription subscription) + private void EstablishSubscription(Establishing establishing, ISubscription subscription) { var wrapped = new WrappedSubscription(subscription, this); try { - subscriber.OnSubscribe(wrapped); - // Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before - // onSubscribe completed - wrapped.UngateDemandAndRequestBuffered(); + PrintDebug($"VirtualProcessor#{_hashCode}.EstablishSubscription({wrapped})"); + establishing.Subscriber.OnSubscribe(wrapped); + + // while we were establishing some stuff could have happened + // most likely case, nobody changed it while we where establishing + PrintDebug($"VirtualProcessor#{_hashCode}.EstablishSubscription.Rec({establishing}) -> Both"); + if (CompareAndSet(establishing, new Both(establishing.Subscriber))) + { + // cas won - life is good + // Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before + // onSubscribe completed + wrapped.UngateDemandAndRequestBuffered(); + } + else + { + // changed by someone else + switch (Value) + { + case Establishing est when est.OnErrorBuffered != null: + // there was an onError while establishing + PrintDebug($"VirtualProcessor#{_hashCode}.EstablishSubscription.Rec(Establishing(OnErrorBuffered)) -> Inert"); + ReactiveStreamsCompliance.TryOnError(est.Subscriber, est.OnErrorBuffered); + Value = Inert.Instance; + break; + + case Establishing est when est.OnCompleteBuffered: + // there was on onComplete while we were establishing + PrintDebug($"VirtualProcessor#{_hashCode}.EstablishSubscription.Rec(Establishing(OnCompleteBuffered)) -> Inert"); + ReactiveStreamsCompliance.TryOnComplete(est.Subscriber); + Value = Inert.Instance; + break; + + case Inert _: + ReactiveStreamsCompliance.TryCancel(subscription, new IllegalStateException("VirtualProcessor was already subscribed to.")); + break; + + default: + throw new IllegalStateException( + $"Unexpected state while establishing: [{Value}], if this ever happens it is a bug."); + } + } } catch (Exception ex) { Value = Inert.Instance; ReactiveStreamsCompliance.TryCancel(subscription, ex); - ReactiveStreamsCompliance.TryOnError(subscriber, ex); + ReactiveStreamsCompliance.TryOnError(establishing.Subscriber, ex); } } @@ -1710,60 +1807,61 @@ public void OnError(Exception cause) * but if `t` was `null` then the spec requires us to throw an NPE (which `ex` * will be in this case). */ - var ex = cause ?? ReactiveStreamsCompliance.ExceptionMustNotBeNullException; - - while (true) + void Rec(Exception ex) { - if (Value == null) - { - if (!CompareAndSet(null, new ErrorPublisher(ex, "failed-VirtualProcessor"))) - continue; - if (cause == null) - throw ex; - return; - } - - var subscription = Value as ISubscription; - if (subscription != null) - { - if (!CompareAndSet(subscription, new ErrorPublisher(ex, "failed-VirtualProcessor"))) - continue; - if (cause == null) - throw ex; - return; - } - - var both = Value as Both; - if (both != null) + while (true) { - Value = Inert.Instance; - try + switch (Value) { - ReactiveStreamsCompliance.TryOnError(both.Subscriber, ex); - } - finally - { - // must throw ArgumentNullEx, rule 2:13 - if (cause == null) - throw ex; + case null: + PrintDebug($"VirtualProcessor#{_hashCode}(null).OnError({ex.Message}) -> ErrorPublisher"); + if (!CompareAndSet(null, new ErrorPublisher(ex, "failed-VirtualProcessor"))) + continue; + return; + + case ISubscription s: + PrintDebug($"VirtualProcessor#{_hashCode}({s}).OnError({ex.Message}) -> ErrorPublisher"); + if (!CompareAndSet(s, new ErrorPublisher(ex, "failed-VirtualProcessor"))) + continue; + return; + + case Both both: + PrintDebug($"VirtualProcessor#{_hashCode}(Both({both.Subscriber})).OnError({ex.Message}) -> ErrorPublisher"); + Value = Inert.Instance; + ReactiveStreamsCompliance.TryOnError(both.Subscriber, ex); + return; + + case ISubscriber s: + // spec violation + PrintDebug($"VirtualProcessor#{_hashCode}({s}).OnError({ex.Message}) -> Inert: Spec violation."); + var inert = GetAndSet(Inert.Instance); + if (inert != Inert.Instance) new ErrorPublisher(ex, "failed-VirtualProcessor").Subscribe(s); + return; + + case Establishing { OnCompleteBuffered: false, OnErrorBuffered: null } est: + PrintDebug($"VirtualProcessor#{_hashCode}({est}).OnError({ex.Message}) -> loop"); + if (!CompareAndSet(est, est.Copy(onErrorBuffered: ex))) + continue; + return; + + case var other: + // spec violation or cancellation race, but nothing we can do + PrintDebug($"VirtualProcessor#{_hashCode}({other}).OnError({ex.Message}): Spec violation or cancellation race"); + return; } - - return; - } - - var subscriber = Value as ISubscriber; - if (subscriber != null) - { - // spec violation - var inert = GetAndSet(Inert.Instance); - if (inert != Inert.Instance) - new ErrorPublisher(ex, "failed-VirtualProcessor").Subscribe(subscriber); - - return; } + } - // spec violation or cancellation race, but nothing we can do - return; + var ex = cause ?? ReactiveStreamsCompliance.ExceptionMustNotBeNullException; + try + { + Rec(ex); + } + finally + { + // must throw NPE, rule 2.13 + if (cause == null) + throw ex; } } @@ -1774,42 +1872,39 @@ public void OnComplete() { while (true) { - if (Value == null) - { - if (!CompareAndSet(null, EmptyPublisher.Instance)) - continue; - - return; - } - - var subscription = Value as ISubscription; - if (subscription != null) - { - if (!CompareAndSet(subscription, EmptyPublisher.Instance)) - continue; - - return; - } - - var both = Value as Both; - if (both != null) - { - Value = Inert.Instance; - ReactiveStreamsCompliance.TryOnComplete(both.Subscriber); - return; - } - - var subscriber = Value as ISubscriber; - if (subscriber != null) + switch (Value) { - // spec violation - Value = Inert.Instance; - EmptyPublisher.Instance.Subscribe(subscriber); - return; + case null: + PrintDebug($"VirtualProcessor#{_hashCode}(null).OnComplete() -> EmptyPublisher"); + if(!CompareAndSet(null, EmptyPublisher.Instance)) + continue; + return; + case ISubscription subscription: + PrintDebug($"VirtualProcessor#{_hashCode}({subscription}).OnComplete() -> EmptyPublisher"); + if(!CompareAndSet(subscription, EmptyPublisher.Instance)) + continue; + return; + case Both both: + PrintDebug($"VirtualProcessor#{_hashCode}(Both({both.Subscriber})).OnComplete() -> Inert"); + Value = Inert.Instance; + ReactiveStreamsCompliance.TryOnComplete(both.Subscriber); + return; + case ISubscriber subscriber: + // spec violation + PrintDebug($"VirtualProcessor#{_hashCode}({subscriber}).OnComplete() -> Inert: Spec Violation"); + Value = Inert.Instance; + EmptyPublisher.Instance.Subscribe(subscriber); + return; + case Establishing { OnCompleteBuffered: false, OnErrorBuffered: null } est: + PrintDebug($"VirtualProcessor#{_hashCode}({est}).OnComplete() -> Establishing(OnCompleteBuffered)"); + if(!est.OnCompleteBuffered && !CompareAndSet(est, est.Copy(onCompleteBuffered: true))) + continue; + return; + case var other: + // spec violation or cancellation race, but nothing we can do + PrintDebug($"VirtualProcessor#{_hashCode}({other}).OnComplete(): Spec violation"); + return; } - - // spec violation or cancellation race, but nothing we can do - return; } } @@ -1824,91 +1919,106 @@ public void OnNext(T element) if (element == null) { var ex = ReactiveStreamsCompliance.ElementMustNotBeNullException; + PrintDebug($"VirtualProcessor#{_hashCode}.OnNext(null)"); - while (true) + void Rec() { - if (Value == null || Value is ISubscription) + while (true) { - if (!CompareAndSet(Value, new ErrorPublisher(ex, "failed-VirtualProcessor"))) - continue; + switch (Value) + { + case null: + case ISubscription _: + if (!CompareAndSet(Value, new ErrorPublisher(ex, "failed-VirtualProcessor"))) continue; + return; - break; - } + case ISubscriber subscriber: + try + { + subscriber.OnError(ex); + } + finally + { + Value = Inert.Instance; + } + return; - var subscriber = Value as ISubscriber; - if (subscriber != null) - { - try - { - subscriber.OnError(ex); - } - finally - { - Value = Inert.Instance; - } - break; - } + case Both both: + try + { + both.Subscriber.OnError(ex); + } + finally + { + Value = Inert.Instance; + } + return; - var both = Value as Both; - if (both != null) - { - try - { - both.Subscriber.OnError(ex); - } - finally - { - Value = Inert.Instance; + default: + // spec violation or cancellation race, but nothing we can do + return; } } - - // spec violation or cancellation race, but nothing we can do - break; } - // must throw ArgumentNullEx, rule 2:13 - throw ex; - } - - while (true) - { - var both = Value as Both; - if (both != null) + try { - try - { - both.Subscriber.OnNext(element); - return; - } - catch (Exception e) - { - Value = Inert.Instance; - throw new IllegalStateException( - "Subscriber threw exception, this is in violation of rule 2:13", e); - } + Rec(); } - - var subscriber = Value as ISubscriber; - if (subscriber != null) + finally { - // spec violation - var ex = new IllegalStateException(NoDemand); - var inert = GetAndSet(Inert.Instance); - if (inert != Inert.Instance) - new ErrorPublisher(ex, "failed-VirtualProcessor").Subscribe(subscriber); + // must throw ArgumentNullEx, rule 2:13 throw ex; } - - if (Value == Inert.Instance || Value is IPublisher) + } + else + { + void Rec() { - // nothing to be done - return; + while (true) + { + switch (Value) + { + case IHasActualSubscriber h: + var s = h.Subscriber; + try + { + PrintDebug($"VirtualProcessor#{_hashCode}({h.GetType()}({s})).OnNext({element}).Rec()"); + s.OnNext(element); + } + catch (Exception e) + { + PrintDebug($"VirtualProcessor#{_hashCode}({h.GetType()}({s})).OnNext({element}) threw {e.Message} -> Inert. Spec violation"); + Value = Inert.Instance; + throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e); + } + return; + + case ISubscriber subscriber: + // spec violation + PrintDebug($"VirtualProcessor#{_hashCode}({subscriber}).OnNext({element}).Rec() -> Inert: Spec violation"); + var ex = new IllegalStateException(NoDemand); + var inert = GetAndSet(Inert.Instance); + if (inert != Inert.Instance) new ErrorPublisher(ex, "failed-VirtualProcessor").Subscribe(subscriber); + throw ex; + + case Inert _: + case IPublisher _: + // nothing to be done + PrintDebug($"VirtualProcessor#{_hashCode}(Inert|Publisher).OnNext({element}).Rec(): noop"); + return; + + case var other: + PrintDebug($"VirtualProcessor#{_hashCode}({other}).OnNext({element}).Rec() -> ErrorPublisher"); + var publisher = new ErrorPublisher(new IllegalStateException(NoDemand), "failed-VirtualPublisher"); + if (!CompareAndSet(other, publisher)) + continue; + throw publisher.Cause; + } + } } - var publisher = new ErrorPublisher(new IllegalStateException(NoDemand), "failed-VirtualPublisher"); - if (!CompareAndSet(Value, publisher)) - continue; - throw publisher.Cause; + Rec(); } } @@ -1952,18 +2062,23 @@ public void Request(long n) { if (n < 1) { + PrintDebug($"VirtualProcessor#{_processor._hashCode}.WrappedSubscription({_real}.Request({n})"); ReactiveStreamsCompliance.TryCancel(_real, new IllegalStateException($"Demand must not be < 1. Was: {n}")); var value = _processor.GetAndSet(Inert.Instance); - var both = value as Both; - if (both != null) - ReactiveStreamsCompliance.RejectDueToNonPositiveDemand(both.Subscriber); - else if (value == Inert.Instance) + switch (value) { - // another failure has won the race - } - else - { - // this cannot possibly happen, but signaling errors is impossible at this point + case Both both: + ReactiveStreamsCompliance.RejectDueToNonPositiveDemand(both.Subscriber); + break; + case Establishing est: + ReactiveStreamsCompliance.RejectDueToNonPositiveDemand(est.Subscriber); + break; + case Inert _: + // another failure has won the race + break; + default: + // this cannot possibly happen, but signaling errors is impossible at this point + break; } } else @@ -1979,11 +2094,17 @@ public void Request(long n) var current = Value; if (current == PassThrough.Instance) { + PrintDebug($"VirtualProcessor#{_processor._hashCode}.WrappedSubscription({_real}.BufferDemand({n}) PassThrough"); _real.Request(n); - break; + return; } + if (!CompareAndSet(current, new Buffering(current.Demand + n))) + { + PrintDebug($"VirtualProcessor#{_processor._hashCode}.WrappedSubscription({_real}.BufferDemand({n}) buffering"); continue; + } + break; } } @@ -1991,6 +2112,7 @@ public void Request(long n) public void Cancel() { + PrintDebug($"VirtualProcessor#{_processor._hashCode}.WrappedSubscription({_real}.Cancel() -> Inert"); _processor.Value = Inert.Instance; _real.Cancel(); } @@ -1998,6 +2120,7 @@ public void Cancel() public void UngateDemandAndRequestBuffered() { // Ungate demand + PrintDebug($"VirtualProcessor#{_processor._hashCode}.WrappedSubscription({_real}.UngateDemandAndRequestBuffered()"); var requests = GetAndSet(PassThrough.Instance).Demand; // And request buffered demand if(requests > 0) @@ -2091,6 +2214,8 @@ public void RegisterPublisher(IUntypedPublisher publisher) /// TBD public void RegisterPublisher(IPublisher publisher) { + if(VirtualProcessor.IsDebug) + Console.WriteLine($"{this}.RegisterPublisher({publisher})"); while (true) { if (Value == null)