diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 743a1b0e242..879e9d70766 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -302,6 +302,10 @@ const ( `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 ` templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` + + `shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` + + `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?) ` + + templateCreateWorkflowExecutionWithReplicationQuery = `INSERT INTO executions (` + `shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id) ` + `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType + `, ?, ?, ?) ` @@ -349,6 +353,17 @@ const ( `and task_id = ?` templateUpdateWorkflowExecutionQuery = `UPDATE executions ` + + `SET execution = ` + templateWorkflowExecutionType + `, next_event_id = ? ` + + `WHERE shard_id = ? ` + + `and type = ? ` + + `and domain_id = ? ` + + `and workflow_id = ? ` + + `and run_id = ? ` + + `and visibility_ts = ? ` + + `and task_id = ? ` + + `IF next_event_id = ?` + + templateUpdateWorkflowExecutionWithReplicationQuery = `UPDATE executions ` + `SET execution = ` + templateWorkflowExecutionType + `, replication_state = ` + templateReplicationStateType + `, next_event_id = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -644,15 +659,6 @@ const ( var ( defaultDateTime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) defaultVisibilityTimestamp = common.UnixNanoToCQLTimestamp(defaultDateTime.UnixNano()) - emptyReplicationInfo = map[string]*ReplicationInfo{} - emptyReplicationInfoMap = map[string]map[string]interface{}{} - emptyReplicationState = &ReplicationState{ - CurrentVersion: common.EmptyVersion, - StartVersion: common.EmptyVersion, - LastWriteVersion: common.EmptyVersion, - LastWriteEventID: common.EmptyEventID, - LastReplicationInfo: emptyReplicationInfo, - } ) type ( @@ -1007,65 +1013,106 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat ) } - var lastReplicationInfo map[string]map[string]interface{} if request.ReplicationState == nil { - request.ReplicationState = emptyReplicationState - lastReplicationInfo = emptyReplicationInfoMap + // Cross DC feature is currently disabled so we will be creating workflow executions without replication state + batch.Query(templateCreateWorkflowExecutionQuery2, + d.shardID, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + rowTypeExecution, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + parentDomainID, + parentWorkflowID, + parentRunID, + initiatedID, + nil, + request.TaskList, + request.WorkflowTypeName, + request.WorkflowTimeout, + request.DecisionTimeoutValue, + request.ExecutionContext, + WorkflowStateCreated, + WorkflowCloseStatusNone, + common.FirstEventID, + request.NextEventID, + request.LastProcessedEvent, + cqlNowTimestamp, + cqlNowTimestamp, + request.RequestID, + request.DecisionScheduleID, + request.DecisionStartedID, + "", // Decision Start Request ID + request.DecisionStartToCloseTimeout, + 0, + 0, + false, + "", + "", // sticky_task_list (no sticky tasklist for new workflow execution) + 0, // sticky_schedule_to_start_timeout + "", // client_library_version + "", // client_feature_version + "", // client_impl + request.NextEventID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID) } else { - lastReplicationInfo = make(map[string]map[string]interface{}) + lastReplicationInfo := make(map[string]map[string]interface{}) for k, v := range request.ReplicationState.LastReplicationInfo { lastReplicationInfo[k] = createReplicationInfoMap(v) } - } - batch.Query(templateCreateWorkflowExecutionQuery2, - d.shardID, - request.DomainID, - *request.Execution.WorkflowId, - *request.Execution.RunId, - rowTypeExecution, - request.DomainID, - *request.Execution.WorkflowId, - *request.Execution.RunId, - parentDomainID, - parentWorkflowID, - parentRunID, - initiatedID, - nil, - request.TaskList, - request.WorkflowTypeName, - request.WorkflowTimeout, - request.DecisionTimeoutValue, - request.ExecutionContext, - WorkflowStateCreated, - WorkflowCloseStatusNone, - common.FirstEventID, - request.NextEventID, - request.LastProcessedEvent, - cqlNowTimestamp, - cqlNowTimestamp, - request.RequestID, - request.DecisionScheduleID, - request.DecisionStartedID, - "", // Decision Start Request ID - request.DecisionStartToCloseTimeout, - 0, - 0, - false, - "", - "", // sticky_task_list (no sticky tasklist for new workflow execution) - 0, // sticky_schedule_to_start_timeout - "", // client_library_version - "", // client_feature_version - "", // client_impl - request.ReplicationState.CurrentVersion, - request.ReplicationState.StartVersion, - request.ReplicationState.LastWriteVersion, - request.ReplicationState.LastWriteEventID, - lastReplicationInfo, - request.NextEventID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID) + batch.Query(templateCreateWorkflowExecutionWithReplicationQuery, + d.shardID, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + rowTypeExecution, + request.DomainID, + *request.Execution.WorkflowId, + *request.Execution.RunId, + parentDomainID, + parentWorkflowID, + parentRunID, + initiatedID, + nil, + request.TaskList, + request.WorkflowTypeName, + request.WorkflowTimeout, + request.DecisionTimeoutValue, + request.ExecutionContext, + WorkflowStateCreated, + WorkflowCloseStatusNone, + common.FirstEventID, + request.NextEventID, + request.LastProcessedEvent, + cqlNowTimestamp, + cqlNowTimestamp, + request.RequestID, + request.DecisionScheduleID, + request.DecisionStartedID, + "", // Decision Start Request ID + request.DecisionStartToCloseTimeout, + 0, + 0, + false, + "", + "", // sticky_task_list (no sticky tasklist for new workflow execution) + 0, // sticky_schedule_to_start_timeout + "", // client_library_version + "", // client_feature_version + "", // client_impl + request.ReplicationState.CurrentVersion, + request.ReplicationState.StartVersion, + request.ReplicationState.LastWriteVersion, + request.ReplicationState.LastWriteEventID, + lastReplicationInfo, + request.NextEventID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID) + } } func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutionRequest) ( @@ -1164,70 +1211,113 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio } func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error { + batch := d.session.NewBatch(gocql.LoggedBatch) + cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) executionInfo := request.ExecutionInfo replicationState := request.ReplicationState - var lastReplicationInfo map[string]map[string]interface{} + if replicationState == nil { - replicationState = emptyReplicationState - lastReplicationInfo = emptyReplicationInfoMap + // Updates will be called with null ReplicationState while the feature is disabled + batch.Query(templateUpdateWorkflowExecutionQuery, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + executionInfo.ParentDomainID, + executionInfo.ParentWorkflowID, + executionInfo.ParentRunID, + executionInfo.InitiatedID, + executionInfo.CompletionEvent, + executionInfo.TaskList, + executionInfo.WorkflowTypeName, + executionInfo.WorkflowTimeout, + executionInfo.DecisionTimeoutValue, + executionInfo.ExecutionContext, + executionInfo.State, + executionInfo.CloseStatus, + executionInfo.LastFirstEventID, + executionInfo.NextEventID, + executionInfo.LastProcessedEvent, + executionInfo.StartTimestamp, + cqlNowTimestamp, + executionInfo.CreateRequestID, + executionInfo.DecisionScheduleID, + executionInfo.DecisionStartedID, + executionInfo.DecisionRequestID, + executionInfo.DecisionTimeout, + executionInfo.DecisionAttempt, + executionInfo.DecisionTimestamp, + executionInfo.CancelRequested, + executionInfo.CancelRequestID, + executionInfo.StickyTaskList, + executionInfo.StickyScheduleToStartTimeout, + executionInfo.ClientLibraryVersion, + executionInfo.ClientFeatureVersion, + executionInfo.ClientImpl, + executionInfo.NextEventID, + d.shardID, + rowTypeExecution, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + request.Condition) } else { - lastReplicationInfo = make(map[string]map[string]interface{}) + lastReplicationInfo := make(map[string]map[string]interface{}) for k, v := range replicationState.LastReplicationInfo { lastReplicationInfo[k] = createReplicationInfoMap(v) } - } - cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) - batch := d.session.NewBatch(gocql.LoggedBatch) - batch.Query(templateUpdateWorkflowExecutionQuery, - executionInfo.DomainID, - executionInfo.WorkflowID, - executionInfo.RunID, - executionInfo.ParentDomainID, - executionInfo.ParentWorkflowID, - executionInfo.ParentRunID, - executionInfo.InitiatedID, - executionInfo.CompletionEvent, - executionInfo.TaskList, - executionInfo.WorkflowTypeName, - executionInfo.WorkflowTimeout, - executionInfo.DecisionTimeoutValue, - executionInfo.ExecutionContext, - executionInfo.State, - executionInfo.CloseStatus, - executionInfo.LastFirstEventID, - executionInfo.NextEventID, - executionInfo.LastProcessedEvent, - executionInfo.StartTimestamp, - cqlNowTimestamp, - executionInfo.CreateRequestID, - executionInfo.DecisionScheduleID, - executionInfo.DecisionStartedID, - executionInfo.DecisionRequestID, - executionInfo.DecisionTimeout, - executionInfo.DecisionAttempt, - executionInfo.DecisionTimestamp, - executionInfo.CancelRequested, - executionInfo.CancelRequestID, - executionInfo.StickyTaskList, - executionInfo.StickyScheduleToStartTimeout, - executionInfo.ClientLibraryVersion, - executionInfo.ClientFeatureVersion, - executionInfo.ClientImpl, - replicationState.CurrentVersion, - replicationState.StartVersion, - replicationState.LastWriteVersion, - replicationState.LastWriteEventID, - lastReplicationInfo, - executionInfo.NextEventID, - d.shardID, - rowTypeExecution, - executionInfo.DomainID, - executionInfo.WorkflowID, - executionInfo.RunID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID, - request.Condition) + batch.Query(templateUpdateWorkflowExecutionWithReplicationQuery, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + executionInfo.ParentDomainID, + executionInfo.ParentWorkflowID, + executionInfo.ParentRunID, + executionInfo.InitiatedID, + executionInfo.CompletionEvent, + executionInfo.TaskList, + executionInfo.WorkflowTypeName, + executionInfo.WorkflowTimeout, + executionInfo.DecisionTimeoutValue, + executionInfo.ExecutionContext, + executionInfo.State, + executionInfo.CloseStatus, + executionInfo.LastFirstEventID, + executionInfo.NextEventID, + executionInfo.LastProcessedEvent, + executionInfo.StartTimestamp, + cqlNowTimestamp, + executionInfo.CreateRequestID, + executionInfo.DecisionScheduleID, + executionInfo.DecisionStartedID, + executionInfo.DecisionRequestID, + executionInfo.DecisionTimeout, + executionInfo.DecisionAttempt, + executionInfo.DecisionTimestamp, + executionInfo.CancelRequested, + executionInfo.CancelRequestID, + executionInfo.StickyTaskList, + executionInfo.StickyScheduleToStartTimeout, + executionInfo.ClientLibraryVersion, + executionInfo.ClientFeatureVersion, + executionInfo.ClientImpl, + replicationState.CurrentVersion, + replicationState.StartVersion, + replicationState.LastWriteVersion, + replicationState.LastWriteEventID, + lastReplicationInfo, + executionInfo.NextEventID, + d.shardID, + rowTypeExecution, + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + request.Condition) + } d.createTransferTasks(batch, request.TransferTasks, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID) @@ -2483,6 +2573,10 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *WorkflowExecuti } func createReplicationState(result map[string]interface{}) *ReplicationState { + if len(result) == 0 { + return nil + } + info := &ReplicationState{} for k, v := range result { switch k {