Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement passthrough middleware for entities #2572

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider(

private class EntityTriggerBinding : ITriggerBinding
{
private static readonly IReadOnlyDictionary<string, object?> EmptyBindingData = new Dictionary<string, object?>(capacity: 0);

private readonly DurableTaskExtension config;
private readonly ParameterInfo parameterInfo;
private readonly FunctionName entityName;
Expand Down Expand Up @@ -95,31 +98,52 @@ private static IReadOnlyDictionary<string, Type> GetBindingDataContract(Paramete

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
var entityContext = (DurableEntityContext)value;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
if (value is DurableEntityContext entityContext)
{
convertedValue = entityContext;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
{
convertedValue = entityContext;
#if !FUNCTIONS_V1
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
#endif
}
else if (destinationType == typeof(string))
{
convertedValue = EntityContextToString(entityContext);
}

var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);
var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);

var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;
var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;

var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#if FUNCTIONS_V3_OR_GREATER
else if (value is RemoteEntityContext remoteContext)
{
// Generate a byte array which is the serialized protobuf payload
// https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization
var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest();

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one of the reasons we used proto encoding for orchestrations is that the OrchestrationRuntimeState wasn't serializable itself, and this was a quick way to reach serialization. If the built in DT.Core EntityBatchRequest and EntityBatchResult types are serializable, we can stick with those if you prefer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed serializable. But I don't have much reason to replace the protobuf serialization as done right now with a different one.

var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string));
var triggerData = new TriggerData(contextValueProvider, EmptyBindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#endif
else
{
throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value));
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
#nullable enable
using System;
using System.Collections.Generic;
using DurableTask.Core;
using DurableTask.Core.Command;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class RemoteEntityContext
{
public RemoteEntityContext(EntityBatchRequest batchRequest)
{
this.Request = batchRequest;
}

[JsonProperty("request")]
public EntityBatchRequest Request { get; private set; }

[JsonIgnore]
internal EntityBatchResult? Result { get; set; }
}
}
3 changes: 2 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,11 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
{
#if FUNCTIONS_V3_OR_GREATER
// This is a newer, more performant flavor of orchestration/activity middleware that is being
// enabled for newer language runtimes. Support for entities in this model is TBD.
// enabled for newer language runtimes.
var ooprocMiddleware = new OutOfProcMiddleware(this);
this.taskHubWorker.AddActivityDispatcherMiddleware(ooprocMiddleware.CallActivityAsync);
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(ooprocMiddleware.CallOrchestratorAsync);
this.taskHubWorker.AddEntityDispatcherMiddleware(ooprocMiddleware.CallEntityAsync);
#else
// This can happen if, for example, a Java user tries to use Durable Functions while targeting V2 or V3 extension bundles
// because those bundles target .NET Core 2.2, which doesn't support the gRPC libraries used in the modern out-of-proc implementation.
Expand Down
162 changes: 162 additions & 0 deletions src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Microsoft.Azure.WebJobs.Host.Executors;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -235,6 +237,166 @@ await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
dispatchContext.SetProperty(orchestratorResult);
}

/// <summary>
/// Durable Task Framework entity middleware that invokes an out-of-process orchestrator function.
/// </summary>
/// <param name="dispatchContext">This middleware context provided by the framework that contains information about the entity.</param>
/// <param name="next">The next middleware handler in the pipeline.</param>
/// <exception cref="SessionAbortedException">Thrown if there is a recoverable error in the Functions runtime that's expected to be handled gracefully.</exception>
public async Task CallEntityAsync(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
EntityBatchRequest? batchRequest = dispatchContext.GetProperty<EntityBatchRequest>();

if (batchRequest == null)
{
// This should never happen, and there's no good response we can return if it does.
throw new InvalidOperationException($"An entity was scheduled but no {nameof(EntityBatchRequest)} was found!");
}

if (batchRequest.InstanceId == null)
{
// This should never happen, and there's no good response we can return if it does.
throw new InvalidOperationException($"An entity was scheduled but InstanceId is null!");
}

EntityId entityId = EntityId.GetEntityIdFromSchedulerId(batchRequest.InstanceId);
FunctionName functionName = new FunctionName(entityId.EntityName);
RegisteredFunctionInfo functionInfo = this.extension.GetEntityInfo(functionName);

void SetErrorResult(FailureDetails failureDetails)
{
// Returns a result with no operation results and no state change,
// and with failure details that explain what error was encountered.
dispatchContext.SetProperty(new EntityBatchResult()
{
Actions = { },
EntityState = batchRequest!.EntityState,
Results = { },
FailureDetails = failureDetails,
});
}

if (functionInfo == null)
{
SetErrorResult(new FailureDetails(
errorType: "EntityFunctionNotFound",
errorMessage: this.extension.GetInvalidEntityFunctionMessage(functionName.Name),
stackTrace: null,
innerFailure: null,
isNonRetriable: true));
return;
}

this.TraceHelper.FunctionStarting(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
this.extension.GetIntputOutputTrace(batchRequest.EntityState),
functionType: FunctionType.Entity,
isReplay: false);

var context = new RemoteEntityContext(batchRequest);

var input = new TriggeredFunctionData
{
TriggerValue = context,
#pragma warning disable CS0618 // Type or member is obsolete (not intended for general public use)
InvokeHandler = async functionInvoker =>
{
// Invoke the function and look for a return value. Trigger return values are an undocumented feature that we depend on.
Task invokeTask = functionInvoker();
if (invokeTask is not Task<object> invokeTaskWithResult)
{
// This should never happen
throw new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
}

// The return value is expected to be a base64 string containing the protobuf-encoding of the batch result.
string? triggerReturnValue = (await invokeTaskWithResult) as string;
if (string.IsNullOrEmpty(triggerReturnValue))
{
throw new InvalidOperationException(
"The function invocation resulted in a null response. This means that either the entity function was implemented " +
"incorrectly, the Durable Task language SDK was implemented incorrectly, or that the destination language worker is not " +
"sending the function result back to the host.");
}

byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue);
var response = Microsoft.DurableTask.Protobuf.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes);
context.Result = response.ToEntityBatchResult();

#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use)
},
};

FunctionResult functionResult;
try
{
functionResult = await functionInfo.Executor.TryExecuteAsync(
input,
cancellationToken: this.HostLifetimeService.OnStopping);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the following lines here:

if (!functionResult.Succeeded)
{
    // Shutdown can surface as a completed invocation in a failed state.
    // Re-throw so we can abort this invocation.
    this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}


if (!functionResult.Succeeded)
{
// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}
}
catch (Exception hostRuntimeException)
{
string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ?
"The Functions/WebJobs runtime is shutting down!" :
$"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}";

this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
reason,
functionType: FunctionType.Entity);

// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}

if (!functionResult.Succeeded)
{
this.TraceHelper.FunctionFailed(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
functionResult.Exception.ToString(),
FunctionType.Orchestrator,
isReplay: false);

SetErrorResult(new FailureDetails(
errorType: "FunctionInvocationFailed",
errorMessage: $"Invocation of function '{functionName}' failed with an exception.",
stackTrace: null,
innerFailure: new FailureDetails(functionResult.Exception),
isNonRetriable: true));

return;
}

EntityBatchResult batchResult = context.Result
?? throw new InvalidOperationException($"The entity function executed successfully but {nameof(context.Result)} is still null!");

this.TraceHelper.FunctionCompleted(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
this.extension.GetIntputOutputTrace(batchRequest.EntityState),
batchResult.EntityState != null,
FunctionType.Entity,
isReplay: false);

// Send the result of the orchestrator function to the DTFx dispatch pipeline.
// This allows us to bypass the default, in-process execution and process the given results immediately.
dispatchContext.SetProperty(batchResult);
}

/// <summary>
/// Durable Task Framework activity middleware that invokes an out-of-process orchestrator function.
/// </summary>
Expand Down
Loading