-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implemented functionality in timer ack manager & timer processor base to allow failover #648
Conversation
17bf26f
to
ed030bd
Compare
SetCurrentTime(clusterName string, currentTime time.Time) | ||
} | ||
|
||
timerProcessor interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is quite confusing between timerQueueProcessor and timerProcessor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestion?
service/history/shardContext.go
Outdated
if currentTime, ok := shardInfo.ClusterTimerAckLevel[clusterName]; ok { | ||
standbyClusterCurrentTime[clusterName] = currentTime | ||
} else { | ||
standbyClusterCurrentTime[clusterName] = shardInfo.TimerAckLevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using the current cluster ack level to initialize the standby cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there will be case that when a new cluster is added, there is no existing ack level for it, so we have to use the current cluster's ack level
service/history/timerGate.go
Outdated
} | ||
|
||
// LocalTimerGate interface | ||
LocalTimerGate interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason Close is not needed by remote timer gate? Both local and remote timers should have the same life-cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local is implemented using a go timer, which can be closed.
remote is purely driven by events, and there is no background coroutine involved.
service/history/timerGate.go
Outdated
close(timerGate.closeChan) | ||
} | ||
|
||
// NewRemoteTimerGate create a new timer gate instance | ||
func NewRemoteTimerGate() RemoteTimerGate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lot of implementation between Local and Remote looks the same. Can we refactor it in a way to reuse code?
I'm OK with copy/pasting code if this is temporary. But make sure create an issue to revisit this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually not a copy paste, although in general they looks the same, however they are backed by different thing, one with go's timer, one with comparison of new time.
df8ec34
to
4a7c840
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the changes looks good. One thing which looks strange is you have too much logic/if-else due to failover support.
I don't have a better recommendation at this point but I would like you to think more if we can figure out a cleaner API contract rather than so many if-else statements.
}) | ||
timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, executionManager, clusterName, log) | ||
func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEngineImpl, clusterName string, logger bark.Logger) *timerQueueStandbyProcessorImpl { | ||
timeNow := func() time.Time { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have capability to trail current time standby processor by some configured amount like 5 minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know, right now, there is no time delay in both timer processor and transfer processor, will be added (change is fairly small) when we are actually doing failover on both timer / transfer
now = t.shard.GetCurrentTime(t.clusterName) | ||
} else { | ||
// if ack manager is a failover manager, we need to use the current local time | ||
now = t.shard.GetCurrentTime(t.shard.GetService().GetClusterMetadata().GetCurrentClusterName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clusterName is stored as part of timerQueueAckMgr struct. Can we use that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, when doing failover, timer ack manager has
cluster : standby cluster name
isFailover: true
and ack manager need to use the the above 2 to "update the shard timer ack level for failover process"
and here, when doing failover, we need to get the current active cluster's time to determine whether to process a timer
@@ -93,6 +104,13 @@ func (t *timerQueueProcessorImpl) SetCurrentTime(clusterName string, currentTime | |||
standbyTimerProcessor.setCurrentTime(currentTime) | |||
} | |||
|
|||
func (t *timerQueueProcessorImpl) FailoverDomain(domainID string, standbyClusterName string) { | |||
// we should consider make the failover idempotent | |||
failoverTimerProcessor := newTimerQueueFailoverProcessor(t.shard, t.historyService, domainID, standbyClusterName, t.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is the life-cycle of this cursor managed? Someone needs to call stop on it? or it calls stop by itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timer ack manager will have a finished chan which used by timer active processor, when chan fired, active processor will stop it self
@@ -224,7 +223,7 @@ func (t *timerQueueProcessorBase) internalProcessor() error { | |||
|
|||
continueProcessor: | |||
for { | |||
now := t.shard.GetCurrentTime(t.clusterName) | |||
now := t.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i basically change the "t.shard.GetCurrentTime(t.clusterName)" to a closure
b3bad03
to
8197f4b
Compare
…ive timer processor
dcb3011
to
f08bd6b
Compare
2126445
to
7493521
Compare
… processor will try to process a standby task indefinitely, until task is finished.
7493521
to
16f058c
Compare
46d7760
to
a33303d
Compare
@@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq | |||
|
|||
response.Timers = append(response.Timers, t) | |||
} | |||
nextPageToken := iter.PageState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason to remove pagination here? I think pagination through tasks is much more efficient then issuing a limit query each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the existing query is using limit
https://github.com/uber/cadence/blob/master/common/persistence/cassandraPersistence.go#L546
we can change to use the pagination later, this is not a blocking issue
|
||
// error which will be thrown if the timer / transfer task should be | ||
// retries due to various of reasons | ||
taskRetryError struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird to use an empty struct as an error. Can you instead use errors.New?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nop, i need to use a dedicated error so the worker can do retry accordingly.
@@ -51,6 +51,10 @@ type ( | |||
UpdateTransferAckLevel(ackLevel int64) error | |||
GetReplicatorAckLevel() int64 | |||
UpdateReplicatorAckLevel(ackLevel int64) error | |||
GetTimerAckLevel() time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timerAckLevel should consist of both visibilityTime and increasing taskID. This seems like an issue that we only use visibilityTime to represent the ackLevel. We should be using both visibilityTime and task_id for timer tasks otherwise it could result in losing timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we do scan from DB, we always scan from the current ack level (inclusive) (time.Time) to int64max.
https://github.com/uber/cadence/blob/master/common/persistence/cassandraPersistence.go#L577
so, each time, when we do a scan from DB, we will not lose timers.
maxAckLevel time.Time | ||
// isReadFinished indicate timer queue ack manager | ||
// have no more task to send out | ||
isReadFinished bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to separate fields to indicate the cursor has reached the end? You could just close the finishedChan and isReadFinished API can just check if the channel is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the finishedChan is used by the workers to see whether to stop.
when the read is finished, the worker still have to process the existing tasks in read, which can take time.
config := shard.GetConfig() | ||
ackLevel := shard.GetTimerAckLevel(clusterName) | ||
func newTimerQueueAckMgr(shard ShardContext, metricsClient metrics.Client, clusterName string, logger bark.Logger) *timerQueueAckMgrImpl { | ||
ackLevel := TimerSequenceID{VisibilityTimestamp: shard.GetTimerClusterAckLevel(clusterName)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use both the VisibilityTimestamp and TaskID to initialize ack level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskID in by default 0, which is the minimum possible.
Moreover, we do not store the TaskID in DB.
service/history/timerQueueAckMgr.go
Outdated
if t.isFailover && !morePage { | ||
t.isReadFinished = true | ||
} | ||
|
||
// fillin the retry task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the comment still relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
} | ||
go t.completeTimersLoop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we move to a model where deletion of timerTasks are managed by this separate cursor? Is there are way to still keep the old behavior before xdc is really turned on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the whole flow is changed.
if you really want, i can certainly do something, like adding a lot of if else.
|
||
LoadCompleteLoop: | ||
for { | ||
request := &persistence.GetTimerIndexTasksRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only use the timestamp to read tasks then it could result in deleting extra timer tasks which are not processed by active cursor yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nop, that will not happen, let us sync.
// before shutdown, make sure the ack level is up to date | ||
t.completeTimers() | ||
return | ||
case <-timer.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok to have a timer to wake up completeTimersLoop but we should create a notification channel where each cursor can notify the completeTimersLoop when the ackLevel moves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will not be a top priority issue, let us solve this later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -104,3 +146,85 @@ func (t *timerQueueProcessorImpl) getTimerFiredCount(clusterName string) uint64 | |||
} | |||
return standbyTimerProcessor.getTimerFiredCount() | |||
} | |||
|
|||
func (t *timerQueueProcessorImpl) completeTimersLoop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a separate task to add emit some metric from this new logic. I think we are missing key metric which would help us investigate issues in production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -169,12 +197,13 @@ func (t *timerQueueStandbyProcessorImpl) processExpiredUserTimer(timerTask *pers | |||
// | |||
// we do not need to notity new timer to base, since if there is no new event being replicated | |||
// checking again if the timer can be completed is meaningless | |||
t.timerQueueAckMgr.retryTimerTask(timerTask) | |||
return nil | |||
return newTaskRetryError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of creating a new error each time lets just define this as a global variable and reuse the same error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
c834a97
to
7eaecd8
Compare
7eaecd8
to
a3143d1
Compare
Partially solve #565