diff --git a/eng/proto b/eng/proto index dad676a2..76cf3b85 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit dad676a26b2fdc23e3dabb3ee7380e646d581410 +Subproject commit 76cf3b85656282deb5700524e8baa641706ecddb diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 63c0e130..f4d0ef49 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -7,6 +7,7 @@ using System.Text; using DurableTask.Core; using DurableTask.Core.Command; +using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using Google.Protobuf; @@ -631,6 +632,25 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) return batchResult; } + /// + /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation. + /// + /// The DT.Core representation. + /// The gRPC representation. + [return: NotNullIfNotNull("parameters")] + internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters) + { + if (parameters == null) + { + return null; + } + + return new TaskOrchestrationEntityParameters() + { + EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(), + }; + } + /// /// Gets the approximate byte count for a . /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 73d92074..b9e1484f 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -202,11 +202,6 @@ public override async Task CreateTimer(DateTime fireAt, CancellationToken cancel /// public override Task WaitForExternalEvent(string eventName, CancellationToken cancellationToken = default) { - if (typeof(T) == typeof(OperationResult)) - { - throw new ArgumentException($"the type {nameof(OperationResult)} cannot be used for application-defined events", nameof(T)); - } - // Return immediately if this external event has already arrived. if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEventPayload)) { diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index c43118d0..289bb310 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -76,7 +76,7 @@ public override async Task LockEntitiesAsync(IEnumerable(criticalSectionId.ToString()); @@ -155,7 +155,7 @@ public void ExitCriticalSection(Guid? matchCriticalSectionId = null) // releaseMessage.EventContent); } - this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.ContentAsObject()); + this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage); } } } @@ -212,7 +212,7 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input // entityMessageEvent.ToString()); } - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.ContentAsObject()); + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent); return guid; } diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 47850916..7f39f0ff 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -113,7 +113,7 @@ public static string LoadAndRun( ? DurableTaskShimFactory.Default : ActivatorUtilities.GetServiceOrCreateInstance(services); TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover); + TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore()); OrchestratorExecutionResult result = executor.Execute(); P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(