Skip to content

Commit

Permalink
Implement incoming grain call filters for observers (#9054)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond authored Jul 10, 2024
1 parent 6ff7edc commit a44d4a3
Show file tree
Hide file tree
Showing 12 changed files with 556 additions and 77 deletions.
3 changes: 2 additions & 1 deletion src/Orleans.Core/Core/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public static IClientBuilder AddActivityPropagation(this IClientBuilder builder)
builder.Services.TryAddSingleton(DistributedContextPropagator.Current);

return builder
.AddOutgoingGrainCallFilter<ActivityPropagationOutgoingGrainCallFilter>();
.AddOutgoingGrainCallFilter<ActivityPropagationOutgoingGrainCallFilter>()
.AddIncomingGrainCallFilter<ActivityPropagationIncomingGrainCallFilter>();
}

/// <summary>
Expand Down
101 changes: 67 additions & 34 deletions src/Orleans.Core/Core/ClientBuilderGrainCallFilterExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,75 @@
namespace Orleans.Hosting
namespace Orleans.Hosting;

/// <summary>
/// Extensions for configuring grain call filters.
/// </summary>
public static class ClientBuilderGrainCallFilterExtensions
{
/// <summary>
/// Extensions for configuring grain call filters.
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter(this IClientBuilder builder, IIncomingGrainCallFilter filter)
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IIncomingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter<TImplementation>());
}

/// <summary>
/// Adds an <see cref="IIncomingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The builder.</returns>
public static IClientBuilder AddIncomingGrainCallFilter(this IClientBuilder builder, IncomingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddIncomingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
public static class ClientBuilderGrainCallFilterExtensions
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, IOutgoingGrainCallFilter filter)
{
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, IOutgoingGrainCallFilter filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IOutgoingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter<TImplementation>());
}
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline.
/// </summary>
/// <typeparam name="TImplementation">The filter implementation type.</typeparam>
/// <param name="builder">The builder.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter<TImplementation>(this IClientBuilder builder)
where TImplementation : class, IOutgoingGrainCallFilter
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter<TImplementation>());
}

/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, OutgoingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
/// <summary>
/// Adds an <see cref="IOutgoingGrainCallFilter"/> to the filter pipeline via a delegate.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="filter">The filter.</param>
/// <returns>The <see cref="IClientBuilder"/>.</returns>
public static IClientBuilder AddOutgoingGrainCallFilter(this IClientBuilder builder, OutgoingGrainCallFilterDelegate filter)
{
return builder.ConfigureServices(services => services.AddOutgoingGrainCallFilter(filter));
}
}
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<ClientGrainContext>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
services.TryAddFromExisting<IRuntimeClient, OutsideRuntimeClient>();
Expand Down
File renamed without changes.
27 changes: 26 additions & 1 deletion src/Orleans.Core/Runtime/InvokableObjectManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Serialization;
Expand All @@ -14,23 +15,34 @@ internal class InvokableObjectManager : IDisposable
{
private readonly CancellationTokenSource disposed = new CancellationTokenSource();
private readonly ConcurrentDictionary<ObserverGrainId, LocalObjectData> localObjects = new ConcurrentDictionary<ObserverGrainId, LocalObjectData>();

private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
private readonly IGrainContext rootGrainContext;
private readonly IRuntimeClient runtimeClient;
private readonly ILogger logger;
private readonly DeepCopier deepCopier;
private readonly DeepCopier<Response> _responseCopier;
private readonly MessagingTrace messagingTrace;
private List<IIncomingGrainCallFilter> _grainCallFilters;

private List<IIncomingGrainCallFilter> GrainCallFilters
=> _grainCallFilters ??= new List<IIncomingGrainCallFilter>(runtimeClient.ServiceProvider.GetServices<IIncomingGrainCallFilter>());

public InvokableObjectManager(
IGrainContext rootGrainContext,
IRuntimeClient runtimeClient,
DeepCopier deepCopier,
MessagingTrace messagingTrace,
DeepCopier<Response> responseCopier,
InterfaceToImplementationMappingCache interfaceToImplementationMapping,
ILogger logger)
{
this.rootGrainContext = rootGrainContext;
this.runtimeClient = runtimeClient;
this.deepCopier = deepCopier;
this.messagingTrace = messagingTrace;
_responseCopier = responseCopier;
_interfaceToImplementationMapping = interfaceToImplementationMapping;
this.logger = logger;
}

Expand Down Expand Up @@ -246,7 +258,20 @@ private async Task LocalObjectMessagePumpAsync()
try
{
request.SetTarget(this);
var response = await request.Invoke();
var filters = _manager.GrainCallFilters;
Response response;
if (filters is { Count: > 0 } || LocalObject is IIncomingGrainCallFilter)
{
var invoker = new GrainMethodInvoker(message, this, request, filters, _manager._interfaceToImplementationMapping, _manager._responseCopier);
await invoker.Invoke();
response = invoker.Response;
}
else
{
response = await request.Invoke();
response = _manager._responseCopier.Copy(response);
}

if (message.Direction != Message.Directions.OneWay)
{
this.SendResponseAsync(message, response);
Expand Down
16 changes: 10 additions & 6 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;

namespace Orleans
Expand All @@ -30,6 +31,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne
private bool disposed;

private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;

public IInternalGrainFactory InternalGrainFactory { get; private set; }

Expand Down Expand Up @@ -65,9 +67,11 @@ public OutsideRuntimeClient(
IOptions<ClientMessagingOptions> clientMessagingOptions,
MessagingTrace messagingTrace,
IServiceProvider serviceProvider,
TimeProvider timeProvider)
TimeProvider timeProvider,
InterfaceToImplementationMappingCache interfaceToImplementationMapping)
{
TimeProvider = timeProvider;
_interfaceToImplementationMapping = interfaceToImplementationMapping;
this.ServiceProvider = serviceProvider;
_localClientDetails = localClientDetails;
this.loggerFactory = loggerFactory;
Expand Down Expand Up @@ -105,14 +109,14 @@ internal void ConsumeServices()

this.InternalGrainFactory = this.ServiceProvider.GetRequiredService<IInternalGrainFactory>();
this.messageFactory = this.ServiceProvider.GetService<MessageFactory>();

var copier = this.ServiceProvider.GetRequiredService<DeepCopier>();
this.localObjects = new InvokableObjectManager(
ServiceProvider.GetRequiredService<ClientGrainContext>(),
this,
copier,
this.messagingTrace,
this.loggerFactory.CreateLogger<ClientGrainContext>());
ServiceProvider.GetRequiredService<DeepCopier>(),
messagingTrace,
ServiceProvider.GetRequiredService<DeepCopier<Response>>(),
_interfaceToImplementationMapping,
loggerFactory.CreateLogger<ClientGrainContext>());

this.callbackTimerTask = Task.Run(MonitorCallbackExpiry);

Expand Down
10 changes: 7 additions & 3 deletions src/Orleans.Runtime/Core/HostedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Orleans.Internal;
using Orleans.Runtime.Messaging;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;

namespace Orleans.Runtime
{
Expand All @@ -24,7 +25,7 @@ internal sealed class HostedClient : IGrainContext, IGrainExtensionBinder, IDisp
private readonly Channel<Message> incomingMessages;
private readonly IGrainReferenceRuntime grainReferenceRuntime;
private readonly InvokableObjectManager invokableObjects;
private readonly IRuntimeClient runtimeClient;
private readonly InsideRuntimeClient runtimeClient;
private readonly ILogger logger;
private readonly IInternalGrainFactory grainFactory;
private readonly MessageCenter siloMessageCenter;
Expand All @@ -36,15 +37,16 @@ internal sealed class HostedClient : IGrainContext, IGrainExtensionBinder, IDisp
private Task? messagePump;

public HostedClient(
IRuntimeClient runtimeClient,
InsideRuntimeClient runtimeClient,
ILocalSiloDetails siloDetails,
ILogger<HostedClient> logger,
IGrainReferenceRuntime grainReferenceRuntime,
IInternalGrainFactory grainFactory,
MessageCenter messageCenter,
MessagingTrace messagingTrace,
DeepCopier deepCopier,
GrainReferenceActivator referenceActivator)
GrainReferenceActivator referenceActivator,
InterfaceToImplementationMappingCache interfaceToImplementationMappingCache)
{
this.incomingMessages = Channel.CreateUnbounded<Message>(new UnboundedChannelOptions
{
Expand All @@ -61,6 +63,8 @@ public HostedClient(
runtimeClient,
deepCopier,
messagingTrace,
runtimeClient.ServiceProvider.GetRequiredService<DeepCopier<Response>>(),
interfaceToImplementationMappingCache,
logger);
this.siloMessageCenter = messageCenter;
this.messagingTrace = messagingTrace;
Expand Down
11 changes: 6 additions & 5 deletions src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal sealed class InsideRuntimeClient : IRuntimeClient, ILifecycleParticipan
private readonly ILoggerFactory loggerFactory;
private readonly SiloMessagingOptions messagingOptions;
private readonly ConcurrentDictionary<(GrainId, CorrelationId), CallbackData> callbacks;
private readonly InterfaceToImplementationMappingCache interfaceToImplementationMapping;
private readonly SharedCallbackData sharedCallbackData;
private readonly SharedCallbackData systemSharedCallbackData;
private readonly PeriodicTimer callbackTimer;
Expand All @@ -39,10 +40,9 @@ internal sealed class InsideRuntimeClient : IRuntimeClient, ILifecycleParticipan
private MessageCenter messageCenter;
private List<IIncomingGrainCallFilter> grainCallFilters;
private readonly DeepCopier _deepCopier;
private readonly InterfaceToImplementationMappingCache interfaceToImplementationMapping;
private HostedClient hostedClient;

private HostedClient HostedClient => this.hostedClient ?? (this.hostedClient = this.ServiceProvider.GetRequiredService<HostedClient>());
private HostedClient HostedClient => this.hostedClient ??= this.ServiceProvider.GetRequiredService<HostedClient>();
private readonly MessageFactory messageFactory;
private IGrainReferenceRuntime grainReferenceRuntime;
private Task callbackTimerTask;
Expand All @@ -60,10 +60,11 @@ public InsideRuntimeClient(
GrainInterfaceTypeResolver interfaceIdResolver,
GrainInterfaceTypeToGrainTypeResolver interfaceToTypeResolver,
DeepCopier deepCopier,
TimeProvider timeProvider)
TimeProvider timeProvider,
InterfaceToImplementationMappingCache interfaceToImplementationMapping)
{
TimeProvider = timeProvider;
this.interfaceToImplementationMapping = new InterfaceToImplementationMappingCache();
this.interfaceToImplementationMapping = interfaceToImplementationMapping;
this._deepCopier = deepCopier;
this.ServiceProvider = serviceProvider;
this.MySilo = siloDetails.SiloAddress;
Expand Down Expand Up @@ -102,7 +103,7 @@ private GrainLocator GrainLocator
=> this.grainLocator ?? (this.grainLocator = this.ServiceProvider.GetRequiredService<GrainLocator>());

private List<IIncomingGrainCallFilter> GrainCallFilters
=> this.grainCallFilters ?? (this.grainCallFilters = new List<IIncomingGrainCallFilter>(this.ServiceProvider.GetServices<IIncomingGrainCallFilter>()));
=> this.grainCallFilters ??= new List<IIncomingGrainCallFilter>(this.ServiceProvider.GetServices<IIncomingGrainCallFilter>());

private MessageCenter MessageCenter => this.messageCenter ?? (this.messageCenter = this.ServiceProvider.GetRequiredService<MessageCenter>());

Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.AddTransient<CancellationSourcesExtension>();
services.AddKeyedTransient<IGrainExtension>(typeof(ICancellationSourcesExtension), (sp, _) => sp.GetRequiredService<CancellationSourcesExtension>());
services.TryAddSingleton<GrainFactory>(sp => sp.GetRequiredService<InsideRuntimeClient>().ConcreteGrainFactory);
services.TryAddSingleton<InterfaceToImplementationMappingCache>();
services.TryAddSingleton<GrainInterfaceTypeToGrainTypeResolver>();
services.TryAddFromExisting<IGrainFactory, GrainFactory>();
services.TryAddFromExisting<IInternalGrainFactory, GrainFactory>();
Expand Down
Loading

0 comments on commit a44d4a3

Please sign in to comment.