diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index d4b8553445..d8158c773e 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -119,6 +119,31 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis return sub; } + + /// + /// Subscribe to messages and yields the result onto a Channel + /// + /// a Channel which receives the event + /// Optional: the dispatcher, will use by default + /// A new subscription that can be used to unsubscribe + public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dispatcher = null) where TMsg:T + { + var sub = new EventStreamSubscription( + this, + dispatcher ?? Dispatchers.SynchronousDispatcher, + async x => + { + if (x is TMsg tc) + { + await channel.Writer.WriteAsync(tc).ConfigureAwait(false); + } + } + ); + + _subscriptions.TryAdd(sub.Id, sub); + + return sub; + } /// /// Subscribe to messages with an asynchronous handler diff --git a/tests/Proto.Actor.Tests/EventStreamTests.cs b/tests/Proto.Actor.Tests/EventStreamTests.cs index 6daa2913fe..adcd1622a1 100644 --- a/tests/Proto.Actor.Tests/EventStreamTests.cs +++ b/tests/Proto.Actor.Tests/EventStreamTests.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading.Channels; using System.Threading.Tasks; using Proto.Mailbox; using Xunit; @@ -80,4 +81,21 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async() eventStream.Publish("hello"); } + + [Fact] + public async Task EventStream_CanSubscribeUsingChannel() + { + var system = new ActorSystem(); + await using var _ = system; + var eventStream = system.EventStream; + + var channel = Channel.CreateUnbounded(); + eventStream.Subscribe(channel); + eventStream.Publish(123); + eventStream.Publish(false); + eventStream.Publish("hello"); + + var res = await channel.Reader.ReadAsync(); + Assert.Equal("hello",res); + } } \ No newline at end of file