From 8eaced5a5fc0d4cb10e99c1d0580fc2ef24120b8 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 22 Dec 2017 10:44:25 -0800 Subject: [PATCH] implement custpomized deduplication of start workflow execution API (#463) * implement custpomized deduplication of start workflow execution API --- .gen/go/shared/idl.go | 4 +- .gen/go/shared/types.go | 213 +++++++++++++++- common/persistence/cassandraPersistence.go | 100 +++++--- .../persistence/cassandraPersistence_test.go | 36 ++- common/persistence/dataInterfaces.go | 37 ++- common/persistence/persistenceTestBase.go | 35 +-- idl/github.com/uber/cadence/shared.thrift | 19 ++ service/history/handler.go | 4 +- service/history/historyEngine.go | 193 +++++++++----- service/history/historyEngine2_test.go | 241 ++++++++++++++++++ service/history/historyEngine_test.go | 5 +- service/history/historyTestBase.go | 17 +- service/history/mutableStateBuilder.go | 3 +- service/history/shardContext.go | 11 +- service/history/shardController.go | 20 +- service/history/shardController_test.go | 10 +- service/history/timerQueueProcessor2_test.go | 3 +- .../history/transferQueueProcessor_test.go | 6 +- service/history/workflowExecutionContext.go | 25 +- 19 files changed, 777 insertions(+), 205 deletions(-) diff --git a/.gen/go/shared/idl.go b/.gen/go/shared/idl.go index eb6704d37c0..ace0c4bb7b1 100644 --- a/.gen/go/shared/idl.go +++ b/.gen/go/shared/idl.go @@ -30,8 +30,8 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "shared", Package: "github.com/uber/cadence/.gen/go/shared", FilePath: "shared.thrift", - SHA1: "296cedf879aa97fc5b9e73540f4035c5c7d922be", + SHA1: "b99e79243700c10b446aa60ecac70d4e1d567759", Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence\n\nexception BadRequestError {\n 1: required string message\n}\n\nexception InternalServiceError {\n 1: required string message\n}\n\nexception DomainAlreadyExistsError {\n 1: required string message\n}\n\nexception WorkflowExecutionAlreadyStartedError {\n 10: optional string message\n 20: optional string startRequestId\n 30: optional string runId\n}\n\nexception EntityNotExistsError {\n 1: required string message\n}\n\nexception ServiceBusyError {\n 1: required string message\n}\n\nexception CancellationAlreadyRequestedError {\n 1: required string message\n}\n\nexception QueryFailedError {\n 1: required string message\n}\n\nenum DomainStatus {\n REGISTERED,\n DEPRECATED,\n DELETED,\n}\n\nenum TimeoutType {\n START_TO_CLOSE,\n SCHEDULE_TO_START,\n SCHEDULE_TO_CLOSE,\n HEARTBEAT,\n}\n\nenum DecisionType {\n ScheduleActivityTask,\n RequestCancelActivityTask,\n StartTimer,\n CompleteWorkflowExecution,\n FailWorkflowExecution,\n CancelTimer,\n CancelWorkflowExecution,\n RequestCancelExternalWorkflowExecution,\n RecordMarker,\n ContinueAsNewWorkflowExecution,\n StartChildWorkflowExecution,\n}\n\nenum EventType {\n WorkflowExecutionStarted,\n WorkflowExecutionCompleted,\n WorkflowExecutionFailed,\n WorkflowExecutionTimedOut,\n DecisionTaskScheduled,\n DecisionTaskStarted,\n DecisionTaskCompleted,\n DecisionTaskTimedOut\n DecisionTaskFailed,\n ActivityTaskScheduled,\n ActivityTaskStarted,\n ActivityTaskCompleted,\n ActivityTaskFailed,\n ActivityTaskTimedOut,\n ActivityTaskCancelRequested,\n RequestCancelActivityTaskFailed,\n ActivityTaskCanceled,\n TimerStarted,\n TimerFired,\n CancelTimerFailed,\n TimerCanceled,\n WorkflowExecutionCancelRequested,\n WorkflowExecutionCanceled,\n RequestCancelExternalWorkflowExecutionInitiated,\n RequestCancelExternalWorkflowExecutionFailed,\n ExternalWorkflowExecutionCancelRequested,\n MarkerRecorded,\n WorkflowExecutionSignaled,\n WorkflowExecutionTerminated,\n WorkflowExecutionContinuedAsNew,\n StartChildWorkflowExecutionInitiated,\n StartChildWorkflowExecutionFailed,\n ChildWorkflowExecutionStarted,\n ChildWorkflowExecutionCompleted,\n ChildWorkflowExecutionFailed,\n ChildWorkflowExecutionCanceled,\n ChildWorkflowExecutionTimedOut,\n ChildWorkflowExecutionTerminated,\n}\n\nenum DecisionTaskFailedCause {\n UNHANDLED_DECISION,\n BAD_SCHEDULE_ACTIVITY_ATTRIBUTES,\n BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES,\n BAD_START_TIMER_ATTRIBUTES,\n BAD_CANCEL_TIMER_ATTRIBUTES,\n BAD_RECORD_MARKER_ATTRIBUTES,\n BAD_COMPLETE_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_FAIL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_CANCEL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_CONTINUE_AS_NEW_ATTRIBUTES,\n START_TIMER_DUPLICATE_ID,\n RESET_STICKY_TASKLIST,\n WORKFLOW_WORKER_UNHANDLED_FAILURE\n}\n\nenum CancelExternalWorkflowExecutionFailedCause {\n UNKNOWN_EXTERNAL_WORKFLOW_EXECUTION,\n}\n\nenum ChildWorkflowExecutionFailedCause {\n WORKFLOW_ALREADY_RUNNING,\n}\n\nenum WorkflowExecutionCloseStatus {\n COMPLETED,\n FAILED,\n CANCELED,\n TERMINATED,\n CONTINUED_AS_NEW,\n TIMED_OUT,\n}\n\nenum ChildPolicy {\n TERMINATE,\n REQUEST_CANCEL,\n ABANDON,\n}\n\nenum QueryTaskCompletedType {\n COMPLETED,\n FAILED,\n}\n\nstruct WorkflowType {\n 10: optional string name\n}\n\nstruct ActivityType {\n 10: optional string name\n}\n\nstruct TaskList {\n 10: optional string name\n}\n\nstruct TaskListMetadata {\n 10: optional double maxTasksPerSecond\n}\n\nstruct WorkflowExecution {\n 10: optional string workflowId\n 20: optional string runId\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional WorkflowExecution execution\n 20: optional WorkflowType type\n 30: optional i64 (js.type = \"Long\") startTime\n 40: optional i64 (js.type = \"Long\") closeTime\n 50: optional WorkflowExecutionCloseStatus closeStatus\n 60: optional i64 (js.type = \"Long\") historyLength\n}\n\nstruct WorkflowExecutionConfiguration {\n 10: optional TaskList taskList\n 20: optional i32 executionStartToCloseTimeoutSeconds\n 30: optional i32 taskStartToCloseTimeoutSeconds\n 40: optional ChildPolicy childPolicy\n}\n\nstruct TransientDecisionInfo {\n 10: optional HistoryEvent scheduledEvent\n 20: optional HistoryEvent startedEvent\n}\n\nstruct ScheduleActivityTaskDecisionAttributes {\n 10: optional string activityId\n 20: optional ActivityType activityType\n 25: optional string domain\n 30: optional TaskList taskList\n 40: optional binary input\n 45: optional i32 scheduleToCloseTimeoutSeconds\n 50: optional i32 scheduleToStartTimeoutSeconds\n 55: optional i32 startToCloseTimeoutSeconds\n 60: optional i32 heartbeatTimeoutSeconds\n}\n\nstruct RequestCancelActivityTaskDecisionAttributes {\n 10: optional string activityId\n}\n\nstruct StartTimerDecisionAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startToFireTimeoutSeconds\n}\n\nstruct CompleteWorkflowExecutionDecisionAttributes {\n 10: optional binary result\n}\n\nstruct FailWorkflowExecutionDecisionAttributes {\n 10: optional string reason\n 20: optional binary details\n}\n\nstruct CancelTimerDecisionAttributes {\n 10: optional string timerId\n}\n\nstruct CancelWorkflowExecutionDecisionAttributes {\n 10: optional binary details\n}\n\nstruct RequestCancelExternalWorkflowExecutionDecisionAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional string runId\n 40: optional binary control\n}\n\nstruct RecordMarkerDecisionAttributes {\n 10: optional string markerName\n 20: optional binary details\n}\n\nstruct ContinueAsNewWorkflowExecutionDecisionAttributes {\n 10: optional WorkflowType workflowType\n 20: optional TaskList taskList\n 30: optional binary input\n 40: optional i32 executionStartToCloseTimeoutSeconds\n 50: optional i32 taskStartToCloseTimeoutSeconds\n}\n\nstruct StartChildWorkflowExecutionDecisionAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional ChildPolicy childPolicy\n 90: optional binary control\n}\n\nstruct Decision {\n 10: optional DecisionType decisionType\n 20: optional ScheduleActivityTaskDecisionAttributes scheduleActivityTaskDecisionAttributes\n 25: optional StartTimerDecisionAttributes startTimerDecisionAttributes\n 30: optional CompleteWorkflowExecutionDecisionAttributes completeWorkflowExecutionDecisionAttributes\n 35: optional FailWorkflowExecutionDecisionAttributes failWorkflowExecutionDecisionAttributes\n 40: optional RequestCancelActivityTaskDecisionAttributes requestCancelActivityTaskDecisionAttributes\n 50: optional CancelTimerDecisionAttributes cancelTimerDecisionAttributes\n 60: optional CancelWorkflowExecutionDecisionAttributes cancelWorkflowExecutionDecisionAttributes\n 70: optional RequestCancelExternalWorkflowExecutionDecisionAttributes requestCancelExternalWorkflowExecutionDecisionAttributes\n 80: optional RecordMarkerDecisionAttributes recordMarkerDecisionAttributes\n 90: optional ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewWorkflowExecutionDecisionAttributes\n 100: optional StartChildWorkflowExecutionDecisionAttributes startChildWorkflowExecutionDecisionAttributes\n}\n\nstruct WorkflowExecutionStartedEventAttributes {\n 10: optional WorkflowType workflowType\n 20: optional TaskList taskList\n 30: optional binary input\n 40: optional i32 executionStartToCloseTimeoutSeconds\n 50: optional i32 taskStartToCloseTimeoutSeconds\n 60: optional string identity\n}\n\nstruct WorkflowExecutionCompletedEventAttributes {\n 10: optional binary result\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionTimedOutEventAttributes {\n 10: optional TimeoutType timeoutType\n}\n\nstruct WorkflowExecutionContinuedAsNewEventAttributes {\n 10: optional string newExecutionRunId\n 20: optional WorkflowType workflowType\n 30: optional TaskList taskList\n 40: optional binary input\n 50: optional i32 executionStartToCloseTimeoutSeconds\n 60: optional i32 taskStartToCloseTimeoutSeconds\n 70: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct DecisionTaskScheduledEventAttributes {\n 10: optional TaskList taskList\n 20: optional i32 startToCloseTimeoutSeconds\n 30: optional i64 (js.type = \"Long\") attempt\n}\n\nstruct DecisionTaskStartedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional string identity\n 30: optional string requestId\n}\n\nstruct DecisionTaskCompletedEventAttributes {\n 10: optional binary executionContext\n 20: optional i64 (js.type = \"Long\") scheduledEventId\n 30: optional i64 (js.type = \"Long\") startedEventId\n 40: optional string identity\n}\n\nstruct DecisionTaskTimedOutEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional TimeoutType timeoutType\n}\n\nstruct DecisionTaskFailedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional DecisionTaskFailedCause cause\n 35: optional binary details\n 40: optional string identity\n}\n\nstruct ActivityTaskScheduledEventAttributes {\n 10: optional string activityId\n 20: optional ActivityType activityType\n 25: optional string domain\n 30: optional TaskList taskList\n 40: optional binary input\n 45: optional i32 scheduleToCloseTimeoutSeconds\n 50: optional i32 scheduleToStartTimeoutSeconds\n 55: optional i32 startToCloseTimeoutSeconds\n 60: optional i32 heartbeatTimeoutSeconds\n 90: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ActivityTaskStartedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional string identity\n 30: optional string requestId\n}\n\nstruct ActivityTaskCompletedEventAttributes {\n 10: optional binary result\n 20: optional i64 (js.type = \"Long\") scheduledEventId\n 30: optional i64 (js.type = \"Long\") startedEventId\n 40: optional string identity\n}\n\nstruct ActivityTaskFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional string identity\n}\n\nstruct ActivityTaskTimedOutEventAttributes {\n 05: optional binary details\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional TimeoutType timeoutType\n}\n\nstruct ActivityTaskCancelRequestedEventAttributes {\n 10: optional string activityId\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct RequestCancelActivityTaskFailedEventAttributes{\n 10: optional string activityId\n 20: optional string cause\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ActivityTaskCanceledEventAttributes {\n 10: optional binary details\n 20: optional i64 (js.type = \"Long\") latestCancelRequestedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional string identity\n}\n\nstruct TimerStartedEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startToFireTimeoutSeconds\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct TimerFiredEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct TimerCanceledEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 40: optional string identity\n}\n\nstruct CancelTimerFailedEventAttributes {\n 10: optional string timerId\n 20: optional string cause\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 40: optional string identity\n}\n\nstruct WorkflowExecutionCancelRequestedEventAttributes {\n 10: optional string cause\n 20: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 30: optional WorkflowExecution externalWorkflowExecution\n 40: optional string identity\n}\n\nstruct WorkflowExecutionCanceledEventAttributes {\n 10: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 20: optional binary details\n}\n\nstruct MarkerRecordedEventAttributes {\n 10: optional string markerName\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionSignaledEventAttributes {\n 10: optional string signalName\n 20: optional binary input\n 30: optional string identity\n}\n\nstruct WorkflowExecutionTerminatedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RequestCancelExternalWorkflowExecutionInitiatedEventAttributes {\n 10: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional binary control\n}\n\nstruct RequestCancelExternalWorkflowExecutionFailedEventAttributes {\n 10: optional CancelExternalWorkflowExecutionFailedCause cause\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 30: optional string domain\n 40: optional WorkflowExecution workflowExecution\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional binary control\n}\n\nstruct ExternalWorkflowExecutionCancelRequestedEventAttributes {\n 10: optional i64 (js.type = \"Long\") initiatedEventId\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n}\n\nstruct StartChildWorkflowExecutionInitiatedEventAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional ChildPolicy childPolicy\n 90: optional binary control\n 100: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct StartChildWorkflowExecutionFailedEventAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional ChildWorkflowExecutionFailedCause cause\n 50: optional binary control\n 60: optional i64 (js.type = \"Long\") initiatedEventId\n 70: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ChildWorkflowExecutionStartedEventAttributes {\n 10: optional string domain\n 20: optional i64 (js.type = \"Long\") initiatedEventId\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n}\n\nstruct ChildWorkflowExecutionCompletedEventAttributes {\n 10: optional binary result\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional string domain\n 40: optional WorkflowExecution workflowExecution\n 50: optional WorkflowType workflowType\n 60: optional i64 (js.type = \"Long\") initiatedEventId\n 70: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionCanceledEventAttributes {\n 10: optional binary details\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionTimedOutEventAttributes {\n 10: optional TimeoutType timeoutType\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionTerminatedEventAttributes {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") initiatedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct HistoryEvent {\n 10: optional i64 (js.type = \"Long\") eventId\n 20: optional i64 (js.type = \"Long\") timestamp\n 30: optional EventType eventType\n 40: optional WorkflowExecutionStartedEventAttributes workflowExecutionStartedEventAttributes\n 50: optional WorkflowExecutionCompletedEventAttributes workflowExecutionCompletedEventAttributes\n 60: optional WorkflowExecutionFailedEventAttributes workflowExecutionFailedEventAttributes\n 70: optional WorkflowExecutionTimedOutEventAttributes workflowExecutionTimedOutEventAttributes\n 80: optional DecisionTaskScheduledEventAttributes decisionTaskScheduledEventAttributes\n 90: optional DecisionTaskStartedEventAttributes decisionTaskStartedEventAttributes\n 100: optional DecisionTaskCompletedEventAttributes decisionTaskCompletedEventAttributes\n 110: optional DecisionTaskTimedOutEventAttributes decisionTaskTimedOutEventAttributes\n 120: optional DecisionTaskFailedEventAttributes decisionTaskFailedEventAttributes\n 130: optional ActivityTaskScheduledEventAttributes activityTaskScheduledEventAttributes\n 140: optional ActivityTaskStartedEventAttributes activityTaskStartedEventAttributes\n 150: optional ActivityTaskCompletedEventAttributes activityTaskCompletedEventAttributes\n 160: optional ActivityTaskFailedEventAttributes activityTaskFailedEventAttributes\n 170: optional ActivityTaskTimedOutEventAttributes activityTaskTimedOutEventAttributes\n 180: optional TimerStartedEventAttributes timerStartedEventAttributes\n 190: optional TimerFiredEventAttributes timerFiredEventAttributes\n 200: optional ActivityTaskCancelRequestedEventAttributes activityTaskCancelRequestedEventAttributes\n 210: optional RequestCancelActivityTaskFailedEventAttributes requestCancelActivityTaskFailedEventAttributes\n 220: optional ActivityTaskCanceledEventAttributes activityTaskCanceledEventAttributes\n 230: optional TimerCanceledEventAttributes timerCanceledEventAttributes\n 240: optional CancelTimerFailedEventAttributes cancelTimerFailedEventAttributes\n 250: optional MarkerRecordedEventAttributes markerRecordedEventAttributes\n 260: optional WorkflowExecutionSignaledEventAttributes workflowExecutionSignaledEventAttributes\n 270: optional WorkflowExecutionTerminatedEventAttributes workflowExecutionTerminatedEventAttributes\n 280: optional WorkflowExecutionCancelRequestedEventAttributes workflowExecutionCancelRequestedEventAttributes\n 290: optional WorkflowExecutionCanceledEventAttributes workflowExecutionCanceledEventAttributes\n 300: optional RequestCancelExternalWorkflowExecutionInitiatedEventAttributes requestCancelExternalWorkflowExecutionInitiatedEventAttributes\n 310: optional RequestCancelExternalWorkflowExecutionFailedEventAttributes requestCancelExternalWorkflowExecutionFailedEventAttributes\n 320: optional ExternalWorkflowExecutionCancelRequestedEventAttributes externalWorkflowExecutionCancelRequestedEventAttributes\n 330: optional WorkflowExecutionContinuedAsNewEventAttributes workflowExecutionContinuedAsNewEventAttributes\n 340: optional StartChildWorkflowExecutionInitiatedEventAttributes startChildWorkflowExecutionInitiatedEventAttributes\n 350: optional StartChildWorkflowExecutionFailedEventAttributes startChildWorkflowExecutionFailedEventAttributes\n 360: optional ChildWorkflowExecutionStartedEventAttributes childWorkflowExecutionStartedEventAttributes\n 370: optional ChildWorkflowExecutionCompletedEventAttributes childWorkflowExecutionCompletedEventAttributes\n 380: optional ChildWorkflowExecutionFailedEventAttributes childWorkflowExecutionFailedEventAttributes\n 390: optional ChildWorkflowExecutionCanceledEventAttributes childWorkflowExecutionCanceledEventAttributes\n 400: optional ChildWorkflowExecutionTimedOutEventAttributes childWorkflowExecutionTimedOutEventAttributes\n 410: optional ChildWorkflowExecutionTerminatedEventAttributes childWorkflowExecutionTerminatedEventAttributes\n}\n\nstruct History {\n 10: optional list events\n}\n\nstruct WorkflowExecutionFilter {\n 10: optional string workflowId\n}\n\nstruct WorkflowTypeFilter {\n 10: optional string name\n}\n\nstruct StartTimeFilter {\n 10: optional i64 (js.type = \"Long\") earliestTime\n 20: optional i64 (js.type = \"Long\") latestTime\n}\n\nstruct DomainInfo {\n 10: optional string name\n 20: optional DomainStatus status\n 30: optional string description\n 40: optional string ownerEmail\n}\n\nstruct DomainConfiguration {\n 10: optional i32 workflowExecutionRetentionPeriodInDays\n 20: optional bool emitMetric\n}\n\nstruct UpdateDomainInfo {\n 10: optional string description\n 20: optional string ownerEmail\n}\n\nstruct RegisterDomainRequest {\n 10: optional string name\n 20: optional string description\n 30: optional string ownerEmail\n 40: optional i32 workflowExecutionRetentionPeriodInDays\n 50: optional bool emitMetric\n}\n\nstruct DescribeDomainRequest {\n 10: optional string name\n}\n\nstruct DescribeDomainResponse {\n 10: optional DomainInfo domainInfo\n 20: optional DomainConfiguration configuration\n}\n\nstruct UpdateDomainRequest {\n 10: optional string name\n 20: optional UpdateDomainInfo updatedInfo\n 30: optional DomainConfiguration configuration\n}\n\nstruct UpdateDomainResponse {\n 10: optional DomainInfo domainInfo\n 20: optional DomainConfiguration configuration\n}\n\nstruct DeprecateDomainRequest {\n 10: optional string name\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional string identity\n 90: optional string requestId\n}\n\nstruct StartWorkflowExecutionResponse {\n 10: optional string runId\n}\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domain\n 20: optional TaskList taskList\n 30: optional string identity\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional WorkflowExecution workflowExecution\n 30: optional WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = 'Long') attempt\n 54: optional i64 (js.type = \"Long\") backlogCountHint\n 60: optional History history\n 70: optional binary nextPageToken\n 80: optional WorkflowQuery query\n}\n\nstruct StickyExecutionAttributes {\n 10: optional TaskList workerTaskList\n 20: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional list decisions\n 30: optional binary executionContext\n 40: optional string identity\n 50: optional StickyExecutionAttributes stickyAttributes\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional binary taskToken\n 20: optional DecisionTaskFailedCause cause\n 30: optional binary details\n 40: optional string identity\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domain\n 20: optional TaskList taskList\n 30: optional string identity\n 40: optional TaskListMetadata taskListMetadata\n}\n\nstruct PollForActivityTaskResponse {\n 10: optional binary taskToken\n 20: optional WorkflowExecution workflowExecution\n 30: optional string activityId\n 40: optional ActivityType activityType\n 50: optional binary input\n 70: optional i64 (js.type = \"Long\") scheduledTimestamp\n 80: optional i32 scheduleToCloseTimeoutSeconds\n 90: optional i64 (js.type = \"Long\") startedTimestamp\n 100: optional i32 startToCloseTimeoutSeconds\n 110: optional i32 heartbeatTimeoutSeconds\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional binary taskToken\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RecordActivityTaskHeartbeatResponse {\n 10: optional bool cancelRequested\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional binary result\n 30: optional string identity\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional binary taskToken\n 20: optional string reason\n 30: optional binary details\n 40: optional string identity\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional binary taskToken\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RespondActivityTaskCompletedByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional binary result\n 60: optional string identity\n}\n\nstruct RespondActivityTaskFailedByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional string reason\n 60: optional binary details\n 70: optional string identity\n}\n\nstruct RespondActivityTaskCanceledByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional binary details\n 60: optional string identity\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string identity\n 40: optional string requestId\n}\n\nstruct GetWorkflowExecutionHistoryRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n 30: optional i32 maximumPageSize\n 40: optional binary nextPageToken\n 50: optional bool waitForNewEvent\n}\n\nstruct GetWorkflowExecutionHistoryResponse {\n 10: optional History history\n 20: optional binary nextPageToken\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string signalName\n 40: optional binary input\n 50: optional string identity\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string reason\n 40: optional binary details\n 50: optional string identity\n}\n\nstruct ListOpenWorkflowExecutionsRequest {\n 10: optional string domain\n 20: optional i32 maximumPageSize\n 30: optional binary nextPageToken\n 40: optional StartTimeFilter StartTimeFilter\n 50: optional WorkflowExecutionFilter executionFilter\n 60: optional WorkflowTypeFilter typeFilter\n}\n\nstruct ListOpenWorkflowExecutionsResponse {\n 10: optional list executions\n 20: optional binary nextPageToken\n}\n\nstruct ListClosedWorkflowExecutionsRequest {\n 10: optional string domain\n 20: optional i32 maximumPageSize\n 30: optional binary nextPageToken\n 40: optional StartTimeFilter StartTimeFilter\n 50: optional WorkflowExecutionFilter executionFilter\n 60: optional WorkflowTypeFilter typeFilter\n 70: optional WorkflowExecutionCloseStatus statusFilter\n}\n\nstruct ListClosedWorkflowExecutionsResponse {\n 10: optional list executions\n 20: optional binary nextPageToken\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n 30: optional WorkflowQuery query\n}\n\nstruct QueryWorkflowResponse {\n 10: optional binary queryResult\n}\n\nstruct WorkflowQuery {\n 10: optional string queryType\n 20: optional binary queryArgs\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional QueryTaskCompletedType completedType\n 30: optional binary queryResult\n 40: optional string errorMessage\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n}\n\nstruct DescribeWorkflowExecutionResponse {\n 10: optional WorkflowExecutionConfiguration executionConfiguration\n 20: optional WorkflowExecutionInfo workflowExecutionInfo\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence\n\nexception BadRequestError {\n 1: required string message\n}\n\nexception InternalServiceError {\n 1: required string message\n}\n\nexception DomainAlreadyExistsError {\n 1: required string message\n}\n\nexception WorkflowExecutionAlreadyStartedError {\n 10: optional string message\n 20: optional string startRequestId\n 30: optional string runId\n}\n\nexception EntityNotExistsError {\n 1: required string message\n}\n\nexception ServiceBusyError {\n 1: required string message\n}\n\nexception CancellationAlreadyRequestedError {\n 1: required string message\n}\n\nexception QueryFailedError {\n 1: required string message\n}\n\nenum WorkflowIdReusePolicy {\n /*\n * allow start a workflow execution using the same workflow ID,\n * when workflow not running, and the last execution close state is in\n * [terminated, cancelled, timeouted, failed].\n */\n AllowDuplicateFailedOnly,\n /*\n * allow start a workflow execution using the same workflow ID,\n * when workflow not running.\n */\n AllowDuplicate,\n /*\n * do not allow start a workflow execution using the same workflow ID at all\n */\n RejectDuplicate,\n}\n\nenum DomainStatus {\n REGISTERED,\n DEPRECATED,\n DELETED,\n}\n\nenum TimeoutType {\n START_TO_CLOSE,\n SCHEDULE_TO_START,\n SCHEDULE_TO_CLOSE,\n HEARTBEAT,\n}\n\nenum DecisionType {\n ScheduleActivityTask,\n RequestCancelActivityTask,\n StartTimer,\n CompleteWorkflowExecution,\n FailWorkflowExecution,\n CancelTimer,\n CancelWorkflowExecution,\n RequestCancelExternalWorkflowExecution,\n RecordMarker,\n ContinueAsNewWorkflowExecution,\n StartChildWorkflowExecution,\n}\n\nenum EventType {\n WorkflowExecutionStarted,\n WorkflowExecutionCompleted,\n WorkflowExecutionFailed,\n WorkflowExecutionTimedOut,\n DecisionTaskScheduled,\n DecisionTaskStarted,\n DecisionTaskCompleted,\n DecisionTaskTimedOut\n DecisionTaskFailed,\n ActivityTaskScheduled,\n ActivityTaskStarted,\n ActivityTaskCompleted,\n ActivityTaskFailed,\n ActivityTaskTimedOut,\n ActivityTaskCancelRequested,\n RequestCancelActivityTaskFailed,\n ActivityTaskCanceled,\n TimerStarted,\n TimerFired,\n CancelTimerFailed,\n TimerCanceled,\n WorkflowExecutionCancelRequested,\n WorkflowExecutionCanceled,\n RequestCancelExternalWorkflowExecutionInitiated,\n RequestCancelExternalWorkflowExecutionFailed,\n ExternalWorkflowExecutionCancelRequested,\n MarkerRecorded,\n WorkflowExecutionSignaled,\n WorkflowExecutionTerminated,\n WorkflowExecutionContinuedAsNew,\n StartChildWorkflowExecutionInitiated,\n StartChildWorkflowExecutionFailed,\n ChildWorkflowExecutionStarted,\n ChildWorkflowExecutionCompleted,\n ChildWorkflowExecutionFailed,\n ChildWorkflowExecutionCanceled,\n ChildWorkflowExecutionTimedOut,\n ChildWorkflowExecutionTerminated,\n}\n\nenum DecisionTaskFailedCause {\n UNHANDLED_DECISION,\n BAD_SCHEDULE_ACTIVITY_ATTRIBUTES,\n BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES,\n BAD_START_TIMER_ATTRIBUTES,\n BAD_CANCEL_TIMER_ATTRIBUTES,\n BAD_RECORD_MARKER_ATTRIBUTES,\n BAD_COMPLETE_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_FAIL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_CANCEL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_ATTRIBUTES,\n BAD_CONTINUE_AS_NEW_ATTRIBUTES,\n START_TIMER_DUPLICATE_ID,\n RESET_STICKY_TASKLIST,\n WORKFLOW_WORKER_UNHANDLED_FAILURE\n}\n\nenum CancelExternalWorkflowExecutionFailedCause {\n UNKNOWN_EXTERNAL_WORKFLOW_EXECUTION,\n}\n\nenum ChildWorkflowExecutionFailedCause {\n WORKFLOW_ALREADY_RUNNING,\n}\n\nenum WorkflowExecutionCloseStatus {\n COMPLETED,\n FAILED,\n CANCELED,\n TERMINATED,\n CONTINUED_AS_NEW,\n TIMED_OUT,\n}\n\nenum ChildPolicy {\n TERMINATE,\n REQUEST_CANCEL,\n ABANDON,\n}\n\nenum QueryTaskCompletedType {\n COMPLETED,\n FAILED,\n}\n\nstruct WorkflowType {\n 10: optional string name\n}\n\nstruct ActivityType {\n 10: optional string name\n}\n\nstruct TaskList {\n 10: optional string name\n}\n\nstruct TaskListMetadata {\n 10: optional double maxTasksPerSecond\n}\n\nstruct WorkflowExecution {\n 10: optional string workflowId\n 20: optional string runId\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional WorkflowExecution execution\n 20: optional WorkflowType type\n 30: optional i64 (js.type = \"Long\") startTime\n 40: optional i64 (js.type = \"Long\") closeTime\n 50: optional WorkflowExecutionCloseStatus closeStatus\n 60: optional i64 (js.type = \"Long\") historyLength\n}\n\nstruct WorkflowExecutionConfiguration {\n 10: optional TaskList taskList\n 20: optional i32 executionStartToCloseTimeoutSeconds\n 30: optional i32 taskStartToCloseTimeoutSeconds\n 40: optional ChildPolicy childPolicy\n}\n\nstruct TransientDecisionInfo {\n 10: optional HistoryEvent scheduledEvent\n 20: optional HistoryEvent startedEvent\n}\n\nstruct ScheduleActivityTaskDecisionAttributes {\n 10: optional string activityId\n 20: optional ActivityType activityType\n 25: optional string domain\n 30: optional TaskList taskList\n 40: optional binary input\n 45: optional i32 scheduleToCloseTimeoutSeconds\n 50: optional i32 scheduleToStartTimeoutSeconds\n 55: optional i32 startToCloseTimeoutSeconds\n 60: optional i32 heartbeatTimeoutSeconds\n}\n\nstruct RequestCancelActivityTaskDecisionAttributes {\n 10: optional string activityId\n}\n\nstruct StartTimerDecisionAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startToFireTimeoutSeconds\n}\n\nstruct CompleteWorkflowExecutionDecisionAttributes {\n 10: optional binary result\n}\n\nstruct FailWorkflowExecutionDecisionAttributes {\n 10: optional string reason\n 20: optional binary details\n}\n\nstruct CancelTimerDecisionAttributes {\n 10: optional string timerId\n}\n\nstruct CancelWorkflowExecutionDecisionAttributes {\n 10: optional binary details\n}\n\nstruct RequestCancelExternalWorkflowExecutionDecisionAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional string runId\n 40: optional binary control\n}\n\nstruct RecordMarkerDecisionAttributes {\n 10: optional string markerName\n 20: optional binary details\n}\n\nstruct ContinueAsNewWorkflowExecutionDecisionAttributes {\n 10: optional WorkflowType workflowType\n 20: optional TaskList taskList\n 30: optional binary input\n 40: optional i32 executionStartToCloseTimeoutSeconds\n 50: optional i32 taskStartToCloseTimeoutSeconds\n}\n\nstruct StartChildWorkflowExecutionDecisionAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional ChildPolicy childPolicy\n 90: optional binary control\n}\n\nstruct Decision {\n 10: optional DecisionType decisionType\n 20: optional ScheduleActivityTaskDecisionAttributes scheduleActivityTaskDecisionAttributes\n 25: optional StartTimerDecisionAttributes startTimerDecisionAttributes\n 30: optional CompleteWorkflowExecutionDecisionAttributes completeWorkflowExecutionDecisionAttributes\n 35: optional FailWorkflowExecutionDecisionAttributes failWorkflowExecutionDecisionAttributes\n 40: optional RequestCancelActivityTaskDecisionAttributes requestCancelActivityTaskDecisionAttributes\n 50: optional CancelTimerDecisionAttributes cancelTimerDecisionAttributes\n 60: optional CancelWorkflowExecutionDecisionAttributes cancelWorkflowExecutionDecisionAttributes\n 70: optional RequestCancelExternalWorkflowExecutionDecisionAttributes requestCancelExternalWorkflowExecutionDecisionAttributes\n 80: optional RecordMarkerDecisionAttributes recordMarkerDecisionAttributes\n 90: optional ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewWorkflowExecutionDecisionAttributes\n 100: optional StartChildWorkflowExecutionDecisionAttributes startChildWorkflowExecutionDecisionAttributes\n}\n\nstruct WorkflowExecutionStartedEventAttributes {\n 10: optional WorkflowType workflowType\n 20: optional TaskList taskList\n 30: optional binary input\n 40: optional i32 executionStartToCloseTimeoutSeconds\n 50: optional i32 taskStartToCloseTimeoutSeconds\n 60: optional string identity\n}\n\nstruct WorkflowExecutionCompletedEventAttributes {\n 10: optional binary result\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionTimedOutEventAttributes {\n 10: optional TimeoutType timeoutType\n}\n\nstruct WorkflowExecutionContinuedAsNewEventAttributes {\n 10: optional string newExecutionRunId\n 20: optional WorkflowType workflowType\n 30: optional TaskList taskList\n 40: optional binary input\n 50: optional i32 executionStartToCloseTimeoutSeconds\n 60: optional i32 taskStartToCloseTimeoutSeconds\n 70: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct DecisionTaskScheduledEventAttributes {\n 10: optional TaskList taskList\n 20: optional i32 startToCloseTimeoutSeconds\n 30: optional i64 (js.type = \"Long\") attempt\n}\n\nstruct DecisionTaskStartedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional string identity\n 30: optional string requestId\n}\n\nstruct DecisionTaskCompletedEventAttributes {\n 10: optional binary executionContext\n 20: optional i64 (js.type = \"Long\") scheduledEventId\n 30: optional i64 (js.type = \"Long\") startedEventId\n 40: optional string identity\n}\n\nstruct DecisionTaskTimedOutEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional TimeoutType timeoutType\n}\n\nstruct DecisionTaskFailedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional DecisionTaskFailedCause cause\n 35: optional binary details\n 40: optional string identity\n}\n\nstruct ActivityTaskScheduledEventAttributes {\n 10: optional string activityId\n 20: optional ActivityType activityType\n 25: optional string domain\n 30: optional TaskList taskList\n 40: optional binary input\n 45: optional i32 scheduleToCloseTimeoutSeconds\n 50: optional i32 scheduleToStartTimeoutSeconds\n 55: optional i32 startToCloseTimeoutSeconds\n 60: optional i32 heartbeatTimeoutSeconds\n 90: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ActivityTaskStartedEventAttributes {\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional string identity\n 30: optional string requestId\n}\n\nstruct ActivityTaskCompletedEventAttributes {\n 10: optional binary result\n 20: optional i64 (js.type = \"Long\") scheduledEventId\n 30: optional i64 (js.type = \"Long\") startedEventId\n 40: optional string identity\n}\n\nstruct ActivityTaskFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional string identity\n}\n\nstruct ActivityTaskTimedOutEventAttributes {\n 05: optional binary details\n 10: optional i64 (js.type = \"Long\") scheduledEventId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional TimeoutType timeoutType\n}\n\nstruct ActivityTaskCancelRequestedEventAttributes {\n 10: optional string activityId\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct RequestCancelActivityTaskFailedEventAttributes{\n 10: optional string activityId\n 20: optional string cause\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ActivityTaskCanceledEventAttributes {\n 10: optional binary details\n 20: optional i64 (js.type = \"Long\") latestCancelRequestedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional string identity\n}\n\nstruct TimerStartedEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startToFireTimeoutSeconds\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct TimerFiredEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct TimerCanceledEventAttributes {\n 10: optional string timerId\n 20: optional i64 (js.type = \"Long\") startedEventId\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 40: optional string identity\n}\n\nstruct CancelTimerFailedEventAttributes {\n 10: optional string timerId\n 20: optional string cause\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 40: optional string identity\n}\n\nstruct WorkflowExecutionCancelRequestedEventAttributes {\n 10: optional string cause\n 20: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 30: optional WorkflowExecution externalWorkflowExecution\n 40: optional string identity\n}\n\nstruct WorkflowExecutionCanceledEventAttributes {\n 10: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 20: optional binary details\n}\n\nstruct MarkerRecordedEventAttributes {\n 10: optional string markerName\n 20: optional binary details\n 30: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct WorkflowExecutionSignaledEventAttributes {\n 10: optional string signalName\n 20: optional binary input\n 30: optional string identity\n}\n\nstruct WorkflowExecutionTerminatedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RequestCancelExternalWorkflowExecutionInitiatedEventAttributes {\n 10: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional binary control\n}\n\nstruct RequestCancelExternalWorkflowExecutionFailedEventAttributes {\n 10: optional CancelExternalWorkflowExecutionFailedCause cause\n 20: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n 30: optional string domain\n 40: optional WorkflowExecution workflowExecution\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional binary control\n}\n\nstruct ExternalWorkflowExecutionCancelRequestedEventAttributes {\n 10: optional i64 (js.type = \"Long\") initiatedEventId\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n}\n\nstruct StartChildWorkflowExecutionInitiatedEventAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional ChildPolicy childPolicy\n 90: optional binary control\n 100: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct StartChildWorkflowExecutionFailedEventAttributes {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional ChildWorkflowExecutionFailedCause cause\n 50: optional binary control\n 60: optional i64 (js.type = \"Long\") initiatedEventId\n 70: optional i64 (js.type = \"Long\") decisionTaskCompletedEventId\n}\n\nstruct ChildWorkflowExecutionStartedEventAttributes {\n 10: optional string domain\n 20: optional i64 (js.type = \"Long\") initiatedEventId\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n}\n\nstruct ChildWorkflowExecutionCompletedEventAttributes {\n 10: optional binary result\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionFailedEventAttributes {\n 10: optional string reason\n 20: optional binary details\n 30: optional string domain\n 40: optional WorkflowExecution workflowExecution\n 50: optional WorkflowType workflowType\n 60: optional i64 (js.type = \"Long\") initiatedEventId\n 70: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionCanceledEventAttributes {\n 10: optional binary details\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionTimedOutEventAttributes {\n 10: optional TimeoutType timeoutType\n 20: optional string domain\n 30: optional WorkflowExecution workflowExecution\n 40: optional WorkflowType workflowType\n 50: optional i64 (js.type = \"Long\") initiatedEventId\n 60: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct ChildWorkflowExecutionTerminatedEventAttributes {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") initiatedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n}\n\nstruct HistoryEvent {\n 10: optional i64 (js.type = \"Long\") eventId\n 20: optional i64 (js.type = \"Long\") timestamp\n 30: optional EventType eventType\n 40: optional WorkflowExecutionStartedEventAttributes workflowExecutionStartedEventAttributes\n 50: optional WorkflowExecutionCompletedEventAttributes workflowExecutionCompletedEventAttributes\n 60: optional WorkflowExecutionFailedEventAttributes workflowExecutionFailedEventAttributes\n 70: optional WorkflowExecutionTimedOutEventAttributes workflowExecutionTimedOutEventAttributes\n 80: optional DecisionTaskScheduledEventAttributes decisionTaskScheduledEventAttributes\n 90: optional DecisionTaskStartedEventAttributes decisionTaskStartedEventAttributes\n 100: optional DecisionTaskCompletedEventAttributes decisionTaskCompletedEventAttributes\n 110: optional DecisionTaskTimedOutEventAttributes decisionTaskTimedOutEventAttributes\n 120: optional DecisionTaskFailedEventAttributes decisionTaskFailedEventAttributes\n 130: optional ActivityTaskScheduledEventAttributes activityTaskScheduledEventAttributes\n 140: optional ActivityTaskStartedEventAttributes activityTaskStartedEventAttributes\n 150: optional ActivityTaskCompletedEventAttributes activityTaskCompletedEventAttributes\n 160: optional ActivityTaskFailedEventAttributes activityTaskFailedEventAttributes\n 170: optional ActivityTaskTimedOutEventAttributes activityTaskTimedOutEventAttributes\n 180: optional TimerStartedEventAttributes timerStartedEventAttributes\n 190: optional TimerFiredEventAttributes timerFiredEventAttributes\n 200: optional ActivityTaskCancelRequestedEventAttributes activityTaskCancelRequestedEventAttributes\n 210: optional RequestCancelActivityTaskFailedEventAttributes requestCancelActivityTaskFailedEventAttributes\n 220: optional ActivityTaskCanceledEventAttributes activityTaskCanceledEventAttributes\n 230: optional TimerCanceledEventAttributes timerCanceledEventAttributes\n 240: optional CancelTimerFailedEventAttributes cancelTimerFailedEventAttributes\n 250: optional MarkerRecordedEventAttributes markerRecordedEventAttributes\n 260: optional WorkflowExecutionSignaledEventAttributes workflowExecutionSignaledEventAttributes\n 270: optional WorkflowExecutionTerminatedEventAttributes workflowExecutionTerminatedEventAttributes\n 280: optional WorkflowExecutionCancelRequestedEventAttributes workflowExecutionCancelRequestedEventAttributes\n 290: optional WorkflowExecutionCanceledEventAttributes workflowExecutionCanceledEventAttributes\n 300: optional RequestCancelExternalWorkflowExecutionInitiatedEventAttributes requestCancelExternalWorkflowExecutionInitiatedEventAttributes\n 310: optional RequestCancelExternalWorkflowExecutionFailedEventAttributes requestCancelExternalWorkflowExecutionFailedEventAttributes\n 320: optional ExternalWorkflowExecutionCancelRequestedEventAttributes externalWorkflowExecutionCancelRequestedEventAttributes\n 330: optional WorkflowExecutionContinuedAsNewEventAttributes workflowExecutionContinuedAsNewEventAttributes\n 340: optional StartChildWorkflowExecutionInitiatedEventAttributes startChildWorkflowExecutionInitiatedEventAttributes\n 350: optional StartChildWorkflowExecutionFailedEventAttributes startChildWorkflowExecutionFailedEventAttributes\n 360: optional ChildWorkflowExecutionStartedEventAttributes childWorkflowExecutionStartedEventAttributes\n 370: optional ChildWorkflowExecutionCompletedEventAttributes childWorkflowExecutionCompletedEventAttributes\n 380: optional ChildWorkflowExecutionFailedEventAttributes childWorkflowExecutionFailedEventAttributes\n 390: optional ChildWorkflowExecutionCanceledEventAttributes childWorkflowExecutionCanceledEventAttributes\n 400: optional ChildWorkflowExecutionTimedOutEventAttributes childWorkflowExecutionTimedOutEventAttributes\n 410: optional ChildWorkflowExecutionTerminatedEventAttributes childWorkflowExecutionTerminatedEventAttributes\n}\n\nstruct History {\n 10: optional list events\n}\n\nstruct WorkflowExecutionFilter {\n 10: optional string workflowId\n}\n\nstruct WorkflowTypeFilter {\n 10: optional string name\n}\n\nstruct StartTimeFilter {\n 10: optional i64 (js.type = \"Long\") earliestTime\n 20: optional i64 (js.type = \"Long\") latestTime\n}\n\nstruct DomainInfo {\n 10: optional string name\n 20: optional DomainStatus status\n 30: optional string description\n 40: optional string ownerEmail\n}\n\nstruct DomainConfiguration {\n 10: optional i32 workflowExecutionRetentionPeriodInDays\n 20: optional bool emitMetric\n}\n\nstruct UpdateDomainInfo {\n 10: optional string description\n 20: optional string ownerEmail\n}\n\nstruct RegisterDomainRequest {\n 10: optional string name\n 20: optional string description\n 30: optional string ownerEmail\n 40: optional i32 workflowExecutionRetentionPeriodInDays\n 50: optional bool emitMetric\n}\n\nstruct DescribeDomainRequest {\n 10: optional string name\n}\n\nstruct DescribeDomainResponse {\n 10: optional DomainInfo domainInfo\n 20: optional DomainConfiguration configuration\n}\n\nstruct UpdateDomainRequest {\n 10: optional string name\n 20: optional UpdateDomainInfo updatedInfo\n 30: optional DomainConfiguration configuration\n}\n\nstruct UpdateDomainResponse {\n 10: optional DomainInfo domainInfo\n 20: optional DomainConfiguration configuration\n}\n\nstruct DeprecateDomainRequest {\n 10: optional string name\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional string workflowId\n 30: optional WorkflowType workflowType\n 40: optional TaskList taskList\n 50: optional binary input\n 60: optional i32 executionStartToCloseTimeoutSeconds\n 70: optional i32 taskStartToCloseTimeoutSeconds\n 80: optional string identity\n 90: optional string requestId\n 100: optional WorkflowIdReusePolicy workflowIdReusePolicy\n}\n\nstruct StartWorkflowExecutionResponse {\n 10: optional string runId\n}\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domain\n 20: optional TaskList taskList\n 30: optional string identity\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional WorkflowExecution workflowExecution\n 30: optional WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = 'Long') attempt\n 54: optional i64 (js.type = \"Long\") backlogCountHint\n 60: optional History history\n 70: optional binary nextPageToken\n 80: optional WorkflowQuery query\n}\n\nstruct StickyExecutionAttributes {\n 10: optional TaskList workerTaskList\n 20: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional list decisions\n 30: optional binary executionContext\n 40: optional string identity\n 50: optional StickyExecutionAttributes stickyAttributes\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional binary taskToken\n 20: optional DecisionTaskFailedCause cause\n 30: optional binary details\n 40: optional string identity\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domain\n 20: optional TaskList taskList\n 30: optional string identity\n 40: optional TaskListMetadata taskListMetadata\n}\n\nstruct PollForActivityTaskResponse {\n 10: optional binary taskToken\n 20: optional WorkflowExecution workflowExecution\n 30: optional string activityId\n 40: optional ActivityType activityType\n 50: optional binary input\n 70: optional i64 (js.type = \"Long\") scheduledTimestamp\n 80: optional i32 scheduleToCloseTimeoutSeconds\n 90: optional i64 (js.type = \"Long\") startedTimestamp\n 100: optional i32 startToCloseTimeoutSeconds\n 110: optional i32 heartbeatTimeoutSeconds\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional binary taskToken\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RecordActivityTaskHeartbeatResponse {\n 10: optional bool cancelRequested\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional binary result\n 30: optional string identity\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional binary taskToken\n 20: optional string reason\n 30: optional binary details\n 40: optional string identity\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional binary taskToken\n 20: optional binary details\n 30: optional string identity\n}\n\nstruct RespondActivityTaskCompletedByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional binary result\n 60: optional string identity\n}\n\nstruct RespondActivityTaskFailedByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional string reason\n 60: optional binary details\n 70: optional string identity\n}\n\nstruct RespondActivityTaskCanceledByIDRequest {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional string activityID\n 50: optional binary details\n 60: optional string identity\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string identity\n 40: optional string requestId\n}\n\nstruct GetWorkflowExecutionHistoryRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n 30: optional i32 maximumPageSize\n 40: optional binary nextPageToken\n 50: optional bool waitForNewEvent\n}\n\nstruct GetWorkflowExecutionHistoryResponse {\n 10: optional History history\n 20: optional binary nextPageToken\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string signalName\n 40: optional binary input\n 50: optional string identity\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution workflowExecution\n 30: optional string reason\n 40: optional binary details\n 50: optional string identity\n}\n\nstruct ListOpenWorkflowExecutionsRequest {\n 10: optional string domain\n 20: optional i32 maximumPageSize\n 30: optional binary nextPageToken\n 40: optional StartTimeFilter StartTimeFilter\n 50: optional WorkflowExecutionFilter executionFilter\n 60: optional WorkflowTypeFilter typeFilter\n}\n\nstruct ListOpenWorkflowExecutionsResponse {\n 10: optional list executions\n 20: optional binary nextPageToken\n}\n\nstruct ListClosedWorkflowExecutionsRequest {\n 10: optional string domain\n 20: optional i32 maximumPageSize\n 30: optional binary nextPageToken\n 40: optional StartTimeFilter StartTimeFilter\n 50: optional WorkflowExecutionFilter executionFilter\n 60: optional WorkflowTypeFilter typeFilter\n 70: optional WorkflowExecutionCloseStatus statusFilter\n}\n\nstruct ListClosedWorkflowExecutionsResponse {\n 10: optional list executions\n 20: optional binary nextPageToken\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n 30: optional WorkflowQuery query\n}\n\nstruct QueryWorkflowResponse {\n 10: optional binary queryResult\n}\n\nstruct WorkflowQuery {\n 10: optional string queryType\n 20: optional binary queryArgs\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional binary taskToken\n 20: optional QueryTaskCompletedType completedType\n 30: optional binary queryResult\n 40: optional string errorMessage\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domain\n 20: optional WorkflowExecution execution\n}\n\nstruct DescribeWorkflowExecutionResponse {\n 10: optional WorkflowExecutionConfiguration executionConfiguration\n 20: optional WorkflowExecutionInfo workflowExecutionInfo\n}\n" diff --git a/.gen/go/shared/types.go b/.gen/go/shared/types.go index 56738717872..995b826fa4e 100644 --- a/.gen/go/shared/types.go +++ b/.gen/go/shared/types.go @@ -19736,15 +19736,16 @@ func (v *StartTimerDecisionAttributes) GetStartToFireTimeoutSeconds() (o int64) } type StartWorkflowExecutionRequest struct { - Domain *string `json:"domain,omitempty"` - WorkflowId *string `json:"workflowId,omitempty"` - WorkflowType *WorkflowType `json:"workflowType,omitempty"` - TaskList *TaskList `json:"taskList,omitempty"` - Input []byte `json:"input,omitempty"` - ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` - TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` - Identity *string `json:"identity,omitempty"` - RequestId *string `json:"requestId,omitempty"` + Domain *string `json:"domain,omitempty"` + WorkflowId *string `json:"workflowId,omitempty"` + WorkflowType *WorkflowType `json:"workflowType,omitempty"` + TaskList *TaskList `json:"taskList,omitempty"` + Input []byte `json:"input,omitempty"` + ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` + TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` + Identity *string `json:"identity,omitempty"` + RequestId *string `json:"requestId,omitempty"` + WorkflowIdReusePolicy *WorkflowIdReusePolicy `json:"workflowIdReusePolicy,omitempty"` } // ToWire translates a StartWorkflowExecutionRequest struct into a Thrift-level intermediate @@ -19764,7 +19765,7 @@ type StartWorkflowExecutionRequest struct { // } func (v *StartWorkflowExecutionRequest) ToWire() (wire.Value, error) { var ( - fields [9]wire.Field + fields [10]wire.Field i int = 0 w wire.Value err error @@ -19842,10 +19843,24 @@ func (v *StartWorkflowExecutionRequest) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 90, Value: w} i++ } + if v.WorkflowIdReusePolicy != nil { + w, err = v.WorkflowIdReusePolicy.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 100, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } +func _WorkflowIdReusePolicy_Read(w wire.Value) (WorkflowIdReusePolicy, error) { + var v WorkflowIdReusePolicy + err := v.FromWire(w) + return v, err +} + // FromWire deserializes a StartWorkflowExecutionRequest struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -19951,6 +19966,16 @@ func (v *StartWorkflowExecutionRequest) FromWire(w wire.Value) error { return err } + } + case 100: + if field.Value.Type() == wire.TI32 { + var x WorkflowIdReusePolicy + x, err = _WorkflowIdReusePolicy_Read(field.Value) + v.WorkflowIdReusePolicy = &x + if err != nil { + return err + } + } } } @@ -19965,7 +19990,7 @@ func (v *StartWorkflowExecutionRequest) String() string { return "" } - var fields [9]string + var fields [10]string i := 0 if v.Domain != nil { fields[i] = fmt.Sprintf("Domain: %v", *(v.Domain)) @@ -20003,10 +20028,24 @@ func (v *StartWorkflowExecutionRequest) String() string { fields[i] = fmt.Sprintf("RequestId: %v", *(v.RequestId)) i++ } + if v.WorkflowIdReusePolicy != nil { + fields[i] = fmt.Sprintf("WorkflowIdReusePolicy: %v", *(v.WorkflowIdReusePolicy)) + i++ + } return fmt.Sprintf("StartWorkflowExecutionRequest{%v}", strings.Join(fields[:i], ", ")) } +func _WorkflowIdReusePolicy_EqualsPtr(lhs, rhs *WorkflowIdReusePolicy) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return x.Equals(y) + } + return lhs == nil && rhs == nil +} + // Equals returns true if all the fields of this StartWorkflowExecutionRequest match the // provided StartWorkflowExecutionRequest. // @@ -20039,6 +20078,9 @@ func (v *StartWorkflowExecutionRequest) Equals(rhs *StartWorkflowExecutionReques if !_String_EqualsPtr(v.RequestId, rhs.RequestId) { return false } + if !_WorkflowIdReusePolicy_EqualsPtr(v.WorkflowIdReusePolicy, rhs.WorkflowIdReusePolicy) { + return false + } return true } @@ -20103,6 +20145,16 @@ func (v *StartWorkflowExecutionRequest) GetRequestId() (o string) { return } +// GetWorkflowIdReusePolicy returns the value of WorkflowIdReusePolicy if it is set or its +// zero value if it is unset. +func (v *StartWorkflowExecutionRequest) GetWorkflowIdReusePolicy() (o WorkflowIdReusePolicy) { + if v.WorkflowIdReusePolicy != nil { + return *v.WorkflowIdReusePolicy + } + + return +} + type StartWorkflowExecutionResponse struct { RunId *string `json:"runId,omitempty"` } @@ -24929,6 +24981,145 @@ func (v *WorkflowExecutionTimedOutEventAttributes) GetTimeoutType() (o TimeoutTy return } +type WorkflowIdReusePolicy int32 + +const ( + WorkflowIdReusePolicyAllowDuplicateFailedOnly WorkflowIdReusePolicy = 0 + WorkflowIdReusePolicyAllowDuplicate WorkflowIdReusePolicy = 1 + WorkflowIdReusePolicyRejectDuplicate WorkflowIdReusePolicy = 2 +) + +// WorkflowIdReusePolicy_Values returns all recognized values of WorkflowIdReusePolicy. +func WorkflowIdReusePolicy_Values() []WorkflowIdReusePolicy { + return []WorkflowIdReusePolicy{ + WorkflowIdReusePolicyAllowDuplicateFailedOnly, + WorkflowIdReusePolicyAllowDuplicate, + WorkflowIdReusePolicyRejectDuplicate, + } +} + +// UnmarshalText tries to decode WorkflowIdReusePolicy from a byte slice +// containing its name. +// +// var v WorkflowIdReusePolicy +// err := v.UnmarshalText([]byte("AllowDuplicateFailedOnly")) +func (v *WorkflowIdReusePolicy) UnmarshalText(value []byte) error { + switch string(value) { + case "AllowDuplicateFailedOnly": + *v = WorkflowIdReusePolicyAllowDuplicateFailedOnly + return nil + case "AllowDuplicate": + *v = WorkflowIdReusePolicyAllowDuplicate + return nil + case "RejectDuplicate": + *v = WorkflowIdReusePolicyRejectDuplicate + return nil + default: + return fmt.Errorf("unknown enum value %q for %q", value, "WorkflowIdReusePolicy") + } +} + +// ToWire translates WorkflowIdReusePolicy into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// Enums are represented as 32-bit integers over the wire. +func (v WorkflowIdReusePolicy) ToWire() (wire.Value, error) { + return wire.NewValueI32(int32(v)), nil +} + +// FromWire deserializes WorkflowIdReusePolicy from its Thrift-level +// representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TI32) +// if err != nil { +// return WorkflowIdReusePolicy(0), err +// } +// +// var v WorkflowIdReusePolicy +// if err := v.FromWire(x); err != nil { +// return WorkflowIdReusePolicy(0), err +// } +// return v, nil +func (v *WorkflowIdReusePolicy) FromWire(w wire.Value) error { + *v = (WorkflowIdReusePolicy)(w.GetI32()) + return nil +} + +// String returns a readable string representation of WorkflowIdReusePolicy. +func (v WorkflowIdReusePolicy) String() string { + w := int32(v) + switch w { + case 0: + return "AllowDuplicateFailedOnly" + case 1: + return "AllowDuplicate" + case 2: + return "RejectDuplicate" + } + return fmt.Sprintf("WorkflowIdReusePolicy(%d)", w) +} + +// Equals returns true if this WorkflowIdReusePolicy value matches the provided +// value. +func (v WorkflowIdReusePolicy) Equals(rhs WorkflowIdReusePolicy) bool { + return v == rhs +} + +// MarshalJSON serializes WorkflowIdReusePolicy into JSON. +// +// If the enum value is recognized, its name is returned. Otherwise, +// its integer value is returned. +// +// This implements json.Marshaler. +func (v WorkflowIdReusePolicy) MarshalJSON() ([]byte, error) { + switch int32(v) { + case 0: + return ([]byte)("\"AllowDuplicateFailedOnly\""), nil + case 1: + return ([]byte)("\"AllowDuplicate\""), nil + case 2: + return ([]byte)("\"RejectDuplicate\""), nil + } + return ([]byte)(strconv.FormatInt(int64(v), 10)), nil +} + +// UnmarshalJSON attempts to decode WorkflowIdReusePolicy from its JSON +// representation. +// +// This implementation supports both, numeric and string inputs. If a +// string is provided, it must be a known enum name. +// +// This implements json.Unmarshaler. +func (v *WorkflowIdReusePolicy) UnmarshalJSON(text []byte) error { + d := json.NewDecoder(bytes.NewReader(text)) + d.UseNumber() + t, err := d.Token() + if err != nil { + return err + } + + switch w := t.(type) { + case json.Number: + x, err := w.Int64() + if err != nil { + return err + } + if x > math.MaxInt32 { + return fmt.Errorf("enum overflow from JSON %q for %q", text, "WorkflowIdReusePolicy") + } + if x < math.MinInt32 { + return fmt.Errorf("enum underflow from JSON %q for %q", text, "WorkflowIdReusePolicy") + } + *v = (WorkflowIdReusePolicy)(x) + return nil + case string: + return v.UnmarshalText([]byte(w)) + default: + return fmt.Errorf("invalid JSON value %q (%T) to unmarshal into %q", t, t, "WorkflowIdReusePolicy") + } +} + type WorkflowQuery struct { QueryType *string `json:"queryType,omitempty"` QueryArgs []byte `json:"queryArgs,omitempty"` diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index ed63d069549..8dab111a17b 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -239,19 +239,20 @@ const ( `and task_id = ? ` + `IF range_id = ?` - templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions ` + - `SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?}` + + templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` + + `SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + `and workflow_id = ? ` + `and run_id = ? ` + `and visibility_ts = ? ` + - `and task_id = ? ` + `and task_id = ? ` + + `IF current_run_id = ? ` templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` + `shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution) ` + - `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?}) IF NOT EXISTS` + `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 ` templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` + `shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` + @@ -286,7 +287,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ?` - templateGetCurrentExecutionQuery = `SELECT current_run_id ` + + templateGetCurrentExecutionQuery = `SELECT current_run_id, execution ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -417,14 +418,9 @@ const ( `and task_id = ? ` + `IF next_event_id = ?` - templateDeleteWorkflowExecutionQuery = `DELETE FROM executions ` + - `WHERE shard_id = ? ` + - `and type = ? ` + - `and domain_id = ? ` + - `and workflow_id = ? ` + - `and run_id = ? ` + - `and visibility_ts = ? ` + - `and task_id = ? ` + templateDeleteWorkflowExecutionQueryWithTTL = `INSERT INTO executions ` + + `(shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution) ` + + `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) USING TTL ? ` templateDeleteWorkflowExecutionMutableStateQuery = `DELETE FROM executions ` + `WHERE shard_id = ? ` + @@ -797,12 +793,15 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx if execution, ok := previous["execution"].(map[string]interface{}); ok { // CreateWorkflowExecution failed because it already exists + executionInfo := createWorkflowExecutionInfo(execution) msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)", - execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")) - return nil, &workflow.WorkflowExecutionAlreadyStartedError{ - Message: common.StringPtr(msg), - StartRequestId: common.StringPtr(fmt.Sprintf("%v", execution["create_request_id"])), - RunId: common.StringPtr(fmt.Sprintf("%v", execution["run_id"])), + request.Execution.GetWorkflowId(), executionInfo.RunID, request.RangeID, strings.Join(columns, ",")) + return nil, &WorkflowExecutionAlreadyStartedError{ + Msg: msg, + StartRequestID: executionInfo.CreateRequestID, + RunID: executionInfo.RunID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, } } } @@ -832,18 +831,37 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *CreateWorkflowExecutionRequest, batch *gocql.Batch, cqlNowTimestamp int64) { + + parentDomainID := emptyDomainID + parentWorkflowID := "" + parentRunID := emptyRunID + initiatedID := emptyInitiatedID + state := WorkflowStateRunning + closeStatus := WorkflowCloseStatusNone + if request.ParentExecution != nil { + parentDomainID = request.ParentDomainID + parentWorkflowID = *request.ParentExecution.WorkflowId + parentRunID = *request.ParentExecution.RunId + initiatedID = request.InitiatedID + state = WorkflowStateCreated + } + if request.ContinueAsNew { batch.Query(templateUpdateCurrentWorkflowExecutionQuery, *request.Execution.RunId, *request.Execution.RunId, request.RequestID, + state, + closeStatus, d.shardID, rowTypeExecution, request.DomainID, *request.Execution.WorkflowId, permanentRunID, defaultVisibilityTimestamp, - rowTypeExecutionTaskID) + rowTypeExecutionTaskID, + request.PreviousRunID, + ) } else { batch.Query(templateCreateWorkflowExecutionQuery, d.shardID, @@ -855,18 +873,10 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat rowTypeExecutionTaskID, *request.Execution.RunId, *request.Execution.RunId, - request.RequestID) - } - - parentDomainID := emptyDomainID - parentWorkflowID := "" - parentRunID := emptyRunID - initiatedID := emptyInitiatedID - if request.ParentExecution != nil { - parentDomainID = request.ParentDomainID - parentWorkflowID = *request.ParentExecution.WorkflowId - parentRunID = *request.ParentExecution.RunId - initiatedID = request.InitiatedID + request.RequestID, + state, + closeStatus, + ) } batch.Query(templateCreateWorkflowExecutionQuery2, @@ -1065,16 +1075,23 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp) d.createTransferTasks(batch, startReq.TransferTasks, startReq.DomainID, *startReq.Execution.WorkflowId, *startReq.Execution.RunId, cqlNowTimestamp) - } else if request.CloseExecution { - // Delete WorkflowExecution row representing current execution - batch.Query(templateDeleteWorkflowExecutionQuery, + } else if request.FinishExecution { + // Delete WorkflowExecution row representing current execution, by using a TTL + batch.Query(templateDeleteWorkflowExecutionQueryWithTTL, d.shardID, rowTypeExecution, executionInfo.DomainID, executionInfo.WorkflowID, permanentRunID, defaultVisibilityTimestamp, - rowTypeExecutionTaskID) + rowTypeExecutionTaskID, + executionInfo.RunID, + executionInfo.RunID, + executionInfo.CreateRequestID, + executionInfo.State, + executionInfo.CloseStatus, + request.FinishedExecutionTTL, + ) } // Verifies that the RangeID has not changed @@ -1203,8 +1220,8 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR defaultVisibilityTimestamp, rowTypeExecutionTaskID) - var currentRunID string - if err := query.Scan(¤tRunID); err != nil { + result := make(map[string]interface{}) + if err := query.MapScan(result); err != nil { if err == gocql.ErrNotFound { return nil, &workflow.EntityNotExistsError{ Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v", @@ -1221,7 +1238,14 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR } } - return &GetCurrentExecutionResponse{RunID: currentRunID}, nil + currentRunID := result["current_run_id"].(gocql.UUID).String() + executionInfo := createWorkflowExecutionInfo(result["execution"].(map[string]interface{})) + return &GetCurrentExecutionResponse{ + RunID: currentRunID, + StartRequestID: executionInfo.CreateRequestID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, + }, nil } func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) { diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index 4aa764a8954..69927df3f9c 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -80,9 +80,9 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() { task1, err1 := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType1", 20, 14, nil, 3, 0, 2, nil) s.NotNil(err1, "Expected workflow creation to fail.") log.Infof("Unable to start workflow execution: %v", err1) - startedErr, ok := err1.(*gen.WorkflowExecutionAlreadyStartedError) + startedErr, ok := err1.(*WorkflowExecutionAlreadyStartedError) s.True(ok) - s.Equal(workflowExecution.RunId, startedErr.RunId, startedErr.Message) + s.Equal(workflowExecution.GetRunId(), startedErr.RunID, startedErr.Msg) s.Empty(task1, "Expected empty task identifier.") response, err2 := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ @@ -353,7 +353,7 @@ func (s *cassandraPersistenceSuite) TestGetCurrentWorkflow() { s.Nil(err0, "No error expected.") s.NotEmpty(task0, "Expected non empty task identifier.") - runID0, err1 := s.GetCurrentWorkflow(domainID, *workflowExecution.WorkflowId) + runID0, err1 := s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId) s.Nil(err1, "No error expected.") s.Equal(*workflowExecution.RunId, runID0) @@ -363,24 +363,21 @@ func (s *cassandraPersistenceSuite) TestGetCurrentWorkflow() { updatedInfo1 := copyWorkflowExecutionInfo(info0.ExecutionInfo) updatedInfo1.NextEventID = int64(6) updatedInfo1.LastProcessedEvent = int64(2) - err3 := s.UpdateWorkflowExecutionAndDelete(updatedInfo1, int64(3)) + err3 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3)) s.Nil(err3, "No error expected.") - _, err4 := s.GetCurrentWorkflow(domainID, *workflowExecution.WorkflowId) - s.NotNil(err4, "No error expected.") - s.IsType(&gen.EntityNotExistsError{}, err4) + runID4, err4 := s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId) + s.Nil(err4, "No error expected.") + s.Equal(*workflowExecution.RunId, runID4) workflowExecution2 := gen.WorkflowExecution{ WorkflowId: common.StringPtr("get-current-workflow-test"), RunId: common.StringPtr("c3ff4bc6-de18-4643-83b2-037a33f45322"), } - task1, err5 := s.CreateWorkflowExecution(domainID, workflowExecution2, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil) - s.Nil(err5, "No error expected.") - s.NotEmpty(task1, "Expected non empty task identifier.") - runID1, err6 := s.GetCurrentWorkflow(domainID, *workflowExecution2.WorkflowId) - s.Nil(err6, "No error expected.") - s.Equal(*workflowExecution2.RunId, runID1) + task1, err5 := s.CreateWorkflowExecution(domainID, workflowExecution2, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil) + s.NotNil(err5, "Error expected.") + s.Empty(task1, "Expected empty task identifier.") } func (s *cassandraPersistenceSuite) TestTransferTasks() { @@ -470,15 +467,16 @@ func (s *cassandraPersistenceSuite) TestTransferTasksThroughUpdate() { updatedInfo1 := copyWorkflowExecutionInfo(info1) updatedInfo1.NextEventID = int64(6) updatedInfo1.LastProcessedEvent = int64(2) - err5 := s.UpdateWorkflowExecutionAndDelete(updatedInfo1, int64(5)) + err5 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(5)) s.Nil(err5, "No error expected.") newExecution := gen.WorkflowExecution{ - WorkflowId: common.StringPtr("get-transfer-tasks-through-update-test"), + WorkflowId: workflowExecution.WorkflowId, RunId: common.StringPtr("2a038c8f-b575-4151-8d2c-d443e999ab5a"), } - _, err6 := s.GetCurrentWorkflow(domainID, "get-transfer-tasks-through-update-test") - s.NotNil(err6, "Entity exist error expected.") + runID6, err6 := s.GetCurrentWorkflowRunID(domainID, newExecution.GetWorkflowId()) + s.Nil(err6) + s.Equal(*workflowExecution.RunId, runID6) tasks3, err7 := s.GetTransferTasks(1) s.Nil(err7, "No error expected.") @@ -497,7 +495,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasksThroughUpdate() { s.Nil(err9) _, err10 := s.CreateWorkflowExecution(domainID, newExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil) - s.Nil(err10, "No error expected.") + s.NotNil(err10, "Error expected.") } func (s *cassandraPersistenceSuite) TestCancelTransferTaskTasks() { @@ -994,7 +992,7 @@ func (s *cassandraPersistenceSuite) TestContinueAsNew() { s.Equal(common.EmptyEventID, newExecutionInfo.LastProcessedEvent) s.Equal(int64(2), newExecutionInfo.DecisionScheduleID) - newRunID, err5 := s.GetCurrentWorkflow(domainID, *workflowExecution.WorkflowId) + newRunID, err5 := s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId) s.Nil(err5) s.Equal(*newWorkflowExecution.RunId, newRunID) } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index be075b8bfc7..0a92fa3b5d0 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -94,6 +94,15 @@ type ( Msg string } + // WorkflowExecutionAlreadyStartedError is returned when creating a new workflow failed. + WorkflowExecutionAlreadyStartedError struct { + Msg string + StartRequestID string + RunID string + State int + CloseStatus int + } + // TimeoutError is returned when a write operation fails due to a timeout TimeoutError struct { Msg string @@ -371,6 +380,8 @@ type ( DecisionStartedID int64 DecisionStartToCloseTimeout int32 ContinueAsNew bool + PreviousRunID string + ExecutionInfo *WorkflowExecutionInfo } // CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest @@ -397,19 +408,23 @@ type ( // GetCurrentExecutionResponse is the response to GetCurrentExecution GetCurrentExecutionResponse struct { - RunID string + StartRequestID string + RunID string + State int + CloseStatus int } // UpdateWorkflowExecutionRequest is used to update a workflow execution UpdateWorkflowExecutionRequest struct { - ExecutionInfo *WorkflowExecutionInfo - TransferTasks []Task - TimerTasks []Task - DeleteTimerTask Task - Condition int64 - RangeID int64 - ContinueAsNew *CreateWorkflowExecutionRequest - CloseExecution bool + ExecutionInfo *WorkflowExecutionInfo + TransferTasks []Task + TimerTasks []Task + DeleteTimerTask Task + Condition int64 + RangeID int64 + ContinueAsNew *CreateWorkflowExecutionRequest + FinishExecution bool + FinishedExecutionTTL int32 // Mutable state UpsertActivityInfos []*ActivityInfo @@ -716,6 +731,10 @@ func (e *ShardOwnershipLostError) Error() string { return e.Msg } +func (e *WorkflowExecutionAlreadyStartedError) Error() string { + return e.Msg +} + func (e *TimeoutError) Error() string { return e.Msg } diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 4793223c348..5f4fac6abcf 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -203,7 +203,7 @@ func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) er // CreateWorkflowExecution is a utility method to create workflow executions func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleID int64, timerTasks []Task) (string, error) { + decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) { response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ RequestID: uuid.New(), DomainID: domainID, @@ -230,17 +230,13 @@ func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution wo DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // CreateWorkflowExecutionManyTasks is a utility method to create workflow executions func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error) { + decisionScheduleIDs []int64, activityScheduleIDs []int64) (*CreateWorkflowExecutionResponse, error) { transferTasks := []Task{} for _, decisionScheduleID := range decisionScheduleIDs { @@ -278,18 +274,14 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExe DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // CreateChildWorkflowExecution is a utility method to create child workflow executions func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleID int64, timerTasks []Task) (string, error) { + decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) { response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ RequestID: uuid.New(), DomainID: domainID, @@ -319,11 +311,7 @@ func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecuti DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // GetWorkflowExecutionInfo is a utility method to retrieve execution info @@ -340,8 +328,8 @@ func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution w return response.State, nil } -// GetCurrentWorkflow returns the workflow state for the given params -func (s *TestBase) GetCurrentWorkflow(domainID, workflowID string) (string, error) { +// GetCurrentWorkflowRunID returns the workflow run ID for the given params +func (s *TestBase) GetCurrentWorkflowRunID(domainID, workflowID string) (string, error) { response, err := s.WorkflowMgr.GetCurrentExecution(&GetCurrentExecutionRequest{ DomainID: domainID, WorkflowID: workflowID, @@ -393,6 +381,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co DecisionStartedID: common.EmptyEventID, DecisionStartToCloseTimeout: 1, ContinueAsNew: true, + PreviousRunID: updatedInfo.RunID, }, }) } @@ -407,8 +396,8 @@ func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, d upsertTimerInfos, deleteTimerInfos, nil, nil, nil, nil) } -// UpdateWorkflowExecutionAndDelete is a utility method to update workflow execution -func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error { +// UpdateWorkflowExecutionAndFinish is a utility method to update workflow execution +func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *WorkflowExecutionInfo, condition int64) error { transferTasks := []Task{} transferTasks = append(transferTasks, &CloseExecutionTask{TaskID: s.GetNextSequenceNumber()}) return s.WorkflowMgr.UpdateWorkflowExecution(&UpdateWorkflowExecutionRequest{ @@ -422,7 +411,7 @@ func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecuti DeleteActivityInfo: nil, UpserTimerInfos: nil, DeleteTimerInfos: nil, - CloseExecution: true, + FinishExecution: true, }) } diff --git a/idl/github.com/uber/cadence/shared.thrift b/idl/github.com/uber/cadence/shared.thrift index 6a4c5d77fc5..9784dd219cb 100644 --- a/idl/github.com/uber/cadence/shared.thrift +++ b/idl/github.com/uber/cadence/shared.thrift @@ -54,6 +54,24 @@ exception QueryFailedError { 1: required string message } +enum WorkflowIdReusePolicy { + /* + * allow start a workflow execution using the same workflow ID, + * when workflow not running, and the last execution close state is in + * [terminated, cancelled, timeouted, failed]. + */ + AllowDuplicateFailedOnly, + /* + * allow start a workflow execution using the same workflow ID, + * when workflow not running. + */ + AllowDuplicate, + /* + * do not allow start a workflow execution using the same workflow ID at all + */ + RejectDuplicate, +} + enum DomainStatus { REGISTERED, DEPRECATED, @@ -693,6 +711,7 @@ struct StartWorkflowExecutionRequest { 70: optional i32 taskStartToCloseTimeoutSeconds 80: optional string identity 90: optional string requestId + 100: optional WorkflowIdReusePolicy workflowIdReusePolicy } struct StartWorkflowExecutionResponse { diff --git a/service/history/handler.go b/service/history/handler.go index 1fc903f9917..8bb41ba7b87 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -113,7 +113,7 @@ func (h *Handler) Start() error { } h.hServiceResolver = hServiceResolver h.controller = newShardController(h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr, - h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient()) + h.metadataMgr, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient()) h.metricsClient = h.GetMetricsClient() h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID) // events notifier must starts before controller @@ -137,7 +137,7 @@ func (h *Handler) Stop() { // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard func (h *Handler) CreateEngine(context ShardContext) Engine { - return NewEngineWithShardContext(context, h.metadataMgr, h.visibilityMgr, + return NewEngineWithShardContext(context, context.GetDomainCache(), h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, h.historyEventNotifier) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index c831ebdadd1..924ad1f52df 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -51,7 +51,6 @@ const ( type ( historyEngineImpl struct { shard ShardContext - metadataMgr persistence.MetadataManager historyMgr persistence.HistoryManager executionManager persistence.ExecutionManager txProcessor transferQueueProcessor @@ -84,10 +83,19 @@ var ( ErrConflict = errors.New("Conditional update failed") // ErrMaxAttemptsExceeded is exported temporarily for integration test ErrMaxAttemptsExceeded = errors.New("Maximum attempts exceeded to update history") + + // FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy + // for start workflow execution API + FailedWorkflowCloseState = map[int]bool{ + persistence.WorkflowCloseStatusFailed: true, + persistence.WorkflowCloseStatusCanceled: true, + persistence.WorkflowCloseStatusTerminated: true, + persistence.WorkflowCloseStatusTimedOut: true, + } ) // NewEngineWithShardContext creates an instance of history engine -func NewEngineWithShardContext(shard ShardContext, metadataMgr persistence.MetadataManager, +func NewEngineWithShardContext(shard ShardContext, domainCache cache.DomainCache, visibilityMgr persistence.VisibilityManager, matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier) Engine { shardWrapper := &shardContextWrapper{ @@ -99,10 +107,8 @@ func NewEngineWithShardContext(shard ShardContext, metadataMgr persistence.Metad executionManager := shard.GetExecutionManager() historyManager := shard.GetHistoryManager() historyCache := newHistoryCache(shard, logger) - domainCache := cache.NewDomainCache(metadataMgr, logger) historyEngImpl := &historyEngineImpl{ shard: shard, - metadataMgr: metadataMgr, historyMgr: historyManager, executionManager: executionManager, tokenSerializer: common.NewJSONTaskTokenSerializer(), @@ -151,7 +157,6 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow } request := startRequest.StartRequest - executionID := *request.WorkflowId if request.ExecutionStartToCloseTimeoutSeconds == nil || *request.ExecutionStartToCloseTimeoutSeconds <= 0 { return nil, &workflow.BadRequestError{Message: "Missing or invalid ExecutionStartToCloseTimeoutSeconds."} @@ -160,12 +165,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow return nil, &workflow.BadRequestError{Message: "Missing or invalid TaskStartToCloseTimeoutSeconds."} } - // We generate a new workflow execution run_id on each StartWorkflowExecution call. This generated run_id is - // returned back to the caller as the response to StartWorkflowExecution. - runID := uuid.New() - workflowExecution := workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(executionID), - RunId: common.StringPtr(runID), + execution := workflow.WorkflowExecution{ + WorkflowId: request.WorkflowId, + RunId: common.StringPtr(uuid.New()), } var parentExecution *workflow.WorkflowExecution @@ -181,7 +183,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow // Generate first decision task event. taskList := *request.TaskList.Name msBuilder := newMutableStateBuilder(e.shard.GetConfig(), e.logger) - startedEvent := msBuilder.AddWorkflowExecutionStartedEvent(domainID, workflowExecution, request) + startedEvent := msBuilder.AddWorkflowExecutionStartedEvent(domainID, execution, request) if startedEvent == nil { return nil, &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."} } @@ -213,81 +215,138 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow serializedHistory, serializedError := msBuilder.hBuilder.Serialize() if serializedError != nil { logging.LogHistorySerializationErrorEvent(e.logger, serializedError, fmt.Sprintf( - "HistoryEventBatch serialization error on start workflow. WorkflowID: %v, RunID: %v", executionID, runID)) + "HistoryEventBatch serialization error on start workflow. WorkflowID: %v, RunID: %v", + execution.GetWorkflowId(), execution.GetRunId())) return nil, serializedError } - err1 := e.shard.AppendHistoryEvents(&persistence.AppendHistoryEventsRequest{ + err = e.shard.AppendHistoryEvents(&persistence.AppendHistoryEventsRequest{ DomainID: domainID, - Execution: workflowExecution, + Execution: execution, // It is ok to use 0 for TransactionID because RunID is unique so there are // no potential duplicates to override. TransactionID: 0, FirstEventID: *startedEvent.EventId, Events: serializedHistory, }) - if err1 != nil { - return nil, err1 + if err != nil { + return nil, err } - _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ - RequestID: common.StringDefault(request.RequestId), - DomainID: domainID, - Execution: workflowExecution, - ParentDomainID: parentDomainID, - ParentExecution: parentExecution, - InitiatedID: initiatedID, - TaskList: *request.TaskList.Name, - WorkflowTypeName: *request.WorkflowType.Name, - WorkflowTimeout: *request.ExecutionStartToCloseTimeoutSeconds, - DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds, - ExecutionContext: nil, - NextEventID: msBuilder.GetNextEventID(), - LastProcessedEvent: emptyEventID, - TransferTasks: transferTasks, - DecisionScheduleID: decisionScheduleID, - DecisionStartedID: decisionStartID, - DecisionStartToCloseTimeout: decisionTimeout, - ContinueAsNew: false, - TimerTasks: timerTasks, - }) + deleteEvents := func() { + // We created the history events but failed to create workflow execution, so cleanup the history which could cause + // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they + // are always created for a unique run_id which is not visible beyond this call yet. + // TODO: Handle error on deletion of execution history + e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: execution, + }) + } - if err != nil { - switch t := err.(type) { - case *workflow.WorkflowExecutionAlreadyStartedError: - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: workflowExecution, - }) + createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { + _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ + RequestID: common.StringDefault(request.RequestId), + DomainID: domainID, + Execution: execution, + ParentDomainID: parentDomainID, + ParentExecution: parentExecution, + InitiatedID: initiatedID, + TaskList: *request.TaskList.Name, + WorkflowTypeName: *request.WorkflowType.Name, + WorkflowTimeout: *request.ExecutionStartToCloseTimeoutSeconds, + DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds, + ExecutionContext: nil, + NextEventID: msBuilder.GetNextEventID(), + LastProcessedEvent: emptyEventID, + TransferTasks: transferTasks, + DecisionScheduleID: decisionScheduleID, + DecisionStartedID: decisionStartID, + DecisionStartToCloseTimeout: decisionTimeout, + TimerTasks: timerTasks, + ContinueAsNew: !isBrandNew, + PreviousRunID: prevRunID, + }) - if common.StringDefault(t.StartRequestId) == common.StringDefault(request.RequestId) { - return &workflow.StartWorkflowExecutionResponse{ - RunId: t.RunId, - }, nil + if err != nil { + switch t := err.(type) { + case *persistence.WorkflowExecutionAlreadyStartedError: + if t.StartRequestID == common.StringDefault(request.RequestId) { + deleteEvents() + return t.RunID, nil + } + case *persistence.ShardOwnershipLostError: + deleteEvents() } - case *persistence.ShardOwnershipLostError: - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: workflowExecution, - }) + return "", err } + return execution.GetRunId(), nil + } - return nil, err + workflowExistsErrHandler := func(err *persistence.WorkflowExecutionAlreadyStartedError) error { + // set the prev run ID for database conditional update + prevStartRequestID := err.StartRequestID + prevRunID := err.RunID + prevState := err.State + prevCloseState := err.CloseStatus + + errFn := func(errMsg string, createRequestID string, workflowID string, runID string) error { + msg := fmt.Sprintf(errMsg, workflowID, runID) + return &workflow.WorkflowExecutionAlreadyStartedError{ + Message: common.StringPtr(msg), + StartRequestId: common.StringPtr(fmt.Sprintf("%v", createRequestID)), + RunId: common.StringPtr(fmt.Sprintf("%v", runID)), + } + } + + // here we know there is some information about the prev workflow, i.e. either running right now + // or has history check if the this workflow is finished + if prevState != persistence.WorkflowStateCompleted { + deleteEvents() + msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + } + switch startRequest.StartRequest.GetWorkflowIdReusePolicy() { + case workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly: + if _, ok := FailedWorkflowCloseState[prevCloseState]; !ok { + deleteEvents() + msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v. Workflow ID reuse policy: allow duplicate workflow ID if last run failed." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + } + case workflow.WorkflowIdReusePolicyAllowDuplicate: + // as long as workflow not running, so this case has no check + case workflow.WorkflowIdReusePolicyRejectDuplicate: + deleteEvents() + msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v. Workflow ID reuse policy: reject duplicate workflow ID." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + default: + deleteEvents() + return &workflow.InternalServiceError{Message: "Failed to process start workflow reuse policy."} + } + + return nil + } + + // try to create the workflow execution + isBrandNew := true + resultRunID := "" + resultRunID, err = createWorkflow(isBrandNew, "") + // if err still non nil, see if retry + if errExist, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { + if err = workflowExistsErrHandler(errExist); err == nil { + isBrandNew = false + resultRunID, err = createWorkflow(isBrandNew, errExist.RunID) + } } - e.timerProcessor.NotifyNewTimer(timerTasks) + if err == nil { + e.timerProcessor.NotifyNewTimer(timerTasks) - return &workflow.StartWorkflowExecutionResponse{ - RunId: workflowExecution.RunId, - }, nil + return &workflow.StartWorkflowExecutionResponse{ + RunId: common.StringPtr(resultRunID), + }, nil + } + return nil, err } // GetMutableState retrieves the mutable state of the workflow execution diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index ca5f6ee951f..0a8765f1af4 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -27,6 +27,8 @@ import ( "os" "testing" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -103,6 +105,7 @@ func (s *engine2Suite) SetupTest() { transferSequenceNumber: 1, executionManager: s.mockExecutionMgr, historyMgr: s.mockHistoryMgr, + domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.logger), shardManager: s.mockShardManager, maxTransferSequenceNumber: 100000, closeCh: s.shardClosedCh, @@ -691,6 +694,244 @@ func (s *engine2Suite) TestRespondDecisionTaskCompletedRecordMarkerDecision() { s.False(executionBuilder.HasPendingDecisionTask()) } +func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() { + domainID := "domainId" + workflowID := "workflowID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + + resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + }, + }) + s.Nil(err) + s.NotNil(resp.RunId) +} + +func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { + domainID := "domainId" + workflowID := "workflowID" + runID := "runID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + requestID := "requestID" + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: requestID, + RunID: runID, + State: persistence.WorkflowStateRunning, + CloseStatus: persistence.WorkflowCloseStatusNone, + }).Once() + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() + + resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(requestID), + }, + }) + s.Nil(err) + s.Equal(runID, resp.GetRunId()) +} + +func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { + domainID := "domainId" + workflowID := "workflowID" + runID := "runID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runID, + State: persistence.WorkflowStateRunning, + CloseStatus: persistence.WorkflowCloseStatusNone, + }).Once() + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() + + resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr("newRequestID"), + }, + }) + s.NotNil(err) + s.Nil(resp) +} + +func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { + domainID := "domainId" + workflowID := "workflowID" + runID := "runID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + + options := []workflow.WorkflowIdReusePolicy{ + workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly, + workflow.WorkflowIdReusePolicyAllowDuplicate, + workflow.WorkflowIdReusePolicyRejectDuplicate, + } + + expecedErrs := []bool{true, false, true} + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), + ).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runID, + State: persistence.WorkflowStateCompleted, + CloseStatus: persistence.WorkflowCloseStatusCompleted, + }).Times(len(expecedErrs)) + + for index, option := range options { + if !expecedErrs[index] { + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == true }), + ).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + } else { + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() + } + + resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr("newRequestID"), + WorkflowIdReusePolicy: &option, + }, + }) + + if expecedErrs[index] { + s.NotNil(err) + s.Nil(resp) + } else { + s.Nil(err) + s.NotNil(resp) + } + } +} + +func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { + domainID := "domainId" + workflowID := "workflowID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + + options := []workflow.WorkflowIdReusePolicy{ + workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly, + workflow.WorkflowIdReusePolicyAllowDuplicate, + workflow.WorkflowIdReusePolicyRejectDuplicate, + } + + expecedErrs := []bool{false, false, true} + + closeStates := []int{ + persistence.WorkflowCloseStatusFailed, + persistence.WorkflowCloseStatusCanceled, + persistence.WorkflowCloseStatusTerminated, + persistence.WorkflowCloseStatusTimedOut, + } + runIDs := []string{"1", "2", "3", "4"} + + for i, closeState := range closeStates { + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), + ).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runIDs[i], + State: persistence.WorkflowStateCompleted, + CloseStatus: closeState, + }).Times(len(expecedErrs)) + + for j, option := range options { + + if !expecedErrs[j] { + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == true }), + ).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + } else { + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() + } + + resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + StartRequest: &workflow.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr("newRequestID"), + WorkflowIdReusePolicy: &option, + }, + }) + + if expecedErrs[j] { + s.NotNil(err) + s.Nil(resp) + } else { + s.Nil(err) + s.NotNil(resp) + } + } + } +} + func (s *engine2Suite) getBuilder(domainID string, we workflow.WorkflowExecution) *mutableStateBuilder { context, release, err := s.historyEngine.historyCache.getOrCreateWorkflowExecution(domainID, we) if err != nil { diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 31111297762..7d22419363c 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -111,6 +111,7 @@ func (s *engineSuite) SetupTest() { transferSequenceNumber: 1, executionManager: s.mockExecutionMgr, historyMgr: s.mockHistoryMgr, + domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.logger), shardManager: s.mockShardManager, maxTransferSequenceNumber: 100000, closeCh: s.shardClosedCh, @@ -942,7 +943,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowSuccess() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( - &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once() + &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil) err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ DomainUUID: common.StringPtr(domainID), @@ -999,7 +1000,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowSuccess() { s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( - &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once() + &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil) err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ DomainUUID: common.StringPtr(domainID), diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 107b9a22475..64c4ec3e741 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -25,6 +25,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/cache" + log "github.com/sirupsen/logrus" "github.com/uber-common/bark" @@ -48,6 +50,7 @@ type ( transferSequenceNumber int64 historyMgr persistence.HistoryManager executionMgr persistence.ExecutionManager + domainCache cache.DomainCache config *Config logger bark.Logger metricsClient metrics.Client @@ -63,12 +66,13 @@ type ( var _ ShardContext = (*TestShardContext)(nil) func newTestShardContext(shardInfo *persistence.ShardInfo, transferSequenceNumber int64, historyMgr persistence.HistoryManager, - executionMgr persistence.ExecutionManager, config *Config, logger bark.Logger) *TestShardContext { + executionMgr persistence.ExecutionManager, domainCache cache.DomainCache, config *Config, logger bark.Logger) *TestShardContext { return &TestShardContext{ shardInfo: shardInfo, transferSequenceNumber: transferSequenceNumber, historyMgr: historyMgr, executionMgr: executionMgr, + domainCache: domainCache, config: config, logger: logger, metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), @@ -85,6 +89,11 @@ func (s *TestShardContext) GetHistoryManager() persistence.HistoryManager { return s.historyMgr } +// GetDomainCache test implementation +func (s *TestShardContext) GetDomainCache() cache.DomainCache { + return s.domainCache +} + // GetNextTransferTaskID test implementation func (s *TestShardContext) GetNextTransferTaskID() (int64, error) { return atomic.AddInt64(&s.transferSequenceNumber, 1), nil @@ -193,7 +202,8 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options persistence.TestBaseOpt s.TestBase.SetupWorkflowStoreWithOptions(options) log := bark.NewLoggerFromLogrus(log.New()) config := NewConfig(1) - s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, config, log) + domainCache := cache.NewDomainCache(s.MetadataManager, log) + s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, domainCache, config, log) s.TestBase.TaskIDGenerator = s.ShardContext } @@ -202,6 +212,7 @@ func (s *TestBase) SetupWorkflowStore() { s.TestBase.SetupWorkflowStore() log := bark.NewLoggerFromLogrus(log.New()) config := NewConfig(1) - s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, config, log) + domainCache := cache.NewDomainCache(s.MetadataManager, log) + s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, domainCache, config, log) s.TestBase.TaskIDGenerator = s.ShardContext } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index b1d9b472dba..6e514a99fe9 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -1398,7 +1398,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int "{OutStandingActivityTasks: %v, HasPendingDecision: %v}", len(e.pendingActivityInfoIDs), e.HasPendingDecisionTask())) } - + prevRunID := e.executionInfo.RunID e.executionInfo.State = persistence.WorkflowStateCompleted e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusContinuedAsNew newExecution := workflow.WorkflowExecution{ @@ -1453,6 +1453,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int DecisionStartedID: di.StartedID, DecisionStartToCloseTimeout: di.DecisionTimeout, ContinueAsNew: true, + PreviousRunID: prevRunID, } return e.hBuilder.AddContinuedAsNewEvent(decisionCompletedEventID, newRunID, attributes), newStateBuilder, nil diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 93a8fee4f8c..7b00d9d89ce 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" @@ -40,6 +42,7 @@ type ( ShardContext interface { GetExecutionManager() persistence.ExecutionManager GetHistoryManager() persistence.HistoryManager + GetDomainCache() cache.DomainCache GetNextTransferTaskID() (int64, error) GetTransferSequenceNumber() int64 GetTransferMaxReadLevel() int64 @@ -64,6 +67,7 @@ type ( shardManager persistence.ShardManager historyMgr persistence.HistoryManager executionManager persistence.ExecutionManager + domainCache cache.DomainCache closeCh chan<- int isClosed bool config *Config @@ -88,6 +92,10 @@ func (s *shardContextImpl) GetHistoryManager() persistence.HistoryManager { return s.historyMgr } +func (s *shardContextImpl) GetDomainCache() cache.DomainCache { + return s.domainCache +} + func (s *shardContextImpl) GetNextTransferTaskID() (int64, error) { s.Lock() defer s.Unlock() @@ -445,7 +453,7 @@ func (s *shardContextImpl) GetTimeSource() common.TimeSource { // TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter. func acquireShard(shardID int, shardManager persistence.ShardManager, historyMgr persistence.HistoryManager, - executionMgr persistence.ExecutionManager, owner string, closeCh chan<- int, config *Config, + executionMgr persistence.ExecutionManager, domainCache cache.DomainCache, owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext, error) { response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID}) if err0 != nil { @@ -463,6 +471,7 @@ func acquireShard(shardID int, shardManager persistence.ShardManager, historyMgr shardManager: shardManager, historyMgr: historyMgr, executionManager: executionMgr, + domainCache: domainCache, shardInfo: updatedShardInfo, closeCh: closeCh, metricsClient: metricsClient.Tagged(tags), diff --git a/service/history/shardController.go b/service/history/shardController.go index dabdacbe70d..f4b403eba2d 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/cache" + "github.com/uber-common/bark" "github.com/uber/cadence/common" @@ -46,6 +48,7 @@ type ( membershipUpdateCh chan *membership.ChangedEvent shardMgr persistence.ShardManager historyMgr persistence.HistoryManager + metadataMgr persistence.MetadataManager executionMgrFactory persistence.ExecutionManagerFactory engineFactory EngineFactory shardClosedCh chan int @@ -68,6 +71,7 @@ type ( shardMgr persistence.ShardManager historyMgr persistence.HistoryManager executionMgr persistence.ExecutionManager + domainCache cache.DomainCache engineFactory EngineFactory host *membership.HostInfo engine Engine @@ -78,7 +82,7 @@ type ( ) func newShardController(host *membership.HostInfo, resolver membership.ServiceResolver, - shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, + shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, metadataMgr persistence.MetadataManager, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, config *Config, logger bark.Logger, reporter metrics.Client) *shardController { return &shardController{ @@ -87,6 +91,7 @@ func newShardController(host *membership.HostInfo, resolver membership.ServiceRe membershipUpdateCh: make(chan *membership.ChangedEvent, 10), shardMgr: shardMgr, historyMgr: historyMgr, + metadataMgr: metadataMgr, executionMgrFactory: executionMgrFactory, engineFactory: factory, historyShards: make(map[int]*historyShardsItem), @@ -101,19 +106,22 @@ func newShardController(host *membership.HostInfo, resolver membership.ServiceRe } func newHistoryShardsItem(shardID int, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, - executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, host *membership.HostInfo, - config *Config, logger bark.Logger, reporter metrics.Client) (*historyShardsItem, error) { + metadataMgr persistence.MetadataManager, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, + host *membership.HostInfo, config *Config, logger bark.Logger, reporter metrics.Client) (*historyShardsItem, error) { executionMgr, err := executionMgrFactory.CreateExecutionManager(shardID) if err != nil { return nil, err } + domainCache := cache.NewDomainCache(metadataMgr, logger) + return &historyShardsItem{ shardID: shardID, shardMgr: shardMgr, historyMgr: historyMgr, executionMgr: executionMgr, + domainCache: domainCache, engineFactory: factory, host: host, config: config, @@ -209,8 +217,8 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar } if info.Identity() == c.host.Identity() { - shardItem, err := newHistoryShardsItem(shardID, c.shardMgr, c.historyMgr, c.executionMgrFactory, c.engineFactory, c.host, - c.config, c.logger, c.metricsClient) + shardItem, err := newHistoryShardsItem(shardID, c.shardMgr, c.historyMgr, c.metadataMgr, + c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient) if err != nil { return nil, err } @@ -368,7 +376,7 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine, logging.LogShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID) - context, err := acquireShard(i.shardID, i.shardMgr, i.historyMgr, i.executionMgr, i.host.Identity(), shardClosedCh, + context, err := acquireShard(i.shardID, i.shardMgr, i.historyMgr, i.executionMgr, i.domainCache, i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient) if err != nil { return nil, err diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index b62cfdd1654..bd3cd59c04f 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -49,6 +49,7 @@ type ( mockShardManager *mmocks.ShardManager mockExecutionMgrFactory *mmocks.ExecutionManagerFactory mockHistoryMgr *mmocks.HistoryManager + mockMetadaraMgr *mmocks.MetadataManager mockServiceResolver *mmocks.ServiceResolver mockEngineFactory *MockHistoryEngineFactory config *Config @@ -70,10 +71,11 @@ func (s *shardControllerSuite) SetupTest() { s.mockShardManager = &mmocks.ShardManager{} s.mockExecutionMgrFactory = &mmocks.ExecutionManagerFactory{} s.mockHistoryMgr = &mmocks.HistoryManager{} + s.mockMetadaraMgr = &mmocks.MetadataManager{} s.mockServiceResolver = &mmocks.ServiceResolver{} s.mockEngineFactory = &MockHistoryEngineFactory{} s.controller = newShardController(s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.mockMetadaraMgr, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) } func (s *shardControllerSuite) TearDownTest() { @@ -232,7 +234,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.mockMetadaraMgr, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -322,7 +324,7 @@ func (s *shardControllerSuite) TestRingUpdated() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.mockMetadaraMgr, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -399,7 +401,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) + s.mockMetadaraMgr, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 30918d85f53..6320538550f 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -96,6 +96,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() { closeCh: s.shardClosedCh, config: s.config, logger: s.logger, + domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.logger), metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), } @@ -223,7 +224,7 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( - &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once() + &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil) s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) { diff --git a/service/history/transferQueueProcessor_test.go b/service/history/transferQueueProcessor_test.go index fdd32f45e06..2e7b51edf61 100644 --- a/service/history/transferQueueProcessor_test.go +++ b/service/history/transferQueueProcessor_test.go @@ -213,7 +213,7 @@ func (s *transferQueueProcessorSuite) TestDeleteExecutionTransferTasks() { addCompleteWorkflowEvent(builder, *completeDecisionEvent.EventId, []byte("result")) updatedInfo1 := copyWorkflowExecutionInfo(builder.executionInfo) - err1 := s.UpdateWorkflowExecutionAndDelete(updatedInfo1, int64(3)) + err1 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3)) s.Nil(err1, "No error expected.") newExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("delete-execution-transfertasks-test"), @@ -249,7 +249,7 @@ workerPump: } _, err3 := s.CreateWorkflowExecution(domainID, newExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil) - s.Nil(err3, "No error expected.") + s.NotNil(err3, "Error expected.") s.logger.Infof("Execution created successfully: %v", err3) } @@ -276,7 +276,7 @@ func (s *transferQueueProcessorSuite) TestDeleteExecutionTransferTasksDomainNotE addCompleteWorkflowEvent(builder, *completeDecisionEvent.EventId, []byte("result")) updatedInfo1 := copyWorkflowExecutionInfo(builder.executionInfo) - err1 := s.UpdateWorkflowExecutionAndDelete(updatedInfo1, int64(3)) + err1 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3)) s.Nil(err1, "No error expected.") newExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("delete-execution-transfertasks-test"), diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 7a61068d444..36a566d8c1b 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -156,11 +156,18 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi } continueAsNew := updates.continueAsNew - deleteExecution := false + finishExecution := false + var finishExecutionTTL int32 if c.msBuilder.executionInfo.State == persistence.WorkflowStateCompleted { // Workflow execution completed as part of this transaction. - // Also transactionally delete workflow execution representing current run for the execution - deleteExecution = true + // Also transactionally delete workflow execution representing + // current run for the execution using cassandra TTL + finishExecution = true + _, domainConfig, err := c.shard.GetDomainCache().GetDomainByID(c.msBuilder.executionInfo.DomainID) + if err != nil { + return err + } + finishExecutionTTL = domainConfig.Retention } if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{ ExecutionInfo: c.msBuilder.executionInfo, @@ -177,7 +184,8 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi NewBufferedEvents: updates.newBufferedEvents, ClearBufferedEvents: updates.clearBufferedEvents, ContinueAsNew: continueAsNew, - CloseExecution: deleteExecution, + FinishExecution: finishExecution, + FinishedExecutionTTL: finishExecutionTTL, }); err1 != nil { switch err1.(type) { case *persistence.ConditionFailedError: @@ -272,15 +280,6 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithRetry( return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) } -func (c *workflowExecutionContext) deleteWorkflowExecutionWithRetry( - request *persistence.DeleteWorkflowExecutionRequest) error { - op := func() error { - return c.executionManager.DeleteWorkflowExecution(request) - } - - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) -} - func (c *workflowExecutionContext) clear() { c.msBuilder = nil }