diff --git a/docs/articles/clustering/cluster-sharding.md b/docs/articles/clustering/cluster-sharding.md index 4217a54da34..a25c73aed6b 100644 --- a/docs/articles/clustering/cluster-sharding.md +++ b/docs/articles/clustering/cluster-sharding.md @@ -55,6 +55,8 @@ In this example, we first specify way to resolve our message recipients in conte Second part of an example is registering custom actor type as sharded entity using `ClusterSharding.Start` or `ClusterSharding.StartAsync` methods. Result is the `IActorRef` to shard region used to communicate between current actor system and target entities. Shard region must be specified once per each type on each node, that is expected to participate in sharding entities of that type. Keep in mind, that it's recommended to wait for the current node to first fully join the cluster before initializing a shard regions in order to avoid potential timeouts. +In some cases, the actor may need to know the `entityId` associated with it. This can be achieved using the `entityPropsFactory` parameter to `ClusterSharding.Start` or `ClusterSharding.StartAsync`. The entity ID will be passed to the factory as a parameter, which can then be used in the creation of the actor. + In case when you want to send message to entities from specific node, but you don't want that node to participate in sharding itself, you can use `ShardRegionProxy` for that. Example: diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingFailureSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingFailureSpec.cs index c08c3bd3faa..9e85c5ef50f 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingFailureSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingFailureSpec.cs @@ -41,7 +41,7 @@ protected ClusterShardingFailureSpecConfig(string mode) serialization-bindings {{ ""System.Object"" = hyperion }} - }} + }} akka.loglevel = INFO akka.actor.provider = cluster akka.remote.log-remote-lifecycle-events = off @@ -176,7 +176,7 @@ public Entity() private readonly ClusterShardingFailureSpecConfig _config; private readonly List _storageLocations; - + protected ClusterShardingFailureSpec(ClusterShardingFailureSpecConfig config, Type type) : base(config, type) { @@ -194,7 +194,7 @@ protected ClusterShardingFailureSpec(ClusterShardingFailureSpecConfig config, Ty } protected bool IsDDataMode { get; } - + protected override void AfterTermination() { base.AfterTermination(); @@ -364,6 +364,13 @@ public void ClusterSharding_with_flaky_journal_network_should_recover_after_jour //Test the Shard passivate works during a journal failure shard2.Tell(new Passivate(PoisonPill.Instance), entity21); + + AwaitAssert(() => + { + region.Tell(new Get("21")); + ExpectMsg(v => v.Id == "21" && v.N == 0, hint: "Passivating did not reset Value down to 0"); + }); + region.Tell(new Add("21", 1)); region.Tell(new Get("21")); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs index d822a77e65a..4003b139acf 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs @@ -220,9 +220,15 @@ private Stop() public const int NumberOfShards = 12; private int _count = 0; + private readonly string id; - public Counter() + public static Props Props(string id) => Actor.Props.Create(() => new Counter(id)); + + public static string ShardingTypeName => "Counter"; + + public Counter(string id) { + this.id = id; Context.SetReceiveTimeout(TimeSpan.FromMinutes(2)); } @@ -233,7 +239,7 @@ protected override void PostStop() Thread.Sleep(500); } - public override string PersistenceId { get { return "Counter-" + Self.Path.Name; } } + public override string PersistenceId { get { return $"Counter.{ShardingTypeName}-{id}"; } } protected override bool ReceiveRecover(object message) { @@ -277,16 +283,17 @@ private void UpdateState(CounterChanged e) internal class QualifiedCounter : Counter { - public static Props Props(string typeName) + public static Props Props(string typeName, string id) { - return Actor.Props.Create(() => new QualifiedCounter(typeName)); + return Actor.Props.Create(() => new QualifiedCounter(typeName, id)); } public readonly string TypeName; public override string PersistenceId { get { return TypeName + "-" + Self.Path.Name; } } - public QualifiedCounter(string typeName) + public QualifiedCounter(string typeName, string id) + : base(id) { TypeName = typeName; } @@ -294,19 +301,34 @@ public QualifiedCounter(string typeName) internal class AnotherCounter : QualifiedCounter { - public AnotherCounter() - : base("AnotherCounter") + public static new Props Props(string id) + { + return Actor.Props.Create(() => new AnotherCounter(id)); + } + public static new string ShardingTypeName => nameof(AnotherCounter); + + public AnotherCounter(string id) + : base(AnotherCounter.ShardingTypeName, id) { } } internal class CounterSupervisor : ActorBase { - public readonly IActorRef Counter; + public static string ShardingTypeName => nameof(CounterSupervisor); - public CounterSupervisor() + public static Props Props(string id) { - Counter = Context.ActorOf(Props.Create(), "theCounter"); + return Actor.Props.Create(() => new CounterSupervisor(id)); + } + + public readonly string entityId; + public readonly IActorRef counter; + + public CounterSupervisor(string entityId) + { + this.entityId = entityId; + counter = Context.ActorOf(Counter.Props(entityId), "theCounter"); } protected override SupervisorStrategy SupervisorStrategy() @@ -328,7 +350,7 @@ protected override SupervisorStrategy SupervisorStrategy() protected override bool Receive(object message) { - Counter.Forward(message); + counter.Forward(message); return true; } } @@ -355,6 +377,9 @@ protected DDataClusterShardingWithEntityRecoverySpec(DDataClusterShardingWithEnt } public abstract class ClusterShardingSpec : MultiNodeClusterSpec { + // must use different unique name for some tests than the one used in API tests + public static string TestCounterShardingTypeName => $"Test{Counter.ShardingTypeName}"; + #region Setup private readonly Lazy _region; @@ -373,7 +398,7 @@ protected ClusterShardingSpec(ClusterShardingSpecConfig config, Type type) { _config = config; - _region = new Lazy(() => CreateRegion("counter", false)); + _region = new Lazy(() => CreateRegion(TestCounterShardingTypeName, false)); _rebalancingRegion = new Lazy(() => CreateRegion("rebalancingCounter", false)); _persistentEntitiesRegion = new Lazy(() => CreateRegion("RememberCounterEntities", true)); @@ -397,7 +422,7 @@ protected ClusterShardingSpec(ClusterShardingSpecConfig config, Type type) EnterBarrier("startup"); } protected bool IsDDataMode { get; } - + protected override void AfterTermination() { base.AfterTermination(); @@ -430,7 +455,7 @@ private void CreateCoordinator() { var typeNames = new[] { - "counter", "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", + TestCounterShardingTypeName, "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", "RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest" }; @@ -479,7 +504,7 @@ private IActorRef CreateRegion(string typeName, bool rememberEntities) return Sys.ActorOf(Props.Create(() => new ShardRegion( typeName, - QualifiedCounter.Props(typeName), + entityId => QualifiedCounter.Props(typeName, entityId), settings, "/user/" + typeName + "Coordinator/singleton/coordinator", Counter.ExtractEntityId, @@ -609,7 +634,7 @@ public void ClusterSharding_should_use_second_node() r.Tell(new Counter.EntityEnvelope(2, Counter.Increment.Instance)); r.Tell(new Counter.Get(2)); ExpectMsg(3); - LastSender.Path.Should().Be(Node(_config.Second) / "user" / "counterRegion" / "2" / "2"); + LastSender.Path.Should().Be(Node(_config.Second) / "user" / $"{TestCounterShardingTypeName}Region" / "2" / "2"); r.Tell(new Counter.Get(11)); ExpectMsg(1); @@ -669,9 +694,9 @@ public void ClusterSharding_should_support_proxy_only_mode() var settings = ClusterShardingSettings.Create(cfg, Sys.Settings.Config.GetConfig("akka.cluster.singleton")); var proxy = Sys.ActorOf(ShardRegion.ProxyProps( - typeName: "counter", + typeName: TestCounterShardingTypeName, settings: settings, - coordinatorPath: "/user/counterCoordinator/singleton/coordinator", + coordinatorPath: $"/user/{TestCounterShardingTypeName}Coordinator/singleton/coordinator", extractEntityId: Counter.ExtractEntityId, extractShardId: Counter.ExtractShardId, replicator: Sys.DeadLetters, @@ -770,12 +795,12 @@ public void ClusterSharding_should_use_third_and_fourth_node() r.Tell(new Counter.EntityEnvelope(3, Counter.Increment.Instance)); r.Tell(new Counter.Get(3)); ExpectMsg(11); - LastSender.Path.Should().Be(Node(_config.Third) / "user" / "counterRegion" / "3" / "3"); + LastSender.Path.Should().Be(Node(_config.Third) / "user" / $"{TestCounterShardingTypeName}Region" / "3" / "3"); r.Tell(new Counter.EntityEnvelope(4, Counter.Increment.Instance)); r.Tell(new Counter.Get(4)); ExpectMsg(21); - LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / "counterRegion" / "4" / "4"); + LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / $"{TestCounterShardingTypeName}Region" / "4" / "4"); }, _config.First); EnterBarrier("first-update"); @@ -818,7 +843,7 @@ public void ClusterSharding_should_recover_coordinator_state_after_coordinator_c { _region.Value.Tell(new Counter.Get(3), probe3.Ref); probe3.ExpectMsg(11); - probe3.LastSender.Path.Should().Be(Node(_config.Third) / "user" / "counterRegion" / "3" / "3"); + probe3.LastSender.Path.Should().Be(Node(_config.Third) / "user" / $"{TestCounterShardingTypeName}Region" / "3" / "3"); }); }); @@ -829,7 +854,7 @@ public void ClusterSharding_should_recover_coordinator_state_after_coordinator_c { _region.Value.Tell(new Counter.Get(4), probe4.Ref); probe4.ExpectMsg(21); - probe4.LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / "counterRegion" / "4" / "4"); + probe4.LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / $"{TestCounterShardingTypeName}Region" / "4" / "4"); }); }); }, _config.Fifth); @@ -888,24 +913,24 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions() { //#counter-start ClusterSharding.Get(Sys).Start( - typeName: "Counter", - entityProps: Props.Create(), + typeName: Counter.ShardingTypeName, + entityPropsFactory: entityId => Counter.Props(entityId), settings: ClusterShardingSettings.Create(Sys), extractEntityId: Counter.ExtractEntityId, extractShardId: Counter.ExtractShardId); //#counter-start ClusterSharding.Get(Sys).Start( - typeName: "AnotherCounter", - entityProps: Props.Create(), + typeName: AnotherCounter.ShardingTypeName, + entityPropsFactory: entityId => AnotherCounter.Props(entityId), settings: ClusterShardingSettings.Create(Sys), extractEntityId: Counter.ExtractEntityId, extractShardId: Counter.ExtractShardId); //#counter-supervisor-start ClusterSharding.Get(Sys).Start( - typeName: "SupervisedCounter", - entityProps: Props.Create(), + typeName: CounterSupervisor.ShardingTypeName, + entityPropsFactory: entityId => CounterSupervisor.Props(entityId), settings: ClusterShardingSettings.Create(Sys), extractEntityId: Counter.ExtractEntityId, extractShardId: Counter.ExtractShardId); @@ -915,18 +940,19 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions() RunOn(() => { //#counter-usage - var counterRegion = ClusterSharding.Get(Sys).ShardRegion("Counter"); - counterRegion.Tell(new Counter.Get(123)); + var counterRegion = ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName); + var entityId = 999; + counterRegion.Tell(new Counter.Get(entityId)); ExpectMsg(0); - counterRegion.Tell(new Counter.EntityEnvelope(123, Counter.Increment.Instance)); - counterRegion.Tell(new Counter.Get(123)); + counterRegion.Tell(new Counter.EntityEnvelope(entityId, Counter.Increment.Instance)); + counterRegion.Tell(new Counter.Get(entityId)); ExpectMsg(1); //#counter-usage - var anotherCounterRegion = ClusterSharding.Get(Sys).ShardRegion("AnotherCounter"); - anotherCounterRegion.Tell(new Counter.EntityEnvelope(123, Counter.Decrement.Instance)); - anotherCounterRegion.Tell(new Counter.Get(123)); + var anotherCounterRegion = ClusterSharding.Get(Sys).ShardRegion(AnotherCounter.ShardingTypeName); + anotherCounterRegion.Tell(new Counter.EntityEnvelope(entityId, Counter.Decrement.Instance)); + anotherCounterRegion.Tell(new Counter.Get(entityId)); ExpectMsg(-1); }, _config.Fifth); EnterBarrier("extension-used"); @@ -936,8 +962,8 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions() { for (int i = 1000; i <= 1010; i++) { - ClusterSharding.Get(Sys).ShardRegion("Counter").Tell(new Counter.EntityEnvelope(i, Counter.Increment.Instance)); - ClusterSharding.Get(Sys).ShardRegion("Counter").Tell(new Counter.Get(i)); + ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName).Tell(new Counter.EntityEnvelope(i, Counter.Increment.Instance)); + ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName).Tell(new Counter.Get(i)); ExpectMsg(1); LastSender.Path.Address.Should().NotBe(Cluster.SelfAddress); } @@ -954,7 +980,7 @@ public void ClusterSharding_should_be_easy_API_for_starting() { var counterRegionViaStart = ClusterSharding.Get(Sys).Start( typeName: "ApiTest", - entityProps: Props.Create(), + entityPropsFactory: Counter.Props, settings: ClusterShardingSettings.Create(Sys), extractEntityId: Counter.ExtractEntityId, extractShardId: Counter.ExtractShardId); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs new file mode 100644 index 00000000000..bee6a932e0d --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs @@ -0,0 +1,82 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit.TestActors; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ClusterShardingInternalsSpec : Akka.TestKit.Xunit2.TestKit + { + ClusterSharding clusterSharding; + + public ClusterShardingInternalsSpec() : base(GetConfig()) + { + clusterSharding = ClusterSharding.Get(Sys); + } + + private Tuple ExtractEntityId(object message) + { + switch (message) + { + case int i: + return new Tuple(i.ToString(), message); + } + throw new NotSupportedException(); + } + + private string ExtractShardId(object message) + { + switch (message) + { + case int i: + return (i % 10).ToString(); + } + throw new NotSupportedException(); + } + + + public static Config GetConfig() + { + return ConfigurationFactory.ParseString("akka.actor.provider = cluster") + + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + [Fact] + public void ClusterSharding_must_start_a_region_in_proxy_mode_in_case_of_node_role_mismatch() + { + var settingsWithRole = ClusterShardingSettings.Create(Sys).WithRole("nonExistingRole"); + var typeName = "typeName"; + + var region = clusterSharding.Start( + typeName: typeName, + entityProps: Props.Empty, + settings: settingsWithRole, + extractEntityId: ExtractEntityId, + extractShardId: ExtractShardId, + allocationStrategy: new LeastShardAllocationStrategy(0, 0), + handOffStopMessage: PoisonPill.Instance); + + var proxy = clusterSharding.StartProxy( + typeName: typeName, + role: settingsWithRole.Role, + extractEntityId: ExtractEntityId, + extractShardId: ExtractShardId + ); + + region.Should().BeSameAs(proxy); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/GetShardTypeNamesSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/GetShardTypeNamesSpec.cs new file mode 100644 index 00000000000..d86e7f5eb1f --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/GetShardTypeNamesSpec.cs @@ -0,0 +1,69 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit.TestActors; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Sharding.Tests +{ + public class GetShardTypeNamesSpec : Akka.TestKit.Xunit2.TestKit + { + public GetShardTypeNamesSpec() : base(GetConfig()) + { + } + + public static Config GetConfig() + { + return ConfigurationFactory.ParseString("akka.actor.provider = cluster") + + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + [Fact] + public void GetShardTypeNames_must_contain_empty_when_join_cluster_without_shards() + { + ClusterSharding.Get(Sys).ShardTypeNames.Should().BeEmpty(); + } + + [Fact] + public void GetShardTypeNames_must_contain_started_shards_when_started_2_shards() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + var settings = ClusterShardingSettings.Create(Sys); + ClusterSharding.Get(Sys).Start("type1", EchoActor.Props(this), settings, ExtractEntityId, ExtractShardId); + ClusterSharding.Get(Sys).Start("type2", EchoActor.Props(this), settings, ExtractEntityId, ExtractShardId); + + ClusterSharding.Get(Sys).ShardTypeNames.ShouldBeEquivalentTo(new string[] { "type1", "type2" }); + } + + private Tuple ExtractEntityId(object message) + { + switch (message) + { + case int i: + return new Tuple(i.ToString(), message); + } + throw new NotSupportedException(); + } + + private string ExtractShardId(object message) + { + switch (message) + { + case int i: + return (i % 10).ToString(); + } + throw new NotSupportedException(); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ProxyShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ProxyShardingSpec.cs new file mode 100644 index 00000000000..c4c7c971458 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ProxyShardingSpec.cs @@ -0,0 +1,105 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit.TestActors; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ProxyShardingSpec : Akka.TestKit.Xunit2.TestKit + { + ClusterSharding clusterSharding; + ClusterShardingSettings shardingSettings; + private MessageExtractor messageExtractor = new MessageExtractor(10); + + public ProxyShardingSpec() : base(GetConfig()) + { + var role = "Shard"; + clusterSharding = ClusterSharding.Get(Sys); + shardingSettings = ClusterShardingSettings.Create(Sys); + clusterSharding.StartProxy("myType", role, IdExtractor, ShardResolver); + } + + private class MessageExtractor : HashCodeMessageExtractor + { + public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards) + { + } + + public override string EntityId(object message) + { + return "dummyId"; + } + } + + private Tuple IdExtractor(object message) + { + switch (message) + { + case int i: + return new Tuple(i.ToString(), message); + } + throw new NotSupportedException(); + } + + private string ShardResolver(object message) + { + switch (message) + { + case int i: + return (i % 10).ToString(); + } + throw new NotSupportedException(); + } + + + public static Config GetConfig() + { + return ConfigurationFactory.ParseString("akka.actor.provider = cluster") + + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + [Fact] + public void ProxyShardingSpec_Proxy_should_be_found() + { + IActorRef proxyActor = Sys.ActorSelection("akka://test/system/sharding/myTypeProxy") + .ResolveOne(TimeSpan.FromSeconds(5)).Result; + + proxyActor.Path.Should().NotBeNull(); + proxyActor.Path.ToString().Should().EndWith("Proxy"); + } + + [Fact] + public void ProxyShardingSpec_Shard_region_should_be_found() + { + var shardRegion = clusterSharding.Start("myType", EchoActor.Props(this), shardingSettings, messageExtractor); + + shardRegion.Path.Should().NotBeNull(); + shardRegion.Path.ToString().Should().EndWith("myType"); + } + + [Fact] + public void ProxyShardingSpec_Shard_coordinator_should_be_found() + { + var shardRegion = clusterSharding.Start("myType", EchoActor.Props(this), shardingSettings, messageExtractor); + + IActorRef shardCoordinator = Sys.ActorSelection("akka://test/system/sharding/myTypeCoordinator") + .ResolveOne(TimeSpan.FromSeconds(5)).Result; + + shardCoordinator.Path.Should().NotBeNull(); + shardCoordinator.Path.ToString().Should().EndWith("Coordinator"); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index 935dc8e814e..c9cda07a2c5 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; using System.Runtime.ExceptionServices; using System.Threading.Tasks; using Akka.Actor; @@ -228,6 +229,7 @@ public class ClusterSharding : IExtension { private readonly Lazy _guardian; private readonly ConcurrentDictionary _regions = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _proxies = new ConcurrentDictionary(); private readonly ExtendedActorSystem _system; private readonly Cluster _cluster; @@ -281,6 +283,10 @@ public static Config DefaultConfig() /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -313,25 +319,31 @@ public IActorRef Start( IShardAllocationStrategy allocationStrategy, object handOffStopMessage) { - RequireClusterRole(settings.Role); - - var timeout = _system.Settings.CreationTimeout; - var startMsg = new ClusterShardingGuardian.Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); - - var reply = _guardian.Value.Ask(startMsg, timeout).Result; - switch (reply) + if (settings.ShouldHostShard(_cluster)) { - case ClusterShardingGuardian.Started started: - var shardRegion = started.ShardRegion; - _regions.TryAdd(typeName, shardRegion); - return shardRegion; - - case Status.Failure failure: - ExceptionDispatchInfo.Capture(failure.Cause).Throw(); - return ActorRefs.Nobody; - - default: - throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + var timeout = _system.Settings.CreationTimeout; + var startMsg = new ClusterShardingGuardian.Start(typeName, _ => entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); + + var reply = _guardian.Value.Ask(startMsg, timeout).Result; + switch (reply) + { + case ClusterShardingGuardian.Started started: + var shardRegion = started.ShardRegion; + _regions.TryAdd(typeName, shardRegion); + return shardRegion; + + case Status.Failure failure: + ExceptionDispatchInfo.Capture(failure.Cause).Throw(); + return ActorRefs.Nobody; + + default: + throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + } + } + else + { + _cluster.System.Log.Debug("Starting Shard Region Proxy [{0}] (no actors will be hosted on this node)...", typeName); + return StartProxy(typeName, settings.Role, extractEntityId, extractShardId); } } @@ -339,6 +351,10 @@ public IActorRef Start( /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -371,25 +387,31 @@ public async Task StartAsync( IShardAllocationStrategy allocationStrategy, object handOffStopMessage) { - RequireClusterRole(settings.Role); - - var timeout = _system.Settings.CreationTimeout; - var startMsg = new ClusterShardingGuardian.Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); - - var reply = await _guardian.Value.Ask(startMsg, timeout); - switch (reply) + if (settings.ShouldHostShard(_cluster)) { - case ClusterShardingGuardian.Started started: - var shardRegion = started.ShardRegion; - _regions.TryAdd(typeName, shardRegion); - return shardRegion; - - case Status.Failure failure: - ExceptionDispatchInfo.Capture(failure.Cause).Throw(); - return ActorRefs.Nobody; - - default: - throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + var timeout = _system.Settings.CreationTimeout; + var startMsg = new ClusterShardingGuardian.Start(typeName, _ => entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); + + var reply = await _guardian.Value.Ask(startMsg, timeout); + switch (reply) + { + case ClusterShardingGuardian.Started started: + var shardRegion = started.ShardRegion; + _regions.TryAdd(typeName, shardRegion); + return shardRegion; + + case Status.Failure failure: + ExceptionDispatchInfo.Capture(failure.Cause).Throw(); + return ActorRefs.Nobody; + + default: + throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + } + } + else + { + _cluster.System.Log.Debug("Starting Shard Region Proxy [{0}] (no actors will be hosted on this node)...", typeName); + return await StartProxyAsync(typeName, settings.Role, extractEntityId, extractShardId); } } @@ -397,6 +419,10 @@ public async Task StartAsync( /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -419,9 +445,7 @@ public IActorRef Start( ExtractEntityId extractEntityId, ExtractShardId extractShardId) { - var allocationStrategy = new LeastShardAllocationStrategy( - Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, - Settings.TunningParameters.LeastShardAllocationMaxSimultaneousRebalance); + var allocationStrategy = DefaultShardAllocationStrategy(settings); return Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill.Instance); } @@ -429,6 +453,10 @@ public IActorRef Start( /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -451,9 +479,7 @@ public Task StartAsync( ExtractEntityId extractEntityId, ExtractShardId extractShardId) { - var allocationStrategy = new LeastShardAllocationStrategy( - Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, - Settings.TunningParameters.LeastShardAllocationMaxSimultaneousRebalance); + var allocationStrategy = DefaultShardAllocationStrategy(settings); return StartAsync(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill.Instance); } @@ -461,6 +487,10 @@ public Task StartAsync( /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -489,6 +519,10 @@ public IActorRef Start(string typeName, Props entityProps, ClusterShardingSettin /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -517,6 +551,10 @@ public Task StartAsync(string typeName, Props entityProps, ClusterSha /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -534,9 +572,7 @@ public IActorRef Start(string typeName, Props entityProps, ClusterShardingSettin entityProps, settings, messageExtractor, - new LeastShardAllocationStrategy( - Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, - Settings.TunningParameters.LeastShardAllocationMaxSimultaneousRebalance), + DefaultShardAllocationStrategy(settings), PoisonPill.Instance); } @@ -544,6 +580,10 @@ public IActorRef Start(string typeName, Props entityProps, ClusterShardingSettin /// Register a named entity type by defining the of the entity actor and /// functions to extract entity and shard identifier from messages. The /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// /// /// The name of the entity type /// @@ -561,9 +601,334 @@ public Task StartAsync(string typeName, Props entityProps, ClusterSha entityProps, settings, messageExtractor, - new LeastShardAllocationStrategy( - Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, - Settings.TunningParameters.LeastShardAllocationMaxSimultaneousRebalance), + DefaultShardAllocationStrategy(settings), + PoisonPill.Instance); + } + + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Partial function to extract the entity id and the message to send to the entity from the incoming message, + /// if the partial function does not match the message will be `unhandled`, + /// i.e.posted as `Unhandled` messages on the event stream + /// + /// + /// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used + /// + /// Possibility to use a custom shard allocation and rebalancing logic + /// + /// The message that will be sent to entities when they are to be stopped for a rebalance or + /// graceful shutdown of a , e.g. . + /// + /// + /// This exception is thrown when the cluster member doesn't have the role specified in . + /// + /// The actor ref of the that is to be responsible for the shard. + public IActorRef Start( + string typeName, + Func entityPropsFactory, + ClusterShardingSettings settings, + ExtractEntityId extractEntityId, + ExtractShardId extractShardId, + IShardAllocationStrategy allocationStrategy, + object handOffStopMessage) + { + if (settings.ShouldHostShard(_cluster)) + { + var timeout = _system.Settings.CreationTimeout; + var startMsg = new ClusterShardingGuardian.Start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); + + var reply = _guardian.Value.Ask(startMsg, timeout).Result; + switch (reply) + { + case ClusterShardingGuardian.Started started: + var shardRegion = started.ShardRegion; + _regions.TryAdd(typeName, shardRegion); + return shardRegion; + + case Status.Failure failure: + ExceptionDispatchInfo.Capture(failure.Cause).Throw(); + return ActorRefs.Nobody; + + default: + throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + } + } + else + { + _cluster.System.Log.Debug("Starting Shard Region Proxy [{0}] (no actors will be hosted on this node)...", typeName); + return StartProxy(typeName, settings.Role, extractEntityId, extractShardId); + } + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Partial function to extract the entity id and the message to send to the entity from the incoming message, + /// if the partial function does not match the message will be `unhandled`, + /// i.e.posted as `Unhandled` messages on the event stream + /// + /// + /// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used + /// + /// Possibility to use a custom shard allocation and rebalancing logic + /// + /// The message that will be sent to entities when they are to be stopped for a rebalance or + /// graceful shutdown of a , e.g. . + /// + /// + /// This exception is thrown when the cluster member doesn't have the role specified in . + /// + /// The actor ref of the that is to be responsible for the shard. + public async Task StartAsync( + string typeName, + Func entityPropsFactory, + ClusterShardingSettings settings, + ExtractEntityId extractEntityId, + ExtractShardId extractShardId, + IShardAllocationStrategy allocationStrategy, + object handOffStopMessage) + { + if (settings.ShouldHostShard(_cluster)) + { + var timeout = _system.Settings.CreationTimeout; + var startMsg = new ClusterShardingGuardian.Start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage); + + var reply = await _guardian.Value.Ask(startMsg, timeout); + switch (reply) + { + case ClusterShardingGuardian.Started started: + var shardRegion = started.ShardRegion; + _regions.TryAdd(typeName, shardRegion); + return shardRegion; + + case Status.Failure failure: + ExceptionDispatchInfo.Capture(failure.Cause).Throw(); + return ActorRefs.Nobody; + + default: + throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + } + } + else + { + _cluster.System.Log.Debug("Starting Shard Region Proxy [{0}] (no actors will be hosted on this node)...", typeName); + return StartProxy(typeName, settings.Role, extractEntityId, extractShardId); + } + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Partial function to extract the entity id and the message to send to the entity from the incoming message, + /// if the partial function does not match the message will be `unhandled`, + /// i.e.posted as `Unhandled` messages on the event stream + /// + /// + /// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used + /// + /// The actor ref of the that is to be responsible for the shard. + public IActorRef Start( + string typeName, + Func entityPropsFactory, + ClusterShardingSettings settings, + ExtractEntityId extractEntityId, + ExtractShardId extractShardId) + { + var allocationStrategy = DefaultShardAllocationStrategy(settings); + return Start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill.Instance); + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Partial function to extract the entity id and the message to send to the entity from the incoming message, + /// if the partial function does not match the message will be `unhandled`, + /// i.e.posted as `Unhandled` messages on the event stream + /// + /// + /// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used + /// + /// The actor ref of the that is to be responsible for the shard. + public Task StartAsync( + string typeName, + Func entityPropsFactory, + ClusterShardingSettings settings, + ExtractEntityId extractEntityId, + ExtractShardId extractShardId) + { + var allocationStrategy = DefaultShardAllocationStrategy(settings); + return StartAsync(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill.Instance); + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. + /// + /// Possibility to use a custom shard allocation and rebalancing logic + /// + /// The message that will be sent to entities when they are to be stopped for a rebalance or + /// graceful shutdown of a , e.g. . + /// + /// The actor ref of the that is to be responsible for the shard. + public IActorRef Start(string typeName, Func entityPropsFactory, ClusterShardingSettings settings, + IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, object handOffMessage) + { + ExtractEntityId extractEntityId = messageExtractor.ToExtractEntityId(); + ExtractShardId extractShardId = messageExtractor.ShardId; + + return Start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, handOffMessage); + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. + /// + /// Possibility to use a custom shard allocation and rebalancing logic + /// + /// The message that will be sent to entities when they are to be stopped for a rebalance or + /// graceful shutdown of a , e.g. . + /// + /// The actor ref of the that is to be responsible for the shard. + public Task StartAsync(string typeName, Func entityPropsFactory, ClusterShardingSettings settings, + IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, object handOffMessage) + { + ExtractEntityId extractEntityId = messageExtractor.ToExtractEntityId(); + ExtractShardId extractShardId = messageExtractor.ShardId; + + return StartAsync(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, handOffMessage); + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. + /// + /// The actor ref of the that is to be responsible for the shard. + public IActorRef Start(string typeName, Func entityPropsFactory, ClusterShardingSettings settings, + IMessageExtractor messageExtractor) + { + return Start(typeName, + entityPropsFactory, + settings, + messageExtractor, + DefaultShardAllocationStrategy(settings), + PoisonPill.Instance); + } + + /// + /// Register a named entity type by defining the of the entity actor and + /// functions to extract entity and shard identifier from messages. The + /// actor for this type can later be retrieved with the method. + /// + /// This method will start a in proxy mode in case if there is no match between the roles of + /// the current cluster node and the role specified in passed to this method. + /// + /// + /// The name of the entity type + /// + /// Function that, given an entity id, returns the of the entity actors that will be created by the + /// + /// Configuration settings, see + /// + /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. + /// + /// The actor ref of the that is to be responsible for the shard. + public Task StartAsync(string typeName, Func entityPropsFactory, ClusterShardingSettings settings, + IMessageExtractor messageExtractor) + { + return StartAsync(typeName, + entityPropsFactory, + settings, + messageExtractor, + DefaultShardAllocationStrategy(settings), PoisonPill.Instance); } @@ -596,7 +961,7 @@ public IActorRef StartProxy(string typeName, string role, ExtractEntityId extrac switch (reply) { case ClusterShardingGuardian.Started started: - _regions.TryAdd(typeName, started.ShardRegion); + _proxies.TryAdd(typeName, started.ShardRegion); return started.ShardRegion; case Status.Failure failure: @@ -637,7 +1002,7 @@ public async Task StartProxyAsync(string typeName, string role, Extra switch (reply) { case ClusterShardingGuardian.Started started: - _regions.TryAdd(typeName, started.ShardRegion); + _proxies.TryAdd(typeName, started.ShardRegion); return started.ShardRegion; case Status.Failure failure: @@ -703,6 +1068,11 @@ Tuple extractEntityId(Msg msg) return StartProxyAsync(typeName, role, extractEntityId, messageExtractor.ShardId); } + /// + /// get all currently defined sharding type names. + /// + public ImmutableHashSet ShardTypeNames => _regions.Keys.ToImmutableHashSet(); + /// /// Retrieve the actor reference of the actor responsible for the named entity type. /// The entity type must be registered with the method before it can be used here. @@ -717,18 +1087,35 @@ public IActorRef ShardRegion(string typeName) { if (_regions.TryGetValue(typeName, out var region)) return region; + if (_proxies.TryGetValue(typeName, out region)) + return region; throw new ArgumentException($"Shard type [{typeName}] must be started first"); } - private void RequireClusterRole(string role) + /// + /// Retrieve the actor reference of the actor that will act as a proxy to the + /// named entity type running in another data center. A proxy within the same data center can be accessed + /// with instead of this method. The entity type must be registered with the + /// method before it can be used here. Messages to the entity is always sent + /// via the . + /// + /// + /// + public IActorRef ShardRegionProxy(string typeName) { - if (!(string.IsNullOrEmpty(role) || _cluster.SelfRoles.Contains(role))) - { - throw new IllegalStateException( - $"This cluster member [{_cluster.SelfAddress}] doesn't have the role [{role}]"); - } + if (_proxies.TryGetValue(typeName, out var proxy)) + return proxy; + throw new ArgumentException($"Shard type [{typeName}] must be started first"); + } + + private IShardAllocationStrategy DefaultShardAllocationStrategy(ClusterShardingSettings settings) + { + return new LeastShardAllocationStrategy( + Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, + Settings.TunningParameters.LeastShardAllocationMaxSimultaneousRebalance); } + } /// @@ -777,7 +1164,7 @@ public interface IMessageExtractor object EntityMessage(object message); /// - /// Extract the entity id from an incoming . Only messages that + /// Extract the shard id from an incoming . Only messages that /// passed the method will be used as input to this method. /// /// TBD diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs index ba2140ed1d7..b775f66d607 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs @@ -55,7 +55,7 @@ public sealed class Start : INoSerializationVerificationNeeded /// /// TBD /// - public readonly Props EntityProps; + public readonly Func EntityProps; /// /// TBD /// @@ -90,7 +90,7 @@ public sealed class Start : INoSerializationVerificationNeeded /// /// This exception is thrown when the specified or is undefined. /// - public Start(string typeName, Props entityProps, ClusterShardingSettings settings, + public Start(string typeName, Func entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, IShardAllocationStrategy allocationStrategy, object handOffStopMessage) { if (string.IsNullOrEmpty(typeName)) throw new ArgumentNullException(nameof(typeName), "ClusterSharding start requires type name to be provided"); @@ -218,9 +218,8 @@ public ClusterShardingGuardian() try { var settings = startProxy.Settings; - var encName = Uri.EscapeDataString(startProxy.TypeName); - var coordinatorSingletonManagerName = CoordinatorSingletonManagerName(encName); - var coordinatorPath = CoordinatorPath(encName); + var encName = Uri.EscapeDataString(startProxy.TypeName + "Proxy"); + var coordinatorPath = CoordinatorPath(Uri.EscapeDataString(startProxy.TypeName)); var shardRegion = Context.Child(encName); if (Equals(shardRegion, ActorRefs.Nobody)) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs index d3483d3d970..d6606483b55 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs @@ -278,6 +278,16 @@ public ClusterShardingSettings( CoordinatorSingletonSettings = coordinatorSingletonSettings; } + /// + /// If true, this node should run the shard region, otherwise just a shard proxy should started on this node. + /// + /// + /// + internal bool ShouldHostShard(Cluster cluster) + { + return string.IsNullOrEmpty(Role) || cluster.SelfRoles.Contains(Role); + } + /// /// TBD /// diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs index e8d9738f39f..46659feccd7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs @@ -35,7 +35,7 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash public string TypeName { get; } public string ShardId { get; } - public Props EntityProps { get; } + public Func EntityProps { get; } public ClusterShardingSettings Settings { get; } public ExtractEntityId ExtractEntityId { get; } public ExtractShardId ExtractShardId { get; } @@ -72,7 +72,7 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash public DDataShard( string typeName, ShardId shardId, - Props entityProps, + Func entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, @@ -100,7 +100,7 @@ public DDataShard( _readConsistency = new ReadMajority(settings.TunningParameters.WaitingForStateTimeout, majorityCap); _writeConsistency = new WriteMajority(settings.TunningParameters.UpdatingStateTimeout, majorityCap); _stateKeys = Enumerable.Range(0, NrOfKeys).Select(i => new ORSetKey($"shard-{typeName}-{shardId}-{i}")).ToImmutableArray(); - + GetState(); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs index 626187c7cdd..932bbc63b73 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs @@ -33,7 +33,7 @@ internal sealed class PersistentShard : PersistentActor, IShard public string TypeName { get; } public string ShardId { get; } - public Props EntityProps { get; } + public Func EntityProps { get; } public ClusterShardingSettings Settings { get; } public ExtractEntityId ExtractEntityId { get; } public ExtractShardId ExtractShardId { get; } @@ -45,12 +45,12 @@ internal sealed class PersistentShard : PersistentActor, IShard public ImmutableHashSet Passivating { get; set; } = ImmutableHashSet.Empty; public ImmutableDictionary>> MessageBuffers { get; set; } = ImmutableDictionary>>.Empty; - private EntityRecoveryStrategy RememberedEntitiesRecoveryStrategy { get; } + private EntityRecoveryStrategy RememberedEntitiesRecoveryStrategy { get; } public PersistentShard( string typeName, string shardId, - Props entityProps, + Func entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 489cc8eed58..194b28aa84c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -18,7 +18,7 @@ namespace Akka.Cluster.Sharding using ShardId = String; using EntityId = String; using Msg = Object; - + internal interface IShard { IActorContext Context { get; } @@ -26,7 +26,7 @@ internal interface IShard IActorRef Sender { get; } string TypeName { get; } ShardId ShardId { get; } - Props EntityProps { get; } + Func EntityProps { get; } ClusterShardingSettings Settings { get; } ExtractEntityId ExtractEntityId { get; } ExtractShardId ExtractShardId { get; } @@ -360,7 +360,7 @@ public override int GetHashCode() public ILoggingAdapter Log { get; } = Context.GetLogger(); public string TypeName { get; } public string ShardId { get; } - public Props EntityProps { get; } + public Func EntityProps { get; } public ClusterShardingSettings Settings { get; } public ExtractEntityId ExtractEntityId { get; } public ExtractShardId ExtractShardId { get; } @@ -377,7 +377,7 @@ public override int GetHashCode() public Shard( string typeName, string shardId, - Props entityProps, + Func entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, @@ -417,14 +417,14 @@ public static void Initialized(this TShard shard) where TShard : IShard { shard.Context.Parent.Tell(new ShardInitialized(shard.ShardId)); } - + public static void BaseProcessChange(this TShard shard, T evt, Action handler) where TShard : IShard where T : Shard.StateChange { handler(evt); } - + public static bool HandleCommand(this TShard shard, object message) where TShard : IShard { switch (message) @@ -471,7 +471,7 @@ private static void HandleShardRegionQuery(this TShard shard, Shard.ISha break; } } - + public static void BaseEntityTerminated(this TShard shard, IActorRef tref) where TShard : IShard { if (shard.IdByRef.TryGetValue(tref, out var id)) @@ -616,14 +616,14 @@ private static void Passivate(this TShard shard, IActorRef entity, objec shard.Log.Debug("Unknown entity {0}. Not sending stopMessage back to entity.", entity); } } - + public static void PassivateCompleted(this TShard shard, Shard.EntityStopped evt) where TShard: IShard { shard.Log.Debug("Entity stopped after passivation [{0}]", evt.EntityId); shard.State = new Shard.ShardState(shard.State.Entries.Remove(evt.EntityId)); shard.MessageBuffers = shard.MessageBuffers.Remove(evt.EntityId); } - + public static void SendMessageBuffer(this TShard shard, Shard.EntityStarted message) where TShard: IShard { var id = message.EntityId; @@ -677,7 +677,7 @@ private static void DeliverMessage(this TShard shard, object message, IA shard.DeliverTo(id, message, payload, sender); } } - + internal static void BaseDeliverTo(this TShard shard, string id, object message, object payload, IActorRef sender) where TShard : IShard { var name = Uri.EscapeDataString(id); @@ -688,7 +688,7 @@ internal static void BaseDeliverTo(this TShard shard, string id, object else child.Tell(payload, sender); } - + internal static IActorRef GetEntity(this TShard shard, string id) where TShard: IShard { var name = Uri.EscapeDataString(id); @@ -697,7 +697,7 @@ internal static IActorRef GetEntity(this TShard shard, string id) where { shard.Log.Debug("Starting entity [{0}] in shard [{1}]", id, shard.ShardId); - child = shard.Context.Watch(shard.Context.ActorOf(shard.EntityProps, name)); + child = shard.Context.Watch(shard.Context.ActorOf(shard.EntityProps(id), name)); shard.IdByRef = shard.IdByRef.SetItem(child, id); shard.RefById = shard.RefById.SetItem(id, child); shard.State = new Shard.ShardState(shard.State.Entries.Add(id)); @@ -706,12 +706,12 @@ internal static IActorRef GetEntity(this TShard shard, string id) where return child; } - internal static int TotalBufferSize(this TShard shard) where TShard : IShard => + internal static int TotalBufferSize(this TShard shard) where TShard : IShard => shard.MessageBuffers.Aggregate(0, (sum, entity) => sum + entity.Value.Count); #endregion - public static Props Props(string typeName, ShardId shardId, Props entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) + public static Props Props(string typeName, ShardId shardId, Func entityProps, ClusterShardingSettings settings, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) { switch (settings.StateStoreMode) { @@ -724,7 +724,7 @@ public static Props Props(string typeName, ShardId shardId, Props entityProps, C } } } - + class RememberEntityStarter : ActorBase { private class Tick : INoSerializationVerificationNeeded diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 5ed456eb95a..f3c4b3d6069 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -129,7 +129,7 @@ public sealed class StartEntityAck : IClusterShardingSerializable public readonly ShardId ShardId; /// - /// Creates a new instance of a class, used to confirm that + /// Creates a new instance of a class, used to confirm that /// request has succeed. /// /// An identifier of a newly started entity. @@ -243,7 +243,7 @@ public int Compare(Member x, Member y) /// /// /// TBD - internal static Props Props(string typeName, Props entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) + internal static Props Props(string typeName, Func entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) { return Actor.Props.Create(() => new ShardRegion(typeName, entityProps, settings, coordinatorPath, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).WithDeploy(Deploy.Local); } @@ -271,7 +271,7 @@ internal static Props ProxyProps(string typeName, ClusterShardingSettings settin /// /// TBD /// - public readonly Props EntityProps; + public readonly Func EntityProps; /// /// TBD /// @@ -358,7 +358,7 @@ internal static Props ProxyProps(string typeName, ClusterShardingSettings settin /// TBD /// /// - public ShardRegion(string typeName, Props entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) + public ShardRegion(string typeName, Func entityProps, ClusterShardingSettings settings, string coordinatorPath, ExtractEntityId extractEntityId, ExtractShardId extractShardId, object handOffStopMessage, IActorRef replicator, int majorityMinCap) { TypeName = typeName; EntityProps = entityProps; @@ -417,7 +417,7 @@ protected object RegistrationMessage { get { - if (EntityProps != null && !EntityProps.Equals(Actor.Props.None)) + if (EntityProps != null) return new PersistentShardCoordinator.Register(Self); return new PersistentShardCoordinator.RegisterProxy(Self); } @@ -518,10 +518,21 @@ private void Register() { var coordinator = CoordinatorSelection; coordinator?.Tell(RegistrationMessage); - if (ShardBuffers.Count != 0 && _retryCount >= RetryCountThreshold) - Log.Warning("Trying to register to coordinator at [{0}], but no acknowledgement. Total [{1}] buffered messages.", - coordinator != null ? coordinator.PathString : string.Empty, TotalBufferSize); + { + if (coordinator != null) + { + var coordinatorMessage = Cluster.State.Unreachable.Contains(MembersByAge.First()) ? $"Coordinator [{MembersByAge.First()}] is unreachable." : $"Coordinator [{MembersByAge.First()}] is reachable."; + + Log.Warning("Trying to register to coordinator at [{0}], but no acknowledgement. Total [{1}] buffered messages. [{2}]", + coordinator != null ? coordinator.PathString : string.Empty, TotalBufferSize, coordinatorMessage); + } + else + { + Log.Warning("No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed? Total [{0}] buffered messages.", + TotalBufferSize); + } + } } private void DeliverStartEntity(object message, IActorRef sender) @@ -902,7 +913,7 @@ private IActorRef GetShard(ShardId id) //TODO: change on ConcurrentDictionary.GetOrAdd? if (!Shards.TryGetValue(id, out var region)) { - if (EntityProps == null || EntityProps.Equals(Actor.Props.Empty)) + if (EntityProps == null) throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion"); if (ShardsByRef.Values.All(shardId => shardId != id))