Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PORT #6101] Akka.Cluster.Sharding GetEntityLocation Query #6107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ Possible reasons for disabling remember entity storage are:

For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead.

> [!NOTE]
> Currently, Lightning.NET library, the storage solution used to store DData in disk, is having problem
> deploying native library files in [Linux operating system operating in x64 and ARM platforms]
> (<https://github.com/CoreyKaylor/Lightning.NET/issues/141>).
>
> You will need to install LightningDB in your Linux distribution manually if you wanted to use the durable DData feature.

### Terminating Remembered Entities

One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop.
Expand Down Expand Up @@ -217,6 +210,19 @@ You can inspect current sharding stats by using following messages:
* On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them.
* On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them.

### Querying for the Location of Specific Entities

It's possible to query a `ShardRegion` or a `ShardRegionProxy` using a `GetEntityLocation` query:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationQuery)]

A `GetEntityLocation` query will always return an `EntityLocation` response - even if the query could not be executed.

> [!IMPORTANT]
> One major caveat is that in order for the `GetEntityLocation` to execute your `IMessageExtractor` or `ShardExtractor` delegate will need to support the `ShardRegion.StartEntity` message - just like you'd have to use in order to support `remember-entities=on`:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardRegionQueriesSpecs.cs?name=GetEntityLocationExtractor)]

## Integrating Cluster Sharding with Persistent Actors

One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//-----------------------------------------------------------------------
// <copyright file="ShardRegionQueriesSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardRegionQueriesSpecs : AkkaSpec
{
private readonly Cluster _cluster;
private readonly ClusterSharding _clusterSharding;
private readonly IActorRef _shardRegion;

private readonly ActorSystem _proxySys;

public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper)
{
_clusterSharding = ClusterSharding.Get(Sys);
_cluster = Cluster.Get(Sys);
_shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true),
ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId);

var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]")
.WithFallback(Sys.Settings.Config);
_proxySys = ActorSystem.Create(Sys.Name, proxySysConfig);

_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });

// form a 2-node cluster
var proxyCluster = Cluster.Get(_proxySys);
proxyCluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });
}

protected override async Task AfterAllAsync()
{
await ShutdownAsync(_proxySys);
await base.AfterAllAsync();
}

private Option<(string, object)> ExtractEntityId(object message)
{
switch (message)
{
case int i:
return (i.ToString(), message);
}

throw new NotSupportedException();
}

// <GetEntityLocationExtractor>
private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10).ToString();
// must support ShardRegion.StartEntity in order for
// GetEntityLocation to work properly
case ShardRegion.StartEntity se:
return se.EntityId;
}

throw new NotSupportedException();
}
// </GetEntityLocationExtractor>

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.roles = [shard]")
.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

/// <summary>
/// DocFx material for demonstrating how this query type works
/// </summary>
[Fact]
public async Task ShardRegion_GetEntityLocation_DocumentationSpec()
{
// <GetEntityLocationQuery>
// creates an entity with entityId="1"
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));

// determine where entity with "entityId=1" is located in cluster
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));

q1.EntityId.Should().Be("1");

// have a valid ShardId
q1.ShardId.Should().NotBeEmpty();

// have valid address for node that will / would host entity
q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address

// if entity actor is alive, will retrieve a reference to it
q1.EntityRef.HasValue.Should().BeTrue();
// </GetEntityLocationQuery>
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")]
public async Task ShardRegion_should_support_GetEntityLocation_query_locally()
{
// arrange
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
await _shardRegion.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")]
public async Task ShardRegion_should_support_GetEntityLocation_query_remotely()
{
// arrange
var sharding2 = ClusterSharding.Get(_proxySys);
var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", ExtractEntityId, ExtractShardId);

await shardRegionProxy.Ask<int>(1, TimeSpan.FromSeconds(3));
await shardRegionProxy.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}
}
}
81 changes: 81 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.Pattern;
using Akka.Util;
using Akka.Util.Internal;
using Get = Akka.DistributedData.Get;

namespace Akka.Cluster.Sharding
{
Expand Down Expand Up @@ -925,12 +926,92 @@ private void HandleShardRegionQuery(IShardRegionQuery query)
case GetShardRegionStatus _:
Sender.Tell(new ShardRegionStatus(_typeName, _coordinator != null));
break;
case GetEntityLocation g:
ReplyToGetEntityLocationQuery(g, Sender);
break;
default:
Unhandled(query);
break;
}
}

private void ReplyToGetEntityLocationQuery(GetEntityLocation getEntityLocation, IActorRef sender)
{
// Get the Address of the remote IActorRef, or return our Cluster.SelfAddress is the shard / entity
// is hosted locally.
Address GetNodeAddress(IActorRef shardOrRegionRef)
{
return shardOrRegionRef.Path.Address.HasGlobalScope
? shardOrRegionRef.Path.Address
: _cluster.SelfAddress;
}

try
{
var shardId = _extractShardId(new StartEntity(getEntityLocation.EntityId));
if (string.IsNullOrEmpty(shardId))
{
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
return;
}

async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath)
{
// now we just need to check to see if an entity ref exists
try
{
var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout);
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
new Option<IActorRef>(entityRef)));
}
catch (ActorNotFoundException ex)
{
// entity does not exist
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
Option<IActorRef>.None));
}
}

if (!_shards.TryGetValue(shardId, out var shardActorRef))
{
// shard is not homed yet, so try looking up the ShardRegion
if (!_regionByShard.TryGetValue(shardId, out var shardRegionRef))
{
// shardRegion isn't allocated either
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
}
else
{
// ShardRegion exists, but shard is not homed
// NOTE: in the event that we're querying a shard's location from a ShardRegionProxy
// the shard may not be technically "homed" inside the proxy, but it does exist.
#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task
#pragma warning restore CS4014
}

return;
}

#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task
#pragma warning restore CS4014
}
catch (Exception ex)
{
_log.Error(ex, "Error while trying to resolve GetEntityLocation query for entityId [{0}]. " +
"Does MessageExtractor support `ShardRegion.StartEntity`? " +
"If not, that's why you might be receiving this error.",
getEntityLocation.EntityId);
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, string.Empty, Address.AllSystems,
Option<IActorRef>.None));
}
}

private void ReplyToRegionStateQuery(IActorRef sender)
{
QueryShardsAsync<Shard.CurrentShardState>(Shard.GetCurrentShardState.Instance)
Expand Down
Loading