Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve type safety for custom graph stages #3237

Merged
merged 12 commits into from
Jan 16, 2018
39 changes: 18 additions & 21 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ namespace Akka.Streams
{
public static Akka.Streams.SharedKillSwitch Shared(string name) { }
public static Akka.Streams.IGraph<Akka.Streams.FlowShape<T, T>, Akka.Streams.UniqueKillSwitch> Single<T>() { }
public static Akka.Streams.IGraph<Akka.Streams.BidiShape<TIn1, TIn1, TOut1, TOut1>, Akka.Streams.UniqueKillSwitch> SingleBidi<TIn1, TOut1>() { }
public static Akka.Streams.IGraph<Akka.Streams.BidiShape<T1, T1, T2, T2>, Akka.Streams.UniqueKillSwitch> SingleBidi<T1, T2>() { }
}
public struct MaterializationContext
{
Expand Down Expand Up @@ -1425,7 +1425,7 @@ namespace Akka.Streams.Dsl
public Partition(int outputPorts, System.Func<T, int> partitioner) { }
public override Akka.Streams.UniformFanOutShape<T, T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public Akka.Streams.Outlet Out(int id) { }
public Akka.Streams.Outlet<T> Out(int id) { }
public override string ToString() { }
}
public sealed class PartitionOutOfBoundsException : System.Exception
Expand Down Expand Up @@ -4118,8 +4118,8 @@ namespace Akka.Streams.Stage
protected void AbortReading<T>(Akka.Streams.Inlet<T> inlet) { }
protected internal void AfterPostStop() { }
protected internal void BeforePreStart() { }
protected void Cancel(Akka.Streams.Inlet inlet) { }
protected void Complete(Akka.Streams.Outlet outlet) { }
protected void Cancel<T>(Akka.Streams.Inlet<T> inlet) { }
protected void Complete<T>(Akka.Streams.Outlet<T> outlet) { }
public void CompleteStage() { }
public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func<bool> predicate) { }
public static Akka.Streams.Stage.OutHandler ConditionalTerminateOutput(System.Func<bool> predicate) { }
Expand All @@ -4130,37 +4130,34 @@ namespace Akka.Streams.Stage
protected internal void EmitMultiple<T>(Akka.Streams.Outlet<T> outlet, System.Collections.Generic.IEnumerable<T> elements) { }
protected internal void EmitMultiple<T>(Akka.Streams.Outlet<T> outlet, System.Collections.Generic.IEnumerator<T> enumerator, System.Action andThen) { }
protected internal void EmitMultiple<T>(Akka.Streams.Outlet<T> outlet, System.Collections.Generic.IEnumerator<T> enumerator) { }
protected void Fail(Akka.Streams.Outlet outlet, System.Exception reason) { }
protected void Fail<T>(Akka.Streams.Outlet<T> outlet, System.Exception reason) { }
public void FailStage(System.Exception reason) { }
protected System.Action<T> GetAsyncCallback<T>(System.Action<T> handler) { }
protected System.Action GetAsyncCallback(System.Action handler) { }
protected Akka.Streams.Stage.IInHandler GetHandler(Akka.Streams.Inlet inlet) { }
protected Akka.Streams.Stage.IOutHandler GetHandler(Akka.Streams.Outlet outlet) { }
protected Akka.Streams.Stage.IInHandler GetHandler<T>(Akka.Streams.Inlet<T> inlet) { }
protected Akka.Streams.Stage.IOutHandler GetHandler<T>(Akka.Streams.Outlet<T> outlet) { }
[Akka.Annotations.ApiMayChangeAttribute()]
protected Akka.Streams.Stage.StageActorRef GetStageActorRef(Akka.Streams.Stage.StageActorRef.Receive receive) { }
protected internal T Grab<T>(Akka.Streams.Inlet inlet) { }
protected internal T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { }
protected internal bool IsAvailable(Akka.Streams.Inlet inlet) { }
protected internal bool IsAvailable(Akka.Streams.Outlet outlet) { }
protected bool IsClosed(Akka.Streams.Inlet inlet) { }
protected bool IsClosed(Akka.Streams.Outlet outlet) { }
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
protected internal bool IsAvailable<T>(Akka.Streams.Inlet<T> inlet) { }
protected internal bool IsAvailable<T>(Akka.Streams.Outlet<T> outlet) { }
protected bool IsClosed<T>(Akka.Streams.Inlet<T> inlet) { }
protected bool IsClosed<T>(Akka.Streams.Outlet<T> outlet) { }
protected void PassAlong<TOut, TIn>(Akka.Streams.Inlet<TIn> from, Akka.Streams.Outlet<TOut> to, bool doFinish = True, bool doFail = True, bool doPull = False)
where TIn : TOut { }
public virtual void PostStop() { }
public virtual void PreStart() { }
protected internal void Pull(Akka.Streams.Inlet inlet) { }
protected internal void Pull<T>(Akka.Streams.Inlet<T> inlet) { }
protected internal void Push<T>(Akka.Streams.Outlet outlet, T element) { }
protected internal void Push<T>(Akka.Streams.Outlet<T> outlet, T element) { }
protected void Read<T>(Akka.Streams.Inlet<T> inlet, System.Action<T> andThen, System.Action onClose) { }
protected void ReadMany<T>(Akka.Streams.Inlet<T> inlet, int n, System.Action<System.Collections.Generic.IEnumerable<T>> andThen, System.Action<System.Collections.Generic.IEnumerable<T>> onComplete) { }
protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { }
protected internal void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action<System.Exception> onUpstreamFailure = null) { }
protected internal void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { }
protected internal void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { }
protected internal void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { }
protected internal void SetHandler<T>(Akka.Streams.Inlet<T> inlet, Akka.Streams.Stage.IInHandler handler) { }
protected internal void SetHandler<T>(Akka.Streams.Inlet<T> inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action<System.Exception> onUpstreamFailure = null) { }
protected internal void SetHandler<T>(Akka.Streams.Outlet<T> outlet, Akka.Streams.Stage.IOutHandler handler) { }
protected internal void SetHandler<T>(Akka.Streams.Outlet<T> outlet, System.Action onPull, System.Action onDownstreamFinish = null) { }
protected internal void SetHandler<TIn, TOut>(Akka.Streams.Inlet<TIn> inlet, Akka.Streams.Outlet<TOut> outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { }
protected void SetKeepGoing(bool enabled) { }
protected internal void TryPull(Akka.Streams.Inlet inlet) { }
protected internal void TryPull<T>(Akka.Streams.Inlet<T> inlet) { }
protected sealed class LambdaInHandler : Akka.Streams.Stage.InHandler
{
Expand Down
40 changes: 12 additions & 28 deletions src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
using System;
using System.Linq;
using System.Reflection;
using Akka.Actor;
using Akka.Event;
using Akka.Streams.Implementation.Fusing;
using Akka.Streams.Stage;
using Akka.Streams.Tests.Implementation.Fusing;
Expand Down Expand Up @@ -71,7 +69,7 @@ private static void Execute(int numberOfIdentities)
setup.Interpreter.Execute(int.MaxValue);
});
}

private sealed class GraphDataSource<T> : GraphInterpreter.UpstreamBoundaryStageLogic
{
private int _index;
Expand All @@ -81,23 +79,26 @@ public GraphDataSource(string toString, T[] data)
{
_toString = toString;

var outlet = new Outlet<T>("out");
Out = outlet;

// ReSharper disable once PossibleNullReferenceException
typeof(OutPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(Out, 0);
typeof(OutPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(outlet, 0);

SetHandler(Out, onPull: () =>
SetHandler(outlet, onPull: () =>
{
if (_index < data.Length)
if (_index < data.Length)
{
Push(Out, data[_index]);
Push(outlet, data[_index]);
_index++;
}
else
CompleteStage();
}, onDownstreamFinish: CompleteStage);
Console.WriteLine("Handler Set");
}
public override Outlet Out { get; } = new Outlet<T>("out");

public override Outlet Out { get; }

public override string ToString() => _toString;
}
Expand All @@ -114,10 +115,10 @@ public GraphDataSink(string toString, int expected)
typeof(InPort).GetField("Id", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(_inlet, 0);
In = _inlet;

SetHandler(In, onPush: () =>
SetHandler(_inlet, onPush: () =>
{
expected--;
if(expected > 0)
if (expected > 0)
Pull(_inlet);
// Otherwise do nothing, it will exit the interpreter
});
Expand All @@ -129,22 +130,5 @@ public GraphDataSink(string toString, int expected)

public override string ToString() => _toString;
}

private sealed class NoobBus : LoggingBus
{
public static readonly NoobBus Instance = new NoobBus();

private NoobBus() { }

public override bool Subscribe(IActorRef subscriber, Type classifier) => true;

public override void Publish(object @event)
{
}

public override bool Unsubscribe(IActorRef subscriber) => true;

public override bool Unsubscribe(IActorRef subscriber, Type classifier) => true;
}
}
}
Loading