Skip to content

Commit

Permalink
Fix lock issue for timer ack level (#942)
Browse files Browse the repository at this point in the history
* Fix write lock for timer ack level
  • Loading branch information
vancexu authored and wxing1292 committed Jul 9, 2018
1 parent 3c9a409 commit 1864329
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type (
UpdatedAt time.Time
ReplicationAckLevel int64
TransferAckLevel int64 // TO BE DEPRECATED IN FAVOR OF ClusterTransferAckLevel
TimerAckLevel time.Time // TO BE DEPRECATED IN FAVOR OF ClusteerTimerAckLevel
TimerAckLevel time.Time // TO BE DEPRECATED IN FAVOR OF ClusterTimerAckLevel
ClusterTransferAckLevel map[string]int64
ClusterTimerAckLevel map[string]time.Time
DomainNotificationVersion int64
Expand Down
4 changes: 2 additions & 2 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (s *shardContextImpl) GetTimerAckLevel() time.Time {
}

func (s *shardContextImpl) UpdateTimerAckLevel(ackLevel time.Time) error {
s.RLock()
defer s.RUnlock()
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
s.shardInfo.StolenSinceRenew = 0
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerGate.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (timerGate *LocalTimerGateImpl) Update(nextTime time.Time) bool {
now := time.Now()

if timerGate.timer.Stop() && timerGate.nextWakeupTime.Before(nextTime) {
// this means the timer, before stopped, is active && next wake up time do not have to be upddated
// this means the timer, before stopped, is active && next wake up time do not have to be updated
timerGate.timer.Reset(timerGate.nextWakeupTime.Sub(now))
return false
}

// this means the timer, before stopped, is active && next wake up time has to be upddated
// this means the timer, before stopped, is active && next wake up time has to be updated
// or this means the timer, before stopped, is already fired / never active
timerGate.nextWakeupTime = nextTime
timerGate.timer.Reset(nextTime.Sub(now))
Expand Down
8 changes: 4 additions & 4 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (t *timerQueueProcessorImpl) completeTimers() error {

executionMgr := t.shard.GetExecutionManager()
minTimestamp := lowerAckLevel.VisibilityTimestamp
// releax the upper limit for scan since the query is [minTimestamp, minTimestamp)
// relax the upper limit for scan since the query is [minTimestamp, maxTimestamp)
maxTimestamp := upperAckLevel.VisibilityTimestamp.Add(1 * time.Second)
batchSize := t.config.TimerTaskBatchSize()
request := &persistence.GetTimerIndexTasksRequest{
Expand All @@ -227,10 +227,10 @@ LoadCompleteLoop:
if compareTimerIDLess(&upperAckLevel, &timerSequenceID) {
break LoadCompleteLoop
}
minTimestamp = timer.VisibilityTimestamp
if err := executionMgr.CompleteTimerTask(&persistence.CompleteTimerTaskRequest{
err := executionMgr.CompleteTimerTask(&persistence.CompleteTimerTaskRequest{
VisibilityTimestamp: timer.VisibilityTimestamp,
TaskID: timer.TaskID}); err != nil {
TaskID: timer.TaskID})
if err != nil {
t.logger.Warnf("Timer queue ack manager unable to complete timer task: %v; %v", timer, err)
}
}
Expand Down

0 comments on commit 1864329

Please sign in to comment.