Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible problems with ClusterClient Discovery #7270

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Actor;
using Akka.Discovery;
using Akka.Event;
using Akka.Util;

#nullable enable
namespace Akka.Cluster.Tools.Client;
Expand Down Expand Up @@ -49,6 +50,7 @@ private sealed record ResolveResult(Contact Contact, IActorRef? Subject);
private readonly string _targetActorSystemName;
private readonly string _receptionistName;
private readonly string _transportProtocol;
private readonly int _numberOfContacts;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the number of initial contacts we need, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it limits the number of contacts we use when we instantiate the ClusterClient actor. Too many of these will set the initial contacts to a higher than optimum number. The thing is that ClusterClient initial contacts never gets trimmed when they can't be contacted, if we use the whole discovery resolve result as the ClusterClient initial contact list, there will be a big burst of network connection attempt when the ClusterClient actor starts up and whenever its internal contact list could not be contacted anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, makes sense


private readonly bool _verboseLogging;

Expand Down Expand Up @@ -109,6 +111,8 @@ public ClusterClientDiscovery(ClusterClientSettings settings)
_discoveryRetryInterval = _settings.DiscoverySettings.DiscoveryRetryInterval;
_discoveryTimeout = _discoverySettings.DiscoveryTimeout;

_numberOfContacts = _discoverySettings.NumberOfContacts;

_verboseLogging = _settings.VerboseLogging;

Become(Discovering);
Expand All @@ -134,7 +138,7 @@ private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.Resolve
{
var networkAddress = string.IsNullOrWhiteSpace(target.Host) ? target.Address.ToString() : target.Host;
var address = new Address(_transportProtocol, _targetActorSystemName, networkAddress, target.Port);
return new RootActorPath(address) / "system" / _discoverySettings.ReceptionistName;
return new RootActorPath(address) / "system" / _receptionistName;
}

private static async Task<ResolveResult> ResolveContact(Contact contact, TimeSpan timeout, CancellationToken ct)
Expand Down Expand Up @@ -207,10 +211,13 @@ async Task<ResolveResult[]> VerifyContacts()
}
else
{
var filteredContacts = TrimContacts(contacts, _numberOfContacts);
if(_log.IsInfoEnabled)
_log.Info("Cluster client initial contacts are verified at [{0}], starting cluster client actor.", string.Join(", ", contacts.Select(c => c.Path)));
_log.Info(
"Cluster client initial contacts are verified at [{0}], starting cluster client actor.",
string.Join(", ", filteredContacts.Select(c => c.Path)));

Become(Active(contacts));
Become(Active(filteredContacts));
}

return true;
Expand All @@ -229,6 +236,40 @@ async Task<ResolveResult[]> VerifyContacts()
}
}

/// <summary>
/// Trim the number of Contact in the `fullContact` array to `count` length
/// by picking random elements while avoiding repeating elements from being returned
/// </summary>
/// <param name="fullContact">Array of Contacts</param>
/// <param name="count">The number of elements to return</param>
/// <returns></returns>
private static Contact[] TrimContacts(Contact[] fullContact, int count)
{
if (fullContact.Length <= count)
return fullContact;

Shuffle(fullContact);
return fullContact.Take(count).ToArray();
}

/// <summary>
/// Fisher-Yates in-place array shuffle algorithm
/// </summary>
/// <param name="array"></param>
/// <typeparam name="T"></typeparam>
private static void Shuffle<T>(T[] array)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have another implementation of this lying around somewhere? I could have sworn I'd written one before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one in the Akka.Util.Internal.ArrayExtensions, but its internal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a link to that file and just use that (or you can add a friend assembly to Akka.Discovery; link is probably cleaner.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just a nitpick though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, turns out the core Akka project is already a friend of Akka.Cluster.Tools

{
var rnd = ThreadLocalRandom.Current;
var n = array.Length;
for (var i = 0; i < (n - 1); i++)
{
var r = i + rnd.Next(n - i);
var t = array[r];
array[r] = array[i];
array[i] = t;
}
}

private Receive Active(Contact[] contacts)
{
if(_verboseLogging && _log.IsDebugEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ public sealed record ClusterClientDiscoverySettings(
string ReceptionistName,
string? PortName,
TimeSpan DiscoveryRetryInterval,
TimeSpan DiscoveryTimeout)
TimeSpan DiscoveryTimeout,
int NumberOfContacts)
{
public static readonly ClusterClientDiscoverySettings Empty = new ("<method>", null, null, "receptionist", null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(60));
public static readonly ClusterClientDiscoverySettings Empty = new ("<method>", null, null, "receptionist", null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(60), 3);

public static ClusterClientDiscoverySettings Create(Config clusterClientConfig)
{
Expand All @@ -35,7 +36,8 @@ public static ClusterClientDiscoverySettings Create(Config clusterClientConfig)
config.GetString("receptionist-name", "receptionist"),
config.GetString("port-name"),
config.GetTimeSpan("discovery-retry-interval", TimeSpan.FromSeconds(1)),
config.GetTimeSpan("discovery-timeout", TimeSpan.FromSeconds(60))
config.GetTimeSpan("discovery-timeout", TimeSpan.FromSeconds(60)),
config.GetInt("number-of-contacts", 3)
);
}
}
2 changes: 2 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ akka.cluster.client {
method = <method>
actor-system-name = null
receptionist-name = receptionist
# The contact points will be trimmed down to this number of contact points to the client
number-of-contacts = 3
service-name = null
port-name = null
discovery-retry-interval = 1s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ namespace Akka.Cluster.Tools.Client
{
[System.Runtime.CompilerServices.NullableAttribute(1)]
public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty;
public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { }
public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout, int NumberOfContacts) { }
public string ActorSystemName { get; set; }
public string DiscoveryMethod { get; set; }
public System.TimeSpan DiscoveryRetryInterval { get; set; }
public System.TimeSpan DiscoveryTimeout { get; set; }
public int NumberOfContacts { get; set; }
public string PortName { get; set; }
[System.Runtime.CompilerServices.NullableAttribute(1)]
public string ReceptionistName { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ namespace Akka.Cluster.Tools.Client
{
[System.Runtime.CompilerServices.NullableAttribute(1)]
public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty;
public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { }
public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout, int NumberOfContacts) { }
public string ActorSystemName { get; set; }
public string DiscoveryMethod { get; set; }
public System.TimeSpan DiscoveryRetryInterval { get; set; }
public System.TimeSpan DiscoveryTimeout { get; set; }
public int NumberOfContacts { get; set; }
public string PortName { get; set; }
[System.Runtime.CompilerServices.NullableAttribute(1)]
public string ReceptionistName { get; set; }
Expand Down
Loading