-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
separate timer queue ack manager in separate file, add functionality … #630
Conversation
1e70506
to
0ef2268
Compare
service/history/timerQueueAckMgr.go
Outdated
|
||
sync.Mutex | ||
// outstanding tasks, key cluster name, value map of outstanding timer task -> finished (true) | ||
clusterOutstandingTasks map[string]map[TimerSequenceID]bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep this component simple by not making it aware of all clusters. We should have a separate instance of TimerQueueProcessor with its own AckManager per cluster.
@@ -58,8 +58,8 @@ type ( | |||
GetConfig() *Config | |||
GetLogger() bark.Logger | |||
GetMetricsClient() metrics.Client | |||
GetTimerAckLevel() time.Time | |||
UpdateTimerAckLevel(ackLevel time.Time) error | |||
GetTimerAckLevel(cluster string) time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we are going to have too many components managing ack levels separately. Currently each update is a write to cassandra, this is not going to scale once cross dc support is in production. Let's put a TODO to accept updates from individual components and have a separate job which updates all the ack levels in a single write for the entire shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
track in: #634
service/history/shardContext.go
Outdated
@@ -121,6 +121,13 @@ func (s *shardContextImpl) GetTransferAckLevel() int64 { | |||
s.RLock() | |||
defer s.RUnlock() | |||
|
|||
// TODO change make this cluster input parameter | |||
cluster := s.GetService().GetClusterMetadata().GetCurrentClusterName() | |||
// if can find corresponding ack level in the cluster to timer ack level map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -169,6 +156,12 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(timerTasks []persistence.Task) | |||
} | |||
} | |||
|
|||
// NotifyNewTimerEvents - Notify the processor about the new standby history events arrival. | |||
// This should be called each time new timer events arrives, otherwise timers maybe fired unexpected. | |||
func (t *timerQueueProcessorImpl) NotifyNewTimerEvents() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have the same API for notifying new timers whether it is for active processor or standby processor. Basically we should have a baseTimerQueueProcessor which has common logic and then activeTimerQueueProcessor and passiveTimerQueueProcessor with their own handling of TimerTasks. But the way you notify both should be exactly the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be done in the next PR
…to timer queue processor to be cluster aware rename SequenceID -> TimerSequenceID
…to timer queue processor to be cluster aware
rename SequenceID -> TimerSequenceID
Standby timer processing logic && domain failover timer ack manager scan DB for domain specific timer in next PR.
Partially solve #565