diff --git a/docs/articles/clustering/cluster-sharding.md b/docs/articles/clustering/cluster-sharding.md index 36a58dc8f92..759e6cbe43f 100644 --- a/docs/articles/clustering/cluster-sharding.md +++ b/docs/articles/clustering/cluster-sharding.md @@ -165,13 +165,6 @@ Possible reasons for disabling remember entity storage are: For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead. -> [!NOTE] -> Currently, Lightning.NET library, the storage solution used to store DData in disk, is having problem -> deploying native library files in [Linux operating system operating in x64 and ARM platforms] -> (). -> -> You will need to install LightningDB in your Linux distribution manually if you wanted to use the durable DData feature. - ### Terminating Remembered Entities One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop. @@ -217,6 +210,19 @@ You can inspect current sharding stats by using following messages: * On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them. * On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them. +### Querying for the Location of Specific Entities + +It's possible to query a `ShardRegion` or a `ShardRegionProxy` using a `GetEntityLocation` query: + +[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationQuery)] + +A `GetEntityLocation` query will always return an `EntityLocation` response - even if the query could not be executed. + +> [!IMPORTANT] +> One major caveat is that in order for the `GetEntityLocation` to execute your `IMessageExtractor` or `ShardExtractor` delegate will need to support the `ShardRegion.StartEntity` message - just like you'd have to use in order to support `remember-entities=on`: + +[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationExtractor)] + ## Integrating Cluster Sharding with Persistent Actors One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs new file mode 100644 index 00000000000..89ad40adab0 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs @@ -0,0 +1,183 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using Akka.Util; +using Xunit; +using Xunit.Abstractions; +using FluentAssertions; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ShardRegionQueriesSpecs : AkkaSpec + { + private readonly Cluster _cluster; + private readonly ClusterSharding _clusterSharding; + private readonly IActorRef _shardRegion; + + private readonly ActorSystem _proxySys; + + public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper) + { + _clusterSharding = ClusterSharding.Get(Sys); + _cluster = Cluster.Get(Sys); + _shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true), + ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId); + + var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]") + .WithFallback(Sys.Settings.Config); + _proxySys = ActorSystem.Create(Sys.Name, proxySysConfig); + + _cluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + + // form a 2-node cluster + var proxyCluster = Cluster.Get(_proxySys); + proxyCluster.Join(_cluster.SelfAddress); + AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); }); + } + + protected override async Task AfterAllAsync() + { + await ShutdownAsync(_proxySys); + await base.AfterAllAsync(); + } + + private Option<(string, object)> ExtractEntityId(object message) + { + switch (message) + { + case int i: + return (i.ToString(), message); + } + + throw new NotSupportedException(); + } + + // + private string ExtractShardId(object message) + { + switch (message) + { + case int i: + return (i % 10).ToString(); + // must support ShardRegion.StartEntity in order for + // GetEntityLocation to work properly + case ShardRegion.StartEntity se: + return se.EntityId; + } + + throw new NotSupportedException(); + } + // + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString(@" + akka.loglevel = WARNING + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.cluster.roles = [shard]") + .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) + .WithFallback(ClusterSingletonManager.DefaultConfig()); + } + + /// + /// DocFx material for demonstrating how this query type works + /// + [Fact] + public async Task ShardRegion_GetEntityLocation_DocumentationSpec() + { + // + // creates an entity with entityId="1" + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + + // determine where entity with "entityId=1" is located in cluster + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + + q1.EntityId.Should().Be("1"); + + // have a valid ShardId + q1.ShardId.Should().NotBeEmpty(); + + // have valid address for node that will / would host entity + q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address + + // if entity actor is alive, will retrieve a reference to it + q1.EntityRef.HasValue.Should().BeTrue(); + // + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")] + public async Task ShardRegion_should_support_GetEntityLocation_query_locally() + { + // arrange + await _shardRegion.Ask(1, TimeSpan.FromSeconds(3)); + await _shardRegion.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await _shardRegion.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await _shardRegion.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await _shardRegion.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + + [Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")] + public async Task ShardRegion_should_support_GetEntityLocation_query_remotely() + { + // arrange + var sharding2 = ClusterSharding.Get(_proxySys); + var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", ExtractEntityId, ExtractShardId); + + await shardRegionProxy.Ask(1, TimeSpan.FromSeconds(3)); + await shardRegionProxy.Ask(2, TimeSpan.FromSeconds(3)); + + // act + var q1 = await shardRegionProxy.Ask(new GetEntityLocation("1", TimeSpan.FromSeconds(1))); + var q2 = await shardRegionProxy.Ask(new GetEntityLocation("2", TimeSpan.FromSeconds(1))); + var q3 = await shardRegionProxy.Ask(new GetEntityLocation("3", TimeSpan.FromSeconds(1))); + + // assert + void AssertValidEntityLocation(EntityLocation e, string entityId) + { + e.EntityId.Should().Be(entityId); + e.EntityRef.Should().NotBe(Option.None); + e.ShardId.Should().NotBeNullOrEmpty(); + e.ShardRegion.Should().Be(_cluster.SelfAddress); + } + + AssertValidEntityLocation(q1, "1"); + AssertValidEntityLocation(q2, "2"); + + q3.EntityRef.Should().Be(Option.None); + q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard? + q3.ShardRegion.Should().Be(Address.AllSystems); + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 35b30a671aa..6b2bdea2d08 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -16,6 +16,7 @@ using Akka.Pattern; using Akka.Util; using Akka.Util.Internal; +using Get = Akka.DistributedData.Get; namespace Akka.Cluster.Sharding { @@ -925,12 +926,92 @@ private void HandleShardRegionQuery(IShardRegionQuery query) case GetShardRegionStatus _: Sender.Tell(new ShardRegionStatus(_typeName, _coordinator != null)); break; + case GetEntityLocation g: + ReplyToGetEntityLocationQuery(g, Sender); + break; default: Unhandled(query); break; } } + private void ReplyToGetEntityLocationQuery(GetEntityLocation getEntityLocation, IActorRef sender) + { + // Get the Address of the remote IActorRef, or return our Cluster.SelfAddress is the shard / entity + // is hosted locally. + Address GetNodeAddress(IActorRef shardOrRegionRef) + { + return shardOrRegionRef.Path.Address.HasGlobalScope + ? shardOrRegionRef.Path.Address + : _cluster.SelfAddress; + } + + try + { + var shardId = _extractShardId(new StartEntity(getEntityLocation.EntityId)); + if (string.IsNullOrEmpty(shardId)) + { + // unsupported entityId - could only happen in highly customized extractors + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, + Option.None)); + return; + } + + async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) + { + // now we just need to check to see if an entity ref exists + try + { + var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout); + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, + new Option(entityRef))); + } + catch (ActorNotFoundException ex) + { + // entity does not exist + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, + Option.None)); + } + } + + if (!_shards.TryGetValue(shardId, out var shardActorRef)) + { + // shard is not homed yet, so try looking up the ShardRegion + if (!_regionByShard.TryGetValue(shardId, out var shardRegionRef)) + { + // shardRegion isn't allocated either + sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, + Option.None)); + } + else + { + // ShardRegion exists, but shard is not homed + // NOTE: in the event that we're querying a shard's location from a ShardRegionProxy + // the shard may not be technically "homed" inside the proxy, but it does exist. +#pragma warning disable CS4014 + ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task +#pragma warning restore CS4014 + } + + return; + } + +#pragma warning disable CS4014 + ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task +#pragma warning restore CS4014 + } + catch (Exception ex) + { + _log.Error(ex, "Error while trying to resolve GetEntityLocation query for entityId [{0}]. " + + "Does MessageExtractor support `ShardRegion.StartEntity`? " + + "If not, that's why you might be receiving this error.", + getEntityLocation.EntityId); + // unsupported entityId - could only happen in highly customized extractors + sender.Tell(new EntityLocation(getEntityLocation.EntityId, string.Empty, Address.AllSystems, + Option.None)); + } + } + private void ReplyToRegionStateQuery(IActorRef sender) { QueryShardsAsync(Shard.GetCurrentShardState.Instance) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs index 8ba62438177..f88490b8dfd 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs @@ -10,15 +10,24 @@ using System.Linq; using Akka.Actor; using Akka.Event; +using Akka.Util; namespace Akka.Cluster.Sharding { using ShardId = String; /// - /// Marker interface for all commands. + /// Marker interface for commands that can be sent to a . /// public interface IShardRegionCommand { } + + /// + /// Marker interface for read-only queries that can be sent to a . + /// + /// + /// These have no side-effects on the state of the sharding system. + /// + public interface IShardRegionQuery { } /// /// If the state of the entities are persistent you may stop entities that are not used to @@ -130,11 +139,6 @@ public override int GetHashCode() #endregion } - /// - /// TBD - /// - public interface IShardRegionQuery { } - /// /// Send this message to the actor to request for , /// which contains the addresses of all registered regions. @@ -156,6 +160,77 @@ private GetCurrentRegions() public override string ToString() => $"GetCurrentRegions"; } + /// + /// Send this message to a actor to determine the location and liveness + /// of a specific entity actor in the region. + /// + /// Creates a message in response. + /// + /// + /// This is used primarily for testing and telemetry purposes. + /// + /// In order for this query to work, the must support , + /// which is also used when remember-entities=on. + /// + public sealed class GetEntityLocation : IShardRegionQuery + { + public GetEntityLocation(string entityId, TimeSpan timeout) + { + EntityId = entityId; + Timeout = timeout; + } + + /// + /// The id of the entity we're searching for. + /// + public string EntityId { get; } + + /// + /// Used to timeout the Ask{T} operation used to identify whether or not + /// this entity actor currently exists. + /// + public TimeSpan Timeout { get; } + } + + /// + /// Response to a query. + /// + /// + /// In the event that no ShardId can be extracted for the given , we will return + /// and for the shard and shard region respectively. + /// + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Address shardRegion, Option entityRef) + { + EntityId = entityId; + ShardId = shardId; + ShardRegion = shardRegion ?? Address.AllSystems; + EntityRef = entityRef; + } + + /// + /// The Id of the entity. + /// + public string EntityId { get; } + + /// + /// The shard Id that would host this entity. + /// + public string ShardId { get; } + + /// + /// The in the cluster that would host + /// this particular entity. + /// + public Address ShardRegion { get; } + + /// + /// Optional - a reference to this entity actor, if it's alive. + /// + public Option EntityRef { get; } + } + /// /// Reply to . /// diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Core.verified.txt index 6da8c3ae198..0420132d415 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Core.verified.txt @@ -100,6 +100,14 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Akka.Actor.Address shardRegion, Akka.Util.Option entityRef) { } + public string EntityId { get; } + public Akka.Util.Option EntityRef { get; } + public string ShardId { get; } + public Akka.Actor.Address ShardRegion { get; } + } public class static EnumerableExtensions { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } @@ -120,6 +128,12 @@ namespace Akka.Cluster.Sharding public static readonly Akka.Cluster.Sharding.GetCurrentRegions Instance; public override string ToString() { } } + public sealed class GetEntityLocation : Akka.Cluster.Sharding.IShardRegionQuery + { + public GetEntityLocation(string entityId, System.TimeSpan timeout) { } + public string EntityId { get; } + public System.TimeSpan Timeout { get; } + } public sealed class GetShardRegionState : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery { public static readonly Akka.Cluster.Sharding.GetShardRegionState Instance; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index b0725a7ca73..db27986094d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -100,6 +100,14 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Akka.Actor.Address shardRegion, Akka.Util.Option entityRef) { } + public string EntityId { get; } + public Akka.Util.Option EntityRef { get; } + public string ShardId { get; } + public Akka.Actor.Address ShardRegion { get; } + } public class static EnumerableExtensions { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } @@ -120,6 +128,12 @@ namespace Akka.Cluster.Sharding public static readonly Akka.Cluster.Sharding.GetCurrentRegions Instance; public override string ToString() { } } + public sealed class GetEntityLocation : Akka.Cluster.Sharding.IShardRegionQuery + { + public GetEntityLocation(string entityId, System.TimeSpan timeout) { } + public string EntityId { get; } + public System.TimeSpan Timeout { get; } + } public sealed class GetShardRegionState : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery { public static readonly Akka.Cluster.Sharding.GetShardRegionState Instance; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 6da8c3ae198..0420132d415 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -100,6 +100,14 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Akka.Actor.Address shardRegion, Akka.Util.Option entityRef) { } + public string EntityId { get; } + public Akka.Util.Option EntityRef { get; } + public string ShardId { get; } + public Akka.Actor.Address ShardRegion { get; } + } public class static EnumerableExtensions { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } @@ -120,6 +128,12 @@ namespace Akka.Cluster.Sharding public static readonly Akka.Cluster.Sharding.GetCurrentRegions Instance; public override string ToString() { } } + public sealed class GetEntityLocation : Akka.Cluster.Sharding.IShardRegionQuery + { + public GetEntityLocation(string entityId, System.TimeSpan timeout) { } + public string EntityId { get; } + public System.TimeSpan Timeout { get; } + } public sealed class GetShardRegionState : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery { public static readonly Akka.Cluster.Sharding.GetShardRegionState Instance; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.verified.txt index e1f4fc448b8..be20a8b1f40 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.verified.txt @@ -100,6 +100,14 @@ namespace Akka.Cluster.Sharding public override int GetHashCode() { } public override string ToString() { } } + public sealed class EntityLocation + { + public EntityLocation(string entityId, string shardId, Akka.Actor.Address shardRegion, Akka.Util.Option entityRef) { } + public string EntityId { get; } + public Akka.Util.Option EntityRef { get; } + public string ShardId { get; } + public Akka.Actor.Address ShardRegion { get; } + } public class static EnumerableExtensions { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } @@ -120,6 +128,12 @@ namespace Akka.Cluster.Sharding public static readonly Akka.Cluster.Sharding.GetCurrentRegions Instance; public override string ToString() { } } + public sealed class GetEntityLocation : Akka.Cluster.Sharding.IShardRegionQuery + { + public GetEntityLocation(string entityId, System.TimeSpan timeout) { } + public string EntityId { get; } + public System.TimeSpan Timeout { get; } + } public sealed class GetShardRegionState : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery { public static readonly Akka.Cluster.Sharding.GetShardRegionState Instance;