diff --git a/src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs b/src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs index d5955af0cb..069fb660e4 100644 --- a/src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs +++ b/src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs @@ -19,17 +19,13 @@ public sealed class ClusterManifest /// /// The silo manifests. /// - /// - /// All grain manifests. - /// public ClusterManifest( MajorMinorVersion version, - ImmutableDictionary silos, - ImmutableArray allGrainManifests) + ImmutableDictionary silos) { - this.Version = version; - this.Silos = silos; - this.AllGrainManifests = allGrainManifests; + Version = version; + Silos = silos; + AllGrainManifests = silos.Values.ToImmutableArray(); } /// diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index d811713572..4841f193fa 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -61,6 +61,7 @@ public static void AddDefaultServices(IServiceCollection services) services.TryAddSingleton(); services.AddLogging(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.AddFromExisting(); diff --git a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs index d5e74b9b41..5878b767bd 100644 --- a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs +++ b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs @@ -28,7 +28,7 @@ public Entry(MethodInfo implementationMethod, MethodInfo interfaceMethod) } /// - /// Gets the grain implmentation . + /// Gets the grain implementation . /// public MethodInfo ImplementationMethod { get; } diff --git a/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs b/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs index c07d3ac229..74cb3e8647 100644 --- a/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs +++ b/src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs @@ -1,6 +1,8 @@ +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -23,14 +25,16 @@ internal class ClientClusterManifestProvider : IClusterManifestProvider, IAsyncD private readonly ILogger _logger; private readonly TypeManagementOptions _typeManagementOptions; private readonly IServiceProvider _services; + private readonly LocalClientDetails _localClientDetails; private readonly GatewayManager _gatewayManager; private readonly AsyncEnumerable _updates; private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); private ClusterManifest _current; - private Task _runTask; + private Task? _runTask; public ClientClusterManifestProvider( IServiceProvider services, + LocalClientDetails localClientDetails, GatewayManager gatewayManager, ILogger logger, ClientManifestProvider clientManifestProvider, @@ -39,12 +43,18 @@ public ClientClusterManifestProvider( _logger = logger; _typeManagementOptions = typeManagementOptions.Value; _services = services; + _localClientDetails = localClientDetails; _gatewayManager = gatewayManager; - this.LocalGrainManifest = clientManifestProvider.ClientManifest; - _current = new ClusterManifest(MajorMinorVersion.MinValue, ImmutableDictionary.Empty, ImmutableArray.Create(this.LocalGrainManifest)); + LocalGrainManifest = clientManifestProvider.ClientManifest; + + // Create a fake manifest for the very first generation, which only includes the local client's manifest. + var builder = ImmutableDictionary.CreateBuilder(); + builder.Add(_localClientDetails.ClientAddress, LocalGrainManifest); + _current = new ClusterManifest(MajorMinorVersion.MinValue, builder.ToImmutable()); + _updates = new AsyncEnumerable( initialValue: _current, - updateValidator: (previous, proposed) => previous is null || proposed.Version > previous.Version, + updateValidator: (previous, proposed) => proposed.Version > previous.Version, onPublished: update => Interlocked.Exchange(ref _current, update)); } @@ -73,13 +83,32 @@ private async Task RunAsync() { var grainFactory = _services.GetRequiredService(); var cancellationTask = _cancellation.Token.WhenCancelled(); + SiloAddress? gateway = null; + IClusterManifestSystemTarget? provider = null; + var minorVersion = 0; + var gatewayVersion = MajorMinorVersion.MinValue; while (!_cancellation.IsCancellationRequested) { - var gateway = _gatewayManager.GetLiveGateway(); + // Select a new gateway if the current one is not available. + // This could be caused by a temporary issue or a permanent gateway failure. + if (gateway is null || !_gatewayManager.IsGatewayAvailable(gateway)) + { + gateway = _gatewayManager.GetLiveGateway(); + provider = grainFactory.GetGrain(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId); + + // Accept any cluster manifest version from the new gateway. + // Since the minor version of the manifest is specific to each gateway, we reset it to the lowest possible value. + // This means that it is possible to receive the an older or equivalent cluster manifest when the gateway changes. + // That hiccup is addressed by resetting the expected manifest version and merging incomplete manifests until a complete + // manifest is received. + gatewayVersion = MajorMinorVersion.MinValue; + } + + Debug.Assert(provider is not null); + try { - var provider = grainFactory.GetGrain(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId); - var refreshTask = provider.GetClusterManifest().AsTask(); + var refreshTask = GetClusterManifestUpdate(provider, gatewayVersion); var task = await Task.WhenAny(cancellationTask, refreshTask).ConfigureAwait(false); if (ReferenceEquals(task, cancellationTask)) @@ -87,7 +116,38 @@ private async Task RunAsync() return; } - if (!_updates.TryPublish(await refreshTask)) + var updateResult = await refreshTask; + if (updateResult is null) + { + // There was no newer cluster manifest, so wait for the next refresh interval and try again. + await Task.WhenAny(cancellationTask, Task.Delay(_typeManagementOptions.TypeMapRefreshInterval)); + continue; + } + + gatewayVersion = updateResult.Version; + + // If the manifest does not contain all active servers, merge with the existing manifest until it does. + // This prevents reversed progress at the expense of including potentially defunct silos. + ImmutableDictionary siloManifests; + if (!updateResult.IncludesAllActiveServers) + { + // Merge manifests until the manifest contains all active servers. + var mergedSilos = _current.Silos.ToBuilder(); + mergedSilos.Add(_localClientDetails.ClientAddress, LocalGrainManifest); + foreach (var kvp in updateResult.SiloManifests) + { + mergedSilos[kvp.Key] = kvp.Value; + } + + siloManifests = mergedSilos.ToImmutable(); + } + else + { + siloManifests = updateResult.SiloManifests.Add(_localClientDetails.ClientAddress, LocalGrainManifest); + } + + var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), siloManifests); + if (!_updates.TryPublish(updatedManifest)) { await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500))); continue; @@ -106,6 +166,9 @@ private async Task RunAsync() { _logger.LogWarning(exception, "Error trying to get cluster manifest from gateway {Gateway}", gateway); await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromSeconds(5))); + + // Reset the gateway so that another will be selected on the next iteration. + gateway = null; } } } @@ -120,6 +183,25 @@ private async Task RunAsync() } } + private async Task GetClusterManifestUpdate(IClusterManifestSystemTarget provider, MajorMinorVersion previousVersion) + { + try + { + // First, attempt to call the new API, which provides more information. + // This returns null if there is no newer cluster manifest. + return await provider.GetClusterManifestUpdate(previousVersion); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Failed to fetch cluster manifest update from {Provider}.", provider); + + // If the provider does not support the new API, fall back to the old one. + var manifest = await provider.GetClusterManifest(); + var result = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers: true); + return result; + } + } + /// public ValueTask DisposeAsync() { diff --git a/src/Orleans.Core/Manifest/ClientManifestProvider.cs b/src/Orleans.Core/Manifest/ClientManifestProvider.cs index 7d612aa72f..119cc90426 100644 --- a/src/Orleans.Core/Manifest/ClientManifestProvider.cs +++ b/src/Orleans.Core/Manifest/ClientManifestProvider.cs @@ -13,15 +13,12 @@ namespace Orleans.Runtime internal class ClientManifestProvider { public ClientManifestProvider( - IEnumerable grainPropertiesProviders, IEnumerable grainInterfacePropertiesProviders, IOptions grainTypeOptions, - GrainTypeResolver grainTypeResolver, GrainInterfaceTypeResolver interfaceTypeResolver) { - var grainProperties = CreateGrainManifest(grainPropertiesProviders, grainTypeOptions, grainTypeResolver); var interfaces = CreateInterfaceManifest(grainInterfacePropertiesProviders, grainTypeOptions, interfaceTypeResolver); - this.ClientManifest = new GrainManifest(grainProperties, interfaces); + this.ClientManifest = new GrainManifest(ImmutableDictionary.Empty, interfaces); } /// @@ -57,34 +54,5 @@ private static ImmutableDictionary return builder.ToImmutable(); } - - private static ImmutableDictionary CreateGrainManifest( - IEnumerable grainMetadataProviders, - IOptions grainTypeOptions, - GrainTypeResolver grainTypeProvider) - { - var propertiesMap = ImmutableDictionary.CreateBuilder(); - foreach (var grainClass in grainTypeOptions.Value.Classes) - { - var grainType = grainTypeProvider.GetGrainType((Type)grainClass); - var properties = new Dictionary(); - foreach (var provider in grainMetadataProviders) - { - provider.Populate((Type)grainClass, grainType, properties); - } - - var result = new GrainProperties(properties.ToImmutableDictionary()); - if (propertiesMap.TryGetValue(grainType, out var grainProperty)) - { - throw new InvalidOperationException($"An entry with the key {grainType} is already present." - + $"\nExisting: {grainProperty.ToDetailedString()}\nTrying to add: {result.ToDetailedString()}" - + "\nConsider using the [GrainType(\"name\")] attribute to give these classes unique names."); - } - - propertiesMap.Add(grainType, result); - } - - return propertiesMap.ToImmutable(); - } } } diff --git a/src/Orleans.Core/Manifest/GrainVersionManifest.cs b/src/Orleans.Core/Manifest/GrainVersionManifest.cs index 1a62ce3efd..4548cabb57 100644 --- a/src/Orleans.Core/Manifest/GrainVersionManifest.cs +++ b/src/Orleans.Core/Manifest/GrainVersionManifest.cs @@ -37,7 +37,7 @@ public GrainVersionManifest(IClusterManifestProvider clusterManifestProvider) /// /// Gets the local version for a specified grain interface type. /// - /// The grain intrerface type name. + /// The grain interface type name. /// The version of the specified grain interface. public ushort GetLocalVersion(GrainInterfaceType interfaceType) { @@ -151,7 +151,7 @@ public ushort GetLocalVersion(GrainInterfaceType interfaceType) /// The grain type. /// The grain interface type name. /// The grain interface version. - /// The set of silos which support the specifed grain. + /// The set of silos which support the specified grain. public (MajorMinorVersion Version, Dictionary Result) GetSupportedSilos(GrainType grainType, GrainInterfaceType interfaceType, ushort[] versions) { var result = new Dictionary(); @@ -238,6 +238,13 @@ private static Cache BuildCache(ClusterManifest clusterManifest) foreach (var entry in clusterManifest.Silos) { var silo = entry.Key; + + // Since clients are not eligible for placement, we exclude them here. + if (silo.IsClient) + { + continue; + } + var manifest = entry.Value; foreach (var grainInterface in manifest.Interfaces) { diff --git a/src/Orleans.Core/Manifest/IClusterManifestProvider.cs b/src/Orleans.Core/Manifest/IClusterManifestProvider.cs index d74dc1d4b8..98764d9e46 100644 --- a/src/Orleans.Core/Manifest/IClusterManifestProvider.cs +++ b/src/Orleans.Core/Manifest/IClusterManifestProvider.cs @@ -13,7 +13,7 @@ public interface IClusterManifestProvider /// Gets the current cluster manifest. /// ClusterManifest Current { get; } - + /// /// Gets the stream of cluster manifest updates. /// diff --git a/src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs b/src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs index 42411884a7..9315578caa 100644 --- a/src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs +++ b/src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs @@ -1,3 +1,5 @@ +#nullable enable +using System.Collections.Immutable; using System.Threading.Tasks; using Orleans.Metadata; @@ -13,5 +15,46 @@ internal interface IClusterManifestSystemTarget : ISystemTarget /// /// The current cluster manifest. ValueTask GetClusterManifest(); + + /// + /// Gets an updated cluster manifest if newer than the provided . + /// + /// The current cluster manifest, or if it is not newer than the provided version. + ValueTask GetClusterManifestUpdate(MajorMinorVersion previousVersion); + } + + /// + /// Represents an update to the cluster manifest. + /// + [GenerateSerializer, Immutable] + public class ClusterManifestUpdate + { + public ClusterManifestUpdate( + MajorMinorVersion manifestVersion, + ImmutableDictionary siloManifests, + bool includesAllActiveServers) + { + Version = manifestVersion; + SiloManifests = siloManifests; + IncludesAllActiveServers = includesAllActiveServers; + } + + /// + /// Gets the version of this instance. + /// + [Id(0)] + public MajorMinorVersion Version { get; } + + /// + /// Gets the manifests for each silo in the cluster. + /// + [Id(1)] + public ImmutableDictionary SiloManifests { get; } + + /// + /// Gets a value indicating whether this update includes all active servers. + /// + [Id(2)] + public bool IncludesAllActiveServers { get; } } } diff --git a/src/Orleans.Core/Messaging/ClientMessageCenter.cs b/src/Orleans.Core/Messaging/ClientMessageCenter.cs index f05d8bc352..42c46916a4 100644 --- a/src/Orleans.Core/Messaging/ClientMessageCenter.cs +++ b/src/Orleans.Core/Messaging/ClientMessageCenter.cs @@ -44,7 +44,7 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable internal static readonly TimeSpan MINIMUM_INTERCONNECT_DELAY = TimeSpan.FromMilliseconds(100); // wait one tenth of a second between connect attempts internal const int CONNECT_RETRY_COUNT = 2; // Retry twice before giving up on a gateway server - internal ClientGrainId ClientId { get; private set; } + internal ClientGrainId ClientId => _localClientDetails.ClientId; public IRuntimeClient RuntimeClient { get; } internal bool Running { get; private set; } @@ -58,17 +58,16 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary. private readonly WeakReference[] grainBuckets; private readonly ILogger logger; - public SiloAddress MyAddress { get; private set; } + public SiloAddress MyAddress => _localClientDetails.ClientAddress; private int numberOfConnectedGateways = 0; private readonly MessageFactory messageFactory; private readonly IClusterConnectionStatusListener connectionStatusListener; private readonly ConnectionManager connectionManager; + private readonly LocalClientDetails _localClientDetails; public ClientMessageCenter( IOptions clientMessagingOptions, - IPAddress localAddress, - int gen, - ClientGrainId clientId, + LocalClientDetails localClientDetails, IRuntimeClient runtimeClient, MessageFactory messageFactory, IClusterConnectionStatusListener connectionStatusListener, @@ -77,8 +76,7 @@ public ClientMessageCenter( GatewayManager gatewayManager) { this.connectionManager = connectionManager; - MyAddress = SiloAddress.New(localAddress, 0, gen); - ClientId = clientId; + _localClientDetails = localClientDetails; this.RuntimeClient = runtimeClient; this.messageFactory = messageFactory; this.connectionStatusListener = connectionStatusListener; diff --git a/src/Orleans.Core/Runtime/LocalClientDetails.cs b/src/Orleans.Core/Runtime/LocalClientDetails.cs new file mode 100644 index 0000000000..faa24cf5d3 --- /dev/null +++ b/src/Orleans.Core/Runtime/LocalClientDetails.cs @@ -0,0 +1,26 @@ +using System.Net; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Runtime; +using Orleans.Runtime.Configuration; + +namespace Orleans +{ + internal class LocalClientDetails + { + public LocalClientDetails(IOptions clientMessagingOptions) + { + var options = clientMessagingOptions.Value; + var ipAddress = options.LocalAddress ?? ConfigUtilities.GetLocalIPAddress(options.PreferredFamily, options.NetworkInterfaceName); + + // Client generations are negative + var generation = -SiloAddress.AllocateNewGeneration(); + ClientAddress = SiloAddress.New(ipAddress, 0, generation); + ClientId = ClientGrainId.Create(); + } + + public ClientGrainId ClientId { get; } + public IPAddress IPAddress => ClientAddress.Endpoint.Address; + public SiloAddress ClientAddress { get; } + } +} \ No newline at end of file diff --git a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs index d0e05be21d..382b355734 100644 --- a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs +++ b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -11,7 +10,6 @@ using Orleans.Configuration; using Orleans.Messaging; using Orleans.Runtime; -using Orleans.Runtime.Configuration; using Orleans.Serialization; using Orleans.Serialization.Invocation; @@ -30,12 +28,11 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne private bool disposed; private readonly MessagingTrace messagingTrace; - private readonly ClientGrainId clientId; public IInternalGrainFactory InternalGrainFactory { get; private set; } private MessageFactory messageFactory; - private IPAddress localAddress; + private readonly LocalClientDetails _localClientDetails; private readonly ILoggerFactory loggerFactory; private readonly SharedCallbackData sharedCallbackData; @@ -59,16 +56,17 @@ public string CurrentActivationIdentity [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope", Justification = "MessageCenter is IDisposable but cannot call Dispose yet as it lives past the end of this method call.")] public OutsideRuntimeClient( + LocalClientDetails localClientDetails, ILoggerFactory loggerFactory, IOptions clientMessagingOptions, MessagingTrace messagingTrace, IServiceProvider serviceProvider) { this.ServiceProvider = serviceProvider; + _localClientDetails = localClientDetails; this.loggerFactory = loggerFactory; this.messagingTrace = messagingTrace; this.logger = loggerFactory.CreateLogger(); - this.clientId = ClientGrainId.Create(); callbacks = new ConcurrentDictionary(); this.clientMessagingOptions = clientMessagingOptions.Value; @@ -113,10 +111,8 @@ internal void ConsumeServices() this.GrainReferenceRuntime = this.ServiceProvider.GetRequiredService(); - this.localAddress = this.clientMessagingOptions.LocalAddress ?? ConfigUtilities.GetLocalIPAddress(this.clientMessagingOptions.PreferredFamily, this.clientMessagingOptions.NetworkInterfaceName); - // Client init / sign-on message - logger.LogInformation((int)ErrorCode.ClientStarting, "Starting Orleans client with runtime version \"{RuntimeVersion}\", local address {LocalAddress} and client id {ClientId}", RuntimeVersion.Current, localAddress, clientId); + logger.LogInformation((int)ErrorCode.ClientStarting, "Starting Orleans client with runtime version \"{RuntimeVersion}\", local address {LocalAddress} and client id {ClientId}", RuntimeVersion.Current, _localClientDetails.ClientAddress, _localClientDetails.ClientId); if (TestOnlyThrowExceptionDuringInit) { @@ -141,7 +137,7 @@ public async Task Start(CancellationToken cancellationToken) // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler. await Task.Run(() => this.StartInternal(cancellationToken)).ConfigureAwait(false); - logger.LogInformation((int)ErrorCode.ProxyClient_StartDone, "Started client with address {ActivationAddress} and id {ClientId}", CurrentActivationAddress.ToString(), clientId); + logger.LogInformation((int)ErrorCode.ProxyClient_StartDone, "Started client with address {ActivationAddress} and id {ClientId}", CurrentActivationAddress.ToString(), _localClientDetails.ClientId); } // used for testing to (carefully!) allow two clients in the same process @@ -155,14 +151,13 @@ await ExecuteWithRetries( retryFilter, cancellationToken); - var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative - MessageCenter = ActivatorUtilities.CreateInstance(this.ServiceProvider, localAddress, generation, clientId); + MessageCenter = ActivatorUtilities.CreateInstance(this.ServiceProvider); MessageCenter.RegisterLocalMessageHandler(this.HandleMessage); await ExecuteWithRetries( async () => await MessageCenter.StartAsync(cancellationToken), retryFilter, cancellationToken); - CurrentActivationAddress = GrainAddress.NewActivationAddress(MessageCenter.MyAddress, clientId.GrainId); + CurrentActivationAddress = GrainAddress.NewActivationAddress(MessageCenter.MyAddress, _localClientDetails.ClientId.GrainId); this.gatewayObserver = new ClientGatewayObserver(gatewayManager); this.InternalGrainFactory.CreateObjectReference(this.gatewayObserver); @@ -350,8 +345,8 @@ public IAddressable CreateObjectReference(IAddressable obj) throw new ArgumentException("Argument must not be a grain class.", nameof(obj)); var observerId = obj is ClientObserver clientObserver - ? clientObserver.GetObserverGrainId(this.clientId) - : ObserverGrainId.Create(this.clientId); + ? clientObserver.GetObserverGrainId(_localClientDetails.ClientId) + : ObserverGrainId.Create(_localClientDetails.ClientId); var reference = this.InternalGrainFactory.GetGrain(observerId.GrainId); if (!localObjects.TryRegister(obj, observerId)) diff --git a/src/Orleans.Core/Utils/StandardExtensions.cs b/src/Orleans.Core/Utils/StandardExtensions.cs index 6e817cc71c..bb1a8b1a58 100644 --- a/src/Orleans.Core/Utils/StandardExtensions.cs +++ b/src/Orleans.Core/Utils/StandardExtensions.cs @@ -8,14 +8,8 @@ namespace Orleans.Internal /// internal static class StandardExtensions { - public static TimeSpan Max(TimeSpan first, TimeSpan second) - { - return first >= second ? first : second; - } + public static TimeSpan Max(TimeSpan first, TimeSpan second) => first >= second ? first : second; - public static TimeSpan Min(TimeSpan first, TimeSpan second) - { - return first < second ? first : second; - } + public static TimeSpan Min(TimeSpan first, TimeSpan second) => first < second ? first : second; } } diff --git a/src/Orleans.Runtime/Core/InsideRuntimeClient.cs b/src/Orleans.Runtime/Core/InsideRuntimeClient.cs index 11d48f1ca2..6fdef1729d 100644 --- a/src/Orleans.Runtime/Core/InsideRuntimeClient.cs +++ b/src/Orleans.Runtime/Core/InsideRuntimeClient.cs @@ -35,7 +35,6 @@ internal sealed class InsideRuntimeClient : IRuntimeClient, ILifecycleParticipan private SafeTimer callbackTimer; private GrainLocator grainLocator; - private Catalog catalog; private MessageCenter messageCenter; private List grainCallFilters; private readonly DeepCopier _deepCopier; @@ -96,8 +95,6 @@ public InsideRuntimeClient( public GrainFactory ConcreteGrainFactory { get; } - private Catalog Catalog => this.catalog ?? (this.catalog = this.ServiceProvider.GetRequiredService()); - private GrainLocator GrainLocator => this.grainLocator ?? (this.grainLocator = this.ServiceProvider.GetRequiredService()); diff --git a/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs b/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs index 7c2ad5fb19..efe9548639 100644 --- a/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs +++ b/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs @@ -1,3 +1,7 @@ +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Orleans.Metadata; @@ -7,20 +11,61 @@ namespace Orleans.Runtime internal class ClusterManifestSystemTarget : SystemTarget, IClusterManifestSystemTarget, ISiloManifestSystemTarget { private readonly GrainManifest _siloManifest; + private readonly IClusterMembershipService _clusterMembershipService; private readonly IClusterManifestProvider _clusterManifestProvider; + private readonly ClusterManifestUpdate _noUpdate = default; + private MembershipVersion _cachedMembershipVersion; + private ClusterManifestUpdate _cachedUpdate; public ClusterManifestSystemTarget( + IClusterMembershipService clusterMembershipService, IClusterManifestProvider clusterManifestProvider, ILocalSiloDetails siloDetails, ILoggerFactory loggerFactory) : base(Constants.ManifestProviderType, siloDetails.SiloAddress, loggerFactory) { _siloManifest = clusterManifestProvider.LocalGrainManifest; + _clusterMembershipService = clusterMembershipService; _clusterManifestProvider = clusterManifestProvider; } - public ValueTask GetClusterManifest() => new ValueTask(_clusterManifestProvider.Current); + public ValueTask GetClusterManifest() => new(_clusterManifestProvider.Current); + public ValueTask GetClusterManifestUpdate(MajorMinorVersion version) + { + var manifest = _clusterManifestProvider.Current; + + // Only return an updated manifest if it is newer than the provided version. + if (manifest.Version <= version) + { + return new (_noUpdate); + } + + // Maintain a cache of whether the current manifest contains all active servers so that it + // does not need to be recomputed each time. + var membershipSnapshot = _clusterMembershipService.CurrentSnapshot; + if (_cachedUpdate is null + || membershipSnapshot.Version > _cachedMembershipVersion + || manifest.Version > _cachedUpdate.Version) + { + var includesAllActiveServers = true; + foreach (var server in membershipSnapshot.Members) + { + if (server.Value.Status == SiloStatus.Active) + { + if (!manifest.Silos.ContainsKey(server.Key)) + { + includesAllActiveServers = false; + } + } + } + + _cachedUpdate = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers); + _cachedMembershipVersion = membershipSnapshot.Version; + } + + return new (_cachedUpdate); + } - public ValueTask GetSiloManifest() => new ValueTask(_siloManifest); + public ValueTask GetSiloManifest() => new(_siloManifest); } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs index 99c5dea782..347c46ced4 100644 --- a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs +++ b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs @@ -39,8 +39,7 @@ public ClusterManifestProvider( this.LocalGrainManifest = siloManifestProvider.SiloManifest; _current = new ClusterManifest( MajorMinorVersion.Zero, - ImmutableDictionary.CreateRange(new[] { new KeyValuePair(localSiloDetails.SiloAddress, this.LocalGrainManifest) }), - ImmutableArray.Create(this.LocalGrainManifest)); + ImmutableDictionary.CreateRange(new[] { new KeyValuePair(localSiloDetails.SiloAddress, this.LocalGrainManifest) })); _updates = new AsyncEnumerable( initialValue: _current, updateValidator: (previous, proposed) => proposed.Version > previous.Version, @@ -182,7 +181,7 @@ private async Task UpdateManifest(ClusterMembershipSnapshot clusterMembers var version = new MajorMinorVersion(clusterMembership.Version.Value, existingManifest.Version.Minor + 1); if (modified) { - return _updates.TryPublish(new ClusterManifest(version, builder.ToImmutable(), builder.Values.ToImmutableArray())) && fetchSuccess; + return _updates.TryPublish(new ClusterManifest(version, builder.ToImmutable())) && fetchSuccess; } return fetchSuccess; diff --git a/test/NonSilo.Tests/Directory/CachedGrainLocatorTests.cs b/test/NonSilo.Tests/Directory/CachedGrainLocatorTests.cs index 4683bb4cb5..94ba121a55 100644 --- a/test/NonSilo.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/NonSilo.Tests/Directory/CachedGrainLocatorTests.cs @@ -499,8 +499,7 @@ private class NoOpClusterManifestProvider : IClusterManifestProvider { public ClusterManifest Current => new ClusterManifest( MajorMinorVersion.Zero, - ImmutableDictionary.Empty, - ImmutableArray.Create(new GrainManifest(ImmutableDictionary.Empty, ImmutableDictionary.Empty))); + ImmutableDictionary.Empty); public IAsyncEnumerable Updates => this.GetUpdates();