From 8200f58811ef31a211d0dd20d970892d22e3d422 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Thu, 22 Mar 2018 07:34:39 -0700 Subject: [PATCH] Support workflow execution CRUD without replication state (#628) ReplicationState part of the workflow execution is not enabled in production as this feature is in development. This change makes sure no over head on exisiting workflow executions which does not have any ReplicationState. --- common/persistence/cassandraPersistence.go | 330 +++++++++++++-------- 1 file changed, 212 insertions(+), 118 deletions(-) diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 743a1b0e242..879e9d70766 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -302,6 +302,10 @@ const ( `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 ` templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` + + `shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` + + `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?) ` + + templateCreateWorkflowExecutionWithReplicationQuery = `INSERT INTO executions (` + `shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id) ` + `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType + `, ?, ?, ?) ` @@ -349,6 +353,17 @@ const ( `and task_id = ?` templateUpdateWorkflowExecutionQuery = `UPDATE executions ` + + `SET execution = ` + templateWorkflowExecutionType + `, next_event_id = ? ` + + `WHERE shard_id = ? ` + + `and type = ? ` + + `and domain_id = ? ` + + `and workflow_id = ? ` + + `and run_id = ? ` + + `and visibility_ts = ? ` + + `and task_id = ? ` + + `IF next_event_id = ?` + + templateUpdateWorkflowExecutionWithReplicationQuery = `UPDATE executions ` + `SET execution = ` + templateWorkflowExecutionType + `, replication_state = ` + templateReplicationStateType + `, next_event_id = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -644,15 +659,6 @@ const ( var ( defaultDateTime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) defaultVisibilityTimestamp = common.UnixNanoToCQLTimestamp(defaultDateTime.UnixNano()) - emptyReplicationInfo = map[string]*ReplicationInfo{} - emptyReplicationInfoMap = map[string]map[string]interface{}{} - emptyReplicationState = &ReplicationState{ - CurrentVersion: common.EmptyVersion, - StartVersion: common.EmptyVersion, - LastWriteVersion: common.EmptyVersion, - LastWriteEventID: common.EmptyEventID, - LastReplicationInfo: emptyReplicationInfo, - } ) type ( @@ -1007,65 +1013,106 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat ) } - var lastReplicationInfo map[string]map[string]interface{} if request.ReplicationState == nil { - request.ReplicationState = emptyReplicationState - lastReplicationInfo = emptyReplicationInfoMap + // Cross DC feature is currently disabled so we will be creating workflow executions without replication state + batch.Query(templateCreateWorkflowExecutionQuery2, + d.shardID, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + rowTypeExecution, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + parentDomainID, + parentWorkflowID, + parentRunID, + initiatedID, + nil, + request.TaskList, + request.WorkflowTypeName, + request.WorkflowTimeout, + request.DecisionTimeoutValue, + request.ExecutionContext, + WorkflowStateCreated, + WorkflowCloseStatusNone, + common.FirstEventID, + request.NextEventID, + request.LastProcessedEvent, + cqlNowTimestamp, + cqlNowTimestamp, + request.RequestID, + request.DecisionScheduleID, + request.DecisionStartedID, + "", // Decision Start Request ID + request.DecisionStartToCloseTimeout, + 0, + 0, + false, + "", + "", // sticky_task_list (no sticky tasklist for new workflow execution) + 0, // sticky_schedule_to_start_timeout + "", // client_library_version + "", // client_feature_version + "", // client_impl + request.NextEventID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID) } else { - lastReplicationInfo = make(map[string]map[string]interface{}) + lastReplicationInfo := make(map[string]map[string]interface{}) for k, v := range request.ReplicationState.LastReplicationInfo { lastReplicationInfo[k] = createReplicationInfoMap(v) } - } - batch.Query(templateCreateWorkflowExecutionQuery2, - d.shardID, - request.DomainID, - *request.Execution.WorkflowId, - *request.Execution.RunId, - rowTypeExecution, - request.DomainID, - *request.Execution.WorkflowId, - *request.Execution.RunId, - parentDomainID, - parentWorkflowID, - parentRunID, - initiatedID, - nil, - request.TaskList, - request.WorkflowTypeName, - request.WorkflowTimeout, - request.DecisionTimeoutValue, - request.ExecutionContext, - WorkflowStateCreated, - WorkflowCloseStatusNone, - common.FirstEventID, - request.NextEventID, - request.LastProcessedEvent, - cqlNowTimestamp, - cqlNowTimestamp, - request.RequestID, - request.DecisionScheduleID, - request.DecisionStartedID, - "", // Decision Start Request ID - request.DecisionStartToCloseTimeout, - 0, - 0, - false, - "", - "", // sticky_task_list (no sticky tasklist for new workflow execution) - 0, // sticky_schedule_to_start_timeout - "", // client_library_version - "", // client_feature_version - "", // client_impl - request.ReplicationState.CurrentVersion, - request.ReplicationState.StartVersion, - request.ReplicationState.LastWriteVersion, - request.ReplicationState.LastWriteEventID, - lastReplicationInfo, - request.NextEventID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID) + batch.Query(templateCreateWorkflowExecutionWithReplicationQuery, + d.shardID, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + rowTypeExecution, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + parentDomainID, + parentWorkflowID, + parentRunID, + initiatedID, + nil, + request.TaskList, + request.WorkflowTypeName, + request.WorkflowTimeout, + request.DecisionTimeoutValue, + request.ExecutionContext, + WorkflowStateCreated, + WorkflowCloseStatusNone, + common.FirstEventID, + request.NextEventID, + request.LastProcessedEvent, + cqlNowTimestamp, + cqlNowTimestamp, + request.RequestID, + request.DecisionScheduleID, + request.DecisionStartedID, + "", // Decision Start Request ID + request.DecisionStartToCloseTimeout, + 0, + 0, + false, + "", + "", // sticky_task_list (no sticky tasklist for new workflow execution) + 0, // sticky_schedule_to_start_timeout + "", // client_library_version + "", // client_feature_version + "", // client_impl + request.ReplicationState.CurrentVersion, + request.ReplicationState.StartVersion, + request.ReplicationState.LastWriteVersion, + request.ReplicationState.LastWriteEventID, + lastReplicationInfo, + request.NextEventID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID) + } } func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutionRequest) ( @@ -1164,70 +1211,113 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio } func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error { + batch := d.session.NewBatch(gocql.LoggedBatch) + cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) executionInfo := request.ExecutionInfo replicationState := request.ReplicationState - var lastReplicationInfo map[string]map[string]interface{} + if replicationState == nil { - replicationState = emptyReplicationState - lastReplicationInfo = emptyReplicationInfoMap + // Updates will be called with null ReplicationState while the feature is disabled + batch.Query(templateUpdateWorkflowExecutionQuery, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + executionInfo.ParentDomainID, + executionInfo.ParentWorkflowID, + executionInfo.ParentRunID, + executionInfo.InitiatedID, + executionInfo.CompletionEvent, + executionInfo.TaskList, + executionInfo.WorkflowTypeName, + executionInfo.WorkflowTimeout, + executionInfo.DecisionTimeoutValue, + executionInfo.ExecutionContext, + executionInfo.State, + executionInfo.CloseStatus, + executionInfo.LastFirstEventID, + executionInfo.NextEventID, + executionInfo.LastProcessedEvent, + executionInfo.StartTimestamp, + cqlNowTimestamp, + executionInfo.CreateRequestID, + executionInfo.DecisionScheduleID, + executionInfo.DecisionStartedID, + executionInfo.DecisionRequestID, + executionInfo.DecisionTimeout, + executionInfo.DecisionAttempt, + executionInfo.DecisionTimestamp, + executionInfo.CancelRequested, + executionInfo.CancelRequestID, + executionInfo.StickyTaskList, + executionInfo.StickyScheduleToStartTimeout, + executionInfo.ClientLibraryVersion, + executionInfo.ClientFeatureVersion, + executionInfo.ClientImpl, + executionInfo.NextEventID, + d.shardID, + rowTypeExecution, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + request.Condition) } else { - lastReplicationInfo = make(map[string]map[string]interface{}) + lastReplicationInfo := make(map[string]map[string]interface{}) for k, v := range replicationState.LastReplicationInfo { lastReplicationInfo[k] = createReplicationInfoMap(v) } - } - cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) - batch := d.session.NewBatch(gocql.LoggedBatch) - batch.Query(templateUpdateWorkflowExecutionQuery, - executionInfo.DomainID, - executionInfo.WorkflowID, - executionInfo.RunID, - executionInfo.ParentDomainID, - executionInfo.ParentWorkflowID, - executionInfo.ParentRunID, - executionInfo.InitiatedID, - executionInfo.CompletionEvent, - executionInfo.TaskList, - executionInfo.WorkflowTypeName, - executionInfo.WorkflowTimeout, - executionInfo.DecisionTimeoutValue, - executionInfo.ExecutionContext, - executionInfo.State, - executionInfo.CloseStatus, - executionInfo.LastFirstEventID, - executionInfo.NextEventID, - executionInfo.LastProcessedEvent, - executionInfo.StartTimestamp, - cqlNowTimestamp, - executionInfo.CreateRequestID, - executionInfo.DecisionScheduleID, - executionInfo.DecisionStartedID, - executionInfo.DecisionRequestID, - executionInfo.DecisionTimeout, - executionInfo.DecisionAttempt, - executionInfo.DecisionTimestamp, - executionInfo.CancelRequested, - executionInfo.CancelRequestID, - executionInfo.StickyTaskList, - executionInfo.StickyScheduleToStartTimeout, - executionInfo.ClientLibraryVersion, - executionInfo.ClientFeatureVersion, - executionInfo.ClientImpl, - replicationState.CurrentVersion, - replicationState.StartVersion, - replicationState.LastWriteVersion, - replicationState.LastWriteEventID, - lastReplicationInfo, - executionInfo.NextEventID, - d.shardID, - rowTypeExecution, - executionInfo.DomainID, - executionInfo.WorkflowID, - executionInfo.RunID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID, - request.Condition) + batch.Query(templateUpdateWorkflowExecutionWithReplicationQuery, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + executionInfo.ParentDomainID, + executionInfo.ParentWorkflowID, + executionInfo.ParentRunID, + executionInfo.InitiatedID, + executionInfo.CompletionEvent, + executionInfo.TaskList, + executionInfo.WorkflowTypeName, + executionInfo.WorkflowTimeout, + executionInfo.DecisionTimeoutValue, + executionInfo.ExecutionContext, + executionInfo.State, + executionInfo.CloseStatus, + executionInfo.LastFirstEventID, + executionInfo.NextEventID, + executionInfo.LastProcessedEvent, + executionInfo.StartTimestamp, + cqlNowTimestamp, + executionInfo.CreateRequestID, + executionInfo.DecisionScheduleID, + executionInfo.DecisionStartedID, + executionInfo.DecisionRequestID, + executionInfo.DecisionTimeout, + executionInfo.DecisionAttempt, + executionInfo.DecisionTimestamp, + executionInfo.CancelRequested, + executionInfo.CancelRequestID, + executionInfo.StickyTaskList, + executionInfo.StickyScheduleToStartTimeout, + executionInfo.ClientLibraryVersion, + executionInfo.ClientFeatureVersion, + executionInfo.ClientImpl, + replicationState.CurrentVersion, + replicationState.StartVersion, + replicationState.LastWriteVersion, + replicationState.LastWriteEventID, + lastReplicationInfo, + executionInfo.NextEventID, + d.shardID, + rowTypeExecution, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + request.Condition) + } d.createTransferTasks(batch, request.TransferTasks, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID) @@ -2483,6 +2573,10 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *WorkflowExecuti } func createReplicationState(result map[string]interface{}) *ReplicationState { + if len(result) == 0 { + return nil + } + info := &ReplicationState{} for k, v := range result { switch k {