Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply capabilities correctly on reload #9367

Merged
merged 14 commits into from
Jul 26, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void Setup()
{
{ RpcWorkerConstants.TypedDataCollection, "1" }
};
grpcCapabilities.UpdateCapabilities(addedCapabilities);
grpcCapabilities.UpdateCapabilities(addedCapabilities, GrpcCapabilitiesUpdateStrategy.Merge);
}
}
}
11 changes: 6 additions & 5 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
- Update PowerShell Worker 7.0 to 4.0.2850 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.2850)
- Update Node.js Worker Version to [3.8.0](https://github.com/Azure/azure-functions-nodejs-worker/releases/tag/v3.8.0)
- Identity dependencies updated to 6.31.0:
- Microsoft.IdentityModel.Tokens
- System.IdentityModel.Tokens.Jwt
- Microsoft.IdentityModel.Abstractions
- Microsoft.IdentityModel.JsonWebTokens
- Microsoft.IdentityModel.Logging
- Microsoft.IdentityModel.Tokens
- System.IdentityModel.Tokens.Jwt
- Microsoft.IdentityModel.Abstractions
- Microsoft.IdentityModel.JsonWebTokens
- Microsoft.IdentityModel.Logging
- Updated Grpc.AspNetCore package to 2.55.0 (https://github.com/Azure/azure-functions-host/pull/9373)
- Update protobuf file to v1.10.0 (https://github.com/Azure/azure-functions-host/pull/9405)
- Send an empty RpcHttp payload if proxying the http request to the worker (https://github.com/Azure/azure-functions-host/pull/9415)
- Add new Host to Worker RPC extensibility feature for out-of-proc workers. (https://github.com/Azure/azure-functions-host/pull/9292)
- Apply capabilities on environment reload (placeholder mode scenarios) (https://github.com/Azure/azure-functions-host/pull/9367)
26 changes: 22 additions & 4 deletions src/WebJobs.Script.Grpc/Channel/GrpcCapabilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.Grpc
Expand All @@ -26,19 +27,36 @@ public string GetCapabilityState(string capability)
return null;
}

public void UpdateCapabilities(IDictionary<string, string> capabilities)
public void UpdateCapabilities(IDictionary<string, string> capabilities, GrpcCapabilitiesUpdateStrategy strategy)
{
if (capabilities == null)
{
return;
}

_logger.LogDebug($"Updating capabilities: {capabilities.ToString()}");
_logger.LogDebug("Updating capabilities using {strategy} strategy. Current: {_capabilities} Incoming: {capabilities}", strategy, JsonSerializer.Serialize(_capabilities), JsonSerializer.Serialize(capabilities));

foreach (KeyValuePair<string, string> capability in capabilities)
switch (strategy)
{
UpdateCapability(capability);
case GrpcCapabilitiesUpdateStrategy.Merge:
foreach (KeyValuePair<string, string> capability in capabilities)
{
UpdateCapability(capability);
}
break;

case GrpcCapabilitiesUpdateStrategy.Replace:
_capabilities.Clear();
foreach (KeyValuePair<string, string> capability in capabilities)
{
UpdateCapability(capability);
}
break;
default:
throw new InvalidOperationException($"Did not recognize the capability update strategy {strategy}.");
}

_logger.LogDebug("Updated capabilities: {capabilities}", JsonSerializer.Serialize(_capabilities));
}

private void UpdateCapability(KeyValuePair<string, string> capability)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.WebJobs.Script.Grpc
{
internal enum GrpcCapabilitiesUpdateStrategy
{
// overwrites existing values and appends new ones
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, B: foo, C: foo} -> {A: foo, B: foo, C: foo}
Merge,
// existing capabilities are cleared and new capabilities are applied
// ex. worker init: {A: foo, B: bar} + env reload: {A:foo, C: foo} -> {A: foo, C: foo}
Replace
}
}
55 changes: 31 additions & 24 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadRespons
_workerConfig.Description.DefaultRuntimeVersion = _workerConfig.Description.DefaultRuntimeVersion ?? res?.WorkerMetadata?.RuntimeVersion;
_workerConfig.Description.DefaultRuntimeName = _workerConfig.Description.DefaultRuntimeName ?? res?.WorkerMetadata?.RuntimeName;

UpdateCapabilities(res.Capabilities);
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
ApplyCapabilities(res.Capabilities, res.CapabilitiesUpdateStrategy.ToGrpcCapabilitiesUpdateStrategy());

if (res.Result.IsFailure(out Exception reloadEnvironmentVariablesException))
{
Expand Down Expand Up @@ -410,7 +409,36 @@ internal void WorkerInitResponse(GrpcEvent initEvent)

_state = _state | RpcWorkerChannelState.Initialized;

UpdateCapabilities(_initMessage.Capabilities);
ApplyCapabilities(_initMessage.Capabilities);

_workerInitTask.TrySetResult(true);
}

private void LogWorkerMetadata(WorkerMetadata workerMetadata)
{
if (workerMetadata == null)
{
return;
}

workerMetadata.UpdateWorkerMetadata(_workerConfig);
var workerMetadataString = workerMetadata.ToString();
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadataString);
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString);
}

// Allow tests to add capabilities, even if not directly supported by the worker.
internal virtual void UpdateCapabilities(IDictionary<string, string> fields, GrpcCapabilitiesUpdateStrategy strategy)
{
_workerCapabilities.UpdateCapabilities(fields, strategy);
}

// Helper method that updates and applies capabilities
// Used at worker initialization and environment reload (placeholder scenarios)
// The default strategy for updating capabilities is merge
internal void ApplyCapabilities(IDictionary<string, string> capabilities, GrpcCapabilitiesUpdateStrategy strategy = GrpcCapabilitiesUpdateStrategy.Merge)
{
UpdateCapabilities(capabilities, strategy);

_isSharedMemoryDataTransferEnabled = IsSharedMemoryDataTransferEnabled();
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
Expand Down Expand Up @@ -441,27 +469,6 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
HandleWorkerInitError(ex);
}
}

_workerInitTask.TrySetResult(true);
}

private void LogWorkerMetadata(WorkerMetadata workerMetadata)
{
if (workerMetadata == null)
{
return;
}

workerMetadata.UpdateWorkerMetadata(_workerConfig);
var workerMetadataString = workerMetadata.ToString();
_metricsLogger.LogEvent(MetricEventNames.WorkerMetadata, functionName: null, workerMetadataString);
_workerChannelLogger.LogDebug("Worker metadata: {workerMetadata}", workerMetadataString);
}

// Allow tests to add capabilities, even if not directly supported by the worker.
internal virtual void UpdateCapabilities(IDictionary<string, string> fields)
{
_workerCapabilities.UpdateCapabilities(fields);
}

public void SetupFunctionInvocationBuffers(IEnumerable<FunctionMetadata> functions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ internal static RetryStrategy ToRetryStrategy(this RpcRetryOptions.Types.RetrySt
_ => throw new InvalidOperationException($"Unknown RetryStrategy RpcDataType: {retryStrategy}.")
};

internal static GrpcCapabilitiesUpdateStrategy ToGrpcCapabilitiesUpdateStrategy(this FunctionEnvironmentReloadResponse.Types.CapabilitiesUpdateStrategy capabilityUpdateStrategy) =>
capabilityUpdateStrategy switch
{
FunctionEnvironmentReloadResponse.Types.CapabilitiesUpdateStrategy.Merge => GrpcCapabilitiesUpdateStrategy.Merge,
FunctionEnvironmentReloadResponse.Types.CapabilitiesUpdateStrategy.Replace => GrpcCapabilitiesUpdateStrategy.Replace,
_ => throw new InvalidOperationException($"Unknown capabilities update strategy: {capabilityUpdateStrategy}.")
};

private static bool ShouldIncludeEmptyEntriesInMessagePayload(GrpcCapabilities capabilities)
{
return !string.IsNullOrWhiteSpace(capabilities.GetCapabilityState(RpcWorkerConstants.IncludeEmptyEntriesInMessagePayload));
Expand Down
7 changes: 4 additions & 3 deletions test/DotNetIsolated60/DotNetIsolated60.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
<FunctionsEnableWorkerIndexing>True</FunctionsEnableWorkerIndexing>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.14.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.18.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="4.0.4" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.10.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="1.0.0-preview2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.0.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.12.0" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@

namespace DotNetIsolated60
{
public class Function1
public class HttpRequestDataFunction
{
private readonly ILogger _logger;
private readonly ILogger<HttpRequestDataFunction> _logger;

public Function1(ILoggerFactory loggerFactory)
public HttpRequestDataFunction(ILogger<HttpRequestDataFunction> logger)
{
_logger = loggerFactory.CreateLogger<Function1>();
_logger = logger;
}

[Function("Function1")]
[Function(nameof(HttpRequestDataFunction))]
public HttpResponseData Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req)
{
_logger.LogInformation("C# HTTP trigger function processed a request.");
Expand Down
23 changes: 23 additions & 0 deletions test/DotNetIsolated60/HttpRequestFunction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace DotNetIsolated60
{
public class HttpRequestFunction
{
private readonly ILogger<HttpRequestFunction> _logger;

public HttpRequestFunction(ILogger<HttpRequestFunction> logger)
{
_logger = logger;
}

[Function(nameof(HttpRequestFunction))]
public Task Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req)
{
_logger.LogInformation("C# HTTP trigger function processed a request.");
return req.HttpContext.Response.WriteAsync("Welcome to Azure Functions!");
}
}
}
22 changes: 18 additions & 4 deletions test/DotNetIsolated60/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,23 @@

//Debugger.Launch();

var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureGeneratedFunctionMetadataProvider()
.Build();
// Tests can set an env var that will swap this to use the proxy
bool useProxy = Environment.GetEnvironmentVariable("UseProxyInTest")?.Contains("1") ?? false;

var hostBuilder = new HostBuilder();

if (useProxy)
{
hostBuilder
.ConfigureFunctionsWebApplication()
.ConfigureGeneratedFunctionMetadataProvider();
}
else
{
hostBuilder
.ConfigureFunctionsWorkerDefaults()
.ConfigureGeneratedFunctionMetadataProvider();
}

var host = hostBuilder.Build();
host.Run();
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ private class TestGrpcWorkerChannel : GrpcWorkerChannel
{
internal TestGrpcWorkerChannel(string workerId, IScriptEventManager eventManager, RpcWorkerConfig workerConfig, IWorkerProcess rpcWorkerProcess, ILogger logger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
: base(workerId, eventManager, workerConfig, rpcWorkerProcess, logger, metricsLogger, attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService)
{
{
}

internal override void UpdateCapabilities(IDictionary<string, string> fields)
internal override void UpdateCapabilities(IDictionary<string, string> fields, GrpcCapabilitiesUpdateStrategy strategy)
{
// inject a capability
fields[RpcWorkerConstants.WorkerApplicationInsightsLoggingEnabled] = bool.TrueString;
base.UpdateCapabilities(fields);
base.UpdateCapabilities(fields, strategy);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ public async Task Specialization_JobHostInternalStorageOptionsUpdatesWithActiveH
[Fact]
public async Task DotNetIsolated_PlaceholderHit()
{
var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1");
var builder = InitializeDotNetIsolatedPlaceholderBuilder("HttpRequestDataFunction");

using var testServer = new TestServer(builder);

Expand All @@ -819,7 +819,57 @@ public async Task DotNetIsolated_PlaceholderHit()
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteContainerReady, "1");
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "0");

response = await client.GetAsync("api/function1");
response = await client.GetAsync("api/HttpRequestDataFunction");
response.EnsureSuccessStatusCode();

// Placeholder hit; these should match
var specializedChannel = await webChannelManager.GetChannels("dotnet-isolated").Single().Value.Task;
Assert.Same(placeholderChannel, specializedChannel);
runningProcess = Process.GetProcessById(placeholderChannel.WorkerProcess.Id);
Assert.Contains(runningProcess.ProcessName, "FunctionsNetHost");

var log = _loggerProvider.GetLog();
Assert.Contains("UsePlaceholderDotNetIsolated: True", log);
Assert.Contains("Placeholder runtime version: '6.0'. Site runtime version: '6.0'. Match: True", log);
Assert.DoesNotContain("Shutting down placeholder worker.", log);
}

[Fact]
public async Task DotNetIsolated_PlaceholderHit_WithProxies()
{
// This test ensures that capabilities are correctly applied in EnvironmentReload during
// specialization
var builder = InitializeDotNetIsolatedPlaceholderBuilder("HttpRequestFunction");

using var testServer = new TestServer(builder);

var client = testServer.CreateClient();
client.Timeout = TimeSpan.FromSeconds(10);

var response = await client.GetAsync("api/warmup");
response.EnsureSuccessStatusCode();

// Validate that the channel is set up with native worker
var webChannelManager = testServer.Services.GetService<IWebHostRpcWorkerChannelManager>();

var placeholderChannel = await webChannelManager.GetChannels("dotnet-isolated").Single().Value.Task;
Assert.Contains("FunctionsNetHost.exe", placeholderChannel.WorkerProcess.Process.StartInfo.FileName);
Assert.NotNull(placeholderChannel.WorkerProcess.Process.Id);
var runningProcess = Process.GetProcessById(placeholderChannel.WorkerProcess.Id);
Assert.Contains(runningProcess.ProcessName, "FunctionsNetHost");

// This has to be on the actual environment in order to propagate to worker
using var proxyEnv = new TestScopedEnvironmentVariable("UseProxyInTest", "1");

_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteContainerReady, "1");
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "0");

Task<HttpResponseMessage> responseTask = client.GetAsync("api/HttpRequestFunction");

// Cancellation not working with TestServer
await TestHelpers.Await(() => responseTask.IsCompleted, timeout: 5000);

response = await responseTask;
response.EnsureSuccessStatusCode();

// Placeholder hit; these should match
Expand Down Expand Up @@ -875,7 +925,7 @@ public async Task SpecializedSite_StopsHostBeforeWorker()
await queue.CreateIfNotExistsAsync();
await queue.ClearAsync();

var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1", "QueueFunction");
var builder = InitializeDotNetIsolatedPlaceholderBuilder("HttpRequestDataFunction", "QueueFunction");

using var testServer = new TestServer(builder);

Expand All @@ -887,7 +937,7 @@ public async Task SpecializedSite_StopsHostBeforeWorker()
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteContainerReady, "1");
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsitePlaceholderMode, "0");

response = await client.GetAsync("api/function1");
response = await client.GetAsync("api/HttpRequestDataFunction");
response.EnsureSuccessStatusCode();

var scriptHostManager = testServer.Services.GetService<IScriptHostManager>();
Expand Down Expand Up @@ -939,7 +989,7 @@ await TestHelpers.Await(() =>

private async Task DotNetIsolatedPlaceholderMiss(Action additionalSpecializedSetup = null)
{
var builder = InitializeDotNetIsolatedPlaceholderBuilder("Function1");
var builder = InitializeDotNetIsolatedPlaceholderBuilder("HttpRequestDataFunction");

// remove WEBSITE_USE_PLACEHOLDER_DOTNETISOLATED
_environment.SetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteUsePlaceholderDotNetIsolated, null);
Expand All @@ -964,7 +1014,7 @@ private async Task DotNetIsolatedPlaceholderMiss(Action additionalSpecializedSet

additionalSpecializedSetup?.Invoke();

response = await client.GetAsync("api/function1");
response = await client.GetAsync("api/HttpRequestDataFunction");
response.EnsureSuccessStatusCode();

// Placeholder miss; new channel should be started using the deployed worker directly
Expand Down
Loading