diff --git a/ProtoActor.sln b/ProtoActor.sln
index 1b596dd6a4..8cae27ed26 100644
--- a/ProtoActor.sln
+++ b/ProtoActor.sln
@@ -203,7 +203,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "examples\remotech
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "examples\remotechannels\Client\Client.csproj", "{5EACFB26-C757-452C-A536-E633BC352A69}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messages", "examples\remotechannels\Messages\Messages.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "examples\remotechannels\Common\Common.csproj", "{D33722CB-96B8-4B9C-AB59-B18B7FF28539}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AmazonECS", "src\Proto.Cluster.AmazonECS\Proto.Cluster.AmazonECS.csproj", "{F5EB4AD3-5BCB-48E9-B974-A93EB19CE43C}"
EndProject
diff --git a/examples/remotechannels/Client/Client.csproj b/examples/remotechannels/Client/Client.csproj
index 443e995ac7..82a0006e30 100644
--- a/examples/remotechannels/Client/Client.csproj
+++ b/examples/remotechannels/Client/Client.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/examples/remotechannels/Client/Program.cs b/examples/remotechannels/Client/Program.cs
index c07f4e09a8..60e8bef71d 100644
--- a/examples/remotechannels/Client/Program.cs
+++ b/examples/remotechannels/Client/Program.cs
@@ -1,7 +1,6 @@
using System;
-using Messages;
+using Common;
using Proto;
-using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static System.Threading.Channels.Channel;
diff --git a/src/Proto.Actor/Channels/ChannelPublisherActor.cs b/examples/remotechannels/Common/ChannelPublisherActor.cs
similarity index 95%
rename from src/Proto.Actor/Channels/ChannelPublisherActor.cs
rename to examples/remotechannels/Common/ChannelPublisherActor.cs
index b6f905c9ad..48c3a46ec9 100644
--- a/src/Proto.Actor/Channels/ChannelPublisherActor.cs
+++ b/examples/remotechannels/Common/ChannelPublisherActor.cs
@@ -6,11 +6,10 @@
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
-using JetBrains.Annotations;
+using Proto;
-namespace Proto.Channels;
+namespace Common;
-[PublicAPI]
public static class ChannelPublisher
{
///
@@ -40,7 +39,6 @@ public static PID StartNew(IRootContext context, Channel channel, string n
}
}
-[PublicAPI]
public class ChannelPublisherActor : IActor
{
private readonly HashSet _subscribers = new();
@@ -56,13 +54,17 @@ public Task ReceiveAsync(IContext context)
}
break;
+
case PID subscriber:
_subscribers.Add(subscriber);
context.Watch(subscriber);
+ context.Respond(new Subscribed());
break;
+
case Terminated terminated:
_subscribers.Remove(terminated.Who);
break;
+
case T typed:
foreach (var sub in _subscribers)
{
diff --git a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs b/examples/remotechannels/Common/ChannelSubscriberActor.cs
similarity index 78%
rename from src/Proto.Actor/Channels/ChannelSubscriberActor.cs
rename to examples/remotechannels/Common/ChannelSubscriberActor.cs
index ebb75a517f..164fd47602 100644
--- a/src/Proto.Actor/Channels/ChannelSubscriberActor.cs
+++ b/examples/remotechannels/Common/ChannelSubscriberActor.cs
@@ -5,11 +5,10 @@
// -----------------------------------------------------------------------
using System.Threading.Channels;
using System.Threading.Tasks;
-using JetBrains.Annotations;
+using Proto;
-namespace Proto.Channels;
+namespace Common;
-[PublicAPI]
public static class ChannelSubscriber
{
///
@@ -21,24 +20,29 @@ public static class ChannelSubscriber
/// The channel to write messages to
/// The Type of channel elements
///
- public static PID StartNew(IRootContext context, PID publisher, Channel channel)
+ public static async Task StartNew(IRootContext context, PID publisher, Channel channel)
{
- var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel));
+ var tcs = new TaskCompletionSource();
+ var props = Props.FromProducer(() => new ChannelSubscriberActor(publisher, channel, tcs));
var pid = context.Spawn(props);
+
+ await tcs.Task;
return pid;
}
}
-[PublicAPI]
public class ChannelSubscriberActor : IActor
{
private readonly Channel _channel;
+ private readonly TaskCompletionSource _subscribed;
private readonly PID _publisher;
- public ChannelSubscriberActor(PID publisher, Channel channel)
+ public ChannelSubscriberActor(PID publisher, Channel channel, TaskCompletionSource subscribed)
+
{
_publisher = publisher;
_channel = channel;
+ _subscribed = subscribed;
}
public async Task ReceiveAsync(IContext context)
@@ -49,15 +53,24 @@ public async Task ReceiveAsync(IContext context)
context.Watch(_publisher);
context.Request(_publisher, context.Self);
break;
+
+ case Subscribed:
+ _subscribed.SetResult();
+ break;
+
case Stopping:
_channel.Writer.Complete();
break;
+
case Terminated t when t.Who.Equals(_publisher):
_channel.Writer.Complete();
break;
+
case T typed:
await _channel.Writer.WriteAsync(typed);
break;
}
}
-}
\ No newline at end of file
+}
+
+public record Subscribed;
\ No newline at end of file
diff --git a/examples/remotechannels/Messages/Messages.csproj b/examples/remotechannels/Common/Common.csproj
similarity index 60%
rename from examples/remotechannels/Messages/Messages.csproj
rename to examples/remotechannels/Common/Common.csproj
index b6022727d9..bbb26c647a 100644
--- a/examples/remotechannels/Messages/Messages.csproj
+++ b/examples/remotechannels/Common/Common.csproj
@@ -5,4 +5,8 @@
10
+
+
+
+
diff --git a/examples/remotechannels/Common/Messages.cs b/examples/remotechannels/Common/Messages.cs
new file mode 100644
index 0000000000..b3ca6e408d
--- /dev/null
+++ b/examples/remotechannels/Common/Messages.cs
@@ -0,0 +1,3 @@
+namespace Common;
+
+public record MyMessage(int Value){}
diff --git a/examples/remotechannels/Messages/Messages.cs b/examples/remotechannels/Messages/Messages.cs
deleted file mode 100644
index 09ec42e0dd..0000000000
--- a/examples/remotechannels/Messages/Messages.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-namespace Messages;
-
-public record MyMessage(int Value){}
-
-public record Subscribe();
-public record Subscribed();
\ No newline at end of file
diff --git a/examples/remotechannels/Server/Program.cs b/examples/remotechannels/Server/Program.cs
index d6abd3509e..5b47f992ba 100644
--- a/examples/remotechannels/Server/Program.cs
+++ b/examples/remotechannels/Server/Program.cs
@@ -1,8 +1,7 @@
using System;
using System.Threading.Tasks;
-using Messages;
+using Common;
using Proto;
-using Proto.Channels;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static Proto.Remote.GrpcNet.GrpcNetRemoteConfig;
diff --git a/examples/remotechannels/Server/Server.csproj b/examples/remotechannels/Server/Server.csproj
index 443e995ac7..82a0006e30 100644
--- a/examples/remotechannels/Server/Server.csproj
+++ b/examples/remotechannels/Server/Server.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs
index d52e3f3112..9bf2ff8251 100644
--- a/src/Proto.Actor/ActorSystem.cs
+++ b/src/Proto.Actor/ActorSystem.cs
@@ -78,7 +78,12 @@ public ActorSystem(ActorSystemConfig config)
/// Allows to access the stop cancellation token and stop reason.
/// Use to stop the actor system.
///
- public Stopper Stopper { get; }
+ internal Stopper Stopper { get; }
+
+ ///
+ /// For stopped , returns the reason for the shutdown.
+ ///
+ public string StoppedReason => Stopper.StoppedReason;
///
/// Manages all the guardians in the actor system.
@@ -117,13 +122,13 @@ public ActorSystem(ActorSystemConfig config)
private void RunThreadPoolStats()
{
- var metricTags = new KeyValuePair[]{ new("id", Id), new("address", Address)};
+ var metricTags = new KeyValuePair[] {new("id", Id), new("address", Address)};
var logger = Log.CreateLogger(nameof(ThreadPoolStats));
_ = ThreadPoolStats.Run(TimeSpan.FromSeconds(5),
t => {
//collect the latency metrics
- if(Metrics.Enabled)
+ if (Metrics.Enabled)
ActorMetrics.ThreadPoolLatency.Record(t.TotalSeconds, metricTags);
//does it take longer than 1 sec for a task to start executing?
@@ -144,7 +149,7 @@ private void RunThreadPoolStats()
///
/// Shutdown reason
///
- public Task ShutdownAsync(string reason="")
+ public Task ShutdownAsync(string reason = "")
{
try
{
@@ -195,15 +200,15 @@ public RootContext NewRoot(MessageHeader? headers = null, params Func
///
///
- public Props ConfigureProps(Props props) => Config.ConfigureProps(props);
-
+ internal Props ConfigureProps(Props props) => Config.ConfigureProps(props);
+
///
/// Applies props configuration delegate for system actors.
///
///
///
///
- public Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);
+ internal Props ConfigureSystemProps(string name, Props props) => Config.ConfigureSystemProps(name, props);
///
/// Stops the actor system with reason = "Disposed"
diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs
index 0231965697..f1c74dbe53 100644
--- a/src/Proto.Actor/Context/ActorContext.cs
+++ b/src/Proto.Actor/Context/ActorContext.cs
@@ -71,11 +71,6 @@ public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMai
public TimeSpan ReceiveTimeout { get; private set; }
- public void Stash()
- {
- if (_messageOrEnvelope is not null) EnsureExtras().Stash.Push(_messageOrEnvelope);
- }
-
public void Respond(object message)
{
if (Sender is not null)
@@ -714,19 +709,6 @@ private async ValueTask RestartAsync()
Self.SendSystemMessage(System, ResumeMailbox.Instance);
await InvokeUserMessageAsync(Started.Instance);
-
- if (_extras?.Stash is not null)
- {
- var currentStash = new Stack
/// Type of the key
+/// Message to extract from
/// The key should be returned in this variable
/// Returns true if the key was successfully extracted, false otherwise
public delegate bool TryGetDeduplicationKey(MessageEnvelope envelope, out T? key);
diff --git a/src/Proto.Actor/Messages/MessageBatch.cs b/src/Proto.Actor/Messages/MessageBatch.cs
deleted file mode 100644
index 9ed1b86570..0000000000
--- a/src/Proto.Actor/Messages/MessageBatch.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-// -----------------------------------------------------------------------
-//
-// Copyright (C) 2015-2022 Asynkron AB All rights reserved
-//
-// -----------------------------------------------------------------------
-using System.Collections.Generic;
-
-namespace Proto;
-
-public record MessageBatch(IReadOnlyCollection Messages);
\ No newline at end of file
diff --git a/src/Proto.Actor/Messages/MessageEnvelope.cs b/src/Proto.Actor/Messages/MessageEnvelope.cs
index a4afa85269..a79ef524ff 100644
--- a/src/Proto.Actor/Messages/MessageEnvelope.cs
+++ b/src/Proto.Actor/Messages/MessageEnvelope.cs
@@ -114,7 +114,7 @@ public MessageEnvelope WithHeader(string key, string value)
///
/// Extends the message envelope with additional headers.
///
- ///
+ ///
/// New envelope
public MessageEnvelope WithHeaders(IEnumerable> items)
{
diff --git a/src/Proto.Actor/PID.cs b/src/Proto.Actor/PID.cs
index b259617598..af94bc152d 100644
--- a/src/Proto.Actor/PID.cs
+++ b/src/Proto.Actor/PID.cs
@@ -61,7 +61,7 @@ internal void SendUserMessage(ActorSystem system, object message)
reff.SendUserMessage(this, message);
}
- public void SendSystemMessage(ActorSystem system, SystemMessage sys)
+ internal void SendSystemMessage(ActorSystem system, SystemMessage sys)
{
var reff = Ref(system) ?? system.ProcessRegistry.Get(this);
reff.SendSystemMessage(this, sys);
diff --git a/src/Proto.Actor/Props/Props.cs b/src/Proto.Actor/Props/Props.cs
index 400d8b5d12..c2c830852b 100644
--- a/src/Proto.Actor/Props/Props.cs
+++ b/src/Proto.Actor/Props/Props.cs
@@ -60,8 +60,8 @@ public sealed record Props
public ImmutableList> SenderMiddleware { get; init; } =
ImmutableList>.Empty;
- public Receiver? ReceiverMiddlewareChain { get; init; }
- public Sender? SenderMiddlewareChain { get; init; }
+ internal Receiver? ReceiverMiddlewareChain { get; init; }
+ internal Sender? SenderMiddlewareChain { get; init; }
///
/// List of decorators for the actor context
@@ -69,7 +69,7 @@ public sealed record Props
public ImmutableList> ContextDecorator { get; init; } =
ImmutableList>.Empty;
- public Func? ContextDecoratorChain { get; init; }
+ internal Func? ContextDecoratorChain { get; init; }
///
/// Delegate that creates the actor and wires it with context and mailbox.
diff --git a/src/Proto.Actor/Stashing/CapturedContext.cs b/src/Proto.Actor/Stashing/CapturedContext.cs
index 35907997dd..4f7164d3b0 100644
--- a/src/Proto.Actor/Stashing/CapturedContext.cs
+++ b/src/Proto.Actor/Stashing/CapturedContext.cs
@@ -12,7 +12,12 @@ namespace Proto;
///
/// Message to store
/// Context to store
-public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context){
+public record CapturedContext(MessageEnvelope MessageEnvelope, IContext Context)
+{
+ ///
+ /// Reprocesses the captured message on the captured context.
+ /// It captures current context before processing and restores it after processing.
+ ///
public async Task Receive()
{
var current = Context.Capture();
diff --git a/src/Proto.Actor/Stopper.cs b/src/Proto.Actor/Stopper.cs
index 5e3035ed31..f49bd8b3f8 100644
--- a/src/Proto.Actor/Stopper.cs
+++ b/src/Proto.Actor/Stopper.cs
@@ -7,7 +7,7 @@
namespace Proto;
-public class Stopper
+class Stopper
{
private readonly CancellationTokenSource _cts = new();
diff --git a/src/Proto.Actor/Utils/TypedDictionary.cs b/src/Proto.Actor/Utils/TypedDictionary.cs
index 5ce277df83..64229fb6d1 100644
--- a/src/Proto.Actor/Utils/TypedDictionary.cs
+++ b/src/Proto.Actor/Utils/TypedDictionary.cs
@@ -9,7 +9,7 @@
namespace Proto.Utils;
// ReSharper disable once UnusedTypeParameter
-public class TypeDictionary
+class TypeDictionary
{
private readonly double _growthFactor;
diff --git a/src/Proto.Cluster.CodeGen/Template.cs b/src/Proto.Cluster.CodeGen/Template.cs
index 8ea167597b..31a50396ed 100644
--- a/src/Proto.Cluster.CodeGen/Template.cs
+++ b/src/Proto.Cluster.CodeGen/Template.cs
@@ -150,11 +150,6 @@ public async Task ReceiveAsync(IContext context)
await _inner.OnStarted();
break;
}
-#pragma warning disable 618
- case ClusterInit _:
-#pragma warning restore 618
- //Ignored
- break;
case Stopping _:
{
await _inner!.OnStopping();
diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs
index 6b66579247..eebcfd48b4 100644
--- a/src/Proto.Cluster/Cluster.cs
+++ b/src/Proto.Cluster/Cluster.cs
@@ -63,9 +63,9 @@ public Cluster(ActorSystem system, ClusterConfig config)
SubscribeToTopologyEvents();
}
- public static ILogger Logger { get; } = Log.CreateLogger();
+ internal static ILogger Logger { get; } = Log.CreateLogger();
- public IClusterContext ClusterContext { get; private set; } = null!;
+ internal IClusterContext ClusterContext { get; private set; } = null!;
public Gossiper Gossip { get; }
@@ -79,6 +79,9 @@ public Cluster(ActorSystem system, ClusterConfig config)
///
public ActorSystem System { get; }
+ ///
+ /// IRemote implementation the cluster is using
+ ///
public IRemote Remote { get; private set; } = null!;
///
@@ -90,7 +93,7 @@ public Cluster(ActorSystem system, ClusterConfig config)
internal IClusterProvider Provider { get; set; } = null!;
- public PidCache PidCache { get; }
+ internal PidCache PidCache { get; }
private void SubscribeToTopologyEvents() =>
System.EventStream.Subscribe(e => {
@@ -101,6 +104,10 @@ private void SubscribeToTopologyEvents() =>
}
);
+ ///
+ /// Gets cluster kinds registered on this cluster member
+ ///
+ ///
public string[] GetClusterKinds() => _clusterKinds.Keys.ToArray();
///
@@ -270,7 +277,7 @@ public ActivatedClusterKind GetClusterKind(string kind)
return clusterKind;
}
- public ActivatedClusterKind? TryGetClusterKind(string kind)
+ internal ActivatedClusterKind? TryGetClusterKind(string kind)
{
_clusterKinds.TryGetValue(kind, out var clusterKind);
diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs
index 09f3bb933c..4af2f0c46c 100644
--- a/src/Proto.Cluster/ClusterConfig.cs
+++ b/src/Proto.Cluster/ClusterConfig.cs
@@ -29,9 +29,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
{
ClusterName = clusterName ?? throw new ArgumentNullException(nameof(clusterName));
ClusterProvider = clusterProvider ?? throw new ArgumentNullException(nameof(clusterProvider));
- TimeoutTimespan = TimeSpan.FromSeconds(5);
ActorRequestTimeout = TimeSpan.FromSeconds(5);
- ActorSpawnTimeout = TimeSpan.FromSeconds(5);
+ ActorSpawnVerificationTimeout = TimeSpan.FromSeconds(5);
ActorActivationTimeout = TimeSpan.FromSeconds(5);
MaxNumberOfEventsInRequestLogThrottlePeriod = 3;
RequestLogThrottlePeriod = TimeSpan.FromSeconds(2);
@@ -88,7 +87,8 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
///
/// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s.
///
- public TimeSpan TimeoutTimespan { get; init; }
+ [Obsolete("Use ActorActivationTimeout instead")]
+ public TimeSpan TimeoutTimespan => ActorActivationTimeout;
///
/// Timeout for single retry of actor request. Default is 5s.
@@ -99,10 +99,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
///
/// Timeout for running the check. Default is 5s.
///
- public TimeSpan ActorSpawnTimeout { get; init; }
+ public TimeSpan ActorSpawnVerificationTimeout { get; init; }
///
- /// Timeout for DB Identity Lookup operations. Default is 5s.
+ /// Timeout for activating an actor. Exact usage varies depending on used. Default is 5s.
///
public TimeSpan ActorActivationTimeout { get; init; }
@@ -165,10 +165,10 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
///
/// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s.
///
- ///
+ ///
///
- public ClusterConfig WithTimeout(TimeSpan timeSpan) =>
- this with {TimeoutTimespan = timeSpan};
+ [Obsolete("Use ActorActivationTimeout instead")]
+ public ClusterConfig WithTimeout(TimeSpan timeout) => WithActorActivationTimeout(timeout);
///
/// Timeout for single retry of actor request. Default is 5s.
@@ -184,8 +184,8 @@ public ClusterConfig WithActorRequestTimeout(TimeSpan timeSpan) =>
///
///
///
- public ClusterConfig WithActorSpawnTimeout(TimeSpan timeSpan) =>
- this with {ActorSpawnTimeout = timeSpan};
+ public ClusterConfig WithActorSpawnVerificationTimeout(TimeSpan timeSpan) =>
+ this with {ActorSpawnVerificationTimeout = timeSpan};
///
/// Timeout for DB Identity Lookup operations. Default is 5s.
diff --git a/src/Proto.Cluster/ClusterExtension.cs b/src/Proto.Cluster/ClusterExtension.cs
index 57ed2ba020..1f85d7efb3 100644
--- a/src/Proto.Cluster/ClusterExtension.cs
+++ b/src/Proto.Cluster/ClusterExtension.cs
@@ -137,14 +137,6 @@ MessageEnvelope startEnvelope
{
clusterKind.Inc();
await baseReceive(ctx, startEnvelope);
-
- var identity = ctx.Get();
- var cluster = ctx.System.Cluster();
-#pragma warning disable 618
- var grainInit = new ClusterInit(identity!, cluster);
-#pragma warning restore 618
- var grainInitEnvelope = new MessageEnvelope(grainInit, null);
- await baseReceive(ctx, grainInitEnvelope);
}
async Task HandleRestarting(
diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs
index a7d9adec46..966e1c4f27 100644
--- a/src/Proto.Cluster/Gossip/Gossiper.cs
+++ b/src/Proto.Cluster/Gossip/Gossiper.cs
@@ -58,8 +58,18 @@ public Gossiper(Cluster cluster)
_context = _cluster.System.Root;
}
+ ///
+ /// Gets the current full gossip state as seen by current member
+ ///
+ ///
public Task GetStateSnapshot() => _context.RequestAsync(_pid, new GetGossipStateSnapshot());
+ ///
+ /// Gets gossip state entry by key, for each member represented in the gossip state, as seen by current member
+ ///
+ ///
+ /// Dictionary where member id is the key and gossip state value is the value
+ ///
public async Task> GetState(string key) where T : IMessage, new()
{
_context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid);
@@ -87,6 +97,11 @@ public Gossiper(Cluster cluster)
return ImmutableDictionary.Empty;
}
+ ///
+ /// Gets the gossip state entry by key, for each member represented in the gossip state, as seen by current member
+ ///
+ /// Dictionary where member id is the key and gossip state value is the value, wrapped in
+ ///
public async Task> GetStateEntry(string key)
{
_context.System.Logger()?.LogDebug("Gossiper getting state from {Pid}", _pid);
@@ -105,8 +120,11 @@ public async Task> GetStateEntry(str
return ImmutableDictionary.Empty;
}
- // Send message to update member state
- // Will not wait for completed state update
+ ///
+ /// Sets a gossip state key to provided value. This will not wait for the state to be actually updated in current member's gossip state.
+ ///
+ ///
+ ///
public void SetState(string key, IMessage value)
{
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid);
@@ -117,6 +135,11 @@ public void SetState(string key, IMessage value)
_context.Send(_pid, new SetGossipStateKey(key, value));
}
+ ///
+ /// Sets a gossip state key to provided value. Waits for the state to be updated in current member's gossip state.
+ ///
+ ///
+ ///
public async Task SetStateAsync(string key, IMessage value)
{
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Gossiper setting state to {Pid}", _pid);
diff --git a/src/Proto.Cluster/Grain/ClusterInit.cs b/src/Proto.Cluster/Grain/ClusterInit.cs
index 9c17549115..e8c07ffb7f 100644
--- a/src/Proto.Cluster/Grain/ClusterInit.cs
+++ b/src/Proto.Cluster/Grain/ClusterInit.cs
@@ -7,6 +7,7 @@
namespace Proto.Cluster;
+// TODO: remove once new version of code gen nuget is built and used in the examples
[Obsolete("Replace with 'Started' lifecycle message. '.ClusterIdentity()' and '.Cluster()' is available on IContext")]
public class ClusterInit
{
diff --git a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
index 3654912b69..21e646b1f5 100644
--- a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
+++ b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
@@ -139,7 +139,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
);
}
- var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout));
+ var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout));
if (canSpawn.IsCompleted)
{
diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs
index 699b64b330..eadfeede3f 100644
--- a/src/Proto.Cluster/Member/MemberList.cs
+++ b/src/Proto.Cluster/Member/MemberList.cs
@@ -112,10 +112,10 @@ public MemberList(Cluster cluster)
internal void InitializeTopologyConsensus() => _topologyConsensus =
_cluster.Gossip.RegisterConsensusCheck(GossipKeys.Topology, topology => topology.TopologyHash);
- public Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct)
+ internal Task<(bool consensus, ulong topologyHash)> TopologyConsensus(CancellationToken ct)
=> _topologyConsensus?.TryGetConsensus(ct) ?? Task.FromResult<(bool consensus, ulong topologyHash)>(default);
- public Member? GetActivator(string kind, string requestSourceAddress)
+ internal Member? GetActivator(string kind, string requestSourceAddress)
{
//immutable, don't lock
if (_memberStrategyByKind.TryGetValue(kind, out var memberStrategy))
@@ -265,7 +265,7 @@ private void SelfBlocked()
_ = _cluster.ShutdownAsync(reason: "Blocked by MemberList");
}
- public MetaMember? GetMetaMember(string memberId)
+ internal MetaMember? GetMetaMember(string memberId)
{
_metaMembers.TryGetValue(memberId, out var meta);
return meta;
@@ -344,9 +344,9 @@ public void BroadcastEvent(object message, bool includeSelf = true)
public bool TryGetMember(string memberId, out Member? value) => _activeMembers.Lookup.TryGetValue(memberId, out value);
- public bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value);
+ internal bool TryGetMemberIndexByAddress(string address, out int value) => _indexByAddress.TryGetValue(address, out value);
- public bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value);
+ internal bool TryGetMemberByIndex(int memberIndex, out Member? value) => _membersByIndex.TryGetValue(memberIndex, out value);
///
/// Gets a list of active
diff --git a/src/Proto.Cluster/Messages/Messages.cs b/src/Proto.Cluster/Messages/Messages.cs
index 687d2c5a6c..8ea590398e 100644
--- a/src/Proto.Cluster/Messages/Messages.cs
+++ b/src/Proto.Cluster/Messages/Messages.cs
@@ -113,8 +113,6 @@ private IEnumerable UnpackKind(Types.Kind kind)
);
}
-public record Tick;
-
public partial class ClusterTopology
{
//this ignores joined and left members, only the actual members are relevant
diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs
index c7a8247166..15f28ec4dc 100644
--- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs
+++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs
@@ -621,7 +621,7 @@ private async Task SpawnRemoteActor(ActivationRequest req, s
Logger.LogTrace("[PartitionIdentity] Spawning Remote Actor {Activator} {Identity} {Kind}",
activatorAddress, req.Identity, req.Kind);
}
- var timeout = _cluster.Config.TimeoutTimespan;
+ var timeout = _cluster.Config.ActorActivationTimeout;
var activatorPid = PartitionManager.RemotePartitionPlacementActor(activatorAddress);
var res = await _cluster.System.Root.RequestAsync(activatorPid, req, timeout);
diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs
index d955d66327..ceaed00995 100644
--- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs
+++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs
@@ -360,7 +360,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
);
}
- var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout));
+ var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout));
if (canSpawn.IsCompleted)
{
diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs
index 5688133421..41e7f7e276 100644
--- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs
+++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs
@@ -190,7 +190,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
);
}
- var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout));
+ var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout));
if (canSpawn.IsCompleted)
{
diff --git a/src/Proto.Cluster/PidCache.cs b/src/Proto.Cluster/PidCache.cs
index 7a108a12c7..fe6f5c6a70 100644
--- a/src/Proto.Cluster/PidCache.cs
+++ b/src/Proto.Cluster/PidCache.cs
@@ -13,7 +13,7 @@
namespace Proto.Cluster;
-public class PidCache
+class PidCache
{
private readonly ICollection> _cacheCollection;
private readonly ConcurrentDictionary _cacheDict;
diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs
index 628c94bdeb..fce8bc5912 100644
--- a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs
+++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs
@@ -144,7 +144,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
);
}
- var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout));
+ var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnVerificationTimeout));
if (canSpawn.IsCompleted)
{
diff --git a/src/Proto.Remote/BlockList.cs b/src/Proto.Remote/BlockList.cs
index 3a2e613e72..4548f6465d 100644
--- a/src/Proto.Remote/BlockList.cs
+++ b/src/Proto.Remote/BlockList.cs
@@ -35,7 +35,7 @@ public BlockList(ActorSystem system)
private ImmutableDictionary _blockedMembers = ImmutableDictionary.Empty;
- public void Block(IEnumerable memberIds)
+ internal void Block(IEnumerable memberIds)
{
lock (_lock)
{
diff --git a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs
index 60cc894420..edfba35048 100644
--- a/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs
+++ b/src/Proto.Remote/GrpcNet/GrpcNetExtensions.cs
@@ -54,14 +54,14 @@ public static ActorSystem WithClientRemote(this ActorSystem system, GrpcNetRemot
return system;
}
- public static IServiceCollection AddRemote(this IServiceCollection services, Func configure)
+ internal static IServiceCollection AddRemote(this IServiceCollection services, Func configure)
{
services.AddSingleton(configure);
AddAllServices(services);
return services;
}
- public static IServiceCollection AddRemote(
+ internal static IServiceCollection AddRemote(
this IServiceCollection services,
GrpcNetRemoteConfig config
)
@@ -71,7 +71,7 @@ GrpcNetRemoteConfig config
return services;
}
- public static IServiceCollection AddClientRemote(
+ internal static IServiceCollection AddClientRemote(
this IServiceCollection services,
GrpcNetRemoteConfig config
)
@@ -103,7 +103,7 @@ private static GrpcServiceEndpointConventionBuilder AddProtoRemoteEndpoint(IEndp
return endpoints.MapGrpcService();
}
- public static void UseProtoRemote(this IApplicationBuilder applicationBuilder)
+ internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder)
{
var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService();
hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get();
@@ -111,7 +111,7 @@ public static void UseProtoRemote(this IApplicationBuilder applicationBuilder)
applicationBuilder.UseEndpoints(c => AddProtoRemoteEndpoint(c));
}
- public static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure)
+ internal static void UseProtoRemote(this IApplicationBuilder applicationBuilder, Action configure)
{
var hostedRemote = applicationBuilder.ApplicationServices.GetRequiredService();
hostedRemote.ServerAddressesFeature = applicationBuilder.ServerFeatures.Get();
diff --git a/src/Proto.Remote/IRemoteExtensions.cs b/src/Proto.Remote/IRemoteExtensions.cs
index 611e2d646f..49c1232a16 100644
--- a/src/Proto.Remote/IRemoteExtensions.cs
+++ b/src/Proto.Remote/IRemoteExtensions.cs
@@ -17,6 +17,7 @@ public static class IRemoteExtensions
///
/// Spawn a remote actor with auto-generated name
///
+ ///
/// Remote node address
/// Actor kind, must be known on the remote node
/// Timeout for the confirmation to be received from the remote node
@@ -27,6 +28,7 @@ public static Task SpawnAsync(this IRemote remote, string addr
///
/// Spawn a remote actor with a name
///
+ ///
/// Remote node address
/// Remote actor name
/// Actor kind, must be known on the remote node
diff --git a/src/Proto.Remote/InternalsVisibleTo.cs b/src/Proto.Remote/InternalsVisibleTo.cs
index cddcde5a78..1132b556fb 100644
--- a/src/Proto.Remote/InternalsVisibleTo.cs
+++ b/src/Proto.Remote/InternalsVisibleTo.cs
@@ -5,4 +5,5 @@
// -----------------------------------------------------------------------
using System.Runtime.CompilerServices;
-[assembly: InternalsVisibleTo("Proto.Cluster")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("Proto.Cluster")]
+[assembly: InternalsVisibleTo("Proto.Remote.Tests")]
\ No newline at end of file
diff --git a/src/Proto.Remote/RemoteConfigExtensions.cs b/src/Proto.Remote/RemoteConfigExtensions.cs
index 3d12cf4067..5ddd2aec74 100644
--- a/src/Proto.Remote/RemoteConfigExtensions.cs
+++ b/src/Proto.Remote/RemoteConfigExtensions.cs
@@ -66,6 +66,7 @@ public static TRemoteConfig WithAdvertisedHost(this TRemoteConfig
/// the external port in order for other systems to be able to connect to it.
/// Advertised port can be different from the bound port, e.g. in container scenarios
///
+ ///
///
///
public static TRemoteConfig WithAdvertisedPort(this TRemoteConfig remoteConfig, int? advertisedPort)
@@ -197,7 +198,7 @@ public static TRemoteConfig WithSerializer(this TRemoteConfig rem
///
///
///
- public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, System.Text.Json.JsonSerializerOptions options)
+ public static TRemoteConfig WithJsonSerializerOptions(this TRemoteConfig remoteConfig, JsonSerializerOptions options)
where TRemoteConfig : RemoteConfigBase
{
remoteConfig.Serialization.JsonSerializerOptions = options;
diff --git a/tests/Proto.Actor.Tests/ActorTests.cs b/tests/Proto.Actor.Tests/ActorTests.cs
index edf4e65012..cc5521487a 100644
--- a/tests/Proto.Actor.Tests/ActorTests.cs
+++ b/tests/Proto.Actor.Tests/ActorTests.cs
@@ -138,19 +138,26 @@ public async Task ActorLifeCycleWhenExceptionIsThrown()
var messages = new Queue();
var i = 0;
+ CapturedContext? capturedContext = null;
+
async Task HandleMessage(IContext ctx)
{
if (ctx.Message is string && i++ == 0)
{
- ctx.Stash();
+ capturedContext = ctx.Capture();
throw new Exception("Test");
}
-
+
messages.Enqueue(ctx.Message!);
+
+ if (ctx.Message is Started && capturedContext != null)
+ {
+ await capturedContext.Receive();
+ }
+
await Task.Yield();
}
- ;
var pid = context.Spawn(
Props.FromFunc(ctx => ctx.Message switch
{
diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs
index 0db2916582..7d9a82b00b 100644
--- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs
+++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs
@@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context)
await _inner.OnStarted();
break;
}
-#pragma warning disable 618
- case ClusterInit _:
-#pragma warning restore 618
- //Ignored
- break;
case Stopping _:
{
await _inner!.OnStopping();
diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs
index 3025d17033..9d2f5562fa 100644
--- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs
+++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutputPackageless.cs
@@ -288,11 +288,6 @@ public async Task ReceiveAsync(IContext context)
await _inner.OnStarted();
break;
}
-#pragma warning disable 618
- case ClusterInit _:
-#pragma warning restore 618
- //Ignored
- break;
case Stopping _:
{
await _inner!.OnStopping();
diff --git a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj
index 46ff1cb19f..3b6079f430 100644
--- a/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj
+++ b/tests/Proto.Cluster.PartitionIdentity.Tests/Proto.Cluster.PartitionIdentity.Tests.csproj
@@ -12,8 +12,6 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs
index 65b66eb506..74ff3a660f 100644
--- a/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs
+++ b/tests/Proto.Cluster.PubSub.Tests/InMemorySubscribersStore.cs
@@ -12,7 +12,7 @@ public class InMemorySubscribersStore : IKeyValueStore
{
private readonly ConcurrentDictionary _store = new();
- public Task GetAsync(string id, CancellationToken ct)
+ public Task GetAsync(string id, CancellationToken ct)
{
_store.TryGetValue(id, out var subscribers);
return subscribers == null ? Task.FromResult(new Subscribers()) : Task.FromResult(subscribers);
diff --git a/tests/Proto.Cluster.Tests/Extensions.cs b/tests/Proto.Cluster.Tests/Extensions.cs
index bb6a87a6ea..a8b7ff999e 100644
--- a/tests/Proto.Cluster.Tests/Extensions.cs
+++ b/tests/Proto.Cluster.Tests/Extensions.cs
@@ -29,7 +29,7 @@ public static async Task DumpClusterState(this IEnumerable memb
if (c.System.Shutdown.IsCancellationRequested)
{
- sb.AppendLine("\tStopped, reason: " + c.System.Stopper.StoppedReason);
+ sb.AppendLine("\tStopped, reason: " + c.System.StoppedReason);
continue;
}