Skip to content

Map WebJobs Extension RPC data sources onto the gRPC host endpoints #9292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
- Update PowerShell Worker 7.4 to 4.0.2802 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.2802)
- Fixing bug with placeholder misses in dotnet-isolated #9253
- Update PowerShell Worker 7.0 to 4.0.2823 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.2823)
- Add new Host to Worker RPC extensibility feature for out-of-proc workers. (#9292)
19 changes: 13 additions & 6 deletions src/WebJobs.Script.Grpc/Server/AspNetCoreGrpcHostBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
{
internal static class AspNetCoreGrpcHostBuilder
{
public static IHostBuilder CreateHostBuilder(FunctionRpc.FunctionRpcBase service, IScriptEventManager scriptEventManager, int port) =>
new HostBuilder().ConfigureWebHost(webBuilder =>
public static IHostBuilder CreateHostBuilder(
FunctionRpc.FunctionRpcBase service,
IScriptEventManager scriptEventManager,
IScriptHostManager scriptHostManager,
int port)
{
return new HostBuilder().ConfigureWebHost(webBuilder =>
{
webBuilder.UseKestrel(options =>
{
Expand All @@ -26,12 +31,14 @@ public static IHostBuilder CreateHostBuilder(FunctionRpc.FunctionRpcBase service
});

webBuilder.ConfigureServices(services =>
{
services.AddSingleton(scriptEventManager);
services.AddSingleton(service);
});
{
services.AddSingleton(scriptHostManager);
services.AddSingleton(scriptEventManager);
services.AddSingleton(service);
});

webBuilder.UseStartup<Startup>();
});
}
}
}
8 changes: 6 additions & 2 deletions src/WebJobs.Script.Grpc/Server/AspNetCoreGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ public class AspNetCoreGrpcServer : IRpcServer, IDisposable, IAsyncDisposable
private bool _disposed = false;
private IHost _grpcHost;

public AspNetCoreGrpcServer(FunctionRpc.FunctionRpcBase service, IScriptEventManager scriptEventManager, ILogger<AspNetCoreGrpcServer> logger)
public AspNetCoreGrpcServer(
FunctionRpc.FunctionRpcBase service,
IScriptEventManager scriptEventManager,
IScriptHostManager scriptHostManager,
ILogger<AspNetCoreGrpcServer> logger)
{
int port = WorkerUtilities.GetUnusedTcpPort();
_grpcHostBuilder = AspNetCoreGrpcHostBuilder.CreateHostBuilder(service, scriptEventManager, port);
_grpcHostBuilder = AspNetCoreGrpcHostBuilder.CreateHostBuilder(service, scriptEventManager, scriptHostManager, port);
_logger = logger;
Uri = new Uri($"http://{WorkerConstants.HostName}:{port}");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Microsoft.Azure.WebJobs.Rpc.Core.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Primitives;

namespace Microsoft.Azure.WebJobs.Script.Grpc
{
/// <summary>
/// Endpoint data source which composes all the WebJobs extension data sources together.
/// </summary>
/// <remarks>
/// Implementation is adapted from <see cref="CompositeEndpointDataSource"/>.
/// https://github.com/dotnet/aspnetcore/blob/main/src/Http/Routing/src/CompositeEndpointDataSource.cs.
/// </remarks>
internal sealed class ExtensionsCompositeEndpointDataSource : EndpointDataSource, IDisposable
{
private readonly object _lock = new();
private readonly List<EndpointDataSource> _dataSources = new();
private readonly IScriptHostManager _scriptHostManager;

private IServiceProvider _extensionServices;
private List<Endpoint> _endpoints;
private IChangeToken _consumerChangeToken;
private CancellationTokenSource _cts;
private List<IDisposable> _changeTokenRegistrations;
private bool _disposed;

public ExtensionsCompositeEndpointDataSource(IScriptHostManager scriptHostManager)
{
_scriptHostManager = scriptHostManager;
_scriptHostManager.ActiveHostChanged += OnHostChanged;
}

/// <inheritdoc />
public override IReadOnlyList<Endpoint> Endpoints
{
get
{
ThrowIfDisposed();
EnsureEndpointsInitialized();
return _endpoints;
}
}

/// <inheritdoc />
public override IChangeToken GetChangeToken()
{
ThrowIfDisposed();
EnsureChangeTokenInitialized();
return _consumerChangeToken;
}

/// <inheritdoc />
public void Dispose()
{
_scriptHostManager.ActiveHostChanged -= OnHostChanged;

List<IDisposable> disposables = null;
lock (_lock)
{
_disposed = true;
if (_changeTokenRegistrations is { Count: > 0 })
{
disposables ??= new List<IDisposable>();
disposables.AddRange(_changeTokenRegistrations);
}
}

// Dispose everything outside of the lock in case a registration is blocking on HandleChange completing
// on another thread or something.
if (disposables is not null)
{
foreach (var disposable in disposables)
{
disposable.Dispose();
}
}

_cts?.Dispose();
}

private Endpoint WrapEndpoint(Endpoint endpoint)
{
static RequestDelegate CreateDelegate(RequestDelegate next, IServiceProvider services)
{
// Incoming HttpContext has the gRPC script host services. Create a scope
// for the JobHost services and swap out the contexts request services.
return async context =>
{
if (next is null)
{
return;
}

IServiceProvider original = context.RequestServices;

try
{
await using AsyncServiceScope scope = services.CreateAsyncScope();
context.RequestServices = scope.ServiceProvider;
await next(context);
}
finally
{
context.RequestServices = original;
}
};
}

if (endpoint is not RouteEndpoint route)
{
// We only wrap URL-routeable endpoints (ie: RouteEndpoint).
return endpoint;
}

IServiceProvider services = _extensionServices
?? throw new InvalidOperationException(
"Trying to register extension endpoints, but no extension IServiceProvider available.");

return new RouteEndpoint(
CreateDelegate(route.RequestDelegate, services),
route.RoutePattern,
route.Order,
route.Metadata,
route.DisplayName);
}

[MemberNotNull(nameof(_consumerChangeToken))]
private void EnsureChangeTokenInitialized()
{
if (_consumerChangeToken is not null)
{
return;
}

lock (_lock)
{
if (_consumerChangeToken is not null)
{
return;
}

// This is our first time initializing the change token, so the collection has "changed" from nothing.
CreateChangeTokenUnsynchronized(collectionChanged: true);
}
}

[MemberNotNull(nameof(_consumerChangeToken))]
private void CreateChangeTokenUnsynchronized(bool collectionChanged)
{
CancellationTokenSource cts = new();

if (collectionChanged)
{
_changeTokenRegistrations = new();
foreach (var dataSource in _dataSources)
{
_changeTokenRegistrations.Add(ChangeToken.OnChange(
dataSource.GetChangeToken,
() => OnEndpointsChange(collectionChanged: false)));
}
}

_cts = cts;
_consumerChangeToken = new CancellationChangeToken(cts.Token);
}

private void OnHostChanged(object sender, ActiveHostChangedEventArgs args)
{
lock (_lock)
{
_dataSources.Clear();
if (args?.NewHost?.Services is { } services)
{
_extensionServices = services;
IEnumerable<WebJobsRpcEndpointDataSource> sources = services
.GetService<IEnumerable<WebJobsRpcEndpointDataSource>>()
?? Enumerable.Empty<WebJobsRpcEndpointDataSource>();
_dataSources.AddRange(sources);
}
else
{
_extensionServices = null;
}
}

OnEndpointsChange(collectionChanged: true);
}

private void OnEndpointsChange(bool collectionChanged)
{
CancellationTokenSource oldTokenSource = null;
List<IDisposable> oldChangeTokenRegistrations = null;

lock (_lock)
{
if (_disposed)
{
return;
}

// Prevent consumers from re-registering callback to in-flight events as that can
// cause a stack overflow.
// Example:
// 1. B registers A.
// 2. A fires event causing B's callback to get called.
// 3. B executes some code in its callback, but needs to re-register callback
// in the same callback.
oldTokenSource = _cts;
oldChangeTokenRegistrations = _changeTokenRegistrations;

// Don't create a new change token if no one is listening.
if (oldTokenSource is not null)
{
// We have to hook to any OnChange callbacks before caching endpoints,
// otherwise we might miss changes that occurred to one of the _dataSources after caching.
CreateChangeTokenUnsynchronized(collectionChanged);
}

// Don't update endpoints if no one has read them yet.
if (_endpoints is not null)
{
// Refresh the endpoints from data source so that callbacks can get the latest endpoints.
CreateEndpointsUnsynchronized();
}
}

// Disposing registrations can block on user defined code on running on other threads that could try to acquire the _lock.
if (collectionChanged && oldChangeTokenRegistrations is not null)
{
foreach (var registration in oldChangeTokenRegistrations)
{
registration.Dispose();
}
}

// Raise consumer callbacks. Any new callback registration would happen on the new token created in earlier step.
// Avoid raising callbacks inside a lock.
oldTokenSource?.Cancel();
}

[MemberNotNull(nameof(_endpoints))]
private void CreateEndpointsUnsynchronized()
{
var endpoints = new List<Endpoint>();

foreach (var dataSource in _dataSources)
{
endpoints.AddRange(dataSource.Endpoints.Select(WrapEndpoint));
}

// Only cache _endpoints after everything succeeds without throwing.
// We don't want to create a negative cache which would cause 404s when there should be 500s.
_endpoints = endpoints;
}

// Defer initialization to avoid doing lots of reflection on startup.
[MemberNotNull(nameof(_endpoints))]
private void EnsureEndpointsInitialized()
{
if (_endpoints is not null)
{
return;
}

lock (_lock)
{
if (_endpoints is not null)
{
return;
}

// Now that we're caching the _enpoints, we're responsible for keeping them up-to-date even if the caller
// hasn't started listening for changes themselves yet.
EnsureChangeTokenInitialized();

// Note: we can't use DataSourceDependentCache here because we also need to handle a list of change
// tokens, which is a complication most of our code doesn't have.
CreateEndpointsUnsynchronized();
}
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ExtensionsCompositeEndpointDataSource));
}
}
}
}
2 changes: 2 additions & 0 deletions src/WebJobs.Script.Grpc/Server/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal class Startup

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ExtensionsCompositeEndpointDataSource>();
services.AddGrpc(options =>
{
options.MaxReceiveMessageSize = MaxMessageLengthBytes;
Expand All @@ -34,6 +35,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<FunctionRpc.FunctionRpcBase>();
endpoints.DataSources.Add(endpoints.ServiceProvider.GetRequiredService<ExtensionsCompositeEndpointDataSource>());
});
}
}
Expand Down
Loading