diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index ce9009ca..00000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "eng/proto"] - path = eng/proto - url = https://github.com/microsoft/durabletask-protobuf diff --git a/CHANGELOG.md b/CHANGELOG.md index 06348972..dbf11e26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ ### Microsoft.DurableTask.Worker - Add new `IDurableTaskWorkerBuilder AddDurableTaskWorker(IServiceCollection, string?)` API +- Add support for work item history streaming + +### Microsoft.DurableTask.Grpc + +- Replace submodule for proto files with download script for easier maintenance +- Update to latest proto files ## v1.5.0 diff --git a/README.md b/README.md index 23efd1fc..7204429b 100644 --- a/README.md +++ b/README.md @@ -167,9 +167,7 @@ There are also several features that aren't yet available: ## Obtaining the Protobuf definitions -This project utilizes git submodules to obtain Protobuf definitions from [durabletask-protobuf](https://github.com/microsoft/durabletask-protobuf). You will need to obtain these to build the project. - -To get the definitions, run `git submodule update --init --recursive` +This project utilizes protobuf definitions from [durabletask-protobuf](https://github.com/microsoft/durabletask-protobuf), which are copied (vendored) into this repository under the `src/Grpc` directory. See the corresponding [README.md](./src/Grpc/README.md) for more information about how to update the protobuf definitions. ## Contributing diff --git a/azure-pipelines-release.yml b/azure-pipelines-release.yml index 3ee48f71..60abd025 100644 --- a/azure-pipelines-release.yml +++ b/azure-pipelines-release.yml @@ -28,7 +28,6 @@ variables: steps: - checkout: self - submodules: true - task: UseDotNet@2 displayName: 'Install .NET 6 SDK (ESRP)' # This is needed for ESRP. diff --git a/eng/proto b/eng/proto deleted file mode 160000 index 443b333f..00000000 --- a/eng/proto +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 443b333f4f65a438dc9eb4f090560d232afec4b7 diff --git a/eng/templates/build.yml b/eng/templates/build.yml index 87d01dcb..a94d3aff 100644 --- a/eng/templates/build.yml +++ b/eng/templates/build.yml @@ -24,7 +24,6 @@ jobs: steps: - checkout: self - submodules: true - task: UseDotNet@2 displayName: 'Install .NET 6 SDK (ESRP)' # This is needed for ESRP. diff --git a/src/Grpc/Grpc.csproj b/src/Grpc/Grpc.csproj index 7ae856a2..394b4787 100644 --- a/src/Grpc/Grpc.csproj +++ b/src/Grpc/Grpc.csproj @@ -1,4 +1,4 @@ - + netstandard2.0;net6.0 @@ -18,7 +18,7 @@ - + diff --git a/src/Grpc/README.md b/src/Grpc/README.md index 7d5e607c..586a4e2d 100644 Binary files a/src/Grpc/README.md and b/src/Grpc/README.md differ diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto new file mode 100644 index 00000000..f566163f --- /dev/null +++ b/src/Grpc/orchestrator_service.proto @@ -0,0 +1,716 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +syntax = "proto3"; + +option csharp_namespace = "Microsoft.DurableTask.Protobuf"; +option java_package = "com.microsoft.durabletask.implementation.protobuf"; +option go_package = "/internal/protos"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; +import "google/protobuf/empty.proto"; + +message OrchestrationInstance { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; +} + +message ActivityRequest { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + int32 taskId = 5; + TraceContext parentTraceContext = 6; +} + +message ActivityResponse { + string instanceId = 1; + int32 taskId = 2; + google.protobuf.StringValue result = 3; + TaskFailureDetails failureDetails = 4; + string completionToken = 5; +} + +message TaskFailureDetails { + string errorType = 1; + string errorMessage = 2; + google.protobuf.StringValue stackTrace = 3; + TaskFailureDetails innerFailure = 4; + bool isNonRetriable = 5; +} + +enum OrchestrationStatus { + ORCHESTRATION_STATUS_RUNNING = 0; + ORCHESTRATION_STATUS_COMPLETED = 1; + ORCHESTRATION_STATUS_CONTINUED_AS_NEW = 2; + ORCHESTRATION_STATUS_FAILED = 3; + ORCHESTRATION_STATUS_CANCELED = 4; + ORCHESTRATION_STATUS_TERMINATED = 5; + ORCHESTRATION_STATUS_PENDING = 6; + ORCHESTRATION_STATUS_SUSPENDED = 7; +} + +message ParentInstanceInfo { + int32 taskScheduledId = 1; + google.protobuf.StringValue name = 2; + google.protobuf.StringValue version = 3; + OrchestrationInstance orchestrationInstance = 4; +} + +message TraceContext { + string traceParent = 1; + string spanID = 2 [deprecated=true]; + google.protobuf.StringValue traceState = 3; +} + +message ExecutionStartedEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + OrchestrationInstance orchestrationInstance = 4; + ParentInstanceInfo parentInstance = 5; + google.protobuf.Timestamp scheduledStartTimestamp = 6; + TraceContext parentTraceContext = 7; + google.protobuf.StringValue orchestrationSpanID = 8; +} + +message ExecutionCompletedEvent { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + TaskFailureDetails failureDetails = 3; +} + +message ExecutionTerminatedEvent { + google.protobuf.StringValue input = 1; + bool recurse = 2; +} + +message TaskScheduledEvent { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; + TraceContext parentTraceContext = 4; +} + +message TaskCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message TaskFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message SubOrchestrationInstanceCreatedEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; +} + +message SubOrchestrationInstanceCompletedEvent { + int32 taskScheduledId = 1; + google.protobuf.StringValue result = 2; +} + +message SubOrchestrationInstanceFailedEvent { + int32 taskScheduledId = 1; + TaskFailureDetails failureDetails = 2; +} + +message TimerCreatedEvent { + google.protobuf.Timestamp fireAt = 1; +} + +message TimerFiredEvent { + google.protobuf.Timestamp fireAt = 1; + int32 timerId = 2; +} + +message OrchestratorStartedEvent { + // No payload data +} + +message OrchestratorCompletedEvent { + // No payload data +} + +message EventSentEvent { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message EventRaisedEvent { + string name = 1; + google.protobuf.StringValue input = 2; +} + +message GenericEvent { + google.protobuf.StringValue data = 1; +} + +message HistoryStateEvent { + OrchestrationState orchestrationState = 1; +} + +message ContinueAsNewEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionSuspendedEvent { + google.protobuf.StringValue input = 1; +} + +message ExecutionResumedEvent { + google.protobuf.StringValue input = 1; +} + +message EntityOperationSignaledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue targetInstanceId = 5; // used only within histories, null in messages +} + +message EntityOperationCalledEvent { + string requestId = 1; + string operation = 2; + google.protobuf.Timestamp scheduledTime = 3; + google.protobuf.StringValue input = 4; + google.protobuf.StringValue parentInstanceId = 5; // used only within messages, null in histories + google.protobuf.StringValue parentExecutionId = 6; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 7; // used only within histories, null in messages +} + +message EntityLockRequestedEvent { + string criticalSectionId = 1; + repeated string lockSet = 2; + int32 position = 3; + google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories +} + +message EntityOperationCompletedEvent { + string requestId = 1; + google.protobuf.StringValue output = 2; +} + +message EntityOperationFailedEvent { + string requestId = 1; + TaskFailureDetails failureDetails = 2; +} + +message EntityUnlockSentEvent { + string criticalSectionId = 1; + google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories + google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages +} + +message EntityLockGrantedEvent { + string criticalSectionId = 1; +} + +message HistoryEvent { + int32 eventId = 1; + google.protobuf.Timestamp timestamp = 2; + oneof eventType { + ExecutionStartedEvent executionStarted = 3; + ExecutionCompletedEvent executionCompleted = 4; + ExecutionTerminatedEvent executionTerminated = 5; + TaskScheduledEvent taskScheduled = 6; + TaskCompletedEvent taskCompleted = 7; + TaskFailedEvent taskFailed = 8; + SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9; + SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10; + SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11; + TimerCreatedEvent timerCreated = 12; + TimerFiredEvent timerFired = 13; + OrchestratorStartedEvent orchestratorStarted = 14; + OrchestratorCompletedEvent orchestratorCompleted = 15; + EventSentEvent eventSent = 16; + EventRaisedEvent eventRaised = 17; + GenericEvent genericEvent = 18; + HistoryStateEvent historyState = 19; + ContinueAsNewEvent continueAsNew = 20; + ExecutionSuspendedEvent executionSuspended = 21; + ExecutionResumedEvent executionResumed = 22; + EntityOperationSignaledEvent entityOperationSignaled = 23; + EntityOperationCalledEvent entityOperationCalled = 24; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; + EntityLockRequestedEvent entityLockRequested = 27; + EntityLockGrantedEvent entityLockGranted = 28; + EntityUnlockSentEvent entityUnlockSent = 29; + } +} + +message ScheduleTaskAction { + string name = 1; + google.protobuf.StringValue version = 2; + google.protobuf.StringValue input = 3; +} + +message CreateSubOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; +} + +message CreateTimerAction { + google.protobuf.Timestamp fireAt = 1; +} + +message SendEventAction { + OrchestrationInstance instance = 1; + string name = 2; + google.protobuf.StringValue data = 3; +} + +message CompleteOrchestrationAction { + OrchestrationStatus orchestrationStatus = 1; + google.protobuf.StringValue result = 2; + google.protobuf.StringValue details = 3; + google.protobuf.StringValue newVersion = 4; + repeated HistoryEvent carryoverEvents = 5; + TaskFailureDetails failureDetails = 6; +} + +message TerminateOrchestrationAction { + string instanceId = 1; + google.protobuf.StringValue reason = 2; + bool recurse = 3; +} + +message OrchestratorAction { + int32 id = 1; + oneof orchestratorActionType { + ScheduleTaskAction scheduleTask = 2; + CreateSubOrchestrationAction createSubOrchestration = 3; + CreateTimerAction createTimer = 4; + SendEventAction sendEvent = 5; + CompleteOrchestrationAction completeOrchestration = 6; + TerminateOrchestrationAction terminateOrchestration = 7; + } +} + +message OrchestratorRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + repeated HistoryEvent pastEvents = 3; + repeated HistoryEvent newEvents = 4; + OrchestratorEntityParameters entityParameters = 5; + bool requiresHistoryStreaming = 6; +} + +message OrchestratorResponse { + string instanceId = 1; + repeated OrchestratorAction actions = 2; + google.protobuf.StringValue customStatus = 3; + string completionToken = 4; + + // The number of work item events that were processed by the orchestrator. + // This field is optional. If not set, the service should assume that the orchestrator processed all events. + google.protobuf.Int32Value numEventsProcessed = 5; +} + +message CreateInstanceRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6; + google.protobuf.StringValue executionId = 7; + map tags = 8; +} + +message OrchestrationIdReusePolicy { + repeated OrchestrationStatus operationStatus = 1; + CreateOrchestrationAction action = 2; +} + +enum CreateOrchestrationAction { + ERROR = 0; + IGNORE = 1; + TERMINATE = 2; +} + +message CreateInstanceResponse { + string instanceId = 1; +} + +message GetInstanceRequest { + string instanceId = 1; + bool getInputsAndOutputs = 2; +} + +message GetInstanceResponse { + bool exists = 1; + OrchestrationState orchestrationState = 2; +} + +message RewindInstanceRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message RewindInstanceResponse { + // Empty for now. Using explicit type incase we want to add content later. +} + +message OrchestrationState { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + OrchestrationStatus orchestrationStatus = 4; + google.protobuf.Timestamp scheduledStartTimestamp = 5; + google.protobuf.Timestamp createdTimestamp = 6; + google.protobuf.Timestamp lastUpdatedTimestamp = 7; + google.protobuf.StringValue input = 8; + google.protobuf.StringValue output = 9; + google.protobuf.StringValue customStatus = 10; + TaskFailureDetails failureDetails = 11; + google.protobuf.StringValue executionId = 12; + google.protobuf.Timestamp completedTimestamp = 13; + google.protobuf.StringValue parentInstanceId = 14; +} + +message RaiseEventRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; +} + +message RaiseEventResponse { + // No payload +} + +message TerminateRequest { + string instanceId = 1; + google.protobuf.StringValue output = 2; + bool recursive = 3; +} + +message TerminateResponse { + // No payload +} + +message SuspendRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message SuspendResponse { + // No payload +} + +message ResumeRequest { + string instanceId = 1; + google.protobuf.StringValue reason = 2; +} + +message ResumeResponse { + // No payload +} + +message QueryInstancesRequest { + InstanceQuery query = 1; +} + +message InstanceQuery{ + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp createdTimeFrom = 2; + google.protobuf.Timestamp createdTimeTo = 3; + repeated google.protobuf.StringValue taskHubNames = 4; + int32 maxInstanceCount = 5; + google.protobuf.StringValue continuationToken = 6; + google.protobuf.StringValue instanceIdPrefix = 7; + bool fetchInputsAndOutputs = 8; +} + +message QueryInstancesResponse { + repeated OrchestrationState orchestrationState = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message PurgeInstancesRequest { + oneof request { + string instanceId = 1; + PurgeInstanceFilter purgeInstanceFilter = 2; + } + bool recursive = 3; +} + +message PurgeInstanceFilter { + google.protobuf.Timestamp createdTimeFrom = 1; + google.protobuf.Timestamp createdTimeTo = 2; + repeated OrchestrationStatus runtimeStatus = 3; +} + +message PurgeInstancesResponse { + int32 deletedInstanceCount = 1; +} + +message CreateTaskHubRequest { + bool recreateIfExists = 1; +} + +message CreateTaskHubResponse { + //no playload +} + +message DeleteTaskHubRequest { + //no playload +} + +message DeleteTaskHubResponse { + //no playload +} + +message SignalEntityRequest { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + string requestId = 4; + google.protobuf.Timestamp scheduledTime = 5; +} + +message SignalEntityResponse { + // no payload +} + +message GetEntityRequest { + string instanceId = 1; + bool includeState = 2; +} + +message GetEntityResponse { + bool exists = 1; + EntityMetadata entity = 2; +} + +message EntityQuery { + google.protobuf.StringValue instanceIdStartsWith = 1; + google.protobuf.Timestamp lastModifiedFrom = 2; + google.protobuf.Timestamp lastModifiedTo = 3; + bool includeState = 4; + bool includeTransient = 5; + google.protobuf.Int32Value pageSize = 6; + google.protobuf.StringValue continuationToken = 7; +} + +message QueryEntitiesRequest { + EntityQuery query = 1; +} + +message QueryEntitiesResponse { + repeated EntityMetadata entities = 1; + google.protobuf.StringValue continuationToken = 2; +} + +message EntityMetadata { + string instanceId = 1; + google.protobuf.Timestamp lastModifiedTime = 2; + int32 backlogQueueSize = 3; + google.protobuf.StringValue lockedBy = 4; + google.protobuf.StringValue serializedState = 5; +} + +message CleanEntityStorageRequest { + google.protobuf.StringValue continuationToken = 1; + bool removeEmptyEntities = 2; + bool releaseOrphanedLocks = 3; +} + +message CleanEntityStorageResponse { + google.protobuf.StringValue continuationToken = 1; + int32 emptyEntitiesRemoved = 2; + int32 orphanedLocksReleased = 3; +} + +message OrchestratorEntityParameters { + google.protobuf.Duration entityMessageReorderWindow = 1; +} + +message EntityBatchRequest { + string instanceId = 1; + google.protobuf.StringValue entityState = 2; + repeated OperationRequest operations = 3; +} + +message EntityBatchResult { + repeated OperationResult results = 1; + repeated OperationAction actions = 2; + google.protobuf.StringValue entityState = 3; + TaskFailureDetails failureDetails = 4; +} + +message EntityRequest { + string instanceId = 1; + string executionId = 2; + google.protobuf.StringValue entityState = 3; // null if entity does not exist + repeated HistoryEvent operationRequests = 4; +} + +message OperationRequest { + string operation = 1; + string requestId = 2; + google.protobuf.StringValue input = 3; +} + +message OperationResult { + oneof resultType { + OperationResultSuccess success = 1; + OperationResultFailure failure = 2; + } +} + +message OperationResultSuccess { + google.protobuf.StringValue result = 1; +} + +message OperationResultFailure { + TaskFailureDetails failureDetails = 1; +} + +message OperationAction { + int32 id = 1; + oneof operationActionType { + SendSignalAction sendSignal = 2; + StartNewOrchestrationAction startNewOrchestration = 3; + } +} + +message SendSignalAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue input = 3; + google.protobuf.Timestamp scheduledTime = 4; +} + +message StartNewOrchestrationAction { + string instanceId = 1; + string name = 2; + google.protobuf.StringValue version = 3; + google.protobuf.StringValue input = 4; + google.protobuf.Timestamp scheduledTime = 5; +} + +service TaskHubSidecarService { + // Sends a hello request to the sidecar service. + rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); + + // Starts a new orchestration instance. + rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse); + + // Gets the status of an existing orchestration instance. + rpc GetInstance(GetInstanceRequest) returns (GetInstanceResponse); + + // Rewinds an orchestration instance to last known good state and replays from there. + rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + + // Waits for an orchestration instance to reach a running or completion state. + rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); + + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). + rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); + + // Raises an event to a running orchestration instance. + rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); + + // Terminates a running orchestration instance. + rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); + + // Suspends a running orchestration instance. + rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); + + // Resumes a suspended orchestration instance. + rpc ResumeInstance(ResumeRequest) returns (ResumeResponse); + + // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); + + rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); + + rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); + rpc CompleteActivityTask(ActivityResponse) returns (CompleteTaskResponse); + rpc CompleteOrchestratorTask(OrchestratorResponse) returns (CompleteTaskResponse); + rpc CompleteEntityTask(EntityBatchResult) returns (CompleteTaskResponse); + + // Gets the history of an orchestration instance as a stream of events. + rpc StreamInstanceHistory(StreamInstanceHistoryRequest) returns (stream HistoryChunk); + + // Deletes and Creates the necessary resources for the orchestration service and the instance store + rpc CreateTaskHub(CreateTaskHubRequest) returns (CreateTaskHubResponse); + + // Deletes the resources for the orchestration service and optionally the instance store + rpc DeleteTaskHub(DeleteTaskHubRequest) returns (DeleteTaskHubResponse); + + // sends a signal to an entity + rpc SignalEntity(SignalEntityRequest) returns (SignalEntityResponse); + + // get information about a specific entity + rpc GetEntity(GetEntityRequest) returns (GetEntityResponse); + + // query entities + rpc QueryEntities(QueryEntitiesRequest) returns (QueryEntitiesResponse); + + // clean entity storage + rpc CleanEntityStorage(CleanEntityStorageRequest) returns (CleanEntityStorageResponse); +} + +message GetWorkItemsRequest { + int32 maxConcurrentOrchestrationWorkItems = 1; + int32 maxConcurrentActivityWorkItems = 2; + int32 maxConcurrentEntityWorkItems = 3; + + repeated WorkerCapability capabilities = 10; +} + +enum WorkerCapability { + WORKER_CAPABILITY_UNSPECIFIED = 0; + + // Indicates that the worker is capable of streaming instance history as a more optimized + // alternative to receiving the full history embedded in the orchestrator work-item. + // When set, the service may return work items without any history events as an optimization. + // It is strongly recommended that all SDKs support this capability. + WORKER_CAPABILITY_HISTORY_STREAMING = 1; +} + +message WorkItem { + oneof request { + OrchestratorRequest orchestratorRequest = 1; + ActivityRequest activityRequest = 2; + EntityBatchRequest entityRequest = 3; // (older) used by orchestration services implementations + HealthPing healthPing = 4; + EntityRequest entityRequestV2 = 5; // (newer) used by backend service implementations + } + string completionToken = 10; +} + +message CompleteTaskResponse { + // No payload +} + +message HealthPing { + // No payload +} + +message StreamInstanceHistoryRequest { + string instanceId = 1; + google.protobuf.StringValue executionId = 2; + + // When set to true, the service may return a more optimized response suitable for workers. + bool forWorkItemProcessing = 3; +} + +message HistoryChunk { + repeated HistoryEvent events = 1; +} \ No newline at end of file diff --git a/src/Grpc/refresh-protos.ps1 b/src/Grpc/refresh-protos.ps1 new file mode 100644 index 00000000..a91393a4 --- /dev/null +++ b/src/Grpc/refresh-protos.ps1 @@ -0,0 +1,55 @@ +#!/usr/bin/env pwsh +param( + [string]$branch = "main" +) + +# Fail with an error if the PowerShell version is less than 7.0 +if ($PSVersionTable.PSVersion -lt [Version]"7.0") { + Write-Error "This script requires PowerShell 7.0 or later." + exit 1 +} + +# Get the commit ID of the latest commit in the durabletask-protobuf repository. +# We need this to download the proto files from the correct commit, avoiding race conditions +# in rare cases where the proto files are updated between the time we download the commit ID +# and the time we download the proto files. +$commitDetails = Invoke-RestMethod -Uri "https://api.github.com/repos/microsoft/durabletask-protobuf/commits/$branch" +$commitId = $commitDetails.sha + +# These are the proto files we need to download from the durabletask-protobuf repository. +$protoFileNames = @( + "orchestrator_service.proto" +) + +# Download each proto file to the local directory using the above commit ID +foreach ($protoFileName in $protoFileNames) { + $url = "https://raw.githubusercontent.com/microsoft/durabletask-protobuf/$commitId/protos/$protoFileName" + $outputFile = "$PSScriptRoot\$protoFileName" + + try { + Invoke-WebRequest -Uri $url -OutFile $outputFile + } + catch { + Write-Error "Failed to download $url to ${outputFile}: $_" + exit 1 + } + + Write-Output "Downloaded $url to $outputFile" +} + +# Log the commit ID and the URLs of the downloaded proto files to a versions file. +# Overwrite the file if it already exists. +$versionsFile = "$PSScriptRoot\versions.txt" +Remove-Item -Path $versionsFile -ErrorAction SilentlyContinue + +Add-Content ` + -Path $versionsFile ` + -Value "# The following files were downloaded from branch $branch at $(Get-Date -Format "yyyy-MM-dd HH:mm:ss" -AsUTC) UTC" + +foreach ($protoFileName in $protoFileNames) { + Add-Content ` + -Path $versionsFile ` + -Value "https://raw.githubusercontent.com/microsoft/durabletask-protobuf/$commitId/protos/$protoFileName" +} + +Write-Host "Wrote commit ID $commitId to $versionsFile" -ForegroundColor Green diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt new file mode 100644 index 00000000..94b29bd3 --- /dev/null +++ b/src/Grpc/versions.txt @@ -0,0 +1,2 @@ +# The following files were downloaded from branch main at 2025-01-30 00:06:14 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c672a0dc97c06587d7399ee12f1c5b0b9fc492a7/protos/orchestrator_service.proto diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index a98e44af..059d592d 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -6,7 +6,6 @@ using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; -using Grpc.Core; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; @@ -27,13 +26,13 @@ class Processor static readonly Google.Protobuf.WellKnownTypes.Empty EmptyMessage = new(); readonly GrpcDurableTaskWorker worker; - readonly TaskHubSidecarServiceClient sidecar; + readonly TaskHubSidecarServiceClient client; readonly DurableTaskShimFactory shimFactory; - public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient sidecar) + public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client) { this.worker = worker; - this.sidecar = sidecar; + this.client = client; this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory); } @@ -87,28 +86,6 @@ public async Task ExecuteAsync(CancellationToken cancellation) } } - static OrchestrationRuntimeState BuildRuntimeState(P.OrchestratorRequest request) - { - IEnumerable pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent); - IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); - - // Reconstruct the orchestration state in a way that correctly distinguishes new events from past events - var runtimeState = new OrchestrationRuntimeState(pastEvents.ToList()); - foreach (HistoryEvent e in newEvents) - { - // AddEvent() puts events into the NewEvents list. - runtimeState.AddEvent(e); - } - - if (runtimeState.ExecutionStartedEvent == null) - { - // TODO: What's the right way to handle this? Callback to the sidecar with a retriable error request? - throw new InvalidOperationException("The provided orchestration history was incomplete"); - } - - return runtimeState; - } - static string GetActionsListForLogging(IReadOnlyList actions) { if (actions.Count == 0) @@ -128,21 +105,72 @@ static string GetActionsListForLogging(IReadOnlyList actio } } + async ValueTask BuildRuntimeStateAsync( + P.OrchestratorRequest orchestratorRequest, + CancellationToken cancellation) + { + IEnumerable pastEvents = []; + if (orchestratorRequest.RequiresHistoryStreaming) + { + // Stream the remaining events from the remote service + P.StreamInstanceHistoryRequest streamRequest = new() + { + InstanceId = orchestratorRequest.InstanceId, + ExecutionId = orchestratorRequest.ExecutionId, + ForWorkItemProcessing = true, + }; + + using AsyncServerStreamingCall streamResponse = + this.client.StreamInstanceHistory(streamRequest, cancellationToken: cancellation); + + await foreach (P.HistoryChunk chunk in streamResponse.ResponseStream.ReadAllAsync(cancellation)) + { + pastEvents = pastEvents.Concat(chunk.Events.Select(ProtoUtils.ConvertHistoryEvent)); + } + } + else + { + // The history was already provided in the work item request + pastEvents = orchestratorRequest.PastEvents.Select(ProtoUtils.ConvertHistoryEvent); + } + + IEnumerable newEvents = orchestratorRequest.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + + // Reconstruct the orchestration state in a way that correctly distinguishes new events from past events + var runtimeState = new OrchestrationRuntimeState(pastEvents.ToList()); + foreach (HistoryEvent e in newEvents) + { + // AddEvent() puts events into the NewEvents list. + runtimeState.AddEvent(e); + } + + if (runtimeState.ExecutionStartedEvent == null) + { + // TODO: What's the right way to handle this? Callback to the sidecar with a retriable error request? + throw new InvalidOperationException("The provided orchestration history was incomplete"); + } + + return runtimeState; + } + async Task> ConnectAsync(CancellationToken cancellation) { - await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation); + await this.client!.HelloAsync(EmptyMessage, cancellationToken: cancellation); this.Logger.EstablishedWorkItemConnection(); DurableTaskWorkerOptions workerOptions = this.worker.workerOptions; // Get the stream for receiving work-items - return this.sidecar!.GetWorkItems( + return this.client!.GetWorkItems( new P.GetWorkItemsRequest { MaxConcurrentActivityWorkItems = workerOptions.Concurrency.MaximumConcurrentActivityWorkItems, MaxConcurrentOrchestrationWorkItems = workerOptions.Concurrency.MaximumConcurrentOrchestrationWorkItems, + MaxConcurrentEntityWorkItems = + workerOptions.Concurrency.MaximumConcurrentEntityWorkItems, + Capabilities = { P.WorkerCapability.HistoryStreaming }, }, cancellationToken: cancellation); } @@ -157,19 +185,25 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall stream, Ca { this.RunBackgroundTask( workItem, - () => this.OnRunOrchestratorAsync(workItem.OrchestratorRequest, workItem.CompletionToken)); + () => this.OnRunOrchestratorAsync( + workItem.OrchestratorRequest, + workItem.CompletionToken, + cancellation)); } else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest) { this.RunBackgroundTask( workItem, - () => this.OnRunActivityAsync(workItem.ActivityRequest, workItem.CompletionToken)); + () => this.OnRunActivityAsync( + workItem.ActivityRequest, + workItem.CompletionToken, + cancellation)); } else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest) { this.RunBackgroundTask( workItem, - () => this.OnRunEntityBatchAsync(workItem.EntityRequest)); + () => this.OnRunEntityBatchAsync(workItem.EntityRequest, cancellation)); } else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing) { @@ -207,7 +241,10 @@ void RunBackgroundTask(P.WorkItem? workItem, Func handler) }); } - async Task OnRunOrchestratorAsync(P.OrchestratorRequest request, string completionToken) + async Task OnRunOrchestratorAsync( + P.OrchestratorRequest request, + string completionToken, + CancellationToken cancellationToken) { OrchestratorExecutionResult? result = null; P.TaskFailureDetails? failureDetails = null; @@ -215,8 +252,8 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request, string completi try { - OrchestrationRuntimeState runtimeState = BuildRuntimeState(request); - name = new(runtimeState.Name); + OrchestrationRuntimeState runtimeState = await this.BuildRuntimeStateAsync(request, cancellationToken); + name = new TaskName(runtimeState.Name); this.Logger.ReceivedOrchestratorRequest( name, @@ -297,10 +334,10 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request, string completi response.Actions.Count, GetActionsListForLogging(response.Actions)); - await this.sidecar.CompleteOrchestratorTaskAsync(response); + await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken); } - async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken) + async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation) { OrchestrationInstance instance = request.OrchestrationInstance.ToCore(); string rawInput = request.Input; @@ -360,10 +397,10 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken) CompletionToken = completionToken, }; - await this.sidecar.CompleteActivityTaskAsync(response); + await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation); } - async Task OnRunEntityBatchAsync(P.EntityBatchRequest request) + async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, CancellationToken cancellation) { var coreEntityId = DTCore.Entities.EntityId.FromString(request.InstanceId); EntityId entityId = new(coreEntityId.Name, coreEntityId.Key); @@ -388,7 +425,7 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request) else { // we could not find the entity. This is considered an application error, - // so we return a non-retryable error-OperationResult for each operation in the batch. + // so we return a non-retriable error-OperationResult for each operation in the batch. batchResult = new EntityBatchResult() { Actions = new List(), // no actions @@ -423,7 +460,7 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request) // convert the result to protobuf format and send it back P.EntityBatchResult response = batchResult.ToEntityBatchResult(); - await this.sidecar.CompleteEntityTaskAsync(response); + await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation); } } }