diff --git a/ProtoActor.sln b/ProtoActor.sln index 1b596dd6a4..8cae27ed26 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -203,7 +203,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "examples\remotech EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "examples\remotechannels\Client\Client.csproj", "{5EACFB26-C757-452C-A536-E633BC352A69}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messages", "examples\remotechannels\Messages\Messages.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "examples\remotechannels\Common\Common.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AmazonECS", "src\Proto.Cluster.AmazonECS\Proto.Cluster.AmazonECS.csproj", "{F5EB4AD3-5BCB-48E9-B974-A93EB19CE43C}" EndProject diff --git a/examples/remotechannels/Client/Client.csproj b/examples/remotechannels/Client/Client.csproj index 443e995ac7..82a0006e30 100644 --- a/examples/remotechannels/Client/Client.csproj +++ b/examples/remotechannels/Client/Client.csproj @@ -8,7 +8,7 @@ - + diff --git a/examples/remotechannels/Client/Program.cs b/examples/remotechannels/Client/Program.cs index c07f4e09a8..60e8bef71d 100644 --- a/examples/remotechannels/Client/Program.cs +++ b/examples/remotechannels/Client/Program.cs @@ -1,7 +1,6 @@ using System; -using Messages; +using Common; using Proto; -using Proto.Channels; using Proto.Remote; using Proto.Remote.GrpcNet; using static System.Threading.Channels.Channel; diff --git a/src/Proto.Actor/Channels/ChannelPublisherActor.cs b/examples/remotechannels/Common/ChannelPublisherActor.cs similarity index 95% rename from src/Proto.Actor/Channels/ChannelPublisherActor.cs rename to examples/remotechannels/Common/ChannelPublisherActor.cs index b6f905c9ad..48c3a46ec9 100644 --- a/src/Proto.Actor/Channels/ChannelPublisherActor.cs +++ b/examples/remotechannels/Common/ChannelPublisherActor.cs @@ -6,11 +6,10 @@ using System.Collections.Generic; using System.Threading.Channels; using System.Threading.Tasks; -using JetBrains.Annotations; +using Proto; -namespace Proto.Channels; +namespace Common; -[PublicAPI] public static class ChannelPublisher { /// @@ -40,7 +39,6 @@ public static PID StartNew(IRootContext context, Channel channel, string n } } -[PublicAPI] public class ChannelPublisherActor : IActor { private readonly HashSet _subscribers = new(); @@ -56,13 +54,17 @@ public Task ReceiveAsync(IContext context) } break; + case PID subscriber: _subscribers.Add(subscriber); context.Watch(subscriber); + context.Respond(new Subscribed()); break; + case Terminated terminated: _subscribers.Remove(terminated.Who); break; + case T typed: foreach (var sub in _subscribers) { diff --git a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs b/examples/remotechannels/Common/ChannelSubscriberActor.cs similarity index 78% rename from src/Proto.Actor/Channels/ChannelSubscriberActor.cs rename to examples/remotechannels/Common/ChannelSubscriberActor.cs index ebb75a517f..164fd47602 100644 --- a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs +++ b/examples/remotechannels/Common/ChannelSubscriberActor.cs @@ -5,11 +5,10 @@ // ----------------------------------------------------------------------- using System.Threading.Channels; using System.Threading.Tasks; -using JetBrains.Annotations; +using Proto; -namespace Proto.Channels; +namespace Common; -[PublicAPI] public static class ChannelSubscriber { /// @@ -21,24 +20,29 @@ public static class ChannelSubscriber /// The channel to write messages to /// The Type of channel elements /// - public static PID StartNew(IRootContext context, PID publisher, Channel channel) + public static async Task StartNew(IRootContext context, PID publisher, Channel channel) { - var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel)); + var tcs = new TaskCompletionSource(); + var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel, tcs)); var pid = context.Spawn(props); + + await tcs.Task; return pid; } } -[PublicAPI] public class ChannelSubscriberActor : IActor { private readonly Channel _channel; + private readonly TaskCompletionSource _subscribed; private readonly PID _publisher; - public ChannelSubscriberActor(PID publisher, Channel channel) + public ChannelSubscriberActor(PID publisher, Channel channel, TaskCompletionSource subscribed) + { _publisher = publisher; _channel = channel; + _subscribed = subscribed; } public async Task ReceiveAsync(IContext context) @@ -49,15 +53,24 @@ public async Task ReceiveAsync(IContext context) context.Watch(_publisher); context.Request(_publisher, context.Self); break; + + case Subscribed: + _subscribed.SetResult(); + break; + case Stopping: _channel.Writer.Complete(); break; + case Terminated t when t.Who.Equals(_publisher): _channel.Writer.Complete(); break; + case T typed: await _channel.Writer.WriteAsync(typed); break; } } -} \ No newline at end of file +} + +public record Subscribed; \ No newline at end of file diff --git a/examples/remotechannels/Messages/Messages.csproj b/examples/remotechannels/Common/Common.csproj similarity index 60% rename from examples/remotechannels/Messages/Messages.csproj rename to examples/remotechannels/Common/Common.csproj index b6022727d9..bbb26c647a 100644 --- a/examples/remotechannels/Messages/Messages.csproj +++ b/examples/remotechannels/Common/Common.csproj @@ -5,4 +5,8 @@ 10 + + + + diff --git a/examples/remotechannels/Common/Messages.cs b/examples/remotechannels/Common/Messages.cs new file mode 100644 index 0000000000..b3ca6e408d --- /dev/null +++ b/examples/remotechannels/Common/Messages.cs @@ -0,0 +1,3 @@ +namespace Common; + +public record MyMessage(int Value){} diff --git a/examples/remotechannels/Messages/Messages.cs b/examples/remotechannels/Messages/Messages.cs deleted file mode 100644 index 09ec42e0dd..0000000000 --- a/examples/remotechannels/Messages/Messages.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Messages; - -public record MyMessage(int Value){} - -public record Subscribe(); -public record Subscribed(); \ No newline at end of file diff --git a/examples/remotechannels/Server/Program.cs b/examples/remotechannels/Server/Program.cs index d6abd3509e..5b47f992ba 100644 --- a/examples/remotechannels/Server/Program.cs +++ b/examples/remotechannels/Server/Program.cs @@ -1,8 +1,7 @@ using System; using System.Threading.Tasks; -using Messages; +using Common; using Proto; -using Proto.Channels; using Proto.Remote; using Proto.Remote.GrpcNet; using static Proto.Remote.GrpcNet.GrpcNetRemoteConfig; diff --git a/examples/remotechannels/Server/Server.csproj b/examples/remotechannels/Server/Server.csproj index 443e995ac7..82a0006e30 100644 --- a/examples/remotechannels/Server/Server.csproj +++ b/examples/remotechannels/Server/Server.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index d52e3f3112..9bf2ff8251 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -78,7 +78,12 @@ public ActorSystem(ActorSystemConfig config) /// Allows to access the stop cancellation token and stop reason. /// Use to stop the actor system. /// - public Stopper Stopper { get; } + internal Stopper Stopper { get; } + + /// + /// For stopped , returns the reason for the shutdown. + /// + public string StoppedReason => Stopper.StoppedReason; /// /// Manages all the guardians in the actor system. @@ -117,13 +122,13 @@ public ActorSystem(ActorSystemConfig config) private void RunThreadPoolStats() { - var metricTags = new KeyValuePair[]{ new("id", Id), new("address", Address)}; + var metricTags = new KeyValuePair[] {new("id", Id), new("address", Address)}; var logger = Log.CreateLogger(nameof(ThreadPoolStats)); _ = ThreadPoolStats.Run(TimeSpan.FromSeconds(5), t => { //collect the latency metrics - if(Metrics.Enabled) + if (Metrics.Enabled) ActorMetrics.ThreadPoolLatency.Record(t.TotalSeconds, metricTags); //does it take longer than 1 sec for a task to start executing? @@ -144,7 +149,7 @@ private void RunThreadPoolStats() /// /// Shutdown reason /// - public Task ShutdownAsync(string reason="") + public Task ShutdownAsync(string reason = "") { try { @@ -195,15 +200,15 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func /// /// - public Props ConfigureProps(Props props) => Config.ConfigureProps(props); - + internal Props ConfigureProps(Props props) => Config.ConfigureProps(props); + /// /// Applies props configuration delegate for system actors. /// /// /// /// - public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); + internal Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props); /// /// Stops the actor system with reason = "Disposed" diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs index 0231965697..f1c74dbe53 100644 --- a/src/Proto.Actor/Context/ActorContext.cs +++ b/src/Proto.Actor/Context/ActorContext.cs @@ -71,11 +71,6 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai public TimeSpan ReceiveTimeout { get; private set; } - public void Stash() - { - if (_messageOrEnvelope is not null) EnsureExtras().Stash.Push(_messageOrEnvelope); - } - public void Respond(object message) { if (Sender is not null) @@ -714,19 +709,6 @@ private async ValueTask RestartAsync() Self.SendSystemMessage(System, ResumeMailbox.Instance); await InvokeUserMessageAsync(Started.Instance); - - if (_extras?.Stash is not null) - { - var currentStash = new Stack(_extras.Stash); - _extras.Stash.Clear(); - - //TODO: what happens if we hit a failure here? - while (currentStash.Any()) - { - var msg = currentStash.Pop(); - await InvokeUserMessageAsync(msg); - } - } } private ValueTask DisposeActorIfDisposable() diff --git a/src/Proto.Actor/Context/ActorContextDecorator.cs b/src/Proto.Actor/Context/ActorContextDecorator.cs index 7ce0af1141..f33d1d7481 100644 --- a/src/Proto.Actor/Context/ActorContextDecorator.cs +++ b/src/Proto.Actor/Context/ActorContextDecorator.cs @@ -48,9 +48,6 @@ public virtual Task Receive(MessageEnvelope envelope) => public virtual void Respond(object message) => _context.Respond(message); - public virtual void Stash() => - _context.Stash(); - public virtual PID SpawnNamed(Props props, string name, Action? callback=null) => _context.SpawnNamed(props, name, callback); diff --git a/src/Proto.Actor/Context/ActorContextExtras.cs b/src/Proto.Actor/Context/ActorContextExtras.cs index ba24111916..b1d1d055ae 100644 --- a/src/Proto.Actor/Context/ActorContextExtras.cs +++ b/src/Proto.Actor/Context/ActorContextExtras.cs @@ -4,7 +4,6 @@ // // ----------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Collections.Immutable; using System.Threading; using Proto.Utils; @@ -25,12 +24,11 @@ public sealed class ActorContextExtras: IDisposable public ImmutableHashSet Children { get; private set; } = ImmutableHashSet.Empty; public Timer? ReceiveTimeoutTimer { get; private set; } public RestartStatistics RestartStatistics { get; } = new(0, null); - public Stack Stash { get; } = new(); public ImmutableHashSet Watchers { get; private set; } = ImmutableHashSet.Empty; public IContext Context { get; } public CancellationTokenSource CancellationTokenSource { get; } = new(); - public TypeDictionary Store { get; } = new(5, 1); + internal TypeDictionary Store { get; } = new(5, 1); public void InitReceiveTimeoutTimer(Timer timer) => ReceiveTimeoutTimer = timer; diff --git a/src/Proto.Actor/Context/IContext.cs b/src/Proto.Actor/Context/IContext.cs index 7f69714ef6..fce16ba6ad 100644 --- a/src/Proto.Actor/Context/IContext.cs +++ b/src/Proto.Actor/Context/IContext.cs @@ -44,11 +44,6 @@ public interface IContext : ISenderContext, IReceiverContext, ISpawnerContext, I /// void Respond(object message, MessageHeader header) => Respond(new MessageEnvelope(message, null, header)); - /// - /// Stashes the current message on a stack for re-processing when the actor restarts. - /// - void Stash(); - /// /// Registers the actor as a watcher for the specified PID. When the PID terminates the watcher is notified with message. /// diff --git a/src/Proto.Actor/Context/ISpawnerContext.cs b/src/Proto.Actor/Context/ISpawnerContext.cs index d913549fef..5a94742d43 100644 --- a/src/Proto.Actor/Context/ISpawnerContext.cs +++ b/src/Proto.Actor/Context/ISpawnerContext.cs @@ -26,6 +26,7 @@ public static class SpawnerContextExtensions /// /// Spawns a new child actor based on props and named with a unique ID. /// + /// /// The Props used to spawn the actor /// The PID of the child actor public static PID Spawn(this ISpawnerContext self, Props props) @@ -36,6 +37,7 @@ public static PID Spawn(this ISpawnerContext self, Props props) /// /// Spawns a new child actor based on props and named with a unique ID. /// + /// /// The Props used to spawn the actor /// /// The PID of the child actor diff --git a/src/Proto.Actor/Deduplication/DeduplicationContext.cs b/src/Proto.Actor/Deduplication/DeduplicationContext.cs index 938f42fa55..a02e5f0af7 100644 --- a/src/Proto.Actor/Deduplication/DeduplicationContext.cs +++ b/src/Proto.Actor/Deduplication/DeduplicationContext.cs @@ -16,6 +16,7 @@ namespace Proto.Deduplication; /// Extracts the deduplication key from the message. /// /// Type of the key +/// Message to extract from /// The key should be returned in this variable /// Returns true if the key was successfully extracted, false otherwise public delegate bool TryGetDeduplicationKey(MessageEnvelope envelope, out T? key); diff --git a/src/Proto.Actor/Messages/MessageBatch.cs b/src/Proto.Actor/Messages/MessageBatch.cs deleted file mode 100644 index 9ed1b86570..0000000000 --- a/src/Proto.Actor/Messages/MessageBatch.cs +++ /dev/null @@ -1,10 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015-2022 Asynkron AB All rights reserved -// -// ----------------------------------------------------------------------- -using System.Collections.Generic; - -namespace Proto; - -public record MessageBatch(IReadOnlyCollection Messages); \ No newline at end of file diff --git a/src/Proto.Actor/Messages/MessageEnvelope.cs b/src/Proto.Actor/Messages/MessageEnvelope.cs index a4afa85269..a79ef524ff 100644 --- a/src/Proto.Actor/Messages/MessageEnvelope.cs +++ b/src/Proto.Actor/Messages/MessageEnvelope.cs @@ -114,7 +114,7 @@ public MessageEnvelope WithHeader(string key, string value) /// /// Extends the message envelope with additional headers. /// - /// + /// /// New envelope public MessageEnvelope WithHeaders(IEnumerable> items) { diff --git a/src/Proto.Actor/PID.cs b/src/Proto.Actor/PID.cs index b259617598..af94bc152d 100644 --- a/src/Proto.Actor/PID.cs +++ b/src/Proto.Actor/PID.cs @@ -61,7 +61,7 @@ internal void SendUserMessage(ActorSystem system, object message) reff.SendUserMessage(this, message); } - public void SendSystemMessage(ActorSystem system, SystemMessage sys) + internal void SendSystemMessage(ActorSystem system, SystemMessage sys) { var reff = Ref(system) ?? system.ProcessRegistry.Get(this); reff.SendSystemMessage(this, sys); diff --git a/src/Proto.Actor/Props/Props.cs b/src/Proto.Actor/Props/Props.cs index 400d8b5d12..c2c830852b 100644 --- a/src/Proto.Actor/Props/Props.cs +++ b/src/Proto.Actor/Props/Props.cs @@ -60,8 +60,8 @@ public sealed record Props public ImmutableList> SenderMiddleware { get; init; } = ImmutableList>.Empty; - public Receiver? ReceiverMiddlewareChain { get; init; } - public Sender? SenderMiddlewareChain { get; init; } + internal Receiver? ReceiverMiddlewareChain { get; init; } + internal Sender? SenderMiddlewareChain { get; init; } /// /// List of decorators for the actor context @@ -69,7 +69,7 @@ public sealed record Props public ImmutableList> ContextDecorator { get; init; } = ImmutableList>.Empty; - public Func? ContextDecoratorChain { get; init; } + internal Func? ContextDecoratorChain { get; init; } /// /// Delegate that creates the actor and wires it with context and mailbox. diff --git a/src/Proto.Actor/Stashing/CapturedContext.cs b/src/Proto.Actor/Stashing/CapturedContext.cs index 35907997dd..4f7164d3b0 100644 --- a/src/Proto.Actor/Stashing/CapturedContext.cs +++ b/src/Proto.Actor/Stashing/CapturedContext.cs @@ -12,7 +12,12 @@ namespace Proto; /// /// Message to store /// Context to store -public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context){ +public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context) +{ + /// + /// Reprocesses the captured message on the captured context. + /// It captures current context before processing and restores it after processing. + /// public async Task Receive() { var current = Context.Capture(); diff --git a/src/Proto.Actor/Stopper.cs b/src/Proto.Actor/Stopper.cs index 5e3035ed31..f49bd8b3f8 100644 --- a/src/Proto.Actor/Stopper.cs +++ b/src/Proto.Actor/Stopper.cs @@ -7,7 +7,7 @@ namespace Proto; -public class Stopper +class Stopper { private readonly CancellationTokenSource _cts = new(); diff --git a/src/Proto.Actor/Utils/TypedDictionary.cs b/src/Proto.Actor/Utils/TypedDictionary.cs index 5ce277df83..64229fb6d1 100644 --- a/src/Proto.Actor/Utils/TypedDictionary.cs +++ b/src/Proto.Actor/Utils/TypedDictionary.cs @@ -9,7 +9,7 @@ namespace Proto.Utils; // ReSharper disable once UnusedTypeParameter -public class TypeDictionary +class TypeDictionary { private readonly double _growthFactor; diff --git a/src/Proto.Cluster.CodeGen/Template.cs b/src/Proto.Cluster.CodeGen/Template.cs index 8ea167597b..31a50396ed 100644 --- a/src/Proto.Cluster.CodeGen/Template.cs +++ b/src/Proto.Cluster.CodeGen/Template.cs @@ -150,11 +150,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 6b66579247..eebcfd48b4 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -63,9 +63,9 @@ public Cluster(ActorSystem system, ClusterConfig config) SubscribeToTopologyEvents(); } - public static ILogger Logger { get; } = Log.CreateLogger(); + internal static ILogger Logger { get; } = Log.CreateLogger(); - public IClusterContext ClusterContext { get; private set; } = null!; + internal IClusterContext ClusterContext { get; private set; } = null!; public Gossiper Gossip { get; } @@ -79,6 +79,9 @@ public Cluster(ActorSystem system, ClusterConfig config) /// public ActorSystem System { get; } + /// + /// IRemote implementation the cluster is using + /// public IRemote Remote { get; private set; } = null!; /// @@ -90,7 +93,7 @@ public Cluster(ActorSystem system, ClusterConfig config) internal IClusterProvider Provider { get; set; } = null!; - public PidCache PidCache { get; } + internal PidCache PidCache { get; } private void SubscribeToTopologyEvents() => System.EventStream.Subscribe(e => { @@ -101,6 +104,10 @@ private void SubscribeToTopologyEvents() => } ); + /// + /// Gets cluster kinds registered on this cluster member + /// + /// public string[] GetClusterKinds() => _clusterKinds.Keys.ToArray(); /// @@ -270,7 +277,7 @@ public ActivatedClusterKind GetClusterKind(string kind) return clusterKind; } - public ActivatedClusterKind? TryGetClusterKind(string kind) + internal ActivatedClusterKind? TryGetClusterKind(string kind) { _clusterKinds.TryGetValue(kind, out var clusterKind); diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 09f3bb933c..4af2f0c46c 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -29,9 +29,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde { ClusterName = clusterName ?? throw new ArgumentNullException(nameof(clusterName)); ClusterProvider = clusterProvider ?? throw new ArgumentNullException(nameof(clusterProvider)); - TimeoutTimespan = TimeSpan.FromSeconds(5); ActorRequestTimeout = TimeSpan.FromSeconds(5); - ActorSpawnTimeout = TimeSpan.FromSeconds(5); + ActorSpawnVerificationTimeout = TimeSpan.FromSeconds(5); ActorActivationTimeout = TimeSpan.FromSeconds(5); MaxNumberOfEventsInRequestLogThrottlePeriod = 3; RequestLogThrottlePeriod = TimeSpan.FromSeconds(2); @@ -88,7 +87,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s. /// - public TimeSpan TimeoutTimespan { get; init; } + [Obsolete("Use ActorActivationTimeout instead")] + public TimeSpan TimeoutTimespan => ActorActivationTimeout; /// /// Timeout for single retry of actor request. Default is 5s. @@ -99,10 +99,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for running the check. Default is 5s. /// - public TimeSpan ActorSpawnTimeout { get; init; } + public TimeSpan ActorSpawnVerificationTimeout { get; init; } /// - /// Timeout for DB Identity Lookup operations. Default is 5s. + /// Timeout for activating an actor. Exact usage varies depending on used. Default is 5s. /// public TimeSpan ActorActivationTimeout { get; init; } @@ -165,10 +165,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// /// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s. /// - /// + /// /// - public ClusterConfig WithTimeout(TimeSpan timeSpan) => - this with {TimeoutTimespan = timeSpan}; + [Obsolete("Use ActorActivationTimeout instead")] + public ClusterConfig WithTimeout(TimeSpan timeout) => WithActorActivationTimeout(timeout); /// /// Timeout for single retry of actor request. Default is 5s. @@ -184,8 +184,8 @@ public ClusterConfig WithActorRequestTimeout(TimeSpan timeSpan) => /// /// /// - public ClusterConfig WithActorSpawnTimeout(TimeSpan timeSpan) => - this with {ActorSpawnTimeout = timeSpan}; + public ClusterConfig WithActorSpawnVerificationTimeout(TimeSpan timeSpan) => + this with {ActorSpawnVerificationTimeout = timeSpan}; /// /// Timeout for DB Identity Lookup operations. Default is 5s. diff --git a/src/Proto.Cluster/ClusterExtension.cs b/src/Proto.Cluster/ClusterExtension.cs index 57ed2ba020..1f85d7efb3 100644 --- a/src/Proto.Cluster/ClusterExtension.cs +++ b/src/Proto.Cluster/ClusterExtension.cs @@ -137,14 +137,6 @@ MessageEnvelope startEnvelope { clusterKind.Inc(); await baseReceive(ctx, startEnvelope); - - var identity = ctx.Get(); - var cluster = ctx.System.Cluster(); -#pragma warning disable 618 - var grainInit = new ClusterInit(identity!, cluster); -#pragma warning restore 618 - var grainInitEnvelope = new MessageEnvelope(grainInit, null); - await baseReceive(ctx, grainInitEnvelope); } async Task HandleRestarting( diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index a7d9adec46..966e1c4f27 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -58,8 +58,18 @@ public Gossiper(Cluster cluster) _context = _cluster.System.Root; } + /// + /// Gets the current full gossip state as seen by current member + /// + /// public Task GetStateSnapshot() => _context.RequestAsync(_pid, new GetGossipStateSnapshot()); + /// + /// Gets gossip state entry by key, for each member represented in the gossip state, as seen by current member + /// + /// + /// Dictionary where member id is the key and gossip state value is the value + /// public async Task> GetState(string key) where T : IMessage, new() { _context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid); @@ -87,6 +97,11 @@ public Gossiper(Cluster cluster) return ImmutableDictionary.Empty; } + /// + /// Gets the gossip state entry by key, for each member represented in the gossip state, as seen by current member + /// + /// Dictionary where member id is the key and gossip state value is the value, wrapped in + /// public async Task> GetStateEntry(string key) { _context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid); @@ -105,8 +120,11 @@ public async Task> GetStateEntry(str return ImmutableDictionary.Empty; } - // Send message to update member state - // Will not wait for completed state update + /// + /// Sets a gossip state key to provided value. This will not wait for the state to be actually updated in current member's gossip state. + /// + /// + /// public void SetState(string key, IMessage value) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid); @@ -117,6 +135,11 @@ public void SetState(string key, IMessage value) _context.Send(_pid, new SetGossipStateKey(key, value)); } + /// + /// Sets a gossip state key to provided value. Waits for the state to be updated in current member's gossip state. + /// + /// + /// public async Task SetStateAsync(string key, IMessage value) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid); diff --git a/src/Proto.Cluster/Grain/ClusterInit.cs b/src/Proto.Cluster/Grain/ClusterInit.cs index 9c17549115..e8c07ffb7f 100644 --- a/src/Proto.Cluster/Grain/ClusterInit.cs +++ b/src/Proto.Cluster/Grain/ClusterInit.cs @@ -7,6 +7,7 @@ namespace Proto.Cluster; +// TODO: remove once new version of code gen nuget is built and used in the examples [Obsolete("Replace with 'Started' lifecycle message. '.ClusterIdentity()' and '.Cluster()' is available on IContext")] public class ClusterInit { diff --git a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs index 3654912b69..21e646b1f5 100644 --- a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs +++ b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs @@ -139,7 +139,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 699b64b330..eadfeede3f 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -112,10 +112,10 @@ public MemberList(Cluster cluster) internal void InitializeTopologyConsensus() => _topologyConsensus = _cluster.Gossip.RegisterConsensusCheck(GossipKeys.Topology, topology => topology.TopologyHash); - public Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct) + internal Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct) => _topologyConsensus?.TryGetConsensus(ct) ?? Task.FromResult<(bool consensus, ulong topologyHash)>(default); - public Member? GetActivator(string kind, string requestSourceAddress) + internal Member? GetActivator(string kind, string requestSourceAddress) { //immutable, don't lock if (_memberStrategyByKind.TryGetValue(kind, out var memberStrategy)) @@ -265,7 +265,7 @@ private void SelfBlocked() _ = _cluster.ShutdownAsync(reason: "Blocked by MemberList"); } - public MetaMember? GetMetaMember(string memberId) + internal MetaMember? GetMetaMember(string memberId) { _metaMembers.TryGetValue(memberId, out var meta); return meta; @@ -344,9 +344,9 @@ public void BroadcastEvent(object message, bool includeSelf = true) public bool TryGetMember(string memberId, out Member? value) => _activeMembers.Lookup.TryGetValue(memberId, out value); - public bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value); + internal bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value); - public bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value); + internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value); /// /// Gets a list of active diff --git a/src/Proto.Cluster/Messages/Messages.cs b/src/Proto.Cluster/Messages/Messages.cs index 687d2c5a6c..8ea590398e 100644 --- a/src/Proto.Cluster/Messages/Messages.cs +++ b/src/Proto.Cluster/Messages/Messages.cs @@ -113,8 +113,6 @@ private IEnumerable UnpackKind(Types.Kind kind) ); } -public record Tick; - public partial class ClusterTopology { //this ignores joined and left members, only the actual members are relevant diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs index c7a8247166..15f28ec4dc 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs @@ -621,7 +621,7 @@ private async Task SpawnRemoteActor(ActivationRequest req, s Logger.LogTrace("[PartitionIdentity] Spawning Remote Actor {Activator} {Identity} {Kind}", activatorAddress, req.Identity, req.Kind); } - var timeout = _cluster.Config.TimeoutTimespan; + var timeout = _cluster.Config.ActorActivationTimeout; var activatorPid = PartitionManager.RemotePartitionPlacementActor(activatorAddress); var res = await _cluster.System.Root.RequestAsync(activatorPid, req, timeout); diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index d955d66327..ceaed00995 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -360,7 +360,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 5688133421..41e7f7e276 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -190,7 +190,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Cluster/PidCache.cs b/src/Proto.Cluster/PidCache.cs index 7a108a12c7..fe6f5c6a70 100644 --- a/src/Proto.Cluster/PidCache.cs +++ b/src/Proto.Cluster/PidCache.cs @@ -13,7 +13,7 @@ namespace Proto.Cluster; -public class PidCache +class PidCache { private readonly ICollection> _cacheCollection; private readonly ConcurrentDictionary _cacheDict; diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs index 628c94bdeb..fce8bc5912 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -144,7 +144,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl ); } - var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout)); if (canSpawn.IsCompleted) { diff --git a/src/Proto.Remote/BlockList.cs b/src/Proto.Remote/BlockList.cs index 3a2e613e72..4548f6465d 100644 --- a/src/Proto.Remote/BlockList.cs +++ b/src/Proto.Remote/BlockList.cs @@ -35,7 +35,7 @@ public BlockList(ActorSystem system) private ImmutableDictionary _blockedMembers = ImmutableDictionary.Empty; - public void Block(IEnumerable memberIds) + internal void Block(IEnumerable memberIds) { lock (_lock) { diff --git a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs index 60cc894420..edfba35048 100644 --- a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs +++ b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs @@ -54,14 +54,14 @@ public static ActorSystem WithClientRemote(this ActorSystem system, GrpcNetRemot return system; } - public static IServiceCollection AddRemote(this IServiceCollection services, Func configure) + internal static IServiceCollection AddRemote(this IServiceCollection services, Func configure) { services.AddSingleton(configure); AddAllServices(services); return services; } - public static IServiceCollection AddRemote( + internal static IServiceCollection AddRemote( this IServiceCollection services, GrpcNetRemoteConfig config ) @@ -71,7 +71,7 @@ GrpcNetRemoteConfig config return services; } - public static IServiceCollection AddClientRemote( + internal static IServiceCollection AddClientRemote( this IServiceCollection services, GrpcNetRemoteConfig config ) @@ -103,7 +103,7 @@ private static GrpcServiceEndpointConventionBuilder AddProtoRemoteEndpoint(IEndp return endpoints.MapGrpcService(); } - public static void UseProtoRemote(this IApplicationBuilder applicationBuilder) + internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder) { var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService(); hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get(); @@ -111,7 +111,7 @@ public static void UseProtoRemote(this IApplicationBuilder applicationBuilder) applicationBuilder.UseEndpoints(c => AddProtoRemoteEndpoint(c)); } - public static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure) + internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure) { var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService(); hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get(); diff --git a/src/Proto.Remote/IRemoteExtensions.cs b/src/Proto.Remote/IRemoteExtensions.cs index 611e2d646f..49c1232a16 100644 --- a/src/Proto.Remote/IRemoteExtensions.cs +++ b/src/Proto.Remote/IRemoteExtensions.cs @@ -17,6 +17,7 @@ public static class IRemoteExtensions /// /// Spawn a remote actor with auto-generated name /// + /// /// Remote node address /// Actor kind, must be known on the remote node /// Timeout for the confirmation to be received from the remote node @@ -27,6 +28,7 @@ public static Task SpawnAsync(this IRemote remote, string addr /// /// Spawn a remote actor with a name /// + /// /// Remote node address /// Remote actor name /// Actor kind, must be known on the remote node diff --git a/src/Proto.Remote/InternalsVisibleTo.cs b/src/Proto.Remote/InternalsVisibleTo.cs index cddcde5a78..1132b556fb 100644 --- a/src/Proto.Remote/InternalsVisibleTo.cs +++ b/src/Proto.Remote/InternalsVisibleTo.cs @@ -5,4 +5,5 @@ // ----------------------------------------------------------------------- using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("Proto.Cluster")] \ No newline at end of file +[assembly: InternalsVisibleTo("Proto.Cluster")] +[assembly: InternalsVisibleTo("Proto.Remote.Tests")] \ No newline at end of file diff --git a/src/Proto.Remote/RemoteConfigExtensions.cs b/src/Proto.Remote/RemoteConfigExtensions.cs index 3d12cf4067..5ddd2aec74 100644 --- a/src/Proto.Remote/RemoteConfigExtensions.cs +++ b/src/Proto.Remote/RemoteConfigExtensions.cs @@ -66,6 +66,7 @@ public static TRemoteConfig WithAdvertisedHost(this TRemoteConfig /// the external port in order for other systems to be able to connect to it. /// Advertised port can be different from the bound port, e.g. in container scenarios /// + /// /// /// public static TRemoteConfig WithAdvertisedPort(this TRemoteConfig remoteConfig, int? advertisedPort) @@ -197,7 +198,7 @@ public static TRemoteConfig WithSerializer(this TRemoteConfig rem /// /// /// - public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, System.Text.Json.JsonSerializerOptions options) + public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, JsonSerializerOptions options) where TRemoteConfig : RemoteConfigBase { remoteConfig.Serialization.JsonSerializerOptions = options; diff --git a/tests/Proto.Actor.Tests/ActorTests.cs b/tests/Proto.Actor.Tests/ActorTests.cs index edf4e65012..cc5521487a 100644 --- a/tests/Proto.Actor.Tests/ActorTests.cs +++ b/tests/Proto.Actor.Tests/ActorTests.cs @@ -138,19 +138,26 @@ public async Task ActorLifeCycleWhenExceptionIsThrown() var messages = new Queue(); var i = 0; + CapturedContext? capturedContext = null; + async Task HandleMessage(IContext ctx) { if (ctx.Message is string && i++ == 0) { - ctx.Stash(); + capturedContext = ctx.Capture(); throw new Exception("Test"); } - + messages.Enqueue(ctx.Message!); + + if (ctx.Message is Started && capturedContext != null) + { + await capturedContext.Receive(); + } + await Task.Yield(); } - ; var pid = context.Spawn( Props.FromFunc(ctx => ctx.Message switch { diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs index 0db2916582..7d9a82b00b 100644 --- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs +++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs @@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs index 3025d17033..9d2f5562fa 100644 --- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs +++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs @@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context) await _inner.OnStarted(); break; } -#pragma warning disable 618 - case ClusterInit _: -#pragma warning restore 618 - //Ignored - break; case Stopping _: { await _inner!.OnStopping(); diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj index 46ff1cb19f..3b6079f430 100644 --- a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj +++ b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj @@ -12,8 +12,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs index 65b66eb506..74ff3a660f 100644 --- a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs +++ b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs @@ -12,7 +12,7 @@ public class InMemorySubscribersStore : IKeyValueStore { private readonly ConcurrentDictionary _store = new(); - public Task GetAsync(string id, CancellationToken ct) + public Task GetAsync(string id, CancellationToken ct) { _store.TryGetValue(id, out var subscribers); return subscribers == null ? Task.FromResult(new Subscribers()) : Task.FromResult(subscribers); diff --git a/tests/Proto.Cluster.Tests/Extensions.cs b/tests/Proto.Cluster.Tests/Extensions.cs index bb6a87a6ea..a8b7ff999e 100644 --- a/tests/Proto.Cluster.Tests/Extensions.cs +++ b/tests/Proto.Cluster.Tests/Extensions.cs @@ -29,7 +29,7 @@ public static async Task DumpClusterState(this IEnumerable memb if (c.System.Shutdown.IsCancellationRequested) { - sb.AppendLine("\tStopped, reason: " + c.System.Stopper.StoppedReason); + sb.AppendLine("\tStopped, reason: " + c.System.StoppedReason); continue; }