Skip to content

Commit

Permalink
Fixes Sink.Ignore signature from Task to Task<Done>
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed May 28, 2022
1 parent 34c830a commit c46969a
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4221,13 +4221,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void A_Graph_with_materialized_value_must_perform_side_effecting_transfor
});
var r = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<int>(), (b, sink) =>
{
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(0));
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
b.Add(g);
b.From(source).To(sink);
return ClosedShape.Instance;
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void Zip_must_complete_if_one_side_is_available_but_other_already_complet
var completed = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<(int, string)>(), (b, sink) =>
{
var zip = b.Add(new Zip<int, string>());
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue<Task>(_ => null);
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue<Task>(_ => null);
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
b.From(source1).To(zip.In0);
b.From(source2).To(zip.In1);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ private async Task ValidateServerClientCommunicationAsync(ByteString testData, S
(await serverConnection.WaitReadAsync()).Should().BeEquivalentTo(testData);
}

private Sink<Tcp.IncomingConnection, Task> EchoHandler() =>
private Sink<Tcp.IncomingConnection, Task<Done>> EchoHandler() =>
Sink.ForEach<Tcp.IncomingConnection>(c => c.Flow.Join(Flow.Create<ByteString>()).Run(Materializer));

[Fact]
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,17 +1325,17 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc,
bool allowClosedSubstreamRecreation)
{
var merge = new GroupByMergeBack<T, TMat, TKey>(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(
flow.Via(new Fusing.GroupBy<T, TKey>(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand All @@ -1349,7 +1349,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
/// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly
/// and not send to the substream.
/// </para>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task}, TClosed}, bool)"/>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task{Done}}, TClosed}, bool)"/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TMat"></typeparam>
Expand All @@ -1364,7 +1364,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);

/// <summary>
/// TBD
Expand Down Expand Up @@ -1472,15 +1472,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitWhen<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitWhenMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.When(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down Expand Up @@ -1577,15 +1577,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitAfter<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitAfterMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.After(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <returns>TBD</returns>
public static Sink<TIn, Task> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());
public static Sink<TIn, Task<Done>> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());

/// <summary>
/// A <see cref="Sink{TIn,TMat}"/> that will invoke the given <paramref name="action"/> for each received element.
Expand All @@ -306,7 +306,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// <typeparam name="TIn">TBD</typeparam>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
.Select(input =>
{
action(input);
Expand Down Expand Up @@ -357,7 +357,7 @@ public static Sink<TIn, NotUsed> Combine<TIn, TOut, TMat>(Func<int, IGraph<Unifo
/// <param name="parallelism">TBD</param>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
.SelectAsyncUnordered(parallelism, input => Task.Run(() =>
{
action(input);
Expand Down
19 changes: 8 additions & 11 deletions src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,16 +1017,16 @@ public TaskSource(Task<T> task)
/// Discards all received elements.
/// </summary>
[InternalApi]
public sealed class IgnoreSink<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task>
public sealed class IgnoreSink<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task<Done>>
{
#region Internal classes

private sealed class Logic : InGraphStageLogic
{
private readonly IgnoreSink<T> _stage;
private readonly TaskCompletionSource<int> _completion;
private readonly TaskCompletionSource<Done> _completion;

public Logic(IgnoreSink<T> stage, TaskCompletionSource<int> completion) : base(stage.Shape)
public Logic(IgnoreSink<T> stage, TaskCompletionSource<Done> completion) : base(stage.Shape)
{
_stage = stage;
_completion = completion;
Expand All @@ -1041,7 +1041,7 @@ public Logic(IgnoreSink<T> stage, TaskCompletionSource<int> completion) : base(s
public override void OnUpstreamFinish()
{
base.OnUpstreamFinish();
_completion.TrySetResult(0);
_completion.TrySetResult(Done.Instance);
}

public override void OnUpstreamFailure(Exception e)
Expand All @@ -1053,22 +1053,19 @@ public override void OnUpstreamFailure(Exception e)

#endregion

public IgnoreSink()
{
Shape = new SinkShape<T>(Inlet);
}
public IgnoreSink() => Shape = new SinkShape<T>(Inlet);

protected override Attributes InitialAttributes { get; } = DefaultAttributes.IgnoreSink;

public Inlet<T> Inlet { get; } = new Inlet<T>("Ignore.in");

public override SinkShape<T> Shape { get; }

public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
public override ILogicAndMaterializedValue<Task<Done>> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
{
var completion = new TaskCompletionSource<int>();
var completion = new TaskCompletionSource<Done>();
var logic = new Logic(this, completion);
return new LogicAndMaterializedValue<Task>(logic, completion.Task);
return new LogicAndMaterializedValue<Task<Done>>(logic, completion.Task);
}

public override string ToString() => "IgnoreSink";
Expand Down

0 comments on commit c46969a

Please sign in to comment.