From 11167edfda6130e6781aa0fcc27de62b3d999565 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Mon, 5 Sep 2022 14:33:32 +0200 Subject: [PATCH 01/12] actor system, context and pid --- src/Proto.Actor/ActorSystem.cs | 19 ++++++++++++------- src/Proto.Actor/Context/ActorContext.cs | 18 ------------------ .../Context/ActorContextDecorator.cs | 3 --- src/Proto.Actor/Context/ActorContextExtras.cs | 1 - src/Proto.Actor/Context/IContext.cs | 5 ----- src/Proto.Actor/PID.cs | 2 +- src/Proto.Actor/Stopper.cs | 2 +- tests/Proto.Actor.Tests/ActorTests.cs | 13 ++++++++++--- tests/Proto.Cluster.Tests/Extensions.cs | 2 +- 9 files changed, 25 insertions(+), 40 deletions(-) diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index d52e3f3112..9bf2ff8251 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -78,7 +78,12 @@ public ActorSystem(ActorSystemConfig config) /// Allows to access the stop cancellation token and stop reason. /// Use to stop the actor system. /// - public Stopper Stopper { get; } + internal Stopper Stopper { get; } + + /// + /// For stopped , returns the reason for the shutdown. + /// + public string StoppedReason => Stopper.StoppedReason; /// /// Manages all the guardians in the actor system. @@ -117,13 +122,13 @@ public ActorSystem(ActorSystemConfig config) private void RunThreadPoolStats() { - var metricTags = new KeyValuePair[]{ new("id", Id), new("address", Address)}; + var metricTags = new KeyValuePair[] {new("id", Id), new("address", Address)}; var logger = Log.CreateLogger(nameof(ThreadPoolStats)); _ = ThreadPoolStats.Run(TimeSpan.FromSeconds(5), t => { //collect the latency metrics - if(Metrics.Enabled) + if (Metrics.Enabled) ActorMetrics.ThreadPoolLatency.Record(t.TotalSeconds, metricTags); //does it take longer than 1 sec for a task to start executing? @@ -144,7 +149,7 @@ private void RunThreadPoolStats() /// /// Shutdown reason /// - public Task ShutdownAsync(string reason="") + public Task ShutdownAsync(string reason = "") { try { @@ -195,15 +200,15 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func /// /// - public Props ConfigureProps(Props props) => Config.ConfigureProps(props); - + internal Props ConfigureProps(Props props) => Config.ConfigureProps(props); + /// /// Applies props configuration delegate for system actors. /// /// /// /// - public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); + internal Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); /// /// Stops the actor system with reason = "Disposed" diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs index 0231965697..f1c74dbe53 100644 --- a/src/Proto.Actor/Context/ActorContext.cs +++ b/src/Proto.Actor/Context/ActorContext.cs @@ -71,11 +71,6 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai public TimeSpan ReceiveTimeout { get; private set; } - public void Stash() - { - if (_messageOrEnvelope is not null) EnsureExtras().Stash.Push(_messageOrEnvelope); - } - public void Respond(object message) { if (Sender is not null) @@ -714,19 +709,6 @@ private async ValueTask RestartAsync() Self.SendSystemMessage(System, ResumeMailbox.Instance); await InvokeUserMessageAsync(Started.Instance); - - if (_extras?.Stash is not null) - { - var currentStash = new Stack(_extras.Stash); - _extras.Stash.Clear(); - - //TODO: what happens if we hit a failure here? - while (currentStash.Any()) - { - var msg = currentStash.Pop(); - await InvokeUserMessageAsync(msg); - } - } } private ValueTask DisposeActorIfDisposable() diff --git a/src/Proto.Actor/Context/ActorContextDecorator.cs b/src/Proto.Actor/Context/ActorContextDecorator.cs index 7ce0af1141..f33d1d7481 100644 --- a/src/Proto.Actor/Context/ActorContextDecorator.cs +++ b/src/Proto.Actor/Context/ActorContextDecorator.cs @@ -48,9 +48,6 @@ public virtual Task Receive(MessageEnvelope envelope) => public virtual void Respond(object message) => _context.Respond(message); - public virtual void Stash() => - _context.Stash(); - public virtual PID SpawnNamed(Props props, string name, Action? callback=null) => _context.SpawnNamed(props, name, callback); diff --git a/src/Proto.Actor/Context/ActorContextExtras.cs b/src/Proto.Actor/Context/ActorContextExtras.cs index ba24111916..3869d313a0 100644 --- a/src/Proto.Actor/Context/ActorContextExtras.cs +++ b/src/Proto.Actor/Context/ActorContextExtras.cs @@ -25,7 +25,6 @@ public sealed class ActorContextExtras: IDisposable public ImmutableHashSet Children { get; private set; } = ImmutableHashSet.Empty; public Timer? ReceiveTimeoutTimer { get; private set; } public RestartStatistics RestartStatistics { get; } = new(0, null); - public Stack Stash { get; } = new(); public ImmutableHashSet Watchers { get; private set; } = ImmutableHashSet.Empty; public IContext Context { get; } public CancellationTokenSource CancellationTokenSource { get; } = new(); diff --git a/src/Proto.Actor/Context/IContext.cs b/src/Proto.Actor/Context/IContext.cs index 7f69714ef6..fce16ba6ad 100644 --- a/src/Proto.Actor/Context/IContext.cs +++ b/src/Proto.Actor/Context/IContext.cs @@ -44,11 +44,6 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I /// void Respond(object message, MessageHeader header) => Respond(new MessageEnvelope(message, null, header)); - /// - /// Stashes the current message on a stack for re-processing when the actor restarts. - /// - void Stash(); - /// /// Registers the actor as a watcher for the specified PID. When the PID terminates the watcher is notified with message. /// diff --git a/src/Proto.Actor/PID.cs b/src/Proto.Actor/PID.cs index b259617598..af94bc152d 100644 --- a/src/Proto.Actor/PID.cs +++ b/src/Proto.Actor/PID.cs @@ -61,7 +61,7 @@ internal void SendUserMessage(ActorSystem system, object message) reff.SendUserMessage(this, message); } - public void SendSystemMessage(ActorSystem system, SystemMessage sys) + internal void SendSystemMessage(ActorSystem system, SystemMessage sys) { var reff = Ref(system) ?? system.ProcessRegistry.Get(this); reff.SendSystemMessage(this, sys); diff --git a/src/Proto.Actor/Stopper.cs b/src/Proto.Actor/Stopper.cs index 5e3035ed31..f49bd8b3f8 100644 --- a/src/Proto.Actor/Stopper.cs +++ b/src/Proto.Actor/Stopper.cs @@ -7,7 +7,7 @@ namespace Proto; -public class Stopper +class Stopper { private readonly CancellationTokenSource _cts = new(); diff --git a/tests/Proto.Actor.Tests/ActorTests.cs b/tests/Proto.Actor.Tests/ActorTests.cs index edf4e65012..cc5521487a 100644 --- a/tests/Proto.Actor.Tests/ActorTests.cs +++ b/tests/Proto.Actor.Tests/ActorTests.cs @@ -138,19 +138,26 @@ public async Task ActorLifeCycleWhenExceptionIsThrown() var messages = new Queue(); var i = 0; + CapturedContext? capturedContext = null; + async Task HandleMessage(IContext ctx) { if (ctx.Message is string && i++ == 0) { - ctx.Stash(); + capturedContext = ctx.Capture(); throw new Exception("Test"); } - + messages.Enqueue(ctx.Message!); + + if (ctx.Message is Started && capturedContext != null) + { + await capturedContext.Receive(); + } + await Task.Yield(); } - ; var pid = context.Spawn( Props.FromFunc(ctx => ctx.Message switch { diff --git a/tests/Proto.Cluster.Tests/Extensions.cs b/tests/Proto.Cluster.Tests/Extensions.cs index bb6a87a6ea..a8b7ff999e 100644 --- a/tests/Proto.Cluster.Tests/Extensions.cs +++ b/tests/Proto.Cluster.Tests/Extensions.cs @@ -29,7 +29,7 @@ public static async Task DumpClusterState(this IEnumerable memb if (c.System.Shutdown.IsCancellationRequested) { - sb.AppendLine("\tStopped, reason: " + c.System.Stopper.StoppedReason); + sb.AppendLine("\tStopped, reason: " + c.System.StoppedReason); continue; } From a27e0f389b67432a3683665099962a132a009ec2 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Tue, 6 Sep 2022 08:34:30 +0200 Subject: [PATCH 02/12] converted channel publisher/subscriber to an example --- ProtoActor.sln | 18 ++++++++ .../Channels/ChannelPublisherActor.cs | 12 +++--- .../Channels/ChannelSubscriberActor.cs | 30 ++++++++++---- examples/Channels/Channels.csproj | 14 +++++++ examples/Channels/Program.cs | 41 +++++++++++++++++++ 5 files changed, 100 insertions(+), 15 deletions(-) rename {src/Proto.Actor => examples}/Channels/ChannelPublisherActor.cs (94%) rename {src/Proto.Actor => examples}/Channels/ChannelSubscriberActor.cs (77%) create mode 100644 examples/Channels/Channels.csproj create mode 100644 examples/Channels/Program.cs diff --git a/ProtoActor.sln b/ProtoActor.sln index 1b596dd6a4..8073fe54fc 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -259,6 +259,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SkyriseMiniClient", "benchm EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SkyriseMiniServer", "benchmarks\SkyriseMini\Server\SkyriseMiniServer.csproj", "{7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Channels", "examples\Channels\Channels.csproj", "{038B6BB0-3B3A-4073-B78F-2999D7F15978}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Channels", "Channels", "{E194E380-E6DE-4137-83D9-EADB54436B56}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1289,6 +1293,18 @@ Global {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x64.Build.0 = Release|Any CPU {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x86.ActiveCfg = Release|Any CPU {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x86.Build.0 = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|Any CPU.Build.0 = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x64.ActiveCfg = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x64.Build.0 = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x86.ActiveCfg = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x86.Build.0 = Debug|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|Any CPU.ActiveCfg = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|Any CPU.Build.0 = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x64.ActiveCfg = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x64.Build.0 = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x86.ActiveCfg = Release|Any CPU + {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1408,6 +1424,8 @@ Global {DF09798D-20CC-418C-901B-DEFEEFF02E1B} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F} {8B0466D8-C57E-4D29-98B3-CB2D087765DC} = {DF09798D-20CC-418C-901B-DEFEEFF02E1B} {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97} = {DF09798D-20CC-418C-901B-DEFEEFF02E1B} + {E194E380-E6DE-4137-83D9-EADB54436B56} = {59DCCC96-DDAF-469F-9E8E-9BC733285082} + {038B6BB0-3B3A-4073-B78F-2999D7F15978} = {E194E380-E6DE-4137-83D9-EADB54436B56} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C} diff --git a/src/Proto.Actor/Channels/ChannelPublisherActor.cs b/examples/Channels/ChannelPublisherActor.cs similarity index 94% rename from src/Proto.Actor/Channels/ChannelPublisherActor.cs rename to examples/Channels/ChannelPublisherActor.cs index b6f905c9ad..6ed9eedd2c 100644 --- a/src/Proto.Actor/Channels/ChannelPublisherActor.cs +++ b/examples/Channels/ChannelPublisherActor.cs @@ -3,14 +3,11 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- -using System.Collections.Generic; using System.Threading.Channels; -using System.Threading.Tasks; -using JetBrains.Annotations; +using Proto; -namespace Proto.Channels; +namespace Channels; -[PublicAPI] public static class ChannelPublisher { /// @@ -40,7 +37,6 @@ public static PID StartNew(IRootContext context, Channel channel, string n } } -[PublicAPI] public class ChannelPublisherActor : IActor { private readonly HashSet _subscribers = new(); @@ -56,13 +52,17 @@ public Task ReceiveAsync(IContext context) } break; + case PID subscriber: _subscribers.Add(subscriber); context.Watch(subscriber); + context.Respond(new Subscribed()); break; + case Terminated terminated: _subscribers.Remove(terminated.Who); break; + case T typed: foreach (var sub in _subscribers) { diff --git a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs b/examples/Channels/ChannelSubscriberActor.cs similarity index 77% rename from src/Proto.Actor/Channels/ChannelSubscriberActor.cs rename to examples/Channels/ChannelSubscriberActor.cs index ebb75a517f..73fd41bc36 100644 --- a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs +++ b/examples/Channels/ChannelSubscriberActor.cs @@ -4,12 +4,10 @@ // // ----------------------------------------------------------------------- using System.Threading.Channels; -using System.Threading.Tasks; -using JetBrains.Annotations; +using Proto; -namespace Proto.Channels; +namespace Channels; -[PublicAPI] public static class ChannelSubscriber { /// @@ -21,24 +19,29 @@ public static class ChannelSubscriber /// The channel to write messages to /// The Type of channel elements /// - public static PID StartNew(IRootContext context, PID publisher, Channel channel) + public static async Task StartNew(IRootContext context, PID publisher, Channel channel) { - var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel)); + var tcs = new TaskCompletionSource(); + var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel, tcs)); var pid = context.Spawn(props); + + await tcs.Task; return pid; } } -[PublicAPI] public class ChannelSubscriberActor : IActor { private readonly Channel _channel; + private readonly TaskCompletionSource _subscribed; private readonly PID _publisher; - public ChannelSubscriberActor(PID publisher, Channel channel) + public ChannelSubscriberActor(PID publisher, Channel channel, TaskCompletionSource subscribed) + { _publisher = publisher; _channel = channel; + _subscribed = subscribed; } public async Task ReceiveAsync(IContext context) @@ -49,15 +52,24 @@ public async Task ReceiveAsync(IContext context) context.Watch(_publisher); context.Request(_publisher, context.Self); break; + + case Subscribed: + _subscribed.SetResult(); + break; + case Stopping: _channel.Writer.Complete(); break; + case Terminated t when t.Who.Equals(_publisher): _channel.Writer.Complete(); break; + case T typed: await _channel.Writer.WriteAsync(typed); break; } } -} \ No newline at end of file +} + +public record Subscribed; \ No newline at end of file diff --git a/examples/Channels/Channels.csproj b/examples/Channels/Channels.csproj new file mode 100644 index 0000000000..117d647b82 --- /dev/null +++ b/examples/Channels/Channels.csproj @@ -0,0 +1,14 @@ + + + + net6.0 + enable + enable + Exe + + + + + + + diff --git a/examples/Channels/Program.cs b/examples/Channels/Program.cs new file mode 100644 index 0000000000..2c2aa94939 --- /dev/null +++ b/examples/Channels/Program.cs @@ -0,0 +1,41 @@ +using System.Threading.Channels; +using Channels; +using Proto; + +// This example shows how actors can be used to implement a simple pub-sub system. +// Messages sent to outChannel will be broadcast and received in inChannel1 and inChannel2 + +var outChannel = Channel.CreateUnbounded(); +var inChannel1 = Channel.CreateUnbounded(); +var inChannel2 = Channel.CreateUnbounded(); + +var actorSystem = new ActorSystem(ActorSystemConfig.Setup()); + +var publisher = ChannelPublisher.StartNew(actorSystem.Root, outChannel, "publisher"); +await ChannelSubscriber.StartNew(actorSystem.Root, publisher, inChannel1); +await ChannelSubscriber.StartNew(actorSystem.Root, publisher, inChannel2); + +var messages = new[] { "Hello", "World", "!" }; + +foreach (var message in messages) +{ + await outChannel.Writer.WriteAsync(message); +} +outChannel.Writer.Complete(); + +var receivedMessages1 = new List(); +await foreach (var msg in inChannel1.Reader.ReadAllAsync()) +{ + receivedMessages1.Add(msg); +} + +var receivedMessages2 = new List(); +await foreach (var msg in inChannel2.Reader.ReadAllAsync()) +{ + receivedMessages2.Add(msg); +} + +Console.WriteLine("Received 1: " + string.Join(",", receivedMessages1)); +Console.WriteLine("Received 2: " + string.Join(",", receivedMessages2)); + +await actorSystem.ShutdownAsync(); \ No newline at end of file From ab30d64573d81d09192d29b5ce06468c2e23e7d6 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Tue, 6 Sep 2022 10:03:55 +0200 Subject: [PATCH 03/12] re-used remote channels example instead of the local one, further public api changes --- ProtoActor.sln | 20 +-------- examples/Channels/Channels.csproj | 14 ------- examples/Channels/Program.cs | 41 ------------------- examples/remotechannels/Client/Client.csproj | 2 +- examples/remotechannels/Client/Program.cs | 3 +- .../Common}/ChannelPublisherActor.cs | 4 +- .../Common}/ChannelSubscriberActor.cs | 3 +- .../Messages.csproj => Common/Common.csproj} | 4 ++ examples/remotechannels/Common/Messages.cs | 3 ++ examples/remotechannels/Messages/Messages.cs | 6 --- examples/remotechannels/Server/Program.cs | 3 +- examples/remotechannels/Server/Server.csproj | 2 +- src/Proto.Actor/Context/ActorContextExtras.cs | 3 +- src/Proto.Actor/Messages/MessageBatch.cs | 10 ----- src/Proto.Actor/Props/Props.cs | 6 +-- src/Proto.Actor/Stashing/CapturedContext.cs | 7 +++- src/Proto.Actor/Utils/TypedDictionary.cs | 2 +- src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs | 10 ++--- src/Proto.Remote/InternalsVisibleTo.cs | 3 +- 19 files changed, 35 insertions(+), 111 deletions(-) delete mode 100644 examples/Channels/Channels.csproj delete mode 100644 examples/Channels/Program.cs rename examples/{Channels => remotechannels/Common}/ChannelPublisherActor.cs (96%) rename examples/{Channels => remotechannels/Common}/ChannelSubscriberActor.cs (98%) rename examples/remotechannels/{Messages/Messages.csproj => Common/Common.csproj} (60%) create mode 100644 examples/remotechannels/Common/Messages.cs delete mode 100644 examples/remotechannels/Messages/Messages.cs delete mode 100644 src/Proto.Actor/Messages/MessageBatch.cs diff --git a/ProtoActor.sln b/ProtoActor.sln index 8073fe54fc..8cae27ed26 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -203,7 +203,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "examples\remotech EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "examples\remotechannels\Client\Client.csproj", "{5EACFB26-C757-452C-A536-E633BC352A69}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messages", "examples\remotechannels\Messages\Messages.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "examples\remotechannels\Common\Common.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AmazonECS", "src\Proto.Cluster.AmazonECS\Proto.Cluster.AmazonECS.csproj", "{F5EB4AD3-5BCB-48E9-B974-A93EB19CE43C}" EndProject @@ -259,10 +259,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SkyriseMiniClient", "benchm EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SkyriseMiniServer", "benchmarks\SkyriseMini\Server\SkyriseMiniServer.csproj", "{7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Channels", "examples\Channels\Channels.csproj", "{038B6BB0-3B3A-4073-B78F-2999D7F15978}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Channels", "Channels", "{E194E380-E6DE-4137-83D9-EADB54436B56}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1293,18 +1289,6 @@ Global {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x64.Build.0 = Release|Any CPU {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x86.ActiveCfg = Release|Any CPU {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97}.Release|x86.Build.0 = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|Any CPU.Build.0 = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x64.ActiveCfg = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x64.Build.0 = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x86.ActiveCfg = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Debug|x86.Build.0 = Debug|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|Any CPU.ActiveCfg = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|Any CPU.Build.0 = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x64.ActiveCfg = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x64.Build.0 = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x86.ActiveCfg = Release|Any CPU - {038B6BB0-3B3A-4073-B78F-2999D7F15978}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1424,8 +1408,6 @@ Global {DF09798D-20CC-418C-901B-DEFEEFF02E1B} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F} {8B0466D8-C57E-4D29-98B3-CB2D087765DC} = {DF09798D-20CC-418C-901B-DEFEEFF02E1B} {7C08B7D5-90BD-45E9-BADF-1B6CEB1A3E97} = {DF09798D-20CC-418C-901B-DEFEEFF02E1B} - {E194E380-E6DE-4137-83D9-EADB54436B56} = {59DCCC96-DDAF-469F-9E8E-9BC733285082} - {038B6BB0-3B3A-4073-B78F-2999D7F15978} = {E194E380-E6DE-4137-83D9-EADB54436B56} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C} diff --git a/examples/Channels/Channels.csproj b/examples/Channels/Channels.csproj deleted file mode 100644 index 117d647b82..0000000000 --- a/examples/Channels/Channels.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - net6.0 - enable - enable - Exe - - - - - - - diff --git a/examples/Channels/Program.cs b/examples/Channels/Program.cs deleted file mode 100644 index 2c2aa94939..0000000000 --- a/examples/Channels/Program.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Threading.Channels; -using Channels; -using Proto; - -// This example shows how actors can be used to implement a simple pub-sub system. -// Messages sent to outChannel will be broadcast and received in inChannel1 and inChannel2 - -var outChannel = Channel.CreateUnbounded(); -var inChannel1 = Channel.CreateUnbounded(); -var inChannel2 = Channel.CreateUnbounded(); - -var actorSystem = new ActorSystem(ActorSystemConfig.Setup()); - -var publisher = ChannelPublisher.StartNew(actorSystem.Root, outChannel, "publisher"); -await ChannelSubscriber.StartNew(actorSystem.Root, publisher, inChannel1); -await ChannelSubscriber.StartNew(actorSystem.Root, publisher, inChannel2); - -var messages = new[] { "Hello", "World", "!" }; - -foreach (var message in messages) -{ - await outChannel.Writer.WriteAsync(message); -} -outChannel.Writer.Complete(); - -var receivedMessages1 = new List(); -await foreach (var msg in inChannel1.Reader.ReadAllAsync()) -{ - receivedMessages1.Add(msg); -} - -var receivedMessages2 = new List(); -await foreach (var msg in inChannel2.Reader.ReadAllAsync()) -{ - receivedMessages2.Add(msg); -} - -Console.WriteLine("Received 1: " + string.Join(",", receivedMessages1)); -Console.WriteLine("Received 2: " + string.Join(",", receivedMessages2)); - -await actorSystem.ShutdownAsync(); \ No newline at end of file diff --git a/examples/remotechannels/Client/Client.csproj b/examples/remotechannels/Client/Client.csproj index 443e995ac7..82a0006e30 100644 --- a/examples/remotechannels/Client/Client.csproj +++ b/examples/remotechannels/Client/Client.csproj @@ -8,7 +8,7 @@ - + diff --git a/examples/remotechannels/Client/Program.cs b/examples/remotechannels/Client/Program.cs index c07f4e09a8..60e8bef71d 100644 --- a/examples/remotechannels/Client/Program.cs +++ b/examples/remotechannels/Client/Program.cs @@ -1,7 +1,6 @@ using System; -using Messages; +using Common; using Proto; -using Proto.Channels; using Proto.Remote; using Proto.Remote.GrpcNet; using static System.Threading.Channels.Channel; diff --git a/examples/Channels/ChannelPublisherActor.cs b/examples/remotechannels/Common/ChannelPublisherActor.cs similarity index 96% rename from examples/Channels/ChannelPublisherActor.cs rename to examples/remotechannels/Common/ChannelPublisherActor.cs index 6ed9eedd2c..48c3a46ec9 100644 --- a/examples/Channels/ChannelPublisherActor.cs +++ b/examples/remotechannels/Common/ChannelPublisherActor.cs @@ -3,10 +3,12 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System.Collections.Generic; using System.Threading.Channels; +using System.Threading.Tasks; using Proto; -namespace Channels; +namespace Common; public static class ChannelPublisher { diff --git a/examples/Channels/ChannelSubscriberActor.cs b/examples/remotechannels/Common/ChannelSubscriberActor.cs similarity index 98% rename from examples/Channels/ChannelSubscriberActor.cs rename to examples/remotechannels/Common/ChannelSubscriberActor.cs index 73fd41bc36..164fd47602 100644 --- a/examples/Channels/ChannelSubscriberActor.cs +++ b/examples/remotechannels/Common/ChannelSubscriberActor.cs @@ -4,9 +4,10 @@ // // ----------------------------------------------------------------------- using System.Threading.Channels; +using System.Threading.Tasks; using Proto; -namespace Channels; +namespace Common; public static class ChannelSubscriber { diff --git a/examples/remotechannels/Messages/Messages.csproj b/examples/remotechannels/Common/Common.csproj similarity index 60% rename from examples/remotechannels/Messages/Messages.csproj rename to examples/remotechannels/Common/Common.csproj index b6022727d9..bbb26c647a 100644 --- a/examples/remotechannels/Messages/Messages.csproj +++ b/examples/remotechannels/Common/Common.csproj @@ -5,4 +5,8 @@ 10 + + + + diff --git a/examples/remotechannels/Common/Messages.cs b/examples/remotechannels/Common/Messages.cs new file mode 100644 index 0000000000..b3ca6e408d --- /dev/null +++ b/examples/remotechannels/Common/Messages.cs @@ -0,0 +1,3 @@ +namespace Common; + +public record MyMessage(int Value){} diff --git a/examples/remotechannels/Messages/Messages.cs b/examples/remotechannels/Messages/Messages.cs deleted file mode 100644 index 09ec42e0dd..0000000000 --- a/examples/remotechannels/Messages/Messages.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Messages; - -public record MyMessage(int Value){} - -public record Subscribe(); -public record Subscribed(); \ No newline at end of file diff --git a/examples/remotechannels/Server/Program.cs b/examples/remotechannels/Server/Program.cs index d6abd3509e..5b47f992ba 100644 --- a/examples/remotechannels/Server/Program.cs +++ b/examples/remotechannels/Server/Program.cs @@ -1,8 +1,7 @@ using System; using System.Threading.Tasks; -using Messages; +using Common; using Proto; -using Proto.Channels; using Proto.Remote; using Proto.Remote.GrpcNet; using static Proto.Remote.GrpcNet.GrpcNetRemoteConfig; diff --git a/examples/remotechannels/Server/Server.csproj b/examples/remotechannels/Server/Server.csproj index 443e995ac7..82a0006e30 100644 --- a/examples/remotechannels/Server/Server.csproj +++ b/examples/remotechannels/Server/Server.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Proto.Actor/Context/ActorContextExtras.cs b/src/Proto.Actor/Context/ActorContextExtras.cs index 3869d313a0..b1d1d055ae 100644 --- a/src/Proto.Actor/Context/ActorContextExtras.cs +++ b/src/Proto.Actor/Context/ActorContextExtras.cs @@ -4,7 +4,6 @@ // // ----------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Collections.Immutable; using System.Threading; using Proto.Utils; @@ -29,7 +28,7 @@ public sealed class ActorContextExtras: IDisposable public IContext Context { get; } public CancellationTokenSource CancellationTokenSource { get; } = new(); - public TypeDictionary Store { get; } = new(5, 1); + internal TypeDictionary Store { get; } = new(5, 1); public void InitReceiveTimeoutTimer(Timer timer) => ReceiveTimeoutTimer = timer; diff --git a/src/Proto.Actor/Messages/MessageBatch.cs b/src/Proto.Actor/Messages/MessageBatch.cs deleted file mode 100644 index 9ed1b86570..0000000000 --- a/src/Proto.Actor/Messages/MessageBatch.cs +++ /dev/null @@ -1,10 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015-2022 Asynkron AB All rights reserved -// -// ----------------------------------------------------------------------- -using System.Collections.Generic; - -namespace Proto; - -public record MessageBatch(IReadOnlyCollection Messages); \ No newline at end of file diff --git a/src/Proto.Actor/Props/Props.cs b/src/Proto.Actor/Props/Props.cs index 400d8b5d12..c2c830852b 100644 --- a/src/Proto.Actor/Props/Props.cs +++ b/src/Proto.Actor/Props/Props.cs @@ -60,8 +60,8 @@ public sealed record Props public ImmutableList> SenderMiddleware { get; init; } = ImmutableList>.Empty; - public Receiver? ReceiverMiddlewareChain { get; init; } - public Sender? SenderMiddlewareChain { get; init; } + internal Receiver? ReceiverMiddlewareChain { get; init; } + internal Sender? SenderMiddlewareChain { get; init; } /// /// List of decorators for the actor context @@ -69,7 +69,7 @@ public sealed record Props public ImmutableList> ContextDecorator { get; init; } = ImmutableList>.Empty; - public Func? ContextDecoratorChain { get; init; } + internal Func? ContextDecoratorChain { get; init; } /// /// Delegate that creates the actor and wires it with context and mailbox. diff --git a/src/Proto.Actor/Stashing/CapturedContext.cs b/src/Proto.Actor/Stashing/CapturedContext.cs index 35907997dd..4f7164d3b0 100644 --- a/src/Proto.Actor/Stashing/CapturedContext.cs +++ b/src/Proto.Actor/Stashing/CapturedContext.cs @@ -12,7 +12,12 @@ namespace Proto; /// /// Message to store /// Context to store -public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context){ +public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context) +{ + /// + /// Reprocesses the captured message on the captured context. + /// It captures current context before processing and restores it after processing. + /// public async Task Receive() { var current = Context.Capture(); diff --git a/src/Proto.Actor/Utils/TypedDictionary.cs b/src/Proto.Actor/Utils/TypedDictionary.cs index 5ce277df83..64229fb6d1 100644 --- a/src/Proto.Actor/Utils/TypedDictionary.cs +++ b/src/Proto.Actor/Utils/TypedDictionary.cs @@ -9,7 +9,7 @@ namespace Proto.Utils; // ReSharper disable once UnusedTypeParameter -public class TypeDictionary +class TypeDictionary { private readonly double _growthFactor; diff --git a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs index 60cc894420..edfba35048 100644 --- a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs +++ b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs @@ -54,14 +54,14 @@ public static ActorSystem WithClientRemote(this ActorSystem system, GrpcNetRemot return system; } - public static IServiceCollection AddRemote(this IServiceCollection services, Func configure) + internal static IServiceCollection AddRemote(this IServiceCollection services, Func configure) { services.AddSingleton(configure); AddAllServices(services); return services; } - public static IServiceCollection AddRemote( + internal static IServiceCollection AddRemote( this IServiceCollection services, GrpcNetRemoteConfig config ) @@ -71,7 +71,7 @@ GrpcNetRemoteConfig config return services; } - public static IServiceCollection AddClientRemote( + internal static IServiceCollection AddClientRemote( this IServiceCollection services, GrpcNetRemoteConfig config ) @@ -103,7 +103,7 @@ private static GrpcServiceEndpointConventionBuilder AddProtoRemoteEndpoint(IEndp return endpoints.MapGrpcService(); } - public static void UseProtoRemote(this IApplicationBuilder applicationBuilder) + internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder) { var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService(); hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get(); @@ -111,7 +111,7 @@ public static void UseProtoRemote(this IApplicationBuilder applicationBuilder) applicationBuilder.UseEndpoints(c => AddProtoRemoteEndpoint(c)); } - public static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure) + internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure) { var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService(); hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get(); diff --git a/src/Proto.Remote/InternalsVisibleTo.cs b/src/Proto.Remote/InternalsVisibleTo.cs index cddcde5a78..1132b556fb 100644 --- a/src/Proto.Remote/InternalsVisibleTo.cs +++ b/src/Proto.Remote/InternalsVisibleTo.cs @@ -5,4 +5,5 @@ // ----------------------------------------------------------------------- using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("Proto.Cluster")] \ No newline at end of file +[assembly: InternalsVisibleTo("Proto.Cluster")] +[assembly: InternalsVisibleTo("Proto.Remote.Tests")] \ No newline at end of file From 934e25cecd260a7f199e9eaa250685fdea89b78c Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Tue, 6 Sep 2022 13:17:47 +0200 Subject: [PATCH 04/12] block list --- src/Proto.Remote/BlockList.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Proto.Remote/BlockList.cs b/src/Proto.Remote/BlockList.cs index 3a2e613e72..4548f6465d 100644 --- a/src/Proto.Remote/BlockList.cs +++ b/src/Proto.Remote/BlockList.cs @@ -35,7 +35,7 @@ public BlockList(ActorSystem system) private ImmutableDictionary _blockedMembers = ImmutableDictionary.Empty; - public void Block(IEnumerable memberIds) + internal void Block(IEnumerable memberIds) { lock (_lock) { From 21bf8a4afb9cd6a980b1608e9408f979daf43f12 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Tue, 6 Sep 2022 13:41:49 +0200 Subject: [PATCH 05/12] timeout names cleanup in cluster config --- src/Proto.Cluster/ClusterConfig.cs | 20 +++++++++---------- .../Identity/IdentityStoragePlacementActor.cs | 2 +- .../Partition/PartitionIdentityActor.cs | 2 +- .../Partition/PartitionPlacementActor.cs | 2 +- .../PartitionActivatorActor.cs | 2 +- ...oto.Cluster.PartitionIdentity.Tests.csproj | 1 - 6 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 09f3bb933c..4af2f0c46c 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -29,9 +29,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde { ClusterName = clusterName ?? throw new ArgumentNullException(nameof(clusterName)); ClusterProvider = clusterProvider ?? throw new ArgumentNullException(nameof(clusterProvider)); - TimeoutTimespan = TimeSpan.FromSeconds(5); ActorRequestTimeout = TimeSpan.FromSeconds(5); - ActorSpawnTimeout = TimeSpan.FromSeconds(5); + ActorSpawnVerificationTimeout = TimeSpan.FromSeconds(5); ActorActivationTimeout = TimeSpan.FromSeconds(5); MaxNumberOfEventsInRequestLogThrottlePeriod = 3; RequestLogThrottlePeriod = TimeSpan.FromSeconds(2); @@ -88,7 +87,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s. /// - public TimeSpan TimeoutTimespan { get; init; } + [Obsolete("Use ActorActivationTimeout instead")] + public TimeSpan TimeoutTimespan => ActorActivationTimeout; /// /// Timeout for single retry of actor request. Default is 5s. @@ -99,10 +99,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for running the check. Default is 5s. /// - public TimeSpan ActorSpawnTimeout { get; init; } + public TimeSpan ActorSpawnVerificationTimeout { get; init; } /// - /// Timeout for DB Identity Lookup operations. Default is 5s. + /// Timeout for activating an actor. Exact usage varies depending on used. Default is 5s. /// public TimeSpan ActorActivationTimeout { get; init; } @@ -165,10 +165,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s. /// - /// + /// /// - public ClusterConfig WithTimeout(TimeSpan timeSpan) => - this with {TimeoutTimespan = timeSpan}; + [Obsolete("Use ActorActivationTimeout instead")] + public ClusterConfig WithTimeout(TimeSpan timeout) => WithActorActivationTimeout(timeout); /// /// Timeout for single retry of actor request. Default is 5s. @@ -184,8 +184,8 @@ public ClusterConfig WithActorRequestTimeout(TimeSpan timeSpan) => /// /// /// - public ClusterConfig WithActorSpawnTimeout(TimeSpan timeSpan) => - this with {ActorSpawnTimeout = timeSpan}; + public ClusterConfig WithActorSpawnVerificationTimeout(TimeSpan timeSpan) => + this with {ActorSpawnVerificationTimeout = timeSpan}; /// /// Timeout for DB Identity Lookup operations. Default is 5s. diff --git a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs index 3654912b69..21e646b1f5 100644 --- a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs +++ b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs @@ -139,7 +139,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs index e41d8e92a4..e57637301f 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs @@ -621,7 +621,7 @@ private async Task SpawnRemoteActor(ActivationRequest req, s Logger.LogTrace("[PartitionIdentity] Spawning Remote Actor {Activator} {Identity} {Kind}", activatorAddress, req.Identity, req.Kind); } - var timeout = _cluster.Config.TimeoutTimespan; + var timeout = _cluster.Config.ActorActivationTimeout; var activatorPid = PartitionManager.RemotePartitionPlacementActor(activatorAddress); var res = await _cluster.System.Root.RequestAsync(activatorPid, req, timeout); diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index d955d66327..ceaed00995 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -360,7 +360,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 3dc9c5a600..ba3a6424b6 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -190,7 +190,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj index 46ff1cb19f..b7860b0a65 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj @@ -12,7 +12,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - runtime; build; native; contentfiles; analyzers; buildtransitive From 0c0011c947a256a7550d6ae332e352f32974fa7a Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Tue, 6 Sep 2022 15:45:55 +0200 Subject: [PATCH 06/12] cluster changes --- src/Proto.Cluster/Cluster.cs | 14 ++++++---- src/Proto.Cluster/Gossip/Gossiper.cs | 27 +++++++++++++++++-- ...oto.Cluster.PartitionIdentity.Tests.csproj | 1 - .../InMemorySubscribersStore.cs | 2 +- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 6b66579247..7f0a06a992 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -63,9 +63,9 @@ public Cluster(ActorSystem system, ClusterConfig config) SubscribeToTopologyEvents(); } - public static ILogger Logger { get; } = Log.CreateLogger(); + internal static ILogger Logger { get; } = Log.CreateLogger(); - public IClusterContext ClusterContext { get; private set; } = null!; + internal IClusterContext ClusterContext { get; private set; } = null!; public Gossiper Gossip { get; } @@ -79,7 +79,7 @@ public Cluster(ActorSystem system, ClusterConfig config) /// public ActorSystem System { get; } - public IRemote Remote { get; private set; } = null!; + internal IRemote Remote { get; private set; } = null!; /// /// A list of known cluster members. See for details @@ -90,7 +90,7 @@ public Cluster(ActorSystem system, ClusterConfig config) internal IClusterProvider Provider { get; set; } = null!; - public PidCache PidCache { get; } + internal PidCache PidCache { get; } private void SubscribeToTopologyEvents() => System.EventStream.Subscribe(e => { @@ -101,6 +101,10 @@ private void SubscribeToTopologyEvents() => } ); + /// + /// Gets cluster kinds registered on this cluster member + /// + /// public string[] GetClusterKinds() => _clusterKinds.Keys.ToArray(); /// @@ -270,7 +274,7 @@ public ActivatedClusterKind GetClusterKind(string kind) return clusterKind; } - public ActivatedClusterKind? TryGetClusterKind(string kind) + internal ActivatedClusterKind? TryGetClusterKind(string kind) { _clusterKinds.TryGetValue(kind, out var clusterKind); diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index a7d9adec46..966e1c4f27 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -58,8 +58,18 @@ public Gossiper(Cluster cluster) _context = _cluster.System.Root; } + /// + /// Gets the current full gossip state as seen by current member + /// + /// public Task GetStateSnapshot() => _context.RequestAsync(_pid, new GetGossipStateSnapshot()); + /// + /// Gets gossip state entry by key, for each member represented in the gossip state, as seen by current member + /// + /// + /// Dictionary where member id is the key and gossip state value is the value + /// public async Task> GetState(string key) where T : IMessage, new() { _context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid); @@ -87,6 +97,11 @@ public Gossiper(Cluster cluster) return ImmutableDictionary.Empty; } + /// + /// Gets the gossip state entry by key, for each member represented in the gossip state, as seen by current member + /// + /// Dictionary where member id is the key and gossip state value is the value, wrapped in + /// public async Task> GetStateEntry(string key) { _context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid); @@ -105,8 +120,11 @@ public async Task> GetStateEntry(str return ImmutableDictionary.Empty; } - // Send message to update member state - // Will not wait for completed state update + /// + /// Sets a gossip state key to provided value. This will not wait for the state to be actually updated in current member's gossip state. + /// + /// + /// public void SetState(string key, IMessage value) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid); @@ -117,6 +135,11 @@ public void SetState(string key, IMessage value) _context.Send(_pid, new SetGossipStateKey(key, value)); } + /// + /// Sets a gossip state key to provided value. Waits for the state to be updated in current member's gossip state. + /// + /// + /// public async Task SetStateAsync(string key, IMessage value) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid); diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj index b7860b0a65..3b6079f430 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj @@ -12,7 +12,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs index 65b66eb506..74ff3a660f 100644 --- a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs +++ b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs @@ -12,7 +12,7 @@ public class InMemorySubscribersStore : IKeyValueStore { private readonly ConcurrentDictionary _store = new(); - public Task GetAsync(string id, CancellationToken ct) + public Task GetAsync(string id, CancellationToken ct) { _store.TryGetValue(id, out var subscribers); return subscribers == null ? Task.FromResult(new Subscribers()) : Task.FromResult(subscribers); From 4927b62f73db4de8f6460b79c27cfa9b5a543c43 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Wed, 7 Sep 2022 08:09:51 +0200 Subject: [PATCH 07/12] cluster changes --- src/Proto.Cluster/Cluster.cs | 5 ++++- src/Proto.Cluster/Member/MemberList.cs | 10 +++++----- src/Proto.Cluster/PidCache.cs | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 7f0a06a992..eebcfd48b4 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -79,7 +79,10 @@ public Cluster(ActorSystem system, ClusterConfig config) /// public ActorSystem System { get; } - internal IRemote Remote { get; private set; } = null!; + /// + /// IRemote implementation the cluster is using + /// + public IRemote Remote { get; private set; } = null!; /// /// A list of known cluster members. See for details diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 699b64b330..eadfeede3f 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -112,10 +112,10 @@ public MemberList(Cluster cluster) internal void InitializeTopologyConsensus() => _topologyConsensus = _cluster.Gossip.RegisterConsensusCheck(GossipKeys.Topology, topology => topology.TopologyHash); - public Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct) + internal Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct) => _topologyConsensus?.TryGetConsensus(ct) ?? Task.FromResult<(bool consensus, ulong topologyHash)>(default); - public Member? GetActivator(string kind, string requestSourceAddress) + internal Member? GetActivator(string kind, string requestSourceAddress) { //immutable, don't lock if (_memberStrategyByKind.TryGetValue(kind, out var memberStrategy)) @@ -265,7 +265,7 @@ private void SelfBlocked() _ = _cluster.ShutdownAsync(reason: "Blocked by MemberList"); } - public MetaMember? GetMetaMember(string memberId) + internal MetaMember? GetMetaMember(string memberId) { _metaMembers.TryGetValue(memberId, out var meta); return meta; @@ -344,9 +344,9 @@ public void BroadcastEvent(object message, bool includeSelf = true) public bool TryGetMember(string memberId, out Member? value) => _activeMembers.Lookup.TryGetValue(memberId, out value); - public bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value); + internal bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value); - public bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value); + internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value); /// /// Gets a list of active diff --git a/src/Proto.Cluster/PidCache.cs b/src/Proto.Cluster/PidCache.cs index 7a108a12c7..fe6f5c6a70 100644 --- a/src/Proto.Cluster/PidCache.cs +++ b/src/Proto.Cluster/PidCache.cs @@ -13,7 +13,7 @@ namespace Proto.Cluster; -public class PidCache +class PidCache { private readonly ICollection> _cacheCollection; private readonly ConcurrentDictionary _cacheDict; From 4c9444504be1c857344bfb25293e7da2223cca57 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Wed, 7 Sep 2022 09:51:13 +0200 Subject: [PATCH 08/12] removed usage of ClusterInit, the msg itself stays for now --- src/Proto.Cluster.CodeGen/Template.cs | 5 ----- src/Proto.Cluster/ClusterExtension.cs | 8 -------- src/Proto.Cluster/Grain/ClusterInit.cs | 1 + tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs | 5 ----- .../ExpectedOutputPackageless.cs | 5 ----- 5 files changed, 1 insertion(+), 23 deletions(-) diff --git a/src/Proto.Cluster.CodeGen/Template.cs b/src/Proto.Cluster.CodeGen/Template.cs index 8ea167597b..31a50396ed 100644 --- a/src/Proto.Cluster.CodeGen/Template.cs +++ b/src/Proto.Cluster.CodeGen/Template.cs @@ -150,11 +150,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); diff --git a/src/Proto.Cluster/ClusterExtension.cs b/src/Proto.Cluster/ClusterExtension.cs index 57ed2ba020..1f85d7efb3 100644 --- a/src/Proto.Cluster/ClusterExtension.cs +++ b/src/Proto.Cluster/ClusterExtension.cs @@ -137,14 +137,6 @@ MessageEnvelope startEnvelope { clusterKind.Inc(); await baseReceive(ctx, startEnvelope); - - var identity = ctx.Get(); - var cluster = ctx.System.Cluster(); -#pragma warning disable 618 - var grainInit = new ClusterInit(identity!, cluster); -#pragma warning restore 618 - var grainInitEnvelope = new MessageEnvelope(grainInit, null); - await baseReceive(ctx, grainInitEnvelope); } async Task HandleRestarting( diff --git a/src/Proto.Cluster/Grain/ClusterInit.cs b/src/Proto.Cluster/Grain/ClusterInit.cs index 9c17549115..e8c07ffb7f 100644 --- a/src/Proto.Cluster/Grain/ClusterInit.cs +++ b/src/Proto.Cluster/Grain/ClusterInit.cs @@ -7,6 +7,7 @@ namespace Proto.Cluster; +// TODO: remove once new version of code gen nuget is built and used in the examples [Obsolete("Replace with 'Started' lifecycle message. '.ClusterIdentity()' and '.Cluster()' is available on IContext")] public class ClusterInit { diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs index 0db2916582..7d9a82b00b 100644 --- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs +++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs @@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs index 3025d17033..9d2f5562fa 100644 --- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs +++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs @@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); From 864d8629008e9eb06c91d11ddde33837dfa1b940 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Wed, 7 Sep 2022 09:53:35 +0200 Subject: [PATCH 09/12] removed unused Tick message --- src/Proto.Cluster/Messages/Messages.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Proto.Cluster/Messages/Messages.cs b/src/Proto.Cluster/Messages/Messages.cs index 687d2c5a6c..8ea590398e 100644 --- a/src/Proto.Cluster/Messages/Messages.cs +++ b/src/Proto.Cluster/Messages/Messages.cs @@ -113,8 +113,6 @@ private IEnumerable UnpackKind(Types.Kind kind) ); } -public record Tick; - public partial class ClusterTopology { //this ignores joined and left members, only the actual members are relevant From 5fca5811d4cbf214b28f3174d58d5afe2a2531bb Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Wed, 7 Sep 2022 09:54:40 +0200 Subject: [PATCH 10/12] test CI --- .github/workflows/build-dev.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-dev.yml b/.github/workflows/build-dev.yml index 49a991e0bc..0a5b3165c0 100644 --- a/.github/workflows/build-dev.yml +++ b/.github/workflows/build-dev.yml @@ -3,7 +3,7 @@ name: Build and deploy to Nuget on: push: branches: - - dev + - "*" tags: - "*" env: From ba7f0417cac63f224a0ce6a7a70ebb2c8045d6da Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Wed, 7 Sep 2022 10:06:40 +0200 Subject: [PATCH 11/12] restore CI config, clean up xml docs related warnings --- .github/workflows/build-dev.yml | 2 +- src/Proto.Actor/Context/ISpawnerContext.cs | 2 ++ src/Proto.Actor/Deduplication/DeduplicationContext.cs | 1 + src/Proto.Actor/Messages/MessageEnvelope.cs | 2 +- src/Proto.Remote/IRemoteExtensions.cs | 2 ++ src/Proto.Remote/RemoteConfigExtensions.cs | 3 ++- 6 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-dev.yml b/.github/workflows/build-dev.yml index 0a5b3165c0..49a991e0bc 100644 --- a/.github/workflows/build-dev.yml +++ b/.github/workflows/build-dev.yml @@ -3,7 +3,7 @@ name: Build and deploy to Nuget on: push: branches: - - "*" + - dev tags: - "*" env: diff --git a/src/Proto.Actor/Context/ISpawnerContext.cs b/src/Proto.Actor/Context/ISpawnerContext.cs index d913549fef..5a94742d43 100644 --- a/src/Proto.Actor/Context/ISpawnerContext.cs +++ b/src/Proto.Actor/Context/ISpawnerContext.cs @@ -26,6 +26,7 @@ public static class SpawnerContextExtensions /// /// Spawns a new child actor based on props and named with a unique ID. /// + /// /// The Props used to spawn the actor /// The PID of the child actor public static PID Spawn(this ISpawnerContext self, Props props) @@ -36,6 +37,7 @@ public static PID Spawn(this ISpawnerContext self, Props props) /// /// Spawns a new child actor based on props and named with a unique ID. /// + /// /// The Props used to spawn the actor /// /// The PID of the child actor diff --git a/src/Proto.Actor/Deduplication/DeduplicationContext.cs b/src/Proto.Actor/Deduplication/DeduplicationContext.cs index 938f42fa55..a02e5f0af7 100644 --- a/src/Proto.Actor/Deduplication/DeduplicationContext.cs +++ b/src/Proto.Actor/Deduplication/DeduplicationContext.cs @@ -16,6 +16,7 @@ namespace Proto.Deduplication; /// Extracts the deduplication key from the message. /// /// Type of the key +/// Message to extract from /// The key should be returned in this variable /// Returns true if the key was successfully extracted, false otherwise public delegate bool TryGetDeduplicationKey(MessageEnvelope envelope, out T? key); diff --git a/src/Proto.Actor/Messages/MessageEnvelope.cs b/src/Proto.Actor/Messages/MessageEnvelope.cs index a4afa85269..a79ef524ff 100644 --- a/src/Proto.Actor/Messages/MessageEnvelope.cs +++ b/src/Proto.Actor/Messages/MessageEnvelope.cs @@ -114,7 +114,7 @@ public MessageEnvelope WithHeader(string key, string value) /// /// Extends the message envelope with additional headers. /// - /// + /// /// New envelope public MessageEnvelope WithHeaders(IEnumerable> items) { diff --git a/src/Proto.Remote/IRemoteExtensions.cs b/src/Proto.Remote/IRemoteExtensions.cs index 611e2d646f..49c1232a16 100644 --- a/src/Proto.Remote/IRemoteExtensions.cs +++ b/src/Proto.Remote/IRemoteExtensions.cs @@ -17,6 +17,7 @@ public static class IRemoteExtensions /// /// Spawn a remote actor with auto-generated name /// + /// /// Remote node address /// Actor kind, must be known on the remote node /// Timeout for the confirmation to be received from the remote node @@ -27,6 +28,7 @@ public static Task SpawnAsync(this IRemote remote, string addr /// /// Spawn a remote actor with a name /// + /// /// Remote node address /// Remote actor name /// Actor kind, must be known on the remote node diff --git a/src/Proto.Remote/RemoteConfigExtensions.cs b/src/Proto.Remote/RemoteConfigExtensions.cs index 3d12cf4067..5ddd2aec74 100644 --- a/src/Proto.Remote/RemoteConfigExtensions.cs +++ b/src/Proto.Remote/RemoteConfigExtensions.cs @@ -66,6 +66,7 @@ public static TRemoteConfig WithAdvertisedHost(this TRemoteConfig /// the external port in order for other systems to be able to connect to it. /// Advertised port can be different from the bound port, e.g. in container scenarios /// + /// /// /// public static TRemoteConfig WithAdvertisedPort(this TRemoteConfig remoteConfig, int? advertisedPort) @@ -197,7 +198,7 @@ public static TRemoteConfig WithSerializer(this TRemoteConfig rem /// /// /// - public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, System.Text.Json.JsonSerializerOptions options) + public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, JsonSerializerOptions options) where TRemoteConfig : RemoteConfigBase { remoteConfig.Serialization.JsonSerializerOptions = options; From 892d8d8ce1ceb2dff0f2a57e44ac880a9379ac50 Mon Sep 17 00:00:00 2001 From: Marcin Budny Date: Thu, 8 Sep 2022 08:07:31 +0200 Subject: [PATCH 12/12] adjustments after merge --- src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs index 628c94bdeb..fce8bc5912 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -144,7 +144,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) {