Skip to content

Commit

Permalink
Support workflow execution CRUD without replication state (#628)
Browse files Browse the repository at this point in the history
ReplicationState part of the workflow execution is not enabled in
production as this feature is in development.  This change makes sure no
over head on exisiting workflow executions which does not have any
ReplicationState.
  • Loading branch information
samarabbas authored Mar 22, 2018
1 parent f5ebfea commit 8200f58
Showing 1 changed file with 212 additions and 118 deletions.
330 changes: 212 additions & 118 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ const (
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 `

templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?) `

templateCreateWorkflowExecutionWithReplicationQuery = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType + `, ?, ?, ?) `

Expand Down Expand Up @@ -349,6 +353,17 @@ const (
`and task_id = ?`

templateUpdateWorkflowExecutionQuery = `UPDATE executions ` +
`SET execution = ` + templateWorkflowExecutionType + `, next_event_id = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateUpdateWorkflowExecutionWithReplicationQuery = `UPDATE executions ` +
`SET execution = ` + templateWorkflowExecutionType + `, replication_state = ` + templateReplicationStateType + `, next_event_id = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -644,15 +659,6 @@ const (
var (
defaultDateTime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
defaultVisibilityTimestamp = common.UnixNanoToCQLTimestamp(defaultDateTime.UnixNano())
emptyReplicationInfo = map[string]*ReplicationInfo{}
emptyReplicationInfoMap = map[string]map[string]interface{}{}
emptyReplicationState = &ReplicationState{
CurrentVersion: common.EmptyVersion,
StartVersion: common.EmptyVersion,
LastWriteVersion: common.EmptyVersion,
LastWriteEventID: common.EmptyEventID,
LastReplicationInfo: emptyReplicationInfo,
}
)

type (
Expand Down Expand Up @@ -1007,65 +1013,106 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat
)
}

var lastReplicationInfo map[string]map[string]interface{}
if request.ReplicationState == nil {
request.ReplicationState = emptyReplicationState
lastReplicationInfo = emptyReplicationInfoMap
// Cross DC feature is currently disabled so we will be creating workflow executions without replication state
batch.Query(templateCreateWorkflowExecutionQuery2,
d.shardID,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
rowTypeExecution,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
parentDomainID,
parentWorkflowID,
parentRunID,
initiatedID,
nil,
request.TaskList,
request.WorkflowTypeName,
request.WorkflowTimeout,
request.DecisionTimeoutValue,
request.ExecutionContext,
WorkflowStateCreated,
WorkflowCloseStatusNone,
common.FirstEventID,
request.NextEventID,
request.LastProcessedEvent,
cqlNowTimestamp,
cqlNowTimestamp,
request.RequestID,
request.DecisionScheduleID,
request.DecisionStartedID,
"", // Decision Start Request ID
request.DecisionStartToCloseTimeout,
0,
0,
false,
"",
"", // sticky_task_list (no sticky tasklist for new workflow execution)
0, // sticky_schedule_to_start_timeout
"", // client_library_version
"", // client_feature_version
"", // client_impl
request.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
} else {
lastReplicationInfo = make(map[string]map[string]interface{})
lastReplicationInfo := make(map[string]map[string]interface{})
for k, v := range request.ReplicationState.LastReplicationInfo {
lastReplicationInfo[k] = createReplicationInfoMap(v)
}
}

batch.Query(templateCreateWorkflowExecutionQuery2,
d.shardID,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
rowTypeExecution,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
parentDomainID,
parentWorkflowID,
parentRunID,
initiatedID,
nil,
request.TaskList,
request.WorkflowTypeName,
request.WorkflowTimeout,
request.DecisionTimeoutValue,
request.ExecutionContext,
WorkflowStateCreated,
WorkflowCloseStatusNone,
common.FirstEventID,
request.NextEventID,
request.LastProcessedEvent,
cqlNowTimestamp,
cqlNowTimestamp,
request.RequestID,
request.DecisionScheduleID,
request.DecisionStartedID,
"", // Decision Start Request ID
request.DecisionStartToCloseTimeout,
0,
0,
false,
"",
"", // sticky_task_list (no sticky tasklist for new workflow execution)
0, // sticky_schedule_to_start_timeout
"", // client_library_version
"", // client_feature_version
"", // client_impl
request.ReplicationState.CurrentVersion,
request.ReplicationState.StartVersion,
request.ReplicationState.LastWriteVersion,
request.ReplicationState.LastWriteEventID,
lastReplicationInfo,
request.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
batch.Query(templateCreateWorkflowExecutionWithReplicationQuery,
d.shardID,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
rowTypeExecution,
request.DomainID,
*request.Execution.WorkflowId,
*request.Execution.RunId,
parentDomainID,
parentWorkflowID,
parentRunID,
initiatedID,
nil,
request.TaskList,
request.WorkflowTypeName,
request.WorkflowTimeout,
request.DecisionTimeoutValue,
request.ExecutionContext,
WorkflowStateCreated,
WorkflowCloseStatusNone,
common.FirstEventID,
request.NextEventID,
request.LastProcessedEvent,
cqlNowTimestamp,
cqlNowTimestamp,
request.RequestID,
request.DecisionScheduleID,
request.DecisionStartedID,
"", // Decision Start Request ID
request.DecisionStartToCloseTimeout,
0,
0,
false,
"",
"", // sticky_task_list (no sticky tasklist for new workflow execution)
0, // sticky_schedule_to_start_timeout
"", // client_library_version
"", // client_feature_version
"", // client_impl
request.ReplicationState.CurrentVersion,
request.ReplicationState.StartVersion,
request.ReplicationState.LastWriteVersion,
request.ReplicationState.LastWriteEventID,
lastReplicationInfo,
request.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
}
}

func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutionRequest) (
Expand Down Expand Up @@ -1164,70 +1211,113 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}

func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
batch := d.session.NewBatch(gocql.LoggedBatch)
cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano())
executionInfo := request.ExecutionInfo
replicationState := request.ReplicationState
var lastReplicationInfo map[string]map[string]interface{}

if replicationState == nil {
replicationState = emptyReplicationState
lastReplicationInfo = emptyReplicationInfoMap
// Updates will be called with null ReplicationState while the feature is disabled
batch.Query(templateUpdateWorkflowExecutionQuery,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
executionInfo.ParentDomainID,
executionInfo.ParentWorkflowID,
executionInfo.ParentRunID,
executionInfo.InitiatedID,
executionInfo.CompletionEvent,
executionInfo.TaskList,
executionInfo.WorkflowTypeName,
executionInfo.WorkflowTimeout,
executionInfo.DecisionTimeoutValue,
executionInfo.ExecutionContext,
executionInfo.State,
executionInfo.CloseStatus,
executionInfo.LastFirstEventID,
executionInfo.NextEventID,
executionInfo.LastProcessedEvent,
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
executionInfo.DecisionRequestID,
executionInfo.DecisionTimeout,
executionInfo.DecisionAttempt,
executionInfo.DecisionTimestamp,
executionInfo.CancelRequested,
executionInfo.CancelRequestID,
executionInfo.StickyTaskList,
executionInfo.StickyScheduleToStartTimeout,
executionInfo.ClientLibraryVersion,
executionInfo.ClientFeatureVersion,
executionInfo.ClientImpl,
executionInfo.NextEventID,
d.shardID,
rowTypeExecution,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.Condition)
} else {
lastReplicationInfo = make(map[string]map[string]interface{})
lastReplicationInfo := make(map[string]map[string]interface{})
for k, v := range replicationState.LastReplicationInfo {
lastReplicationInfo[k] = createReplicationInfoMap(v)
}
}
cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano())

batch := d.session.NewBatch(gocql.LoggedBatch)
batch.Query(templateUpdateWorkflowExecutionQuery,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
executionInfo.ParentDomainID,
executionInfo.ParentWorkflowID,
executionInfo.ParentRunID,
executionInfo.InitiatedID,
executionInfo.CompletionEvent,
executionInfo.TaskList,
executionInfo.WorkflowTypeName,
executionInfo.WorkflowTimeout,
executionInfo.DecisionTimeoutValue,
executionInfo.ExecutionContext,
executionInfo.State,
executionInfo.CloseStatus,
executionInfo.LastFirstEventID,
executionInfo.NextEventID,
executionInfo.LastProcessedEvent,
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
executionInfo.DecisionRequestID,
executionInfo.DecisionTimeout,
executionInfo.DecisionAttempt,
executionInfo.DecisionTimestamp,
executionInfo.CancelRequested,
executionInfo.CancelRequestID,
executionInfo.StickyTaskList,
executionInfo.StickyScheduleToStartTimeout,
executionInfo.ClientLibraryVersion,
executionInfo.ClientFeatureVersion,
executionInfo.ClientImpl,
replicationState.CurrentVersion,
replicationState.StartVersion,
replicationState.LastWriteVersion,
replicationState.LastWriteEventID,
lastReplicationInfo,
executionInfo.NextEventID,
d.shardID,
rowTypeExecution,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.Condition)
batch.Query(templateUpdateWorkflowExecutionWithReplicationQuery,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
executionInfo.ParentDomainID,
executionInfo.ParentWorkflowID,
executionInfo.ParentRunID,
executionInfo.InitiatedID,
executionInfo.CompletionEvent,
executionInfo.TaskList,
executionInfo.WorkflowTypeName,
executionInfo.WorkflowTimeout,
executionInfo.DecisionTimeoutValue,
executionInfo.ExecutionContext,
executionInfo.State,
executionInfo.CloseStatus,
executionInfo.LastFirstEventID,
executionInfo.NextEventID,
executionInfo.LastProcessedEvent,
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
executionInfo.DecisionRequestID,
executionInfo.DecisionTimeout,
executionInfo.DecisionAttempt,
executionInfo.DecisionTimestamp,
executionInfo.CancelRequested,
executionInfo.CancelRequestID,
executionInfo.StickyTaskList,
executionInfo.StickyScheduleToStartTimeout,
executionInfo.ClientLibraryVersion,
executionInfo.ClientFeatureVersion,
executionInfo.ClientImpl,
replicationState.CurrentVersion,
replicationState.StartVersion,
replicationState.LastWriteVersion,
replicationState.LastWriteEventID,
lastReplicationInfo,
executionInfo.NextEventID,
d.shardID,
rowTypeExecution,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.Condition)
}

d.createTransferTasks(batch, request.TransferTasks, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID)
Expand Down Expand Up @@ -2483,6 +2573,10 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *WorkflowExecuti
}

func createReplicationState(result map[string]interface{}) *ReplicationState {
if len(result) == 0 {
return nil
}

info := &ReplicationState{}
for k, v := range result {
switch k {
Expand Down

0 comments on commit 8200f58

Please sign in to comment.