Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0d7a04e
Add reproduction unit test
Arkatufus Mar 24, 2025
428195e
Port https://github.com/akka/akka/pull/23970
Arkatufus Mar 25, 2025
aaefe69
Update API Approval list
Arkatufus Mar 25, 2025
3772f15
Update API Approval list
Arkatufus Mar 25, 2025
59070d0
Add async callback unit tests
Arkatufus Mar 25, 2025
59ab4ad
skip non-implemented feature
Arkatufus Mar 26, 2025
7a9e8f6
Fix TaskCompletionSource allocation problem
Arkatufus Mar 26, 2025
647a481
Code cleanup
Arkatufus Mar 26, 2025
2a1ded1
Implement early callback from https://github.com/akka/akka/pull/23185
Arkatufus Mar 27, 2025
5712140
Make sure early callbacks also get cancelled on stop
Arkatufus Mar 27, 2025
6d70d58
Fix naming and copyright header
Arkatufus Mar 28, 2025
6ccadf1
Update API Approval list
Arkatufus Mar 28, 2025
adb41f7
Fix missing feedback callback
Arkatufus Mar 31, 2025
f779ce3
rerun tests
Arkatufus Mar 31, 2025
a6499c0
Implement locks
Arkatufus Mar 31, 2025
2a7ffe0
cleanup lock code
Arkatufus Mar 31, 2025
f680ea7
extend locking to `_callbackWaitingForInterpreter`
Arkatufus Mar 31, 2025
0a1d84f
Fix unit test
Arkatufus Mar 31, 2025
bf53eeb
rerun tests
Arkatufus Mar 31, 2025
1dd0fe8
Merge branch 'dev' into #7578-Fix-BroadcastHub-block-bug
Aaronontheweb Apr 3, 2025
aa91f10
Merge branch 'dev' into #7578-Fix-BroadcastHub-block-bug
Arkatufus Apr 4, 2025
2f08067
Merge branch 'dev' into #7578-Fix-BroadcastHub-block-bug
Aaronontheweb Apr 18, 2025
ffc1c8e
Merge branch 'dev' into #7578-Fix-BroadcastHub-block-bug
Aaronontheweb Apr 18, 2025
543df2f
Merge branch 'dev' into #7578-Fix-BroadcastHub-block-bug
Aaronontheweb Apr 25, 2025
71e9866
xml-doc and TBD cleanup
Aaronontheweb May 1, 2025
dfbd894
more TBD and XML-DOC cleanup
Aaronontheweb May 1, 2025
67a7200
defined `ConcurrentAsyncCallback`
Aaronontheweb May 1, 2025
d926ea5
introduced `ConcurrentAsyncCallback` into `GraphStage`
Aaronontheweb May 1, 2025
795ef78
fixed all compilation errors
Aaronontheweb May 1, 2025
58ebbb1
Merge branch 'dev' into fix-7578
Aaronontheweb May 1, 2025
78b383c
cleanup hub code
Aaronontheweb May 1, 2025
249084f
added API approvals
Aaronontheweb May 1, 2025
be3b51a
Merge branch 'fix-7578' of https://github.com/Aaronontheweb/akka.net …
Aaronontheweb May 1, 2025
4f8dbd6
cleaned up reverse
Aaronontheweb May 1, 2025
02f860e
Merge branch 'dev' into fix-7578
Aaronontheweb May 2, 2025
047e483
Merge branch 'dev' into fix-7578
Arkatufus May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3967,7 +3967,8 @@ namespace Akka.Streams.Implementation.Fusing
public readonly object Event;
public readonly System.Action<object> Handler;
public readonly Akka.Streams.Stage.GraphStageLogic Logic;
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action<object> handler) { }
public readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> Promise;
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; }
}
public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic
Expand Down Expand Up @@ -4202,7 +4203,7 @@ namespace Akka.Streams.Implementation.Fusing
public readonly Akka.Streams.Stage.GraphStageLogic[] Logics;
public readonly Akka.Streams.IMaterializer Materializer;
public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null;
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> OnAsyncInput;
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> OnAsyncInput;
public const int OutClosed = 32;
public const int OutReady = 8;
public const int PullEndFlip = 10;
Expand All @@ -4213,7 +4214,7 @@ namespace Akka.Streams.Implementation.Fusing
public const int Pushing = 4;
public int RunningStagesCount;
public static readonly Akka.Streams.Attributes[] SingleNoAttribute;
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
public Akka.Actor.IActorRef Context { get; }
public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; }
public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; }
Expand All @@ -4228,7 +4229,7 @@ namespace Akka.Streams.Implementation.Fusing
public int Execute(int eventLimit) { }
public void Finish() { }
public void Init(Akka.Streams.IMaterializer subMaterializer) { }
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action<object> handler) { }
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { }
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { }
public override string ToString() { }
Expand Down Expand Up @@ -4878,12 +4879,13 @@ namespace Akka.Streams.Stage
}
public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging
{
public static System.Action DoNothing;
public static readonly System.Action DoNothing;
public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput;
public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput;
public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput;
public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput;
public readonly int InCount;
public static readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> NoPromise;
public readonly int OutCount;
public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput;
protected GraphStageLogic(int inCount, int outCount) { }
Expand Down Expand Up @@ -4922,6 +4924,7 @@ namespace Akka.Streams.Stage
protected Akka.Streams.Stage.IOutHandler GetHandler<T>(Akka.Streams.Outlet<T> outlet) { }
[Akka.Annotations.ApiMayChangeAttribute()]
protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { }
protected Akka.Streams.Stage.IAsyncCallback<T> GetTypedAsyncCallback<T>(System.Action<T> handler) { }
protected T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
[Akka.Annotations.InternalApiAttribute()]
Expand Down Expand Up @@ -5010,6 +5013,11 @@ namespace Akka.Streams.Stage
protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes);
public virtual Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.NotUsed> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public interface IAsyncCallback<in T>
{
void Invoke(T input);
System.Threading.Tasks.Task<Akka.Done> InvokeWithFeedback(T input);
}
public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext
{
Akka.Streams.Stage.AsyncCallback GetAsyncCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3949,7 +3949,8 @@ namespace Akka.Streams.Implementation.Fusing
public readonly object Event;
public readonly System.Action<object> Handler;
public readonly Akka.Streams.Stage.GraphStageLogic Logic;
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Action<object> handler) { }
public readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> Promise;
public AsyncInput(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, Akka.Streams.Stage.GraphStageLogic logic, object @event, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; }
}
public class BatchingActorInputBoundary : Akka.Streams.Implementation.Fusing.GraphInterpreter.UpstreamBoundaryStageLogic
Expand Down Expand Up @@ -4175,7 +4176,7 @@ namespace Akka.Streams.Implementation.Fusing
public readonly Akka.Streams.Stage.GraphStageLogic[] Logics;
public readonly Akka.Streams.IMaterializer Materializer;
public const Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection NoEvent = null;
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> OnAsyncInput;
public readonly System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> OnAsyncInput;
public const int OutClosed = 32;
public const int OutReady = 8;
public const int PullEndFlip = 10;
Expand All @@ -4186,7 +4187,7 @@ namespace Akka.Streams.Implementation.Fusing
public const int Pushing = 4;
public int RunningStagesCount;
public static readonly Akka.Streams.Attributes[] SingleNoAttribute;
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
public GraphInterpreter(Akka.Streams.Implementation.Fusing.GraphAssembly assembly, Akka.Streams.IMaterializer materializer, Akka.Event.ILoggingAdapter log, Akka.Streams.Stage.GraphStageLogic[] logics, Connection[] connections, System.Action<Akka.Streams.Stage.GraphStageLogic, object, System.Threading.Tasks.TaskCompletionSource<Akka.Done>, System.Action<object>> onAsyncInput, bool fuzzingMode, Akka.Actor.IActorRef context) { }
public Akka.Actor.IActorRef Context { get; }
public static Akka.Streams.Implementation.Fusing.GraphInterpreter Current { get; }
public static Akka.Streams.Implementation.Fusing.GraphInterpreter CurrentInterpreterOrNull { get; }
Expand All @@ -4201,7 +4202,7 @@ namespace Akka.Streams.Implementation.Fusing
public int Execute(int eventLimit) { }
public void Finish() { }
public void Init(Akka.Streams.IMaterializer subMaterializer) { }
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Action<object> handler) { }
public void RunAsyncInput(Akka.Streams.Stage.GraphStageLogic logic, object evt, System.Threading.Tasks.TaskCompletionSource<Akka.Done> promise, System.Action<object> handler) { }
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { }
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { }
public override string ToString() { }
Expand Down Expand Up @@ -4851,12 +4852,13 @@ namespace Akka.Streams.Stage
}
public abstract class GraphStageLogic : Akka.Streams.Stage.IStageLogging
{
public static System.Action DoNothing;
public static readonly System.Action DoNothing;
public static readonly Akka.Streams.Stage.InHandler EagerTerminateInput;
public static readonly Akka.Streams.Stage.OutHandler EagerTerminateOutput;
public static readonly Akka.Streams.Stage.InHandler IgnoreTerminateInput;
public static readonly Akka.Streams.Stage.OutHandler IgnoreTerminateOutput;
public readonly int InCount;
public static readonly System.Threading.Tasks.TaskCompletionSource<Akka.Done> NoPromise;
public readonly int OutCount;
public static readonly Akka.Streams.Stage.InHandler TotallyIgnorantInput;
protected GraphStageLogic(int inCount, int outCount) { }
Expand Down Expand Up @@ -4895,6 +4897,7 @@ namespace Akka.Streams.Stage
protected Akka.Streams.Stage.IOutHandler GetHandler<T>(Akka.Streams.Outlet<T> outlet) { }
[Akka.Annotations.ApiMayChangeAttribute()]
protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { }
protected Akka.Streams.Stage.IAsyncCallback<T> GetTypedAsyncCallback<T>(System.Action<T> handler) { }
protected T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
[Akka.Annotations.InternalApiAttribute()]
Expand Down Expand Up @@ -4983,6 +4986,11 @@ namespace Akka.Streams.Stage
protected abstract Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes);
public virtual Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.NotUsed> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public interface IAsyncCallback<in T>
{
void Invoke(T input);
System.Threading.Tasks.Task<Akka.Done> InvokeWithFeedback(T input);
}
public interface IAsyncContext : Akka.Streams.Stage.IContext, Akka.Streams.Stage.IDetachedContext, Akka.Streams.Stage.ILifecycleContext
{
Akka.Streams.Stage.AsyncCallback GetAsyncCallback();
Expand Down
32 changes: 32 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/HubSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,38 @@ await Awaiting(async () =>
}, Materializer);
}

[Fact]
public async Task BroadcastHub_must_handle_cancelled_Sink()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var upstream = this.CreatePublisherProbe<int>();
var hubSource = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink<int>(4), Materializer);
var downstream = this.CreateSubscriberProbe<int>();

hubSource.RunWith(Sink.Cancelled<int>(), Materializer);
hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.EnsureSubscriptionAsync();

await downstream.RequestAsync(10);
await upstream.ExpectRequestAsync();
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);
await upstream.SendNextAsync(2);
await downstream.ExpectNextAsync(2);
await upstream.SendNextAsync(3);
await downstream.ExpectNextAsync(3);
await upstream.SendNextAsync(4);
await downstream.ExpectNextAsync(4);
await upstream.SendNextAsync(5);
await downstream.ExpectNextAsync(5);

await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public async Task PartitionHub_must_work_in_the_happy_case_with_one_stream()
{
Expand Down
Loading