Skip to content

Commit

Permalink
[7.x] Avoid resending unchanged ClusterManifest to clients (#8772)
Browse files Browse the repository at this point in the history
* Avoid sending cluster manifest to client if client already has the latest version (#8728)

Co-authored-by: Reuben Bond <reuben.bond@gmail.com>
Co-authored-by: ReubenBond <rebond@microsoft.com>
Co-authored-by: HermesNew <13478148@qq.com>

* No newer cluster manifest handle

There was no newer cluster manifest, so wait for the next refresh interval and try again.

* PR feedback

---------

Co-authored-by: swam <453873@qq.com>
Co-authored-by: HermesNew <13478148@qq.com>
  • Loading branch information
3 people authored Dec 16, 2023
1 parent 4ab056f commit 57bafe7
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 92 deletions.
12 changes: 4 additions & 8 deletions src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ public sealed class ClusterManifest
/// <param name="silos">
/// The silo manifests.
/// </param>
/// <param name="allGrainManifests">
/// All grain manifests.
/// </param>
public ClusterManifest(
MajorMinorVersion version,
ImmutableDictionary<SiloAddress, GrainManifest> silos,
ImmutableArray<GrainManifest> allGrainManifests)
ImmutableDictionary<SiloAddress, GrainManifest> silos)
{
this.Version = version;
this.Silos = silos;
this.AllGrainManifests = allGrainManifests;
Version = version;
Silos = silos;
AllGrainManifests = silos.Values.ToImmutableArray();
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void AddDefaultServices(IServiceCollection services)
services.TryAddSingleton<IAppEnvironmentStatistics, AppEnvironmentStatistics>();
services.AddLogging();
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<ClientGrainContext>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Entry(MethodInfo implementationMethod, MethodInfo interfaceMethod)
}

/// <summary>
/// Gets the grain implmentation <see cref="MethodInfo"/>.
/// Gets the grain implementation <see cref="MethodInfo"/>.
/// </summary>
public MethodInfo ImplementationMethod { get; }

Expand Down
98 changes: 90 additions & 8 deletions src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -23,14 +25,16 @@ internal class ClientClusterManifestProvider : IClusterManifestProvider, IAsyncD
private readonly ILogger<ClientClusterManifestProvider> _logger;
private readonly TypeManagementOptions _typeManagementOptions;
private readonly IServiceProvider _services;
private readonly LocalClientDetails _localClientDetails;
private readonly GatewayManager _gatewayManager;
private readonly AsyncEnumerable<ClusterManifest> _updates;
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private ClusterManifest _current;
private Task _runTask;
private Task? _runTask;

public ClientClusterManifestProvider(
IServiceProvider services,
LocalClientDetails localClientDetails,
GatewayManager gatewayManager,
ILogger<ClientClusterManifestProvider> logger,
ClientManifestProvider clientManifestProvider,
Expand All @@ -39,12 +43,18 @@ public ClientClusterManifestProvider(
_logger = logger;
_typeManagementOptions = typeManagementOptions.Value;
_services = services;
_localClientDetails = localClientDetails;
_gatewayManager = gatewayManager;
this.LocalGrainManifest = clientManifestProvider.ClientManifest;
_current = new ClusterManifest(MajorMinorVersion.MinValue, ImmutableDictionary<SiloAddress, GrainManifest>.Empty, ImmutableArray.Create(this.LocalGrainManifest));
LocalGrainManifest = clientManifestProvider.ClientManifest;

// Create a fake manifest for the very first generation, which only includes the local client's manifest.
var builder = ImmutableDictionary.CreateBuilder<SiloAddress, GrainManifest>();
builder.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
_current = new ClusterManifest(MajorMinorVersion.MinValue, builder.ToImmutable());

_updates = new AsyncEnumerable<ClusterManifest>(
initialValue: _current,
updateValidator: (previous, proposed) => previous is null || proposed.Version > previous.Version,
updateValidator: (previous, proposed) => proposed.Version > previous.Version,
onPublished: update => Interlocked.Exchange(ref _current, update));
}

Expand Down Expand Up @@ -73,21 +83,71 @@ private async Task RunAsync()
{
var grainFactory = _services.GetRequiredService<IInternalGrainFactory>();
var cancellationTask = _cancellation.Token.WhenCancelled();
SiloAddress? gateway = null;
IClusterManifestSystemTarget? provider = null;
var minorVersion = 0;
var gatewayVersion = MajorMinorVersion.MinValue;
while (!_cancellation.IsCancellationRequested)
{
var gateway = _gatewayManager.GetLiveGateway();
// Select a new gateway if the current one is not available.
// This could be caused by a temporary issue or a permanent gateway failure.
if (gateway is null || !_gatewayManager.IsGatewayAvailable(gateway))
{
gateway = _gatewayManager.GetLiveGateway();
provider = grainFactory.GetGrain<IClusterManifestSystemTarget>(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId);

// Accept any cluster manifest version from the new gateway.
// Since the minor version of the manifest is specific to each gateway, we reset it to the lowest possible value.
// This means that it is possible to receive the an older or equivalent cluster manifest when the gateway changes.
// That hiccup is addressed by resetting the expected manifest version and merging incomplete manifests until a complete
// manifest is received.
gatewayVersion = MajorMinorVersion.MinValue;
}

Debug.Assert(provider is not null);

try
{
var provider = grainFactory.GetGrain<IClusterManifestSystemTarget>(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId);
var refreshTask = provider.GetClusterManifest().AsTask();
var refreshTask = GetClusterManifestUpdate(provider, gatewayVersion);
var task = await Task.WhenAny(cancellationTask, refreshTask).ConfigureAwait(false);

if (ReferenceEquals(task, cancellationTask))
{
return;
}

if (!_updates.TryPublish(await refreshTask))
var updateResult = await refreshTask;
if (updateResult is null)
{
// There was no newer cluster manifest, so wait for the next refresh interval and try again.
await Task.WhenAny(cancellationTask, Task.Delay(_typeManagementOptions.TypeMapRefreshInterval));
continue;
}

gatewayVersion = updateResult.Version;

// If the manifest does not contain all active servers, merge with the existing manifest until it does.
// This prevents reversed progress at the expense of including potentially defunct silos.
ImmutableDictionary<SiloAddress, GrainManifest> siloManifests;
if (!updateResult.IncludesAllActiveServers)
{
// Merge manifests until the manifest contains all active servers.
var mergedSilos = _current.Silos.ToBuilder();
mergedSilos.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
foreach (var kvp in updateResult.SiloManifests)
{
mergedSilos[kvp.Key] = kvp.Value;
}

siloManifests = mergedSilos.ToImmutable();
}
else
{
siloManifests = updateResult.SiloManifests.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
}

var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), siloManifests);
if (!_updates.TryPublish(updatedManifest))
{
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)));
continue;
Expand All @@ -106,6 +166,9 @@ private async Task RunAsync()
{
_logger.LogWarning(exception, "Error trying to get cluster manifest from gateway {Gateway}", gateway);
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromSeconds(5)));

// Reset the gateway so that another will be selected on the next iteration.
gateway = null;
}
}
}
Expand All @@ -120,6 +183,25 @@ private async Task RunAsync()
}
}

private async Task<ClusterManifestUpdate?> GetClusterManifestUpdate(IClusterManifestSystemTarget provider, MajorMinorVersion previousVersion)
{
try
{
// First, attempt to call the new API, which provides more information.
// This returns null if there is no newer cluster manifest.
return await provider.GetClusterManifestUpdate(previousVersion);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Failed to fetch cluster manifest update from {Provider}.", provider);

// If the provider does not support the new API, fall back to the old one.
var manifest = await provider.GetClusterManifest();
var result = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers: true);
return result;
}
}

/// <inheritdoc />
public ValueTask DisposeAsync()
{
Expand Down
34 changes: 1 addition & 33 deletions src/Orleans.Core/Manifest/ClientManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ namespace Orleans.Runtime
internal class ClientManifestProvider
{
public ClientManifestProvider(
IEnumerable<IGrainPropertiesProvider> grainPropertiesProviders,
IEnumerable<IGrainInterfacePropertiesProvider> grainInterfacePropertiesProviders,
IOptions<GrainTypeOptions> grainTypeOptions,
GrainTypeResolver grainTypeResolver,
GrainInterfaceTypeResolver interfaceTypeResolver)
{
var grainProperties = CreateGrainManifest(grainPropertiesProviders, grainTypeOptions, grainTypeResolver);
var interfaces = CreateInterfaceManifest(grainInterfacePropertiesProviders, grainTypeOptions, interfaceTypeResolver);
this.ClientManifest = new GrainManifest(grainProperties, interfaces);
this.ClientManifest = new GrainManifest(ImmutableDictionary<GrainType, GrainProperties>.Empty, interfaces);
}

/// <summary>
Expand Down Expand Up @@ -57,34 +54,5 @@ private static ImmutableDictionary<GrainInterfaceType, GrainInterfaceProperties>

return builder.ToImmutable();
}

private static ImmutableDictionary<GrainType, GrainProperties> CreateGrainManifest(
IEnumerable<IGrainPropertiesProvider> grainMetadataProviders,
IOptions<GrainTypeOptions> grainTypeOptions,
GrainTypeResolver grainTypeProvider)
{
var propertiesMap = ImmutableDictionary.CreateBuilder<GrainType, GrainProperties>();
foreach (var grainClass in grainTypeOptions.Value.Classes)
{
var grainType = grainTypeProvider.GetGrainType((Type)grainClass);
var properties = new Dictionary<string, string>();
foreach (var provider in grainMetadataProviders)
{
provider.Populate((Type)grainClass, grainType, properties);
}

var result = new GrainProperties(properties.ToImmutableDictionary());
if (propertiesMap.TryGetValue(grainType, out var grainProperty))
{
throw new InvalidOperationException($"An entry with the key {grainType} is already present."
+ $"\nExisting: {grainProperty.ToDetailedString()}\nTrying to add: {result.ToDetailedString()}"
+ "\nConsider using the [GrainType(\"name\")] attribute to give these classes unique names.");
}

propertiesMap.Add(grainType, result);
}

return propertiesMap.ToImmutable();
}
}
}
11 changes: 9 additions & 2 deletions src/Orleans.Core/Manifest/GrainVersionManifest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public GrainVersionManifest(IClusterManifestProvider clusterManifestProvider)
/// <summary>
/// Gets the local version for a specified grain interface type.
/// </summary>
/// <param name="interfaceType">The grain intrerface type name.</param>
/// <param name="interfaceType">The grain interface type name.</param>
/// <returns>The version of the specified grain interface.</returns>
public ushort GetLocalVersion(GrainInterfaceType interfaceType)
{
Expand Down Expand Up @@ -151,7 +151,7 @@ public ushort GetLocalVersion(GrainInterfaceType interfaceType)
/// <param name="grainType">The grain type.</param>
/// <param name="interfaceType">The grain interface type name.</param>
/// <param name="versions">The grain interface version.</param>
/// <returns>The set of silos which support the specifed grain.</returns>
/// <returns>The set of silos which support the specified grain.</returns>
public (MajorMinorVersion Version, Dictionary<ushort, SiloAddress[]> Result) GetSupportedSilos(GrainType grainType, GrainInterfaceType interfaceType, ushort[] versions)
{
var result = new Dictionary<ushort, SiloAddress[]>();
Expand Down Expand Up @@ -238,6 +238,13 @@ private static Cache BuildCache(ClusterManifest clusterManifest)
foreach (var entry in clusterManifest.Silos)
{
var silo = entry.Key;

// Since clients are not eligible for placement, we exclude them here.
if (silo.IsClient)
{
continue;
}

var manifest = entry.Value;
foreach (var grainInterface in manifest.Interfaces)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Manifest/IClusterManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IClusterManifestProvider
/// Gets the current cluster manifest.
/// </summary>
ClusterManifest Current { get; }

/// <summary>
/// Gets the stream of cluster manifest updates.
/// </summary>
Expand Down
43 changes: 43 additions & 0 deletions src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#nullable enable
using System.Collections.Immutable;
using System.Threading.Tasks;
using Orleans.Metadata;

Expand All @@ -13,5 +15,46 @@ internal interface IClusterManifestSystemTarget : ISystemTarget
/// </summary>
/// <returns>The current cluster manifest.</returns>
ValueTask<ClusterManifest> GetClusterManifest();

/// <summary>
/// Gets an updated cluster manifest if newer than the provided <paramref name="previousVersion"/>.
/// </summary>
/// <returns>The current cluster manifest, or <see langword="null"/> if it is not newer than the provided version.</returns>
ValueTask<ClusterManifestUpdate?> GetClusterManifestUpdate(MajorMinorVersion previousVersion);
}

/// <summary>
/// Represents an update to the cluster manifest.
/// </summary>
[GenerateSerializer, Immutable]
public class ClusterManifestUpdate
{
public ClusterManifestUpdate(
MajorMinorVersion manifestVersion,
ImmutableDictionary<SiloAddress, GrainManifest> siloManifests,
bool includesAllActiveServers)
{
Version = manifestVersion;
SiloManifests = siloManifests;
IncludesAllActiveServers = includesAllActiveServers;
}

/// <summary>
/// Gets the version of this instance.
/// </summary>
[Id(0)]
public MajorMinorVersion Version { get; }

/// <summary>
/// Gets the manifests for each silo in the cluster.
/// </summary>
[Id(1)]
public ImmutableDictionary<SiloAddress, GrainManifest> SiloManifests { get; }

/// <summary>
/// Gets a value indicating whether this update includes all active servers.
/// </summary>
[Id(2)]
public bool IncludesAllActiveServers { get; }
}
}
12 changes: 5 additions & 7 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable
internal static readonly TimeSpan MINIMUM_INTERCONNECT_DELAY = TimeSpan.FromMilliseconds(100); // wait one tenth of a second between connect attempts
internal const int CONNECT_RETRY_COUNT = 2; // Retry twice before giving up on a gateway server

internal ClientGrainId ClientId { get; private set; }
internal ClientGrainId ClientId => _localClientDetails.ClientId;
public IRuntimeClient RuntimeClient { get; }
internal bool Running { get; private set; }

Expand All @@ -58,17 +58,16 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable
// false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
private readonly WeakReference<ClientOutboundConnection>[] grainBuckets;
private readonly ILogger logger;
public SiloAddress MyAddress { get; private set; }
public SiloAddress MyAddress => _localClientDetails.ClientAddress;
private int numberOfConnectedGateways = 0;
private readonly MessageFactory messageFactory;
private readonly IClusterConnectionStatusListener connectionStatusListener;
private readonly ConnectionManager connectionManager;
private readonly LocalClientDetails _localClientDetails;

public ClientMessageCenter(
IOptions<ClientMessagingOptions> clientMessagingOptions,
IPAddress localAddress,
int gen,
ClientGrainId clientId,
LocalClientDetails localClientDetails,
IRuntimeClient runtimeClient,
MessageFactory messageFactory,
IClusterConnectionStatusListener connectionStatusListener,
Expand All @@ -77,8 +76,7 @@ public ClientMessageCenter(
GatewayManager gatewayManager)
{
this.connectionManager = connectionManager;
MyAddress = SiloAddress.New(localAddress, 0, gen);
ClientId = clientId;
_localClientDetails = localClientDetails;
this.RuntimeClient = runtimeClient;
this.messageFactory = messageFactory;
this.connectionStatusListener = connectionStatusListener;
Expand Down
Loading

0 comments on commit 57bafe7

Please sign in to comment.