Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Sink.Ignore signature from Task to Task<Done> #5973

Merged
merged 2 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4221,13 +4221,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1935,11 +1935,11 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> FanoutPublisher<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> First<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> Ignore<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Sink<TIn, TMat>>> sinkFactory) { }
Expand Down Expand Up @@ -4209,13 +4209,13 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task>
public sealed class IgnoreSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, System.Threading.Tasks.Task<Akka.Done>>
{
public IgnoreSink() { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Inlet<T> Inlet { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<Akka.Done>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Docs.Tests/Streams/HubsDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void Hubs_must_demonstrate_creating_a_dynamic_merge()

#region merge-hub
// A simple consumer that will print to the console for now
Sink<string, Task> consumer = Sink.ForEach<string>(WriteLine);
Sink<string, Task<Done>> consumer = Sink.ForEach<string>(WriteLine);

// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Docs.Tests/Streams/StreamRefsDocTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public DataReceiver()
});
}

private Sink<string, Task> LogsSinksFor(string id) =>
private Sink<string, Task<Done>> LogsSinksFor(string id) =>
Sink.ForEach<string>(Console.WriteLine);
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/GraphMatValueSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void A_Graph_with_materialized_value_must_perform_side_effecting_transfor
});
var r = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<int>(), (b, sink) =>
{
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(0));
var source = Source.From(Enumerable.Range(1, 10)).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
b.Add(g);
b.From(source).To(sink);
return ClosedShape.Instance;
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void Zip_must_complete_if_one_side_is_available_but_other_already_complet
var completed = RunnableGraph.FromGraph(GraphDsl.Create(Sink.Ignore<(int, string)>(), (b, sink) =>
{
var zip = b.Add(new Zip<int, string>());
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue<Task>(_ => null);
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue<Task>(_ => null);
var source1 = Source.FromPublisher(upstream1).MapMaterializedValue(_ => Task.FromResult(Done.Instance));
var source2 = Source.FromPublisher(upstream2).MapMaterializedValue(_ => Task.FromResult(Done.Instance));

b.From(source1).To(zip.In0);
b.From(source2).To(zip.In1);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ private async Task ValidateServerClientCommunicationAsync(ByteString testData, S
(await serverConnection.WaitReadAsync()).Should().BeEquivalentTo(testData);
}

private Sink<Tcp.IncomingConnection, Task> EchoHandler() =>
private Sink<Tcp.IncomingConnection, Task<Done>> EchoHandler() =>
Sink.ForEach<Tcp.IncomingConnection>(c => c.Flow.Join(Flow.Create<ByteString>()).Run(Materializer));

[Fact]
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,17 +1325,17 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc,
bool allowClosedSubstreamRecreation)
{
var merge = new GroupByMergeBack<T, TMat, TKey>(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(
flow.Via(new Fusing.GroupBy<T, TKey>(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand All @@ -1349,7 +1349,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
/// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly
/// and not send to the substream.
/// </para>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task}, TClosed}, bool)"/>
/// See also <seealso cref="GroupBy{T, TMat, TKey, TClosed}(IFlow{T, TMat}, int, Func{T, TKey}, Func{IFlow{Source{T, NotUsed}, TMat}, Sink{Source{T, NotUsed}, Task{Done}}, TClosed}, bool)"/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TMat"></typeparam>
Expand All @@ -1364,7 +1364,7 @@ public static SubFlow<T, TMat, TClosed> GroupBy<T, TMat, TKey, TClosed>(
this IFlow<T, TMat> flow,
int maxSubstreams,
Func<T, TKey> groupingFunc,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false);

/// <summary>
/// TBD
Expand Down Expand Up @@ -1472,15 +1472,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitWhen<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitWhenMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.When(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down Expand Up @@ -1577,15 +1577,15 @@ public IFlow<T, TMat> Apply<T>(Flow<TOut, T, TMat> flow, int breadth)
/// <returns>TBD</returns>
public static SubFlow<T, TMat, TClosed> SplitAfter<T, TMat, TClosed>(this IFlow<T, TMat> flow,
SubstreamCancelStrategy substreamCancelStrategy, Func<T, bool> predicate,
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task>, TClosed> toFunc)
Func<IFlow<Source<T, NotUsed>, TMat>, Sink<Source<T, NotUsed>, Task<Done>>, TClosed> toFunc)
{
var merge = new SplitAfterMergeBack<T, TMat>(flow, predicate, substreamCancelStrategy);

Func<Sink<T, TMat>, TClosed> finish = s =>
TClosed finish(Sink<T, TMat> s)
{
return toFunc(flow.Via(Fusing.Split.After(predicate, substreamCancelStrategy)),
Sink.ForEach<Source<T, NotUsed>>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer)));
};
}

return new SubFlowImpl<T, T, TMat, TClosed>(Flow.Create<T, TMat>(), merge, finish);
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <returns>TBD</returns>
public static Sink<TIn, Task> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());
public static Sink<TIn, Task<Done>> Ignore<TIn>() => FromGraph(new IgnoreSink<TIn>());

/// <summary>
/// A <see cref="Sink{TIn,TMat}"/> that will invoke the given <paramref name="action"/> for each received element.
Expand All @@ -306,7 +306,7 @@ public static Sink<TIn, IPublisher<TIn>> DistinctRetainingFanOutPublisher<TIn>(A
/// <typeparam name="TIn">TBD</typeparam>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEach<TIn>(Action<TIn> action) => Flow.Create<TIn>()
.Select(input =>
{
action(input);
Expand Down Expand Up @@ -357,7 +357,7 @@ public static Sink<TIn, NotUsed> Combine<TIn, TOut, TMat>(Func<int, IGraph<Unifo
/// <param name="parallelism">TBD</param>
/// <param name="action">TBD</param>
/// <returns>TBD</returns>
public static Sink<TIn, Task> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
public static Sink<TIn, Task<Done>> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
.SelectAsyncUnordered(parallelism, input => Task.Run(() =>
{
action(input);
Expand Down
Loading