Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Mar 27, 2018
1 parent 17d79c9 commit 4f6eae2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
10 changes: 6 additions & 4 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,14 @@ func (s *TestShardContext) GetTransferAckLevel() int64 {
s.RLock()
defer s.RUnlock()

// TODO change make this cluster input parameter
// TODO cluster should be an input parameter
cluster := s.GetService().GetClusterMetadata().GetCurrentClusterName()
// if can find corresponding ack level in the cluster to timer ack level map
// if we can find corresponding ack level
if ackLevel, ok := s.shardInfo.ClusterTransferAckLevel[cluster]; ok {
return ackLevel
}
// otherwise, default to existing ack level, which belongs to local cluster
// this can happen if you add more cluster
return s.shardInfo.TransferAckLevel
}

Expand All @@ -153,7 +154,7 @@ func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error {
s.RLock()
defer s.RUnlock()

// TODO change make this cluster input parameter
// TODO cluster should be an input parameter
cluster := s.GetService().GetClusterMetadata().GetCurrentClusterName()
if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TransferAckLevel = ackLevel
Expand All @@ -178,11 +179,12 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

// if can find corresponding ack level in the cluster to timer ack level map
// if we can find corresponding ack level
if ackLevel, ok := s.shardInfo.ClusterTimerAckLevel[cluster]; ok {
return ackLevel
}
// otherwise, default to existing ack level, which belongs to local cluster
// this can happen if you add more cluster
return s.shardInfo.TimerAckLevel
}

Expand Down
10 changes: 6 additions & 4 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ func (s *shardContextImpl) GetTransferAckLevel() int64 {
s.RLock()
defer s.RUnlock()

// TODO change make this cluster input parameter
// TODO cluster should be an input parameter
cluster := s.GetService().GetClusterMetadata().GetCurrentClusterName()
// if can find corresponding ack level in the cluster to timer ack level map
// if we can find corresponding ack level
if ackLevel, ok := s.shardInfo.ClusterTransferAckLevel[cluster]; ok {
return ackLevel
}
// otherwise, default to existing ack level, which belongs to local cluster
// this can happen if you add more cluster
return s.shardInfo.TransferAckLevel
}

Expand All @@ -135,7 +136,7 @@ func (s *shardContextImpl) UpdateTransferAckLevel(ackLevel int64) error {
s.Lock()
defer s.Unlock()

// TODO change make this cluster input parameter
// TODO cluster should be an input parameter
cluster := s.GetService().GetClusterMetadata().GetCurrentClusterName()
if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TransferAckLevel = ackLevel
Expand Down Expand Up @@ -164,11 +165,12 @@ func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

// if can find corresponding ack level in the cluster to timer ack level map
// if we can find corresponding ack level
if ackLevel, ok := s.shardInfo.ClusterTimerAckLevel[cluster]; ok {
return ackLevel
}
// otherwise, default to existing ack level, which belongs to local cluster
// this can happen if you add more cluster
return s.shardInfo.TimerAckLevel
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func newTransferQueueProcessor(shard ShardContext, historyService *historyEngine
historyClient: historyClient,
visibilityManager: visibilityMgr,
cache: historyService.historyCache,
domainCache: historyService.domainCache,
domainCache: shard.GetDomainCache(),
}
baseProcessor := newQueueProcessor(shard, options, processor, shard.GetTransferAckLevel())
processor.queueProcessorBase = baseProcessor
Expand Down

0 comments on commit 4f6eae2

Please sign in to comment.