Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,9 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
Expand Down Expand Up @@ -2089,6 +2091,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut, TMat3> DivertToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,9 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
Expand Down Expand Up @@ -2087,6 +2089,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut, TMat3> DivertToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
Expand Down
23 changes: 23 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,29 @@ await this.AssertAllStagesStoppedAsync(async () =>
}, Materializer);
}

[Fact(DisplayName = "GroupBy must not have substream limit when maxSubStream is set to negative numbers")]
public async Task GroupBy_UnlimitedSubstreamTest()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var f = Flow.Create<int>().GroupBy(-1, x => x).PrefixAndTail(0).MergeSubstreams();
var (up, down) = ((Flow<int, (IImmutableList<int>, Source<int, NotUsed>), NotUsed>)f)
.RunWith(this.SourceProbe<int>(), this.SinkProbe<(IImmutableList<int>, Source<int, NotUsed>)>(), Materializer);

await down.RequestAsync(100);

foreach (var i in Enumerable.Range(0, 100))
{
await up.SendNextAsync(i);
var (_, source) = await down.ExpectNextAsync();
var (sub, probe) = await StreamPuppet(source.RunWith(Sink.AsPublisher<int>(false), Materializer), this);

sub.Request(1);
await probe.ExpectNextAsync(i);
}
}, Materializer);
}

[Fact]
public async Task GroupBy_must_resume_when_exceeding_maxSubStreams()
{
Expand Down
88 changes: 87 additions & 1 deletion src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1346,13 +1346,70 @@ public static Flow<TIn, TOut2, TMat> Transform<TIn, TOut1, TOut2, TMat>(this Flo
/// <typeparam name="TMat">TBD</typeparam>
/// <typeparam name="TKey">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails</param>
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams.</param>
/// <param name="groupingFunc">Computes the key for each element</param>
/// <param name="allowClosedSubstreamRecreation">Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion</param>
/// <returns>TBD</returns>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) =>
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), allowClosedSubstreamRecreation);

/// <summary>
/// This operation demultiplexes the incoming stream into separate output
/// streams, one for each element key. The key is computed for each element
/// using the given function. When a new key is encountered for the first time
/// a new substream is opened and subsequently fed with all elements belonging to
/// that key.
/// <para>
/// WARNING: If <paramref name="allowClosedSubstreamRecreation"/> is set to false (default behavior) the operator
/// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this
/// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.
/// </para>
/// <para>
/// Note: If <paramref name="allowClosedSubstreamRecreation"/> is set to true substream completion and incoming
/// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing
/// these elements might get lost.
/// </para>
/// <para>
/// The object returned from this method is not a normal <see cref="Flow"/>, it is a
/// <see cref="SubFlow{TOut, TMat, TClosed}"/>. This means that after this operator
/// all transformations are applied to all encountered substreams in the same fashion.
/// Substream mode is exited either by closing the substream (i.e. connecting it to a <see cref="Sink"/>)
/// or by merging the substreams back together; see the <c>To</c> and <c>MergeBack</c> methods
/// on <see cref="SubFlow{TOut, TMat, TClosed}"/> for more information.
/// </para>
/// <para>
/// It is important to note that the substreams also propagate back-pressure as any other stream, which means
/// that blocking one substream will block the <c>GroupBy</c> operator itself —and thereby all substreams— once all
/// internal or explicit buffers are filled.
/// </para>
/// <para>
/// If the group by function <paramref name="groupingFunc"/> throws an exception and the supervision decision
/// is <see cref="Supervision.Directive.Stop"/> the stream and substreams will be completed with failure.
/// </para>
/// <para>
/// If the group by <paramref name="groupingFunc"/> throws an exception and the supervision decision
/// is <see cref="Supervision.Directive.Resume"/> or <see cref="Supervision.Directive.Restart"/>
/// the element is dropped and the stream and substreams continue.
/// </para>
/// <para>
/// Function <paramref name="groupingFunc"/> MUST NOT return <c>null</c>. This will throw exception and trigger supervision decision mechanism.
/// </para>
/// <para>**Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group.</para>
/// <para>**Backpressures when** there is an element pending for a group whose substream backpressures</para>
/// <para>**Completes when** upstream completes</para>
/// <para>**Cancels when** downstream cancels and all substreams cancel</para>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <typeparam name="TKey">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="groupingFunc">Computes the key for each element</param>
/// <param name="allowClosedSubstreamRecreation">Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion</param>
/// <returns>TBD</returns>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) =>
flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), allowClosedSubstreamRecreation);

/// <summary>
/// This operation demultiplexes the incoming stream into separate output
/// streams, one for each element key. The key is computed for each element
Expand All @@ -1366,9 +1423,38 @@ public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey
/// </para>
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <typeparam name="TKey">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams.</param>
/// <param name="groupingFunc">Computes the key for each element</param>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc) =>
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), false);

/// <summary>
/// This operation demultiplexes the incoming stream into separate output
/// streams, one for each element key. The key is computed for each element
/// using the given function. When a new key is encountered for the first time
/// a new substream is opened and subsequently fed with all elements belonging to
/// that key.
/// <para>
/// WARNING: The stage keeps track of all keys of streams that have already been closed.
/// If you expect an infinite number of keys this can cause memory issues. Elements belonging
/// to those keys are drained directly and not send to the substream.
/// </para>
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Arkatufus just a general note going forward, just don't have a comment instead of a TBD.

/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <typeparam name="TKey">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="groupingFunc">Computes the key for each element</param>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, Func<TOut, TKey> groupingFunc) =>
flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), false);

/// <summary>
/// This operation applies the given predicate to all incoming elements and
/// emits them to a stream of output streams, always beginning a new one with
Expand Down
Loading