-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply 5 min delay for standby task #920
Conversation
service/history/queueProcessor.go
Outdated
@@ -283,7 +283,13 @@ ProcessRetryLoop: | |||
if err != nil { | |||
if err == ErrTaskRetry { | |||
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter) | |||
<-notificationChan | |||
DelayLoop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we will delay the task after processing for first time. I thought the idea is to even delay on first processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for transfer task, there is no timestamp associated with it.
we can tag the queue processor, however, that require more code.
@@ -237,7 +237,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() { | |||
// its length > 0 and has correct timestamp, to trkgger a db scan | |||
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}} | |||
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{VisibilityTimestamp: now}} | |||
e.txProcessor.NotifyNewTask(e.currentClusterName, now, fakeDecisionTask) | |||
e.txProcessor.NotifyNewTask(e.currentClusterName, fakeDecisionTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have the exact same contract on moving the clock on both timer and transfer processors. Timer notification takes in the currentTime as notification while transfer does not. I think we should unify these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transfer task do not have timestamp associated with it
@@ -136,6 +136,7 @@ const ( | |||
`create_request_id: ?, ` + | |||
`decision_version: ?, ` + | |||
`decision_schedule_id: ?, ` + | |||
`decision_schedule_time: ?, ` + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should just have a visibility_ts on transfer task for we have the same mechanism for each kind of task for delaying them.
common/time_source.go
Outdated
@@ -47,7 +47,7 @@ func NewRealTimeSource() *RealTimeSource { | |||
|
|||
// Now return the real current time | |||
func (ts *RealTimeSource) Now() time.Time { | |||
return time.Now() | |||
return time.Now().UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually the API is UtcNow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no UtcNow() function
ref: https://golang.org/pkg/time/
service/history/MockMutableState.go
Outdated
@@ -1347,6 +1347,28 @@ func (_m *mockMutableState) GetHistoryEvent(serializedEvent []byte) (*shared.His | |||
return r0, r1 | |||
} | |||
|
|||
func (_m *mockMutableState) GetInFlightDecisionTask() (*decisionInfo, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you merge my change. This was included as part of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i did
…ke start child workflow use UTC for time.Now() in history service when failover, should unblock existing standby task
service/history/historyEngine.go
Outdated
@@ -357,6 +357,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow | |||
} | |||
} | |||
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) | |||
setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just have a single API for setting task information, instead of making 2 separate API calls.
@@ -177,7 +177,7 @@ func (notifier *historyEventNotifierImpl) dispatchHistoryEventNotification(event | |||
|
|||
func (notifier *historyEventNotifierImpl) enqueueHistoryEventNotification(event *historyEventNotification) { | |||
// set the timestamp just before enqueuing the event | |||
event.timestamp = time.Now() | |||
event.timestamp = common.NewRealTimeSource().Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have time source as a field on historyEventNotifierImpl so another implementation could be injected.
@@ -187,12 +187,13 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi | |||
c.msBuilder.GetExecutionInfo().LastFirstEventID, c.msBuilder.GetExecutionInfo().NextEventID) | |||
} | |||
|
|||
return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID) | |||
now := common.NewRealTimeSource().Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets call the API UtcNow and have a timesource defined on workflowExecutionContext
service/history/queueProcessor.go
Outdated
timestamp := task.GetVisibilityTimestamp() | ||
for { | ||
<-notificationChan | ||
if p.shard.GetCurrentTime(p.clusterName).After(timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only want to have an upper bound on how long we hold onto certain tasks like activity and decision, not delay processing always for all tasks.
7f7c270
to
440a3a5
Compare
service/history/historyEngine.go
Outdated
@@ -356,8 +356,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow | |||
replicationTasks = append(replicationTasks, replicationTask) | |||
} | |||
} | |||
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) | |||
setTransferTaskTimestamp(common.NewRealTimeSource().Now(), transferTasks) | |||
setTaskInfo(msBuilder.GetCurrentVersion(), time.Now().UTC(), transferTasks, timerTasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just get UTC within setTaskInfo?
common/time_source.go
Outdated
func (ts *FakeTimeSource) Update(now time.Time) { | ||
ts.now = now | ||
func (ts *FakeTimeSource) Update(now time.Time) *FakeTimeSource { | ||
ts.now = now.UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't do UTC here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason?
@@ -835,8 +844,9 @@ func (r *historyReplicator) terminateWorkflow(ctx context.Context, domainID stri | |||
|
|||
func (r *historyReplicator) notify(clusterName string, now time.Time, transferTasks []persistence.Task, | |||
timerTasks []persistence.Task) { | |||
now = now.Add(-r.shard.GetConfig().StandbyClusterDelay()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no subtract?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a func (t Time) Sub(u Time) Duration
which does something else
setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil) | ||
setTaskInfo( | ||
newStateBuilder.GetCurrentVersion(), | ||
common.NewFakeTimeSource().Update(time.Unix(0, startedEvent.GetTimestamp())).Now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we using fakeTimeSource in production code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ReplicateWorkflowExecutionContinuedAsNewEvent can be used by both active and standby, so the only reliable way is to use the event time in the start event
fix #914
transfer task will be delayed by at most 5 min (meaning the delay can be less than 5 min, i.e. if the standby task can be discarded, or wait until 5 min and re-check)
timer task will be delayed by 5 min.