Skip to content

Commit

Permalink
Apply 5 min delay for standby task (#920)
Browse files Browse the repository at this point in the history
* Apply 5 min delay for standby task
* When failover, should unblock existing standby task
  • Loading branch information
wxing1292 authored Jul 18, 2018
1 parent 1341820 commit 3a79fc0
Show file tree
Hide file tree
Showing 23 changed files with 235 additions and 84 deletions.
4 changes: 4 additions & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ const (
`domain_id: ?, ` +
`workflow_id: ?, ` +
`run_id: ?, ` +
`visibility_ts: ?, ` +
`task_id: ?, ` +
`target_domain_id: ?, ` +
`target_workflow_id: ?, ` +
Expand Down Expand Up @@ -2470,6 +2471,7 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT
domainID,
workflowID,
runID,
task.GetVisibilityTimestamp(),
task.GetTaskID(),
targetDomainID,
targetWorkflowID,
Expand Down Expand Up @@ -3156,6 +3158,8 @@ func createTransferTaskInfo(result map[string]interface{}) *TransferTaskInfo {
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "visibility_ts":
info.VisibilityTimestamp = v.(time.Time)
case "task_id":
info.TaskID = v.(int64)
case "target_domain_id":
Expand Down
16 changes: 10 additions & 6 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,13 +993,14 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
targetWorkflowID := "some random target domain ID"
targetRunID := uuid.New()
currentTransferID := s.GetTransferReadLevel()
now := time.Now()
tasks := []Task{
&ActivityTask{currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
&DecisionTask{currentTransferID + 10002, domainID, tasklist, scheduleID, 222},
&CloseExecutionTask{currentTransferID + 10003, 333},
&CancelExecutionTask{currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444},
&SignalExecutionTask{currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555},
&StartChildExecutionTask{currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666},
&ActivityTask{now, currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
&DecisionTask{now, currentTransferID + 10002, domainID, tasklist, scheduleID, 222},
&CloseExecutionTask{now, currentTransferID + 10003, 333},
&CancelExecutionTask{now, currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444},
&SignalExecutionTask{now, currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555},
&StartChildExecutionTask{now, currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666},
}
err2 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, nil, int64(3), tasks)
s.Nil(err2, "No error expected.")
Expand All @@ -1008,6 +1009,9 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
s.Nil(err1, "No error expected.")
s.NotNil(txTasks, "expected valid list of tasks.")
s.Equal(len(tasks), len(txTasks))
for index := range tasks {
s.True(timeComparator(tasks[index].GetVisibilityTimestamp(), txTasks[index].VisibilityTimestamp, timePrecision))
}
s.Equal(TransferTaskTypeActivityTask, txTasks[0].TaskType)
s.Equal(TransferTaskTypeDecisionTask, txTasks[1].TaskType)
s.Equal(TransferTaskTypeCloseExecution, txTasks[2].TaskType)
Expand Down
139 changes: 122 additions & 17 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ type (
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand Down Expand Up @@ -265,30 +266,35 @@ type (
SetVersion(version int64)
GetTaskID() int64
SetTaskID(id int64)
GetVisibilityTimestamp() time.Time
SetVisibilityTimestamp(timestamp time.Time)
}

// ActivityTask identifies a transfer task for activity
ActivityTask struct {
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
}

// DecisionTask identifies a transfer task for decision
DecisionTask struct {
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
DomainID string
TaskList string
ScheduleID int64
Version int64
}

// CloseExecutionTask identifies a transfer task for deletion of execution
CloseExecutionTask struct {
TaskID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
Version int64
}

// DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.
Expand Down Expand Up @@ -317,6 +323,7 @@ type (

// CancelExecutionTask identifies a transfer task for cancel of execution
CancelExecutionTask struct {
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand All @@ -328,6 +335,7 @@ type (

// SignalExecutionTask identifies a transfer task for signal execution
SignalExecutionTask struct {
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
Expand All @@ -339,11 +347,12 @@ type (

// StartChildExecutionTask identifies a transfer task for starting child execution
StartChildExecutionTask struct {
TaskID int64
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
Version int64
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
Version int64
}

// ActivityTimeoutTask identifies a timeout task.
Expand Down Expand Up @@ -375,6 +384,7 @@ type (

// HistoryReplicationTask is the transfer task created for shipping history replication events to other clusters
HistoryReplicationTask struct {
VisibilityTimestamp time.Time
TaskID int64
FirstEventID int64
NextEventID int64
Expand Down Expand Up @@ -1009,6 +1019,16 @@ func (a *ActivityTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *ActivityTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *ActivityTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the decision task
func (d *DecisionTask) GetType() int {
return TransferTaskTypeDecisionTask
Expand All @@ -1034,6 +1054,16 @@ func (d *DecisionTask) SetTaskID(id int64) {
d.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (d *DecisionTask) GetVisibilityTimestamp() time.Time {
return d.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (d *DecisionTask) SetVisibilityTimestamp(timestamp time.Time) {
d.VisibilityTimestamp = timestamp
}

// GetType returns the type of the close execution task
func (a *CloseExecutionTask) GetType() int {
return TransferTaskTypeCloseExecution
Expand All @@ -1059,6 +1089,16 @@ func (a *CloseExecutionTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *CloseExecutionTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *CloseExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the delete execution task
func (a *DeleteHistoryEventTask) GetType() int {
return TaskTypeDeleteHistoryEvent
Expand All @@ -1084,6 +1124,16 @@ func (a *DeleteHistoryEventTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *DeleteHistoryEventTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *DeleteHistoryEventTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetType returns the type of the timer task
func (d *DecisionTimeoutTask) GetType() int {
return TaskTypeDecisionTimeout
Expand Down Expand Up @@ -1284,6 +1334,16 @@ func (u *CancelExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *CancelExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *CancelExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the signal transfer task
func (u *SignalExecutionTask) GetType() int {
return TransferTaskTypeSignalExecution
Expand All @@ -1309,6 +1369,16 @@ func (u *SignalExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *SignalExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *SignalExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the start child transfer task
func (u *StartChildExecutionTask) GetType() int {
return TransferTaskTypeStartChildExecution
Expand All @@ -1334,6 +1404,16 @@ func (u *StartChildExecutionTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *StartChildExecutionTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *StartChildExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
Expand All @@ -1359,6 +1439,16 @@ func (a *HistoryReplicationTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (a *HistoryReplicationTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (a *HistoryReplicationTask) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}

// GetTaskID returns the task ID for transfer task
func (t *TransferTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand All @@ -1374,6 +1464,11 @@ func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}

// String returns string
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf(
Expand All @@ -1397,6 +1492,11 @@ func (t *ReplicationTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *ReplicationTaskInfo) GetVisibilityTimestamp() time.Time {
return time.Time{}
}

// GetTaskID returns the task ID for timer task
func (t *TimerTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand All @@ -1412,6 +1512,11 @@ func (t *TimerTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for transfer task
func (t *TimerTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}

// GetTaskType returns the task type for timer task
func (t *TimerTaskInfo) String() string {
return fmt.Sprintf(
Expand Down
9 changes: 3 additions & 6 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var keys = map[Key]string{
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
Expand All @@ -92,7 +93,6 @@ var keys = map[Key]string{
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TimerProcessorStandbyTaskDelay: "history.timerProcessorStandbyTaskDelay",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
Expand All @@ -106,7 +106,6 @@ var keys = map[Key]string{
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
TransferProcessorStandbyTaskDelay: "history.transferProcessorStandbyTaskDelay",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
Expand Down Expand Up @@ -198,6 +197,8 @@ const (
HistoryCacheTTL
// AcquireShardInterval is interval that timer used to acquire shard
AcquireShardInterval
// StandbyClusterDelay is the atrificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay
// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down Expand Up @@ -226,8 +227,6 @@ const (
TimerProcessorMaxPollInterval
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
TimerProcessorMaxPollIntervalJitterCoefficient
// TimerProcessorStandbyTaskDelay is task delay for standby task in timer processor
TimerProcessorStandbyTaskDelay
// TransferTaskBatchSize is batch size for transferQueueProcessor
TransferTaskBatchSize
// TransferProcessorFailoverMaxPollRPS is max poll rate per second for transferQueueProcessor
Expand All @@ -254,8 +253,6 @@ const (
TransferProcessorUpdateAckInterval
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval
// TransferProcessorStandbyTaskDelay is delay time for standby task in transferQueueProcessor
TransferProcessorStandbyTaskDelay
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize
// ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor
Expand Down
Loading

0 comments on commit 3a79fc0

Please sign in to comment.