Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventstream channels #2101

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis

return sub;
}

/// <summary>
/// Subscribe to messages and yields the result onto a Channel
/// </summary>
/// <param name="channel">a Channel which receives the event</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(Channel<TMsg> channel, IDispatcher? dispatcher = null) where TMsg:T
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
async x =>
{
if (x is TMsg tc)
{
await channel.Writer.WriteAsync(tc).ConfigureAwait(false);
}
}
);

_subscriptions.TryAdd(sub.Id, sub);

return sub;
}

/// <summary>
/// Subscribe to messages with an asynchronous handler
Expand Down
18 changes: 18 additions & 0 deletions tests/Proto.Actor.Tests/EventStreamTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using Proto.Mailbox;
using Xunit;
Expand Down Expand Up @@ -80,4 +81,21 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async()

eventStream.Publish("hello");
}

[Fact]
public async Task EventStream_CanSubscribeUsingChannel()
{
var system = new ActorSystem();
await using var _ = system;
var eventStream = system.EventStream;

var channel = Channel.CreateUnbounded<string>();
eventStream.Subscribe(channel);
eventStream.Publish(123);
eventStream.Publish(false);
eventStream.Publish("hello");

var res = await channel.Reader.ReadAsync();
Assert.Equal("hello",res);
}
}
Loading