Skip to content

Commit

Permalink
Update to latest protobuf definitions (#359)
Browse files Browse the repository at this point in the history
Summary of changes:
- Updates to latest proto definitions in microsoft/durabletask-proto
- Implements new work item completion token APIs
- Updates System.Text.Json dependency to 6.0.10 to address CVEs
- Adds work item concurrency configuration
- Bump the version from 1.4.0 to 1.5.0 since there is new public surface area (concurrency configuration)
  • Loading branch information
cgillum authored Dec 18, 2024
1 parent 52e18fc commit c8cb34c
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 23 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.5.0

- Implement work item completion tokens for standalone worker scenarios ([#359](https://github.com/microsoft/durabletask-dotnet/pull/359))
- Support for worker concurrency configuration ([#359](https://github.com/microsoft/durabletask-dotnet/pull/359))
- Bump System.Text.Json to 6.0.10

## v1.4.0

- Microsoft.Azure.DurableTask.Core dependency increased to `3.0.0`
Expand Down
2 changes: 1 addition & 1 deletion eng/proto
2 changes: 1 addition & 1 deletion eng/targets/Release.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</PropertyGroup>

<PropertyGroup>
<VersionPrefix>1.4.0</VersionPrefix>
<VersionPrefix>1.5.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Abstractions/Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="System.Text.Json" Version="6.0.0" />
<PackageReference Include="System.Text.Json" Version="6.0.10" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 7 additions & 1 deletion src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,24 @@ internal static Timestamp ToTimestamp(this DateTime dateTime)
/// <param name="instanceId">The orchestrator instance ID.</param>
/// <param name="customStatus">The orchestrator customer status or <c>null</c> if no custom status.</param>
/// <param name="actions">The orchestrator actions.</param>
/// <param name="completionToken">
/// The completion token for the work item. It must be the exact same <see cref="P.WorkItem.CompletionToken" />
/// value that was provided by the corresponding <see cref="P.WorkItem"/> that triggered the orchestrator execution.
/// </param>
/// <returns>The orchestrator response.</returns>
/// <exception cref="NotSupportedException">When an orchestrator action is unknown.</exception>
internal static P.OrchestratorResponse ConstructOrchestratorResponse(
string instanceId,
string? customStatus,
IEnumerable<OrchestratorAction> actions)
IEnumerable<OrchestratorAction> actions,
string completionToken)
{
Check.NotNull(actions);
var response = new P.OrchestratorResponse
{
InstanceId = instanceId,
CustomStatus = customStatus,
CompletionToken = completionToken,
};

foreach (OrchestratorAction action in actions)
Expand Down
31 changes: 31 additions & 0 deletions src/Worker/Core/DurableTaskWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public DataConverter DataConverter
/// </remarks>
public TimeSpan MaximumTimerInterval { get; set; } = TimeSpan.FromDays(3);

/// <summary>
/// Gets options for the Durable Task worker concurrency.
/// </summary>
/// <remarks>
/// Worker concurrency options control how many work items of a particular type (e.g., orchestration, activity,
/// or entity) can be processed concurrently by the worker. It is recommended to set these values based on the
/// expected workload and the resources available on the machine running the worker.
/// </remarks>
public ConcurrencyOptions Concurrency { get; } = new();

/// <summary>
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
/// </summary>
Expand All @@ -108,4 +118,25 @@ internal void ApplyTo(DurableTaskWorkerOptions other)
other.EnableEntitySupport = this.EnableEntitySupport;
}
}

/// <summary>
/// Options for the Durable Task worker concurrency.
/// </summary>
public class ConcurrencyOptions
{
/// <summary>
/// Gets or sets the maximum number of concurrent activity work items that can be processed by the worker.
/// </summary>
public int MaximumConcurrentActivityWorkItems { get; set; } = 100 * Environment.ProcessorCount;

/// <summary>
/// Gets or sets the maximum number of concurrent orchestration work items that can be processed by the worker.
/// </summary>
public int MaximumConcurrentOrchestrationWorkItems { get; set; } = 100 * Environment.ProcessorCount;

/// <summary>
/// Gets or sets the maximum number of concurrent entity work items that can be processed by the worker.
/// </summary>
public int MaximumConcurrentEntityWorkItems { get; set; } = 100 * Environment.ProcessorCount;
}
}
2 changes: 1 addition & 1 deletion src/Worker/Core/Worker.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The worker is responsible for processing durable task work items.</PackageDescri
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="System.Text.Json" Version="6.0.0" />
<PackageReference Include="System.Text.Json" Version="6.0.10" />
</ItemGroup>

<ItemGroup>
Expand Down
42 changes: 32 additions & 10 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient sidec
{
this.worker = worker;
this.sidecar = sidecar;
this.shimFactory = new DurableTaskShimFactory(this.worker.options, this.worker.loggerFactory);
this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory);
}

ILogger Logger => this.worker.logger;
Expand Down Expand Up @@ -102,7 +102,7 @@ static OrchestrationRuntimeState BuildRuntimeState(P.OrchestratorRequest request

if (runtimeState.ExecutionStartedEvent == null)
{
// TODO: What's the right way to handle this? Callback to the sidecar with a retryable error request?
// TODO: What's the right way to handle this? Callback to the sidecar with a retriable error request?
throw new InvalidOperationException("The provided orchestration history was incomplete");
}

Expand Down Expand Up @@ -133,8 +133,18 @@ static string GetActionsListForLogging(IReadOnlyList<P.OrchestratorAction> actio
await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
this.Logger.EstablishedWorkItemConnection();

DurableTaskWorkerOptions workerOptions = this.worker.workerOptions;

// Get the stream for receiving work-items
return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
return this.sidecar!.GetWorkItems(
new P.GetWorkItemsRequest
{
MaxConcurrentActivityWorkItems =
workerOptions.Concurrency.MaximumConcurrentActivityWorkItems,
MaxConcurrentOrchestrationWorkItems =
workerOptions.Concurrency.MaximumConcurrentOrchestrationWorkItems,
},
cancellationToken: cancellation);
}

async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, CancellationToken cancellation)
Expand All @@ -145,16 +155,25 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
{
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
{
this.RunBackgroundTask(workItem, () => this.OnRunOrchestratorAsync(
workItem.OrchestratorRequest));
this.RunBackgroundTask(
workItem,
() => this.OnRunOrchestratorAsync(workItem.OrchestratorRequest, workItem.CompletionToken));
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
{
this.RunBackgroundTask(workItem, () => this.OnRunActivityAsync(workItem.ActivityRequest));
this.RunBackgroundTask(
workItem,
() => this.OnRunActivityAsync(workItem.ActivityRequest, workItem.CompletionToken));
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
{
this.RunBackgroundTask(workItem, () => this.OnRunEntityBatchAsync(workItem.EntityRequest));
this.RunBackgroundTask(
workItem,
() => this.OnRunEntityBatchAsync(workItem.EntityRequest));
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
{
// No-op
}
else
{
Expand Down Expand Up @@ -188,7 +207,7 @@ void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
});
}

async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
async Task OnRunOrchestratorAsync(P.OrchestratorRequest request, string completionToken)
{
OrchestratorExecutionResult? result = null;
P.TaskFailureDetails? failureDetails = null;
Expand Down Expand Up @@ -248,14 +267,16 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
response = ProtoUtils.ConstructOrchestratorResponse(
request.InstanceId,
result.CustomStatus,
result.Actions);
result.Actions,
completionToken);
}
else
{
// This is the case for failures that happened *outside* the orchestrator executor
response = new P.OrchestratorResponse
{
InstanceId = request.InstanceId,
CompletionToken = completionToken,
Actions =
{
new P.OrchestratorAction
Expand All @@ -279,7 +300,7 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
await this.sidecar.CompleteOrchestratorTaskAsync(response);
}

async Task OnRunActivityAsync(P.ActivityRequest request)
async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken)
{
OrchestrationInstance instance = request.OrchestrationInstance.ToCore();
string rawInput = request.Input;
Expand Down Expand Up @@ -336,6 +357,7 @@ async Task OnRunActivityAsync(P.ActivityRequest request)
TaskId = request.TaskId,
Result = output,
FailureDetails = failureDetails,
CompletionToken = completionToken,
};

await this.sidecar.CompleteActivityTaskAsync(response);
Expand Down
18 changes: 11 additions & 7 deletions src/Worker/Grpc/GrpcDurableTaskWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace Microsoft.DurableTask.Worker.Grpc;
/// </summary>
sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
{
readonly GrpcDurableTaskWorkerOptions options;
readonly GrpcDurableTaskWorkerOptions grpcOptions;
readonly DurableTaskWorkerOptions workerOptions;
readonly IServiceProvider services;
readonly ILoggerFactory loggerFactory;
readonly ILogger logger;
Expand All @@ -22,18 +23,21 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
/// </summary>
/// <param name="name">The name of the worker.</param>
/// <param name="factory">The task factory.</param>
/// <param name="options">The gRPC worker options.</param>
/// <param name="grpcOptions">The gRPC-specific worker options.</param>
/// <param name="workerOptions">The generic worker options.</param>
/// <param name="services">The service provider.</param>
/// <param name="loggerFactory">The logger.</param>
public GrpcDurableTaskWorker(
string name,
IDurableTaskFactory factory,
IOptionsMonitor<GrpcDurableTaskWorkerOptions> options,
IOptionsMonitor<GrpcDurableTaskWorkerOptions> grpcOptions,
IOptionsMonitor<DurableTaskWorkerOptions> workerOptions,
IServiceProvider services,
ILoggerFactory loggerFactory)
: base(name, factory)
{
this.options = Check.NotNull(options).Get(name);
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
this.workerOptions = Check.NotNull(workerOptions).Get(name);
this.services = Check.NotNull(services);
this.loggerFactory = Check.NotNull(loggerFactory);
this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name.
Expand Down Expand Up @@ -73,19 +77,19 @@ static GrpcChannel GetChannel(string? address)

AsyncDisposable GetCallInvoker(out CallInvoker callInvoker)
{
if (this.options.Channel is GrpcChannel c)
if (this.grpcOptions.Channel is GrpcChannel c)
{
callInvoker = c.CreateCallInvoker();
return default;
}

if (this.options.CallInvoker is CallInvoker invoker)
if (this.grpcOptions.CallInvoker is CallInvoker invoker)
{
callInvoker = invoker;
return default;
}

c = GetChannel(this.options.Address);
c = GetChannel(this.grpcOptions.Address);
callInvoker = c.CreateCallInvoker();
return new AsyncDisposable(() => new(c.ShutdownAsync()));
}
Expand Down
3 changes: 2 additions & 1 deletion src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static string LoadAndRun(
P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
request.InstanceId,
result.CustomStatus,
result.Actions);
result.Actions,
completionToken: string.Empty /* doesn't apply */);
byte[] responseBytes = response.ToByteArray();
return Convert.ToBase64String(responseBytes);
}
Expand Down

0 comments on commit c8cb34c

Please sign in to comment.