From 19ab29b465d75cbd2ee093e1eb62048284bf5164 Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Sun, 31 Dec 2017 01:07:01 +0100 Subject: [PATCH 1/6] use generic type parameter for all public methods of GraphStageLogic which are dealing with Inlets or Outlets --- .../CoreAPISpec.ApproveStreams.approved.txt | 35 +- .../Fusing/GraphInterpreterSpecKit.cs | 413 +++++------------- src/core/Akka.Streams/Dsl/Graph.cs | 117 +++-- src/core/Akka.Streams/Dsl/Restart.cs | 137 ++---- .../Fusing/ActorGraphInterpreter.cs | 77 ++-- .../Fusing/EnumeratorInterpreter.cs | 31 +- src/core/Akka.Streams/KillSwitch.cs | 45 +- src/core/Akka.Streams/Shape.cs | 49 +-- src/core/Akka.Streams/Stage/GraphStage.cs | 147 ++++--- 9 files changed, 392 insertions(+), 659 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index fbbdb22082d..b44ea579eb2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1421,7 +1421,7 @@ namespace Akka.Streams.Dsl public Partition(int outputPorts, System.Func partitioner) { } public override Akka.Streams.UniformFanOutShape Shape { get; } protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } - public Akka.Streams.Outlet Out(int id) { } + public Akka.Streams.Outlet Out(int id) { } public override string ToString() { } } public sealed class PartitionOutOfBoundsException : System.Exception @@ -4097,8 +4097,8 @@ namespace Akka.Streams.Stage protected void AbortReading(Akka.Streams.Inlet inlet) { } protected internal void AfterPostStop() { } protected internal void BeforePreStart() { } - protected void Cancel(Akka.Streams.Inlet inlet) { } - protected void Complete(Akka.Streams.Outlet outlet) { } + protected void Cancel(Akka.Streams.Inlet inlet) { } + protected void Complete(Akka.Streams.Outlet outlet) { } public void CompleteStage() { } public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func predicate) { } public static Akka.Streams.Stage.OutHandler ConditionalTerminateOutput(System.Func predicate) { } @@ -4109,36 +4109,33 @@ namespace Akka.Streams.Stage protected internal void EmitMultiple(Akka.Streams.Outlet outlet, System.Collections.Generic.IEnumerable elements) { } protected internal void EmitMultiple(Akka.Streams.Outlet outlet, System.Collections.Generic.IEnumerator enumerator, System.Action andThen) { } protected internal void EmitMultiple(Akka.Streams.Outlet outlet, System.Collections.Generic.IEnumerator enumerator) { } - protected void Fail(Akka.Streams.Outlet outlet, System.Exception reason) { } + protected void Fail(Akka.Streams.Outlet outlet, System.Exception reason) { } public void FailStage(System.Exception reason) { } protected System.Action GetAsyncCallback(System.Action handler) { } protected System.Action GetAsyncCallback(System.Action handler) { } - protected Akka.Streams.Stage.IInHandler GetHandler(Akka.Streams.Inlet inlet) { } - protected Akka.Streams.Stage.IOutHandler GetHandler(Akka.Streams.Outlet outlet) { } + protected Akka.Streams.Stage.IInHandler GetHandler(Akka.Streams.Inlet inlet) { } + protected Akka.Streams.Stage.IOutHandler GetHandler(Akka.Streams.Outlet outlet) { } [Akka.Annotations.ApiMayChangeAttribute()] protected Akka.Streams.Stage.StageActorRef GetStageActorRef(Akka.Streams.Stage.StageActorRef.Receive receive) { } - protected internal T Grab(Akka.Streams.Inlet inlet) { } protected internal T Grab(Akka.Streams.Inlet inlet) { } - protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } - protected internal bool IsAvailable(Akka.Streams.Inlet inlet) { } - protected internal bool IsAvailable(Akka.Streams.Outlet outlet) { } - protected bool IsClosed(Akka.Streams.Inlet inlet) { } - protected bool IsClosed(Akka.Streams.Outlet outlet) { } + protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } + protected internal bool IsAvailable(Akka.Streams.Inlet inlet) { } + protected internal bool IsAvailable(Akka.Streams.Outlet outlet) { } + protected bool IsClosed(Akka.Streams.Inlet inlet) { } + protected bool IsClosed(Akka.Streams.Outlet outlet) { } protected void PassAlong(Akka.Streams.Inlet from, Akka.Streams.Outlet to, bool doFinish = True, bool doFail = True, bool doPull = False) where TIn : TOut { } public virtual void PostStop() { } public virtual void PreStart() { } - protected internal void Pull(Akka.Streams.Inlet inlet) { } protected internal void Pull(Akka.Streams.Inlet inlet) { } - protected internal void Push(Akka.Streams.Outlet outlet, T element) { } + protected internal void Push(Akka.Streams.Outlet outlet, T element) { } protected void Read(Akka.Streams.Inlet inlet, System.Action andThen, System.Action onClose) { } protected void ReadMany(Akka.Streams.Inlet inlet, int n, System.Action> andThen, System.Action> onComplete) { } - protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { } - protected internal void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } - protected internal void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } - protected internal void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } + protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { } + protected internal void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } + protected internal void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } + protected internal void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } protected void SetKeepGoing(bool enabled) { } - protected internal void TryPull(Akka.Streams.Inlet inlet) { } protected internal void TryPull(Akka.Streams.Inlet inlet) { } protected sealed class LambdaInHandler : Akka.Streams.Stage.InHandler { diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs index ed7713d984b..522b643187a 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs @@ -32,45 +32,26 @@ public abstract class BaseBuilder private GraphInterpreter _interpreter; private readonly ILoggingAdapter _logger; - protected BaseBuilder(ActorSystem system) - { - _logger = Logging.GetLogger(system, "InterpreterSpecKit"); - } + protected BaseBuilder(ActorSystem system) => _logger = Logging.GetLogger(system, "InterpreterSpecKit"); public GraphInterpreter Interpreter => _interpreter; - public void StepAll() - { - Interpreter.Execute(int.MaxValue); - } + public void StepAll() => Interpreter.Execute(int.MaxValue); - public virtual void Step() - { - Interpreter.Execute(1); - } + public virtual void Step() => Interpreter.Execute(1); public class Upstream : GraphInterpreter.UpstreamBoundaryStageLogic { - private readonly Outlet _out; + public Upstream() => Out = new Outlet("up") { Id = 0 }; - public Upstream() - { - _out = new Outlet("up") { Id = 0 }; - } - - public override Outlet Out => _out; + public override Outlet Out { get; } } public class Downstream : GraphInterpreter.DownstreamBoundaryStageLogic { - private readonly Inlet _in; + public Downstream() => In = new Inlet("up") { Id = 0 }; - public Downstream() - { - _in = new Inlet("up") { Id = 0 }; - } - - public override Inlet In => _in; + public override Inlet In { get; } } public class AssemblyBuilder @@ -206,15 +187,9 @@ public class OnComplete : ITestEvent { public GraphStageLogic Source { get; } - public OnComplete(GraphStageLogic source) - { - Source = source; - } + public OnComplete(GraphStageLogic source) => Source = source; - protected bool Equals(OnComplete other) - { - return Equals(Source, other.Source); - } + protected bool Equals(OnComplete other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -224,25 +199,16 @@ public override bool Equals(object obj) return Equals((OnComplete) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } public class Cancel : ITestEvent { public GraphStageLogic Source { get; } - public Cancel(GraphStageLogic source) - { - Source = source; - } + public Cancel(GraphStageLogic source) => Source = source; - protected bool Equals(Cancel other) - { - return Equals(Source, other.Source); - } + protected bool Equals(Cancel other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -252,10 +218,7 @@ public override bool Equals(object obj) return Equals((Cancel) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } public class OnError : ITestEvent @@ -269,10 +232,7 @@ public OnError(GraphStageLogic source, Exception cause) Cause = cause; } - protected bool Equals(OnError other) - { - return Equals(Source, other.Source) && Equals(Cause, other.Cause); - } + protected bool Equals(OnError other) => Equals(Source, other.Source) && Equals(Cause, other.Cause); public override bool Equals(object obj) { @@ -302,10 +262,7 @@ public OnNext(GraphStageLogic source, object element) Element = element; } - protected bool Equals(OnNext other) - { - return Equals(Source, other.Source) && Equals(Element, other.Element); - } + protected bool Equals(OnNext other) => Equals(Source, other.Source) && Equals(Element, other.Element); public override bool Equals(object obj) { @@ -328,15 +285,9 @@ public class RequestOne : ITestEvent { public GraphStageLogic Source { get; } - public RequestOne(GraphStageLogic source) - { - Source = source; - } + public RequestOne(GraphStageLogic source) => Source = source; - protected bool Equals(RequestOne other) - { - return Equals(Source, other.Source); - } + protected bool Equals(RequestOne other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -346,25 +297,16 @@ public override bool Equals(object obj) return Equals((RequestOne) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } public class RequestAnother : ITestEvent { public GraphStageLogic Source { get; } - public RequestAnother(GraphStageLogic source) - { - Source = source; - } + public RequestAnother(GraphStageLogic source) => Source = source; - protected bool Equals(RequestAnother other) - { - return Equals(Source, other.Source); - } + protected bool Equals(RequestAnother other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -374,25 +316,16 @@ public override bool Equals(object obj) return Equals((RequestAnother) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } public class PreStart : ITestEvent { public GraphStageLogic Source { get; } - public PreStart(GraphStageLogic source) - { - Source = source; - } + public PreStart(GraphStageLogic source) => Source = source; - protected bool Equals(PreStart other) - { - return Equals(Source, other.Source); - } + protected bool Equals(PreStart other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -402,25 +335,16 @@ public override bool Equals(object obj) return Equals((PreStart) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } public class PostStop : ITestEvent { public GraphStageLogic Source { get; } - public PostStop(GraphStageLogic source) - { - Source = source; - } + public PostStop(GraphStageLogic source) => Source = source; - protected bool Equals(PostStop other) - { - return Equals(Source, other.Source); - } + protected bool Equals(PostStop other) => Equals(Source, other.Source); public override bool Equals(object obj) { @@ -430,10 +354,7 @@ public override bool Equals(object obj) return Equals((PostStop) obj); } - public override int GetHashCode() - { - return Source?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Source?.GetHashCode() ?? 0; } #endregion @@ -450,20 +371,11 @@ public ISet LastEvents() return result; } - public void ClearEvents() - { - LastEvent = new HashSet(); - } + public void ClearEvents() => LastEvent = new HashSet(); - public UpstreamProbe NewUpstreamProbe(string name) - { - return new UpstreamProbe(this, name); - } + public UpstreamProbe NewUpstreamProbe(string name) => new UpstreamProbe(this, name); - public DownstreamProbe NewDownstreamProbe(string name) - { - return new DownstreamProbe(this, name); - } + public DownstreamProbe NewDownstreamProbe(string name) => new DownstreamProbe(this, name); public class UpstreamProbe : GraphInterpreter.UpstreamBoundaryStageLogic { @@ -472,19 +384,21 @@ public class UpstreamProbe : GraphInterpreter.UpstreamBoundaryStageLogic public UpstreamProbe(TestSetup setup, string name) { _name = name; - Out = new Outlet("out") {Id = 0}; + Outlet = new Outlet("out") {Id = 0}; var probe = this; - SetHandler(Out, () => setup.LastEvent.Add(new RequestOne(probe)), () => setup.LastEvent.Add(new Cancel(probe))); + SetHandler(Outlet, () => setup.LastEvent.Add(new RequestOne(probe)), () => setup.LastEvent.Add(new Cancel(probe))); } - public sealed override Outlet Out { get; } + public sealed override Outlet Out => Outlet; + + public Outlet Outlet { get; } public void OnNext(T element, int eventLimit = int.MaxValue) { if (GraphInterpreter.IsDebug) Console.WriteLine($"----- NEXT: {this} {element}"); - Push(Out, element); + Push(Outlet, element); Interpreter.Execute(eventLimit); } @@ -492,7 +406,7 @@ public void OnComplete(int eventLimit = int.MaxValue) { if (GraphInterpreter.IsDebug) Console.WriteLine($"----- COMPLETE: {this}"); - Complete(Out); + Complete(Outlet); Interpreter.Execute(eventLimit); } @@ -500,14 +414,11 @@ public void OnFailure(int eventLimit = int.MaxValue, Exception ex = null) { if (GraphInterpreter.IsDebug) Console.WriteLine($"----- FAIL: {this}"); - Fail(Out, ex); + Fail(Outlet, ex); Interpreter.Execute(eventLimit); } - public override string ToString() - { - return _name; - } + public override string ToString() => _name; } public class DownstreamProbe : GraphInterpreter.DownstreamBoundaryStageLogic @@ -517,21 +428,23 @@ public class DownstreamProbe : GraphInterpreter.DownstreamBoundaryStageLogic public DownstreamProbe(TestSetup setup, string name) { _name = name; - In = new Inlet("in") {Id = 0}; + Inlet = new Inlet("in") {Id = 0}; var probe = this; - SetHandler(In, () => setup.LastEvent.Add(new OnNext(probe, Grab(In))), + SetHandler(Inlet, () => setup.LastEvent.Add(new OnNext(probe, Grab(Inlet))), () => setup.LastEvent.Add(new OnComplete(probe)), ex => setup.LastEvent.Add(new OnError(probe, ex))); } - public sealed override Inlet In { get; } + public sealed override Inlet In => Inlet; + + public Inlet Inlet { get; } public void RequestOne(int eventLimit = int.MaxValue) { if (GraphInterpreter.IsDebug) Console.WriteLine($"----- REQ: {this}"); - Pull(In); + Pull(Inlet); Interpreter.Execute(eventLimit); } @@ -539,14 +452,11 @@ public void Cancel(int eventLimit = int.MaxValue) { if (GraphInterpreter.IsDebug) Console.WriteLine($"----- CANCEL: {this}"); - Cancel(In); + Cancel(Inlet); Interpreter.Execute(eventLimit); } - public override string ToString() - { - return _name; - } + public override string ToString() => _name; } } @@ -556,14 +466,12 @@ public class PortTestSetup : TestSetup public UpstreamPortProbe Out { get; } public DownstreamPortProbe In { get; } - private readonly GraphAssembly _assembly; - public PortTestSetup(ActorSystem system, bool chasing = false) : base(system) { _chasing = chasing; var propagateStage = new EventPropagateStage(); - _assembly = !chasing + var assembly = !chasing ? new GraphAssembly(new IGraphStageWithMaterializedValue[0], new Attributes[0], new Inlet[] {null}, new[] {-1}, new Outlet[] {null}, new[] {-1}) : new GraphAssembly(new[] {propagateStage}, new[] {Attributes.None}, @@ -573,7 +481,7 @@ public PortTestSetup(ActorSystem system, bool chasing = false) : base(system) Out = new UpstreamPortProbe(this); In = new DownstreamPortProbe(this); - ManualInit(_assembly); + ManualInit(assembly); Interpreter.AttachDownstreamBoundary(Interpreter.Connections[chasing ? 1 : 0], In); Interpreter.AttachUpstreamBoundary(Interpreter.Connections[0], Out); Interpreter.Init(null); @@ -604,10 +512,7 @@ public Logic(EventPropagateStage stage) :base(stage.Shape) public void OnDownstreamFinish() => Cancel(_stage.In); } - public EventPropagateStage() - { - Shape = new FlowShape(In, Out); - } + public EventPropagateStage() => Shape = new FlowShape(In, Out); public Inlet In { get; } = new Inlet("Propagate.in"); @@ -627,15 +532,15 @@ public UpstreamPortProbe(TestSetup setup) : base(setup, "upstreamPort") { } - public bool IsAvailable() => IsAvailable(Out); + public bool IsAvailable() => IsAvailable(Outlet); - public bool IsClosed() => IsClosed(Out); + public bool IsClosed() => IsClosed(Outlet); - public void Push(T element) => Push(Out, element); + public void Push(T element) => Push(Outlet, element); - public void Complete() => Complete(Out); + public void Complete() => Complete(Outlet); - public void Fail(Exception ex) => Fail(Out, ex); + public void Fail(Exception ex) => Fail(Outlet, ex); } public class DownstreamPortProbe : DownstreamProbe @@ -643,33 +548,33 @@ public class DownstreamPortProbe : DownstreamProbe public DownstreamPortProbe(TestSetup setup) : base(setup, "downstreamPort") { var probe = this; - SetHandler(In, () => - { - // Modified onPush that does not Grab() automatically the element. This access some internals. - var internalEvent = PortToConn[In.Id].Slot; + SetHandler(Inlet, () => + { + // Modified onPush that does not Grab() automatically the element. This access some internals. + var internalEvent = PortToConn[In.Id].Slot; - if (internalEvent is GraphInterpreter.Failed) - ((PortTestSetup) setup).LastEvent.Add(new OnNext(probe, - ((GraphInterpreter.Failed) internalEvent).PreviousElement)); - else - ((PortTestSetup) setup).LastEvent.Add(new OnNext(probe, internalEvent)); - }, - () => ((PortTestSetup) setup).LastEvent.Add(new OnComplete(probe)), - ex => ((PortTestSetup) setup).LastEvent.Add(new OnError(probe, ex)) - ); + if (internalEvent is GraphInterpreter.Failed failed) + ((PortTestSetup)setup).LastEvent.Add(new OnNext(probe, + failed.PreviousElement)); + else + ((PortTestSetup)setup).LastEvent.Add(new OnNext(probe, internalEvent)); + }, + () => ((PortTestSetup)setup).LastEvent.Add(new OnComplete(probe)), + ex => ((PortTestSetup)setup).LastEvent.Add(new OnError(probe, ex)) + ); } - public bool IsAvailable() => IsAvailable(In); + public bool IsAvailable() => IsAvailable(Inlet); - public bool HasBeenPulled() => HasBeenPulled(In); + public bool HasBeenPulled() => HasBeenPulled(Inlet); - public bool IsClosed() => IsClosed(In); + public bool IsClosed() => IsClosed(Inlet); - public void Pull() => Pull(In); + public void Pull() => Pull(Inlet); - public void Cancel() => Cancel(In); + public void Cancel() => Cancel(Inlet); - public T Grab() => Grab(In); + public T Grab() => Grab(Inlet); } } @@ -710,20 +615,11 @@ public FailingStageSetup(ActorSystem system, bool initFailOnNextEvent = false) : .Init(); } - public void FailOnNextEvent() - { - _failOnNextEvent = true; - } + public void FailOnNextEvent() => _failOnNextEvent = true; - public void FailOnPostStop() - { - _failOnPostStop = true; - } + public void FailOnPostStop() => _failOnPostStop = true; - public Exception TestException() - { - return new TestException("test"); - } + public Exception TestException() => new TestException("test"); public class FailingGraphStageLogic : GraphStageLogic { @@ -754,10 +650,7 @@ private void MayFail(Action task) } } - public override void PreStart() - { - MayFail(() => _setup.LastEvent.Add(new PreStart(this))); - } + public override void PreStart() => MayFail(() => _setup.LastEvent.Add(new PreStart(this))); public override void PostStop() { @@ -766,32 +659,20 @@ public override void PostStop() else throw _setup.TestException(); } - public override string ToString() - { - return "stage"; - } + public override string ToString() => "stage"; } public class SandwitchStage : GraphStage> { private readonly FailingStageSetup _setup; - public SandwitchStage(FailingStageSetup setup) - { - _setup = setup; - } + public SandwitchStage(FailingStageSetup setup) => _setup = setup; public override FlowShape Shape => _setup._stageShape; - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - return _setup.Stage.Value; - } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => _setup.Stage.Value; - public override string ToString() - { - return "stage"; - } + public override string ToString() => "stage"; } public class UpstreamPortProbe : UpstreamProbe @@ -800,20 +681,11 @@ public UpstreamPortProbe(TestSetup setup) : base(setup, "upstreamPort") { } - public void Push(T element) - { - Push(Out, element); - } + public void Push(T element) => Push(Outlet, element); - public void Complete() - { - Complete(Out); - } + public void Complete() => Complete(Outlet); - public void Fail(Exception ex) - { - Fail(Out, ex); - } + public void Fail(Exception ex) => Fail(Outlet, ex); } public class DownstreamPortProbe : DownstreamProbe @@ -822,15 +694,9 @@ public DownstreamPortProbe(TestSetup setup) : base(setup, "downstreamPort") { } - public void Pull() - { - Pull(In); - } + public void Pull() => Pull(Inlet); - public void Cancel() - { - Cancel(In); - } + public void Cancel() => Cancel(Inlet); } } @@ -843,10 +709,7 @@ public interface ITestEvent public class OnComplete : ITestEvent { - protected bool Equals(OnComplete other) - { - return true; - } + protected bool Equals(OnComplete other) => true; public override bool Equals(object obj) { @@ -856,18 +719,12 @@ public override bool Equals(object obj) return Equals((OnComplete) obj); } - public override int GetHashCode() - { - return 0; - } + public override int GetHashCode() => 0; } public class Cancel : ITestEvent { - protected bool Equals(Cancel other) - { - return true; - } + protected bool Equals(Cancel other) => true; public override bool Equals(object obj) { @@ -877,25 +734,16 @@ public override bool Equals(object obj) return Equals((Cancel) obj); } - public override int GetHashCode() - { - return 0; - } + public override int GetHashCode() => 0; } public class OnError : ITestEvent { public Exception Cause { get; } - public OnError(Exception cause) - { - Cause = cause; - } + public OnError(Exception cause) => Cause = cause; - protected bool Equals(OnError other) - { - return Equals(Cause, other.Cause); - } + protected bool Equals(OnError other) => Equals(Cause, other.Cause); public override bool Equals(object obj) { @@ -905,25 +753,19 @@ public override bool Equals(object obj) return Equals((OnError) obj); } - public override int GetHashCode() - { - return Cause?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Cause?.GetHashCode() ?? 0; } public class OnNext : ITestEvent { public object Element { get; } - public OnNext(object element) - { - Element = element; - } + public OnNext(object element) => Element = element; protected bool Equals(OnNext other) { - return Element is IEnumerable - ? ((IEnumerable) Element).Cast().SequenceEqual(((IEnumerable) other.Element).Cast()) + return Element is IEnumerable enumerable + ? enumerable.Cast().SequenceEqual(((IEnumerable) other.Element).Cast()) : Equals(Element, other.Element); } @@ -935,18 +777,12 @@ public override bool Equals(object obj) return Equals((OnNext) obj); } - public override int GetHashCode() - { - return Element?.GetHashCode() ?? 0; - } + public override int GetHashCode() => Element?.GetHashCode() ?? 0; } public class RequestOne : ITestEvent { - protected bool Equals(RequestOne other) - { - return true; - } + protected bool Equals(RequestOne other) => true; public override bool Equals(object obj) { @@ -956,18 +792,12 @@ public override bool Equals(object obj) return Equals((RequestOne) obj); } - public override int GetHashCode() - { - return 0; - } + public override int GetHashCode() => 0; } public class RequestAnother : ITestEvent { - protected bool Equals(RequestAnother other) - { - return true; - } + protected bool Equals(RequestAnother other) => true; public override bool Equals(object obj) { @@ -977,10 +807,7 @@ public override bool Equals(object obj) return Equals((RequestAnother) obj); } - public override int GetHashCode() - { - return 0; - } + public override int GetHashCode() => 0; } #endregion @@ -1002,13 +829,14 @@ public ISet LastEvents() public class UpstreamOneBoundedProbe : GraphInterpreter.UpstreamBoundaryStageLogic { private readonly OneBoundedSetup _setup; + private readonly Outlet _outlet; public UpstreamOneBoundedProbe(OneBoundedSetup setup) { _setup = setup; - Out = new Outlet("out") {Id = 0}; + _outlet = new Outlet("out") {Id = 0}; - SetHandler(Out, () => + SetHandler(_outlet, () => { if (setup.LastEvent.OfType().Any()) setup.LastEvent.Add(new RequestAnother()); @@ -1017,30 +845,30 @@ public UpstreamOneBoundedProbe(OneBoundedSetup setup) }, () => setup.LastEvent.Add(new Cancel())); } - public override Outlet Out { get; } + public override Outlet Out => _outlet; public void OnNext(T element) { - Push(Out, element); + Push(_outlet, element); _setup.Run(); } public void OnComplete() { - Complete(Out); + Complete(_outlet); _setup.Run(); } public void OnNextAndComplete(T element) { - Push(Out, element); - Complete(Out); + Push(_outlet, element); + Complete(_outlet); _setup.Run(); } public void OnError(Exception ex) { - Fail(Out, ex); + Fail(_outlet, ex); _setup.Run(); } } @@ -1048,31 +876,29 @@ public void OnError(Exception ex) public class DownstreamOneBoundedPortProbe : GraphInterpreter.DownstreamBoundaryStageLogic { private readonly OneBoundedSetup _setup; + private readonly Inlet _inlet; public DownstreamOneBoundedPortProbe(OneBoundedSetup setup) { _setup = setup; - In = new Inlet("in") {Id = 0}; + _inlet = new Inlet("in") {Id = 0}; - SetHandler(In, () => - { - setup.LastEvent.Add(new OnNext(Grab(In))); - }, + SetHandler(_inlet, () => setup.LastEvent.Add(new OnNext(Grab(_inlet))), () => setup.LastEvent.Add(new OnComplete()), ex => setup.LastEvent.Add(new OnError(ex))); } - public override Inlet In { get; } + public override Inlet In => _inlet; public void RequestOne() { - Pull(In); + Pull(_inlet); _setup.Run(); } public void Cancel() { - Cancel(In); + Cancel(_inlet); _setup.Run(); } } @@ -1094,10 +920,7 @@ public OneBoundedSetup(ActorSystem system, params IGraphStageWithMaterializedVal public new UpstreamOneBoundedProbe Upstream { get; } public new DownstreamOneBoundedPortProbe Downstream { get; } - protected sealed override void Run() - { - Interpreter.Execute(int.MaxValue); - } + protected sealed override void Run() => Interpreter.Execute(int.MaxValue); private void Initialize() { diff --git a/src/core/Akka.Streams/Dsl/Graph.cs b/src/core/Akka.Streams/Dsl/Graph.cs index 547724ac662..f1a16af40e0 100644 --- a/src/core/Akka.Streams/Dsl/Graph.cs +++ b/src/core/Akka.Streams/Dsl/Graph.cs @@ -261,10 +261,7 @@ public MergePreferredShape(int secondaryPorts, IInit init) : base(secondaryPorts /// /// TBD /// TBD - protected override FanInShape Construct(IInit init) - { - return new MergePreferredShape(_secondaryPorts, init); - } + protected override FanInShape Construct(IInit init) => new MergePreferredShape(_secondaryPorts, init); /// /// TBD @@ -447,25 +444,24 @@ public sealed class MergePrioritized : GraphStage> internal sealed class MergePrioritizedLogic : OutGraphStageLogic { private readonly MergePrioritized _stage; - private List>> allBuffers; - private int runningUpstreams; - private Random randomGen = new Random(); + private readonly List>> _allBuffers; + private int _runningUpstreams; + private readonly Random _randomGen = new Random(); public MergePrioritizedLogic(MergePrioritized stage) : base(stage.Shape) { _stage = stage; - allBuffers = new List>>(stage.Priorities.Count); + _allBuffers = new List>>(stage.Priorities.Count); + foreach (int priority in stage.Priorities) - { - allBuffers.Add(FixedSizeBuffer.Create>(priority)); - } + _allBuffers.Add(FixedSizeBuffer.Create>(priority)); - runningUpstreams = stage.InputPorts; + _runningUpstreams = stage.InputPorts; for (int i = 0; i < stage.In.Count; i++) { var inlet = stage.In[i]; - var buffer = allBuffers[i]; + var buffer = _allBuffers[i]; SetHandler(inlet, onPush: () => { @@ -475,20 +471,18 @@ public MergePrioritizedLogic(MergePrioritized stage) : base(stage.Shape) TryPull(inlet); } else - { buffer.Enqueue(inlet); - } }, onUpstreamFinish: () => { if (_stage.EagerComplete) { _stage.In.ForEach(Cancel); - runningUpstreams = 0; + _runningUpstreams = 0; if (!HasPending) CompleteStage(); } else { - runningUpstreams -= 1; + _runningUpstreams -= 1; if (UpstreamsClosed && !HasPending) CompleteStage(); } }); @@ -505,9 +499,9 @@ public override void OnPull() DequeueAndDispatch(); } - public bool HasPending => allBuffers.Any(c => c.NonEmpty); + public bool HasPending => _allBuffers.Any(c => c.NonEmpty); - public bool UpstreamsClosed => runningUpstreams == 0; + public bool UpstreamsClosed => _runningUpstreams == 0; private void DequeueAndDispatch() { @@ -526,24 +520,22 @@ private Inlet SelectNextElement() while (ix < _stage.In.Count) { - if (allBuffers[ix].NonEmpty) - { + if (_allBuffers[ix].NonEmpty) tp += _stage.Priorities[ix]; - } ix += 1; } - int r = randomGen.Next(tp); + int r = _randomGen.Next(tp); Inlet next = null; ix = 0; while (ix < _stage.In.Count && next == null) { - if (allBuffers[ix].NonEmpty) + if (_allBuffers[ix].NonEmpty) { r -= _stage.Priorities[ix]; if (r < 0) - next = allBuffers[ix].Dequeue(); + next = _allBuffers[ix].Dequeue(); } ix += 1; } @@ -575,9 +567,7 @@ public MergePrioritized(IEnumerable priorities, bool eagerComplete = false) var input = new List>(); for (int i = 1; i <= InputPorts; i++) - { input.Add(new Inlet("MergePrioritized.in" + i)); - } In = input; Out = new Outlet("MergePrioritized.out"); @@ -800,30 +790,30 @@ private sealed class Logic : GraphStageLogic private readonly MergeSorted _stage; private T _other; - private readonly Action _dispatchRight; - private readonly Action _dispatchLeft; - private readonly Action _passRight; - private readonly Action _passLeft; private readonly Action _readRight; private readonly Action _readLeft; public Logic(MergeSorted stage) : base(stage.Shape) { _stage = stage; - _dispatchRight = right => Dispatch(_other, right); - _dispatchLeft = left => Dispatch(left, _other); - _passRight = () => Emit(_stage.Out, _other, () => + + void DispatchRight(T right) => Dispatch(_other, right); + void DispatchLeft(T left) => Dispatch(left, _other); + + void PassRight() => Emit(_stage.Out, _other, () => { NullOut(); PassAlong(_stage.Right, _stage.Out, doPull: true); }); - _passLeft = () => Emit(_stage.Out, _other, () => + + void PassLeft() => Emit(_stage.Out, _other, () => { NullOut(); PassAlong(_stage.Left, _stage.Out, doPull: true); }); - _readRight = () => Read(_stage.Right, _dispatchRight, _passLeft); - _readLeft = () => Read(_stage.Left, _dispatchLeft, _passRight); + + _readRight = () => Read(_stage.Right, DispatchRight, PassLeft); + _readLeft = () => Read(_stage.Left, DispatchLeft, PassRight); SetHandler(_stage.Left, IgnoreTerminateInput); SetHandler(_stage.Right, IgnoreTerminateInput); @@ -842,10 +832,7 @@ public override void PreStart() () => PassAlong(_stage.Right, _stage.Out)); } - private void NullOut() - { - _other = default(T); - } + private void NullOut() => _other = default(T); private void Dispatch(T left, T right) { @@ -1076,18 +1063,17 @@ private sealed class Logic : InGraphStageLogic private readonly Partition _stage; private object _outPendingElement; private int _outPendingIndex; - private int _downstreamRunning; public Logic(Partition stage) : base(stage.Shape) { _stage = stage; - _downstreamRunning = stage._outputPorts; + var downstreamRunning = stage._outputPorts; SetHandler(stage.In, this); for (var i = 0; i < stage._outputPorts; i++) { - var output = stage.Shape.Outlets[i]; + var output = stage.Out(i); var index = i; SetHandler(output, onPull: () => @@ -1112,8 +1098,8 @@ public Logic(Partition stage) : base(stage.Shape) Pull(stage.In); }, onDownstreamFinish: () => { - _downstreamRunning--; - if(_downstreamRunning == 0) + downstreamRunning--; + if(downstreamRunning == 0) CompleteStage(); else if (_outPendingElement != null) { @@ -1141,7 +1127,7 @@ public override void OnPush() if (IsAvailable(_stage.Out(index))) { Push(_stage.Out(index), element); - if (_stage.Shape.Outlets.Any(IsAvailable)) + if (_stage._outlets.Any(IsAvailable)) Pull(_stage.In); } else @@ -1150,7 +1136,7 @@ public override void OnPush() _outPendingIndex = index; } } - else if (_stage.Shape.Outlets.Any(IsAvailable)) + else if (_stage._outlets.Any(IsAvailable)) Pull(_stage.In); } @@ -1165,6 +1151,7 @@ public override void OnUpstreamFinish() private readonly int _outputPorts; private readonly Func _partitioner; + private readonly Outlet[] _outlets; /// /// Initializes a new instance of the class. @@ -1175,8 +1162,8 @@ public Partition(int outputPorts, Func partitioner) { _outputPorts = outputPorts; _partitioner = partitioner; - var outlets = Enumerable.Range(0, outputPorts).Select(i => new Outlet("Partition.out" + i)).ToArray(); - Shape = new UniformFanOutShape(In, outlets); + _outlets = Enumerable.Range(0, outputPorts).Select(i => new Outlet("Partition.out" + i)).ToArray(); + Shape = new UniformFanOutShape(In, _outlets); } /// @@ -1189,7 +1176,7 @@ public Partition(int outputPorts, Func partitioner) /// /// TBD /// TBD - public Outlet Out(int id) => Shape.Out(id); + public Outlet Out(int id) => Shape.Out(id); /// /// TBD @@ -1249,15 +1236,14 @@ private sealed class Logic : InGraphStageLogic { private readonly Balance _stage; private readonly FixedSizeBuffer> _pendingQueue; - private int _needDownstreamPulls; - private int _downstreamsRunning; + public Logic(Shape shape, Balance stage) : base(shape) { _stage = stage; _pendingQueue = FixedSizeBuffer.Create>(_stage._outputPorts); - _downstreamsRunning = _stage._outputPorts; + var downstreamsRunning = _stage._outputPorts; - _needDownstreamPulls = _stage._waitForAllDownstreams ? _stage._outputPorts : 0; + var needDownstreamPulls = _stage._waitForAllDownstreams ? _stage._outputPorts : 0; SetHandler(_stage.In, this); @@ -1269,10 +1255,10 @@ public Logic(Shape shape, Balance stage) : base(shape) if (!hasPulled) { hasPulled = true; - if (_needDownstreamPulls > 0) _needDownstreamPulls--; + if (needDownstreamPulls > 0) needDownstreamPulls--; } - if (_needDownstreamPulls == 0) + if (needDownstreamPulls == 0) { if (IsAvailable(_stage.In)) { @@ -1288,12 +1274,12 @@ public Logic(Shape shape, Balance stage) : base(shape) }, onDownstreamFinish: () => { - _downstreamsRunning--; - if (_downstreamsRunning == 0) CompleteStage(); - else if (!hasPulled && _needDownstreamPulls > 0) + downstreamsRunning--; + if (downstreamsRunning == 0) CompleteStage(); + else if (!hasPulled && needDownstreamPulls > 0) { - _needDownstreamPulls--; - if (_needDownstreamPulls == 0 && !HasBeenPulled(_stage.In)) Pull(_stage.In); + needDownstreamPulls--; + if (needDownstreamPulls == 0 && !HasBeenPulled(_stage.In)) Pull(_stage.In); } }); } @@ -1719,7 +1705,7 @@ public class Concat : GraphStage> where private sealed class Logic : OutGraphStageLogic { private readonly Concat _stage; - private int _activeStream = 0; + private int _activeStream; public Logic(Concat stage) : base(stage.Shape) { @@ -1899,10 +1885,7 @@ public override void OnUpstreamFinish() /// /// Initializes a new instance of the class. /// - public OrElse() - { - Shape = new UniformFanInShape(Out, Primary, Secondary); - } + public OrElse() => Shape = new UniformFanInShape(Out, Primary, Secondary); /// /// TBD diff --git a/src/core/Akka.Streams/Dsl/Restart.cs b/src/core/Akka.Streams/Dsl/Restart.cs index b84526daa9a..14000d40bd8 100644 --- a/src/core/Akka.Streams/Dsl/Restart.cs +++ b/src/core/Akka.Streams/Dsl/Restart.cs @@ -6,10 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Akka.Pattern; using Akka.Streams.Stage; @@ -62,19 +58,17 @@ public RestartWithBackoffSource( } public Outlet Out { get; } = new Outlet("RestartWithBackoffSource.out"); + public override SourceShape Shape { get; } - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - return new Logic(this, "Source"); - } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Source"); - private class Logic : RestartWithBackoffLogic> + private class Logic : RestartWithBackoffLogic, T, T> { private readonly RestartWithBackoffSource _stage; public Logic(RestartWithBackoffSource stage, string name) - : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + : base(name, stage.Shape, null, stage.Out, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) { _stage = stage; Backoff(); @@ -85,18 +79,13 @@ protected override void StartGraph() var sinkIn = CreateSubInlet(_stage.Out); _stage.SourceFactory().RunWith(sinkIn.Sink, SubFusingMaterializer); if (IsAvailable(_stage.Out)) - { sinkIn.Pull(); - } } - protected override void Backoff() + protected override void Backoff() => SetHandler(_stage.Out, () => { - SetHandler(_stage.Out, () => - { - // do nothing - }); - } + // do nothing + }); } } @@ -150,19 +139,17 @@ public RestartWithBackoffSink( } public Inlet In { get; } = new Inlet("RestartWithBackoffSink.in"); + public override SinkShape Shape { get; } - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - return new Logic(this, "Sink"); - } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Sink"); - private class Logic : RestartWithBackoffLogic> + private class Logic : RestartWithBackoffLogic, T, T> { private readonly RestartWithBackoffSink _stage; public Logic(RestartWithBackoffSink stage, string name) - : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + : base(name, stage.Shape, stage.In, null, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) { _stage = stage; Backoff(); @@ -174,13 +161,10 @@ protected override void StartGraph() Source.FromGraph(sourceOut.Source).RunWith(_stage.SinkFactory(), SubFusingMaterializer); } - protected override void Backoff() + protected override void Backoff() => SetHandler(_stage.In, () => { - SetHandler(_stage.In, () => - { - // do nothing - }); - } + // do nothing + }); } } @@ -233,22 +217,20 @@ public RestartWithBackoffFlow( } public Inlet In { get; } = new Inlet("RestartWithBackoffFlow.in"); + public Outlet Out { get; } = new Outlet("RestartWithBackoffFlow.out"); public override FlowShape Shape { get; } - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - return new Logic(this, "Flow"); - } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Flow"); - private class Logic : RestartWithBackoffLogic> + private class Logic : RestartWithBackoffLogic, TIn, TOut> { private readonly RestartWithBackoffFlow _stage; - private Tuple, SubSinkInlet> activeOutIn = null; + private Tuple, SubSinkInlet> _activeOutIn; public Logic(RestartWithBackoffFlow stage, string name) - : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + : base(name, stage.Shape, stage.In, stage.Out, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) { _stage = stage; Backoff(); @@ -260,10 +242,9 @@ protected override void StartGraph() var sinkIn = CreateSubInlet(_stage.Out); Source.FromGraph(sourceOut.Source).Via(_stage.FlowFactory()).RunWith(sinkIn.Sink, SubFusingMaterializer); if (IsAvailable(_stage.Out)) - { sinkIn.Pull(); - } - activeOutIn = Tuple.Create(sourceOut, sinkIn); + + _activeOutIn = Tuple.Create(sourceOut, sinkIn); } protected override void Backoff() @@ -279,19 +260,16 @@ protected override void Backoff() // We need to ensure that the other end of the sub flow is also completed, so that we don't // receive any callbacks from it. - if (activeOutIn != null) + if (_activeOutIn != null) { - var sourceOut = activeOutIn.Item1; - var sinkIn = activeOutIn.Item2; + var sourceOut = _activeOutIn.Item1; + var sinkIn = _activeOutIn.Item2; if (!sourceOut.IsClosed) - { sourceOut.Complete(); - } + if (!sinkIn.IsClosed) - { sinkIn.Cancel(); - } - activeOutIn = null; + _activeOutIn = null; } } } @@ -300,60 +278,56 @@ protected override void Backoff() /// /// Shared logic for all restart with backoff logics. /// - internal abstract class RestartWithBackoffLogic : TimerGraphStageLogic where S : Shape + internal abstract class RestartWithBackoffLogic : TimerGraphStageLogic where TShape : Shape { private readonly string _name; - private readonly S _shape; private readonly TimeSpan _minBackoff; private readonly TimeSpan _maxBackoff; private readonly double _randomFactor; - protected Inlet In { get; } - protected Outlet Out { get; } + protected Inlet In { get; } + protected Outlet Out { get; } - private int _restartCount = 0; + private int _restartCount; private Deadline _resetDeadline; // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // don't want to restart the sub inlet when it finishes, we just finish normally. - private bool _finishing = false; + private bool _finishing; protected RestartWithBackoffLogic( string name, - S shape, + TShape shape, + Inlet inlet, + Outlet outlet, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) : base(shape) { _name = name; - _shape = shape; _minBackoff = minBackoff; _maxBackoff = maxBackoff; _randomFactor = randomFactor; _resetDeadline = minBackoff.FromNow(); - In = shape.Inlets.FirstOrDefault(); - Out = shape.Outlets.FirstOrDefault(); + In = inlet; + Out = outlet; } protected abstract void StartGraph(); + protected abstract void Backoff(); - protected SubSinkInlet CreateSubInlet(Outlet outlet) + protected SubSinkInlet CreateSubInlet(Outlet outlet) { - var sinkIn = new SubSinkInlet(this, $"RestartWithBackoff{_name}.subIn"); + var sinkIn = new SubSinkInlet(this, $"RestartWithBackoff{_name}.subIn"); sinkIn.SetHandler(new LambdaInHandler( - onPush: () => - { - Push(Out, sinkIn.Grab()); - }, + onPush: () => Push(Out, sinkIn.Grab()), onUpstreamFinish: () => { if (_finishing) - { Complete(Out); - } else { Log.Debug("Graph out finished"); @@ -363,9 +337,7 @@ protected SubSinkInlet CreateSubInlet(Outlet outlet) onUpstreamFailure: ex => { if (_finishing) - { - Fail(_shape.Outlets.First(), ex); - } + Fail(Out, ex); else { Log.Error(ex, "Restarting graph due to failure"); @@ -384,16 +356,14 @@ protected SubSinkInlet CreateSubInlet(Outlet outlet) return sinkIn; } - protected SubSourceOutlet CreateSubOutlet(Inlet inlet) + protected SubSourceOutlet CreateSubOutlet(Inlet inlet) { - var sourceOut = new SubSourceOutlet(this, $"RestartWithBackoff{_name}.subOut"); + var sourceOut = new SubSourceOutlet(this, $"RestartWithBackoff{_name}.subOut"); sourceOut.SetHandler(new LambdaOutHandler( onPull: () => { if (IsAvailable(In)) - { - sourceOut.Push(Grab(In)); - } + sourceOut.Push(Grab(In)); else { if (!HasBeenPulled(In)) @@ -403,9 +373,7 @@ protected SubSourceOutlet CreateSubOutlet(Inlet inlet) onDownstreamFinish: () => { if (_finishing) - { Cancel(In); - } else { Log.Debug("Graph in finished"); @@ -418,7 +386,7 @@ protected SubSourceOutlet CreateSubOutlet(Inlet inlet) onPush: () => { if (sourceOut.IsAvailable) - sourceOut.Push(Grab(In)); + sourceOut.Push(Grab(In)); }, onUpstreamFinish: () => { @@ -463,10 +431,7 @@ protected internal override void OnTimer(object timerKey) internal sealed class Deadline { - public Deadline(TimeSpan time) - { - Time = time; - } + public Deadline(TimeSpan time) => Time = time; public TimeSpan Time { get; } @@ -474,17 +439,11 @@ public Deadline(TimeSpan time) public static Deadline Now => new Deadline(new TimeSpan(DateTime.UtcNow.Ticks)); - public static Deadline operator +(Deadline deadline, TimeSpan duration) - { - return new Deadline(deadline.Time.Add(duration)); - } + public static Deadline operator +(Deadline deadline, TimeSpan duration) => new Deadline(deadline.Time.Add(duration)); } internal static class DeadlineExtensions { - public static Deadline FromNow(this TimeSpan timespan) - { - return Deadline.Now + timespan; - } + public static Deadline FromNow(this TimeSpan timespan) => Deadline.Now + timespan; } } diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index 0886173e5cb..7f8a16fa8cb 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -66,31 +66,21 @@ public GraphModule(GraphAssembly assembly, Shape shape, Attributes attributes, I /// /// TBD /// TBD - public override IModule WithAttributes(Attributes attributes) - { - return new GraphModule(Assembly, Shape, attributes, MaterializedValueIds); - } + public override IModule WithAttributes(Attributes attributes) => new GraphModule(Assembly, Shape, attributes, MaterializedValueIds); /// /// TBD /// /// TBD - public override IModule CarbonCopy() - { - return new CopiedModule(Shape.DeepCopy(), Attributes.None, this); - } + public override IModule CarbonCopy() => new CopiedModule(Shape.DeepCopy(), Attributes.None, this); /// /// TBD /// /// TBD /// TBD - public override IModule ReplaceShape(Shape newShape) - { - if (!newShape.Equals(Shape)) - return CompositeModule.Create(this, newShape); - return this; - } + public override IModule ReplaceShape(Shape newShape) => + !newShape.Equals(Shape) ? (IModule)CompositeModule.Create(this, newShape) : this; /// /// TBD @@ -745,10 +735,7 @@ public struct Resume : IBoundaryEvent /// TBD /// /// TBD - public Resume(GraphInterpreterShell shell) - { - Shell = shell; - } + public Resume(GraphInterpreterShell shell) => Shell = shell; /// /// TBD @@ -765,10 +752,7 @@ public struct Abort : IBoundaryEvent /// TBD /// /// TBD - public Abort(GraphInterpreterShell shell) - { - Shell = shell; - } + public Abort(GraphInterpreterShell shell) => Shell = shell; /// /// TBD @@ -920,26 +904,23 @@ private sealed class OutHandler : Stage.OutHandler { private readonly BatchingActorInputBoundary _that; - public OutHandler(BatchingActorInputBoundary that) - { - _that = that; - } + public OutHandler(BatchingActorInputBoundary that) => _that = that; public override void OnPull() { var elementsCount = _that._inputBufferElements; var upstreamCompleted = _that._upstreamCompleted; - if (elementsCount > 1) _that.Push(_that.Out, _that.Dequeue()); + if (elementsCount > 1) _that.Push(_that._outlet, _that.Dequeue()); else if (elementsCount == 1) { if (upstreamCompleted) { - _that.Push(_that.Out, _that.Dequeue()); - _that.Complete(_that.Out); + _that.Push(_that._outlet, _that.Dequeue()); + _that.Complete(_that._outlet); } - else _that.Push(_that.Out, _that.Dequeue()); + else _that.Push(_that._outlet, _that.Dequeue()); } - else if (upstreamCompleted) _that.Complete(_that.Out); + else if (upstreamCompleted) _that.Complete(_that._outlet); } public override void OnDownstreamFinish() => _that.Cancel(); @@ -961,7 +942,7 @@ public override void OnPull() private bool _downstreamCanceled; private readonly int _requestBatchSize; private int _batchRemaining; - private readonly Outlet _outlet; + private readonly Outlet _outlet; /// /// TBD @@ -1001,7 +982,7 @@ public void OnInternalError(Exception reason) if (!(_upstreamCompleted || _downstreamCanceled) && !ReferenceEquals(_upstream, null)) _upstream.Cancel(); - if (!IsClosed(Out)) + if (!IsClosed(_outlet)) OnError(reason); } @@ -1015,7 +996,7 @@ public void OnError(Exception reason) { _upstreamCompleted = true; Clear(); - Fail(Out, reason); + Fail(_outlet, reason); } } @@ -1028,7 +1009,7 @@ public void OnComplete() { _upstreamCompleted = true; if (_inputBufferElements == 0) - Complete(Out); + Complete(_outlet); } } @@ -1067,8 +1048,8 @@ public void OnNext(object element) throw new IllegalStateException("Input buffer overrun"); _inputBuffer[(_nextInputElementCursor + _inputBufferElements) & _indexMask] = element; _inputBufferElements++; - if (IsAvailable(Out)) - Push(Out, Dequeue()); + if (IsAvailable(_outlet)) + Push(_outlet, Dequeue()); } } @@ -1160,18 +1141,15 @@ private sealed class InHandler : Stage.InHandler { private readonly ActorOutputBoundary _that; - public InHandler(ActorOutputBoundary that) - { - _that = that; - } + public InHandler(ActorOutputBoundary that) => _that = that; public override void OnPush() { - _that.OnNext(_that.Grab(_that.In)); + _that.OnNext(_that.Grab(_that._inlet)); if (_that._downstreamCompleted) - _that.Cancel(_that.In); + _that.Cancel(_that._inlet); else if (_that._downstreamDemand > 0) - _that.Pull(_that.In); + _that.Pull(_that._inlet); } public override void OnUpstreamFinish() => _that.Complete(); @@ -1235,8 +1213,8 @@ public void RequestMore(long elements) _downstreamDemand += elements; if (_downstreamDemand < 0) _downstreamDemand = long.MaxValue; // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - if (!HasBeenPulled(In) && !IsClosed(In)) - Pull(In); + if (!HasBeenPulled(_inlet) && !IsClosed(_inlet)) + Pull(_inlet); } } @@ -1284,7 +1262,7 @@ public void Cancel() _downstreamCompleted = true; _subscriber = null; _exposedPublisher.Shutdown(new NormalShutdownException("UpstreamBoundary")); - Cancel(In); + Cancel(_inlet); } /// @@ -1446,8 +1424,7 @@ private void ShortCircuitBatch() while (_shortCircuitBuffer.Count != 0 && _currentLimit > 0 && _activeInterpreters.Count != 0) { var element = _shortCircuitBuffer.Dequeue(); - var boundary = element as IBoundaryEvent; - if (boundary != null) + if (element is IBoundaryEvent boundary) ProcessEvent(boundary); else if (element is ShellRegistered) FinishShellRegistration(); @@ -1500,7 +1477,7 @@ protected override bool Receive(object message) if (_shortCircuitBuffer != null) ShortCircuitBatch(); return true; - case StreamSupervisor.PrintDebugDump print: + case StreamSupervisor.PrintDebugDump _: var builder = new StringBuilder($"activeShells (actor: {Self}):\n"); foreach (var shell in _activeInterpreters) diff --git a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs index 7fa89e8d66c..4eaf9b9f058 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs @@ -29,14 +29,18 @@ public sealed class EnumeratorUpstream : GraphInterpreter.UpstreamBoundaryS /// TBD /// public bool HasNext; + + private readonly Outlet _outlet; + /// /// TBD /// /// TBD public EnumeratorUpstream(IEnumerator input) { - Out = new Outlet("IteratorUpstream.out") { Id = 0 }; - SetHandler(Out, onPull: () => + _outlet = new Outlet("IteratorUpstream.out") { Id = 0 }; + + SetHandler(_outlet, onPull: () => { if (!HasNext) CompleteStage(); else @@ -45,10 +49,10 @@ public EnumeratorUpstream(IEnumerator input) HasNext = input.MoveNext(); if (!HasNext) { - Push(Out, element); - Complete(Out); + Push(_outlet, element); + Complete(_outlet); } - else Push(Out, element); + else Push(_outlet, element); } }, onDownstreamFinish: CompleteStage); @@ -57,7 +61,7 @@ public EnumeratorUpstream(IEnumerator input) /// /// TBD /// - public override Outlet Out { get; } + public override Outlet Out => _outlet; } /// @@ -83,15 +87,18 @@ public sealed class EnumeratorDownstream : GraphInterpreter.DownstreamBoun /// internal Exception LastFailure; + private readonly Inlet _inlet; + /// /// TBD /// public EnumeratorDownstream() { - In = new Inlet("IteratorDownstream.in") { Id = 0 }; - SetHandler(In, onPush: () => + _inlet = new Inlet("IteratorDownstream.in") { Id = 0 }; + + SetHandler(_inlet, onPush: () => { - NextElement = Grab(In); + NextElement = Grab(_inlet); NeedsPull = false; }, onUpstreamFinish: () => @@ -110,7 +117,7 @@ public EnumeratorDownstream() /// /// TBD /// - public override Inlet In { get; } + public override Inlet In => _inlet; /// /// TBD @@ -170,7 +177,7 @@ private void PullIfNeeded() { if (NeedsPull) { - Pull(In); + Pull(_inlet); Interpreter.Execute(int.MaxValue); } } @@ -242,7 +249,7 @@ private void Init() log: NoLogger.Instance, connections: connections, logics: logics, - onAsyncInput: (_1, _2, _3) => { throw new NotSupportedException("IteratorInterpreter does not support asynchronous events.");}, + onAsyncInput: (_1, _2, _3) => throw new NotSupportedException("IteratorInterpreter does not support asynchronous events."), fuzzingMode: false, context: null); interpreter.AttachUpstreamBoundary(0, _upstream); diff --git a/src/core/Akka.Streams/KillSwitch.cs b/src/core/Akka.Streams/KillSwitch.cs index 52a315e0b15..b00789082fb 100644 --- a/src/core/Akka.Streams/KillSwitch.cs +++ b/src/core/Akka.Streams/KillSwitch.cs @@ -13,7 +13,7 @@ namespace Akka.Streams { /// /// Creates shared or single kill switches which can be used to control completion of graphs from the outside. - /// - The factory returns a which provides a + /// - The factory returns a which provides a /// of that can be /// used in arbitrary number of graphs and materializations. The switch simultaneously /// controls completion in all of those graphs. @@ -50,7 +50,7 @@ public static class KillSwitches /// TBD /// TBD public static IGraph, UniqueKillSwitch> SingleBidi - () => UniqueBidiKillSwitchStage.Instance; + () => UniqueBidiKillSwitchStage.Instance; /// /// TBD @@ -115,10 +115,7 @@ public Logic(Task terminationSignal, UniqueKillSwitchStage stage) : base(term public static UniqueKillSwitchStage Instance { get; } = new UniqueKillSwitchStage(); - private UniqueKillSwitchStage() - { - Shape = new FlowShape(In, Out); - } + private UniqueKillSwitchStage() => Shape = new FlowShape(In, Out); protected override Attributes InitialAttributes { get; } = Attributes.CreateName("breaker"); @@ -139,16 +136,16 @@ public override ILogicAndMaterializedValue CreateLogicAndMater public override string ToString() => "UniqueKillSwitchFlow"; } - private sealed class UniqueBidiKillSwitchStage : - GraphStageWithMaterializedValue, UniqueKillSwitch> + private sealed class UniqueBidiKillSwitchStage : + GraphStageWithMaterializedValue, UniqueKillSwitch> { #region Logic private sealed class Logic : KillableGraphStageLogic { - private readonly UniqueBidiKillSwitchStage _killSwitch; + private readonly UniqueBidiKillSwitchStage _killSwitch; - public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitch) + public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitch) : base(terminationSignal, killSwitch.Shape) { _killSwitch = killSwitch; @@ -179,25 +176,21 @@ public Logic(Task terminationSignal, UniqueBidiKillSwitchStage Instance { get; } = - new UniqueBidiKillSwitchStage(); + public static UniqueBidiKillSwitchStage Instance { get; } = new UniqueBidiKillSwitchStage(); - private UniqueBidiKillSwitchStage() - { - Shape = new BidiShape(In1, Out1, In2, Out2); - } + private UniqueBidiKillSwitchStage() => Shape = new BidiShape(In1, Out1, In2, Out2); protected override Attributes InitialAttributes { get; } = Attributes.CreateName("breaker"); - private Inlet In1 { get; } = new Inlet("KillSwitchBidi.in1"); + private Inlet In1 { get; } = new Inlet("KillSwitchBidi.in1"); - private Outlet Out1 { get; } = new Outlet("KillSwitchBidi.out1"); + private Outlet Out1 { get; } = new Outlet("KillSwitchBidi.out1"); - private Inlet In2 { get; } = new Inlet("KillSwitchBidi.in2"); + private Inlet In2 { get; } = new Inlet("KillSwitchBidi.in2"); - private Outlet Out2 { get; } = new Outlet("KillSwitchBidi.out2"); + private Outlet Out2 { get; } = new Outlet("KillSwitchBidi.out2"); - public override BidiShape Shape { get; } + public override BidiShape Shape { get; } public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { @@ -255,10 +248,7 @@ public sealed class UniqueKillSwitch : IKillSwitch /// TBD /// /// TBD - internal UniqueKillSwitch(TaskCompletionSource promise) - { - _promise = promise; - } + internal UniqueKillSwitch(TaskCompletionSource promise) => _promise = promise; /// /// After calling the running instance of the of that materialized to the @@ -365,10 +355,7 @@ public override ILogicAndMaterializedValue CreateLogicAndMater /// TBD /// /// TBD - internal SharedKillSwitch(string name) - { - _name = name; - } + internal SharedKillSwitch(string name) => _name = name; /// /// After calling all materialized, running instances of all s provided by the diff --git a/src/core/Akka.Streams/Shape.cs b/src/core/Akka.Streams/Shape.cs index 8e1ae78740e..600e127b7a1 100644 --- a/src/core/Akka.Streams/Shape.cs +++ b/src/core/Akka.Streams/Shape.cs @@ -63,11 +63,7 @@ public abstract class Inlet : InPort /// /// This exception is thrown when the specified is undefined. /// - protected Inlet(string name) - { - if (name == null) throw new ArgumentException("Inlet name must be defined"); - Name = name; - } + protected Inlet(string name) => Name = name ?? throw new ArgumentException("Inlet name must be defined"); /// /// TBD @@ -129,10 +125,7 @@ public abstract class Outlet : OutPort /// TBD /// /// TBD - protected Outlet(string name) - { - Name = name; - } + protected Outlet(string name) => Name = name; /// /// TBD @@ -351,10 +344,7 @@ public sealed class SourceShape : Shape /// TBD public SourceShape(Outlet outlet) { - if (outlet == null) - throw new ArgumentNullException(nameof(outlet)); - - Outlet = outlet; + Outlet = outlet ?? throw new ArgumentNullException(nameof(outlet)); Outlets = ImmutableArray.Create(outlet); } @@ -407,7 +397,7 @@ public override bool Equals(object obj) if (ReferenceEquals(this, obj)) return true; - return obj is SourceShape && Equals((SourceShape) obj); + return obj is SourceShape shape && Equals(shape); } /// @@ -450,13 +440,8 @@ public sealed class FlowShape : Shape, IFlowShape /// public FlowShape(Inlet inlet, Outlet outlet) { - if (inlet == null) - throw new ArgumentNullException(nameof(inlet), "FlowShape expected non-null inlet"); - if (outlet == null) - throw new ArgumentNullException(nameof(outlet), "FlowShape expected non-null outlet"); - - Inlet = inlet; - Outlet = outlet; + Inlet = inlet ?? throw new ArgumentNullException(nameof(inlet), "FlowShape expected non-null inlet"); + Outlet = outlet ?? throw new ArgumentNullException(nameof(outlet), "FlowShape expected non-null outlet"); Inlets = ImmutableArray.Create(inlet); Outlets = ImmutableArray.Create(outlet); } @@ -533,8 +518,7 @@ public sealed class SinkShape : Shape /// public SinkShape(Inlet inlet) { - if (inlet == null) throw new ArgumentNullException(nameof(inlet), "SinkShape expected non-null inlet"); - Inlet = inlet; + Inlet = inlet ?? throw new ArgumentNullException(nameof(inlet), "SinkShape expected non-null inlet"); Inlets = ImmutableArray.Create(inlet); } @@ -582,7 +566,7 @@ public override bool Equals(object obj) if (ReferenceEquals(this, obj)) return true; - return obj is SinkShape && Equals((SinkShape) obj); + return obj is SinkShape shape && Equals(shape); } private bool Equals(SinkShape other) => Equals(Inlet, other.Inlet); @@ -630,19 +614,10 @@ public sealed class BidiShape : Shape /// public BidiShape(Inlet in1, Outlet out1, Inlet in2, Outlet out2) { - if (in1 == null) - throw new ArgumentNullException(nameof(in1)); - if (out1 == null) - throw new ArgumentNullException(nameof(out1)); - if (in2 == null) - throw new ArgumentNullException(nameof(in2)); - if (out2 == null) - throw new ArgumentNullException(nameof(out2)); - - Inlet1 = in1; - Inlet2 = in2; - Outlet1 = out1; - Outlet2 = out2; + Inlet1 = in1 ?? throw new ArgumentNullException(nameof(in1)); + Inlet2 = in2 ?? throw new ArgumentNullException(nameof(in2)); + Outlet1 = out1 ?? throw new ArgumentNullException(nameof(out1)); + Outlet2 = out2 ?? throw new ArgumentNullException(nameof(out2)); Inlets = ImmutableArray.Create(Inlet1, Inlet2); Outlets = ImmutableArray.Create(Outlet1, Outlet2); diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index 0e7721226b4..0625a34bf1e 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -371,10 +371,7 @@ public sealed class Scheduled : IDeadLetterSuppression /// public Scheduled(object timerKey, int timerId, bool isRepeating) { - if (timerKey == null) - throw new ArgumentNullException(nameof(timerKey), "Timer key cannot be null"); - - TimerKey = timerKey; + TimerKey = timerKey ?? throw new ArgumentNullException(nameof(timerKey), "Timer key cannot be null"); TimerId = timerId; IsRepeating = isRepeating; } @@ -869,8 +866,7 @@ public ILoggingAdapter Log // only used in StageLogic, i.e. thread safe if (_log == null) { - var provider = Materializer as IMaterializerLoggingProvider; - if (provider != null) + if (Materializer is IMaterializerLoggingProvider provider) _log = provider.MakeLogger(LogSource); else _log = NoLogger.Instance; @@ -885,7 +881,7 @@ public ILoggingAdapter Log /// /// TBD /// TBD - protected internal void SetHandler(Inlet inlet, IInHandler handler) + protected internal void SetHandler(Inlet inlet, IInHandler handler) { Handlers[inlet.Id] = handler; _interpreter?.SetHandler(GetConnection(inlet), handler); @@ -899,9 +895,9 @@ protected internal void SetHandler(Inlet inlet, IInHandler handler) /// TBD /// TBD /// - /// This exception is thrown when the specified is undefined. + /// This exception is thrown when the specified is undefined. /// - protected internal void SetHandler(Inlet inlet, Action onPush, Action onUpstreamFinish = null, Action onUpstreamFailure = null) + protected internal void SetHandler(Inlet inlet, Action onPush, Action onUpstreamFinish = null, Action onUpstreamFailure = null) { if (onPush == null) throw new ArgumentNullException(nameof(onPush), "GraphStageLogic onPush handler must be provided"); @@ -914,19 +910,26 @@ protected internal void SetHandler(Inlet inlet, Action onPush, Action onUpstream /// /// TBD /// TBD - protected IInHandler GetHandler(Inlet inlet) => (IInHandler)Handlers[inlet.Id]; + protected IInHandler GetHandler(Inlet inlet) => (IInHandler)Handlers[inlet.Id]; /// /// Assigns callbacks for the events for an . /// /// TBD /// TBD - protected internal void SetHandler(Outlet outlet, IOutHandler handler) + private void SetHandler(Outlet outlet, IOutHandler handler) { Handlers[outlet.Id + InCount] = handler; _interpreter?.SetHandler(GetConnection(outlet), handler); } + /// + /// Assigns callbacks for the events for an . + /// + /// TBD + /// TBD + protected internal void SetHandler(Outlet outlet, IOutHandler handler) => SetHandler((Outlet)outlet, handler); + /// /// Assigns callbacks for the events for an . /// @@ -936,7 +939,7 @@ protected internal void SetHandler(Outlet outlet, IOutHandler handler) /// /// This exception is thrown when the specified is undefined. /// - protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownstreamFinish = null) + protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownstreamFinish = null) { if (onPull == null) throw new ArgumentNullException(nameof(onPull), "GraphStageLogic onPull handler must be provided"); @@ -948,7 +951,14 @@ protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownst /// /// TBD /// TBD - protected IOutHandler GetHandler(Outlet outlet) => (IOutHandler)Handlers[outlet.Id + InCount]; + private IOutHandler GetHandler(Outlet outlet) => (IOutHandler)Handlers[outlet.Id + InCount]; + + /// + /// Retrieves the current callback for the events on the given + /// + /// TBD + /// TBD + protected IOutHandler GetHandler(Outlet outlet) => GetHandler((Outlet)outlet); private Connection GetConnection(Inlet inlet) => PortToConn[inlet.Id]; @@ -969,7 +979,7 @@ private IOutHandler GetNonEmittingHandler(Outlet outlet) /// /// This exception is thrown when either the specified is closed or already pulled. /// - protected internal void Pull(Inlet inlet) + private void Pull(Inlet inlet) { var connection = GetConnection(inlet); var portState = connection.PortState; @@ -1008,28 +1018,19 @@ protected internal void Pull(Inlet inlet) /// There can only be one outstanding request at any given time.The method can be used /// query whether pull is allowed to be called or not. /// + /// TBD /// TBD - protected internal void TryPull(Inlet inlet) + protected internal void TryPull(Inlet inlet) { if (!IsClosed(inlet)) Pull(inlet); } - /// - /// Requests an element on the given port unless the port is already closed. - /// Calling this method twice before an element arrived will fail. - /// There can only be one outstanding request at any given time.The method can be used - /// query whether pull is allowed to be called or not. - /// - /// TBD - /// TBD - protected internal void TryPull(Inlet inlet) => TryPull((Inlet)inlet); - /// /// Requests to stop receiving events from a given input port. Cancelling clears any ungrabbed elements from the port. /// /// TBD - protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet)); + protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet)); /// /// Once the callback for an input port has been invoked, the element that has been pushed @@ -1044,7 +1045,7 @@ protected internal void TryPull(Inlet inlet) /// This exception is thrown when the specified is empty. /// /// TBD - protected internal T Grab(Inlet inlet) + private T Grab(Inlet inlet) { var connection = GetConnection(inlet); var element = connection.Slot; @@ -1086,9 +1087,17 @@ protected internal T Grab(Inlet inlet) /// /// TBD /// TBD - protected bool HasBeenPulled(Inlet inlet) + private bool HasBeenPulled(Inlet inlet) => (GetConnection(inlet).PortState & (InReady | InClosed)) == 0; + /// + /// Indicates whether there is already a pending pull for the given input port. If this method returns true + /// then must return false for that same port. + /// + /// TBD + /// TBD + protected bool HasBeenPulled(Inlet inlet) => HasBeenPulled((Inlet)inlet); + /// /// Indicates whether there is an element waiting at the given input port. can be used to retrieve the /// element. After calling this method will return false. @@ -1097,7 +1106,7 @@ protected bool HasBeenPulled(Inlet inlet) /// /// TBD /// TBD - protected internal bool IsAvailable(Inlet inlet) + private bool IsAvailable(Inlet inlet) { var connection = GetConnection(inlet); var normalArrived = (connection.PortState & (InReady | InFailed)) == InReady; @@ -1109,23 +1118,38 @@ protected internal bool IsAvailable(Inlet inlet) } // slow path on failure - if ((connection.PortState & (InReady | InFailed)) == - (InReady | InFailed)) + if ((connection.PortState & (InReady | InFailed)) == (InReady | InFailed)) { - var failed = connection.Slot as GraphInterpreter.Failed; // This can only be Empty actually (if a cancel was concurrent with a failure) - return failed != null && !ReferenceEquals(failed.PreviousElement, Empty.Instance); + return connection.Slot is GraphInterpreter.Failed failed && !ReferenceEquals(failed.PreviousElement, Empty.Instance); } return false; } + /// + /// Indicates whether there is an element waiting at the given input port. can be used to retrieve the + /// element. After calling this method will return false. + /// + /// If this method returns true then will return false for that same port. + /// + /// TBD + /// TBD + protected internal bool IsAvailable(Inlet inlet) => IsAvailable((Inlet)inlet); + + /// + /// Indicates whether the port has been closed. A closed port cannot be pulled. + /// + /// TBD + /// TBD + private bool IsClosed(Inlet inlet) => (GetConnection(inlet).PortState & InClosed) != 0; + /// /// Indicates whether the port has been closed. A closed port cannot be pulled. /// /// TBD /// TBD - protected bool IsClosed(Inlet inlet) => (GetConnection(inlet).PortState & InClosed) != 0; + protected bool IsClosed(Inlet inlet) => IsClosed((Inlet)inlet); /// /// Emits an element through the given output port. Calling this method twice before a has been arrived @@ -1138,7 +1162,7 @@ protected internal bool IsAvailable(Inlet inlet) /// /// This exception is thrown when either the specified is closed or already pulled. /// - protected internal void Push(Outlet outlet, T element) + protected internal void Push(Outlet outlet, T element) { var connection = GetConnection(outlet); var portState = connection.PortState; @@ -1173,12 +1197,12 @@ protected internal void Push(Outlet outlet, T element) /// /// TBD protected void SetKeepGoing(bool enabled) => Interpreter.SetKeepGoing(this, enabled); - + /// /// Signals that there will be no more elements emitted on the given port. /// /// TBD - protected void Complete(Outlet outlet) + private void Complete(Outlet outlet) { if (GetHandler(outlet) is Emitting e) e.AddFollowUp(new EmittingCompletion(e.Out, e.Previous, this)); @@ -1186,12 +1210,18 @@ protected void Complete(Outlet outlet) Interpreter.Complete(GetConnection(outlet)); } + /// + /// Signals that there will be no more elements emitted on the given port. + /// + /// TBD + protected void Complete(Outlet outlet) => Complete((Outlet)outlet); + /// /// Signals failure through the given port. /// /// TBD /// TBD - protected void Fail(Outlet outlet, Exception reason) => Interpreter.Fail(GetConnection(outlet), reason); + protected void Fail(Outlet outlet, Exception reason) => Interpreter.Fail(GetConnection(outlet), reason); /// /// Automatically invokes or on all the input or output ports that have been called, @@ -1216,7 +1246,7 @@ public void CompleteStage() } /// - /// Automatically invokes or on all the input or output ports that have been called, + /// Automatically invokes or on all the input or output ports that have been called, /// then stops the stage, then is called. /// /// TBD @@ -1238,7 +1268,7 @@ public void FailStage(Exception reason) /// /// TBD /// TBD - protected internal bool IsAvailable(Outlet outlet) + protected internal bool IsAvailable(Outlet outlet) => (GetConnection(outlet).PortState & (OutReady | OutClosed)) == OutReady; /// @@ -1246,7 +1276,7 @@ protected internal bool IsAvailable(Outlet outlet) /// /// TBD /// TBD - protected bool IsClosed(Outlet outlet) + protected bool IsClosed(Outlet outlet) => (GetConnection(outlet).PortState & OutClosed) != 0; /// @@ -2134,7 +2164,9 @@ public sealed class EagerTerminateInput : InHandler /// TBD /// public static readonly EagerTerminateInput Instance = new EagerTerminateInput(); + private EagerTerminateInput() { } + /// /// TBD /// @@ -2150,7 +2182,9 @@ public sealed class IgnoreTerminateInput : InHandler /// TBD /// public static readonly IgnoreTerminateInput Instance = new IgnoreTerminateInput(); + private IgnoreTerminateInput() { } + /// /// TBD /// @@ -2173,10 +2207,7 @@ public class ConditionalTerminateInput : InHandler /// TBD /// /// TBD - public ConditionalTerminateInput(Func predicate) - { - _predicate = predicate; - } + public ConditionalTerminateInput(Func predicate) => _predicate = predicate; /// /// TBD @@ -2201,7 +2232,9 @@ public sealed class TotallyIgnorantInput : InHandler /// TBD /// public static readonly TotallyIgnorantInput Instance = new TotallyIgnorantInput(); + private TotallyIgnorantInput() { } + /// /// TBD /// @@ -2226,6 +2259,7 @@ public sealed class EagerTerminateOutput : OutHandler /// TBD /// public static readonly EagerTerminateOutput Instance = new EagerTerminateOutput(); + private EagerTerminateOutput() { } /// /// TBD @@ -2242,7 +2276,9 @@ public sealed class IgnoreTerminateOutput : OutHandler /// TBD /// public static readonly IgnoreTerminateOutput Instance = new IgnoreTerminateOutput(); + private IgnoreTerminateOutput() { } + /// /// TBD /// @@ -2265,10 +2301,7 @@ public class ConditionalTerminateOutput : OutHandler /// TBD /// /// TBD - public ConditionalTerminateOutput(Func predicate) - { - _predicate = predicate; - } + public ConditionalTerminateOutput(Func predicate) => _predicate = predicate; /// /// TBD @@ -2294,6 +2327,7 @@ public sealed class StageActorRef : MinimalActorRef /// /// TBD public delegate void Receive(Tuple args); + /// /// TBD /// @@ -2525,30 +2559,21 @@ private sealed class NotInitialized : ICallbackState { public IList Args { get; } - public NotInitialized(IList args) - { - Args = args; - } + public NotInitialized(IList args) => Args = args; } private sealed class Initialized : ICallbackState { public Action Callback { get; } - public Initialized(Action callback) - { - Callback = callback; - } + public Initialized(Action callback) => Callback = callback; } private sealed class Stopped : ICallbackState { public Action Callback { get; } - public Stopped(Action callback) - { - Callback = callback; - } + public Stopped(Action callback) => Callback = callback; } private readonly AtomicReference _callbackState = From d8aabed863a149a1e0956d9a8e8590d13e2a655c Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Sun, 31 Dec 2017 21:00:53 +0100 Subject: [PATCH 2/6] fix performance project and rename type parmeters in kill switch --- .../InterpreterBenchmark.cs | 38 ++++++------------- src/core/Akka.Streams/KillSwitch.cs | 30 +++++++-------- 2 files changed, 26 insertions(+), 42 deletions(-) diff --git a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs index 1f289b3e40f..689284efa34 100644 --- a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs +++ b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs @@ -8,8 +8,6 @@ using System; using System.Linq; using System.Reflection; -using Akka.Actor; -using Akka.Event; using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; using Akka.Streams.Tests.Implementation.Fusing; @@ -71,7 +69,7 @@ private static void Execute(int numberOfIdentities) setup.Interpreter.Execute(int.MaxValue); }); } - + private sealed class GraphDataSource : GraphInterpreter.UpstreamBoundaryStageLogic { private int _index; @@ -84,11 +82,14 @@ public GraphDataSource(string toString, T[] data) // ReSharper disable once PossibleNullReferenceException typeof(OutPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(Out, 0); - SetHandler(Out, onPull: () => + var outlet = new Outlet("out"); + Out = outlet; + + SetHandler(outlet, onPull: () => { - if (_index < data.Length) + if (_index < data.Length) { - Push(Out, data[_index]); + Push(outlet, data[_index]); _index++; } else @@ -96,8 +97,8 @@ public GraphDataSource(string toString, T[] data) }, onDownstreamFinish: CompleteStage); Console.WriteLine("Handler Set"); } - - public override Outlet Out { get; } = new Outlet("out"); + + public override Outlet Out { get; } public override string ToString() => _toString; } @@ -114,10 +115,10 @@ public GraphDataSink(string toString, int expected) typeof(InPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(_inlet, 0); In = _inlet; - SetHandler(In, onPush: () => + SetHandler(_inlet, onPush: () => { expected--; - if(expected > 0) + if (expected > 0) Pull(_inlet); // Otherwise do nothing, it will exit the interpreter }); @@ -129,22 +130,5 @@ public GraphDataSink(string toString, int expected) public override string ToString() => _toString; } - - private sealed class NoobBus : LoggingBus - { - public static readonly NoobBus Instance = new NoobBus(); - - private NoobBus() { } - - public override bool Subscribe(IActorRef subscriber, Type classifier) => true; - - public override void Publish(object @event) - { - } - - public override bool Unsubscribe(IActorRef subscriber) => true; - - public override bool Unsubscribe(IActorRef subscriber, Type classifier) => true; - } } } diff --git a/src/core/Akka.Streams/KillSwitch.cs b/src/core/Akka.Streams/KillSwitch.cs index b00789082fb..b06b865c2aa 100644 --- a/src/core/Akka.Streams/KillSwitch.cs +++ b/src/core/Akka.Streams/KillSwitch.cs @@ -46,11 +46,11 @@ public static class KillSwitches /// /// For a Flow version see /// - /// TBD - /// TBD + /// TBD + /// TBD /// TBD - public static IGraph, UniqueKillSwitch> SingleBidi - () => UniqueBidiKillSwitchStage.Instance; + public static IGraph, UniqueKillSwitch> SingleBidi + () => UniqueBidiKillSwitchStage.Instance; /// /// TBD @@ -136,16 +136,16 @@ public override ILogicAndMaterializedValue CreateLogicAndMater public override string ToString() => "UniqueKillSwitchFlow"; } - private sealed class UniqueBidiKillSwitchStage : - GraphStageWithMaterializedValue, UniqueKillSwitch> + private sealed class UniqueBidiKillSwitchStage : + GraphStageWithMaterializedValue, UniqueKillSwitch> { #region Logic private sealed class Logic : KillableGraphStageLogic { - private readonly UniqueBidiKillSwitchStage _killSwitch; + private readonly UniqueBidiKillSwitchStage _killSwitch; - public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitch) + public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitch) : base(terminationSignal, killSwitch.Shape) { _killSwitch = killSwitch; @@ -176,21 +176,21 @@ public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSw #endregion - public static UniqueBidiKillSwitchStage Instance { get; } = new UniqueBidiKillSwitchStage(); + public static UniqueBidiKillSwitchStage Instance { get; } = new UniqueBidiKillSwitchStage(); - private UniqueBidiKillSwitchStage() => Shape = new BidiShape(In1, Out1, In2, Out2); + private UniqueBidiKillSwitchStage() => Shape = new BidiShape(In1, Out1, In2, Out2); protected override Attributes InitialAttributes { get; } = Attributes.CreateName("breaker"); - private Inlet In1 { get; } = new Inlet("KillSwitchBidi.in1"); + private Inlet In1 { get; } = new Inlet("KillSwitchBidi.in1"); - private Outlet Out1 { get; } = new Outlet("KillSwitchBidi.out1"); + private Outlet Out1 { get; } = new Outlet("KillSwitchBidi.out1"); - private Inlet In2 { get; } = new Inlet("KillSwitchBidi.in2"); + private Inlet In2 { get; } = new Inlet("KillSwitchBidi.in2"); - private Outlet Out2 { get; } = new Outlet("KillSwitchBidi.out2"); + private Outlet Out2 { get; } = new Outlet("KillSwitchBidi.out2"); - public override BidiShape Shape { get; } + public override BidiShape Shape { get; } public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { From 349cff49db09b71f931cb1b8b7ff750c67a6f58e Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Sun, 31 Dec 2017 23:25:33 +0100 Subject: [PATCH 3/6] api approval --- src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index b44ea579eb2..b0ac9b448a0 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -639,7 +639,7 @@ namespace Akka.Streams { public static Akka.Streams.SharedKillSwitch Shared(string name) { } public static Akka.Streams.IGraph, Akka.Streams.UniqueKillSwitch> Single() { } - public static Akka.Streams.IGraph, Akka.Streams.UniqueKillSwitch> SingleBidi() { } + public static Akka.Streams.IGraph, Akka.Streams.UniqueKillSwitch> SingleBidi() { } } public struct MaterializationContext { From b93c221303c7c56bba924de3e1a90befc6c24e85 Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Thu, 4 Jan 2018 20:59:59 +0100 Subject: [PATCH 4/6] fix merge changes --- .../CoreAPISpec.ApproveStreams.approved.txt | 23 +++++++++++++++---- src/core/Akka.Streams/Stage/GraphStage.cs | 2 +- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index b0ac9b448a0..34cff5e4e70 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -5,6 +5,10 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETFramework,Version=v4.5", FrameworkDisplayName=".NET Framework 4.5")] namespace Akka.Streams { + public sealed class AbruptStageTerminationException : System.Exception + { + public AbruptStageTerminationException(Akka.Streams.Stage.GraphStageLogic logic) { } + } public class AbruptTerminationException : System.Exception { public readonly Akka.Actor.IActorRef Actor; @@ -2359,6 +2363,15 @@ namespace Akka.Streams.Implementation public void Subscribe(Reactive.Streams.ISubscriber subscriber) { } public override string ToString() { } } + public sealed class EmptySource : Akka.Streams.Stage.GraphStage> + { + public EmptySource() { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.SourceShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + public override string ToString() { } + } public abstract class EnumerableActorName { protected EnumerableActorName() { } @@ -3633,12 +3646,13 @@ namespace Akka.Streams.Implementation.Fusing public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class OnCompleted : Akka.Streams.Stage.PushStage + public sealed class OnCompleted : Akka.Streams.Stage.GraphStage> { public OnCompleted(System.Action success, System.Action failure) { } - public override Akka.Streams.Stage.ISyncDirective OnPush(TIn element, Akka.Streams.Stage.IContext context) { } - public override Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext context) { } - public override Akka.Streams.Stage.ITerminationDirective OnUpstreamFinish(Akka.Streams.Stage.IContext context) { } + public Akka.Streams.Inlet In { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.FlowShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] public sealed class Recover : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage @@ -4135,6 +4149,7 @@ namespace Akka.Streams.Stage protected internal void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } protected internal void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } protected internal void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } + protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } protected void SetKeepGoing(bool enabled) { } protected internal void TryPull(Akka.Streams.Inlet inlet) { } protected sealed class LambdaInHandler : Akka.Streams.Stage.InHandler diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index a6c6a69eca3..b10d236da1b 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -971,7 +971,7 @@ protected internal void SetHandler(Outlet outlet, Action onPull, Action on /// /// Assigns callbacks for the events for an and . /// - protected internal void SetHandler(Inlet inlet, Outlet outlet, InAndOutGraphStageLogic handler) + protected internal void SetHandler(Inlet inlet, Outlet outlet, InAndOutGraphStageLogic handler) { SetHandler(inlet, handler); SetHandler(outlet, handler); From d3f8195868f68d537f7bad200bd22491cf43285d Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Sat, 13 Jan 2018 20:36:12 +0100 Subject: [PATCH 5/6] fix interpreter benchmark --- .../Akka.Streams.Tests.Performance/InterpreterBenchmark.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs index 689284efa34..3c474d1decf 100644 --- a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs +++ b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs @@ -79,12 +79,12 @@ public GraphDataSource(string toString, T[] data) { _toString = toString; - // ReSharper disable once PossibleNullReferenceException - typeof(OutPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(Out, 0); - var outlet = new Outlet("out"); Out = outlet; + // ReSharper disable once PossibleNullReferenceException + typeof(OutPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(outlet, 0); + SetHandler(outlet, onPull: () => { if (_index < data.Length) From ad71f52d03188d3699e126456ff890638d5e2ad3 Mon Sep 17 00:00:00 2001 From: Marc Piechura Date: Mon, 15 Jan 2018 22:34:08 +0100 Subject: [PATCH 6/6] fix retry stage --- src/core/Akka.Streams/Dsl/Retry.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Streams/Dsl/Retry.cs b/src/core/Akka.Streams/Dsl/Retry.cs index e983b44574e..66d971340c7 100644 --- a/src/core/Akka.Streams/Dsl/Retry.cs +++ b/src/core/Akka.Streams/Dsl/Retry.cs @@ -152,7 +152,7 @@ public Logic(RetryCoordinator retry) : base(retry.Shape) Pull(retry.In2); if (IsAvailable(retry.Out2)) { - Push(retry.Out2, r.Item2); + Push(retry.Out2, r); _elementInCycle = true; } else