Skip to content

Commit

Permalink
remove keyspace-group-id from the log lines in the pkg/tso package.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 24, 2023
1 parent ba97861 commit 6956f69
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 91 deletions.
67 changes: 14 additions & 53 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ func (am *AllocatorManager) tsoAllocatorLoop() {
defer am.svcLoopWG.Done()

am.AllocatorDaemon(am.ctx)
log.Info("exit allocator loop", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("exit allocator loop")
}

// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
log.Info("closing the allocator manager", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("closing the allocator manager")

if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
Expand All @@ -300,7 +300,7 @@ func (am *AllocatorManager) close() {
am.cancel()
am.svcLoopWG.Wait()

log.Info("closed the allocator manager", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() *ElectionMember {
Expand All @@ -314,7 +314,6 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
serverID := am.member.ID()
if err := am.checkDCLocationUpperLimit(dcLocation); err != nil {
log.Error("check dc-location upper limit failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
Expand All @@ -333,14 +332,12 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
}
if !resp.Succeeded {
log.Warn("write dc-location configuration into etcd failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("write dc-location configuration into etcd",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation),
zap.String("server-name", serverName),
zap.Uint64("server-id", serverID))
Expand Down Expand Up @@ -383,7 +380,6 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations
dcLocation := string(kv.Value)
if err != nil {
log.Warn("get server id and dcLocation from etcd failed, invalid server id",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Any("splitted-serverPath", serverPath),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
Expand Down Expand Up @@ -418,9 +414,7 @@ func (am *AllocatorManager) CleanUpDCLocation() error {
} else if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("delete the dc-location key previously written in etcd",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Uint64("server-id", serverID))
log.Info("delete the dc-location key previously written in etcd", zap.Uint64("server-id", serverID))
go am.ClusterDCLocationChecker()
return nil
}
Expand Down Expand Up @@ -486,7 +480,6 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string {
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {
defer logutil.LogPanic()
defer log.Info("server is closed, return local tso allocator leader loop",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.String("local-tso-allocator-name", am.member.Name()))
for {
Expand All @@ -503,21 +496,18 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Name()))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()))
}

// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
if err != nil {
log.Error("get next leader from etcd failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
Expand All @@ -527,7 +517,6 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
if nextLeader != 0 {
if nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("server-name", am.member.Name()),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
Expand All @@ -542,7 +531,6 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if err != nil {
log.Error("get dc-location info from pd leader failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
// PD leader hasn't been elected out, wait for the campaign
Expand All @@ -553,7 +541,6 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *
}
if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil {
log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("wait-duration", checkStep.String()))
Expand Down Expand Up @@ -590,7 +577,6 @@ func (am *AllocatorManager) campaignAllocatorLeader(
isNextLeader bool,
) {
log.Info("start to campaign local tso allocator leader",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -614,13 +600,11 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
} else {
log.Error("failed to campaign local tso allocator leader due to etcd error",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()),
Expand All @@ -636,19 +620,16 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))

log.Info("initialize the local TSO allocator",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {
log.Error("failed to initialize the local TSO allocator",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -657,7 +638,6 @@ func (am *AllocatorManager) campaignAllocatorLeader(
if dcLocationInfo.GetMaxTs().GetPhysical() != 0 {
if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil {
log.Error("failed to write the max local TSO after member changed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
errs.ZapError(err))
Expand All @@ -669,7 +649,6 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// The next leader is me, delete it to finish campaigning
am.deleteNextLeaderID(allocator.GetDCLocation())
log.Info("local tso allocator leader is ready to serve",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -682,7 +661,6 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-leaderTicker.C:
if !allocator.IsAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -691,7 +669,6 @@ func (am *AllocatorManager) campaignAllocatorLeader(
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("name", am.member.Name()))
Expand All @@ -703,7 +680,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
log.Info("entering into allocator daemon", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("entering into allocator daemon")

// allocatorPatroller should only work when enableLocalTSO is true to
// set up the new Local TSO Allocator in time.
Expand Down Expand Up @@ -736,7 +713,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
// PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run,
// we should run them concurrently to speed up the progress.
case <-ctx.Done():
log.Info("exit allocator daemon", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("exit allocator daemon")
return
}
}
Expand All @@ -763,19 +740,18 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
case <-ag.ctx.Done():
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
log.Info("exit the allocator update loop", zap.Uint32("keyspace-group-id", am.ksgID))
log.Info("exit the allocator update loop")
return
default:
}
if !ag.leadership.Check() {
log.Info("allocator doesn't campaign leadership yet",
zap.Uint32("keyspace-group-id", am.ksgID), zap.String("dc-location", ag.dcLocation))
log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
zap.Uint32("keyspace-group-id", am.ksgID), zap.String("dc-location", ag.dcLocation), errs.ZapError(err))
zap.String("dc-location", ag.dcLocation), errs.ZapError(err))
am.ResetAllocatorGroup(ag.dcLocation)
return
}
Expand Down Expand Up @@ -818,8 +794,7 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
}
newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()
if err != nil {
log.Error("get cluster dc-locations from etcd failed",
zap.Uint32("keyspace-group-id", am.ksgID), errs.ZapError(err))
log.Error("get cluster dc-locations from etcd failed", errs.ZapError(err))
return
}
am.mu.Lock()
Expand Down Expand Up @@ -850,9 +825,7 @@ func (am *AllocatorManager) ClusterDCLocationChecker() {
suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation)
if err != nil {
log.Warn("get or create the local tso suffix failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation),
errs.ZapError(err))
zap.String("dc-location", dcLocation), errs.ZapError(err))
continue
}
if suffix > am.mu.maxSuffix {
Expand Down Expand Up @@ -909,7 +882,6 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32,
}
if !txnResp.Succeeded {
log.Warn("write local tso suffix into etcd failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation),
zap.String("local-tso-suffix", localTSOSuffixValue),
zap.String("server-name", am.member.Name()),
Expand Down Expand Up @@ -990,14 +962,12 @@ func (am *AllocatorManager) PriorityChecker() {
// find this allocator's dc-location isn't the same with server of dc-2 but is same with itself.
if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation {
log.Info("try to move the local tso allocator",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
zap.String("next-dc-location", myServerDCLocation))
if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil {
log.Error("move the local tso allocator failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Uint64("old-leader-id", leaderServerID),
zap.String("old-dc-location", leaderServerDCLocation),
zap.Uint64("next-leader-id", serverID),
Expand All @@ -1014,16 +984,12 @@ func (am *AllocatorManager) PriorityChecker() {
nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation)
if err != nil {
log.Error("get next leader from etcd failed",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", allocatorGroup.dcLocation),
errs.ZapError(err))
zap.String("dc-location", allocatorGroup.dcLocation), errs.ZapError(err))
continue
}
// nextLeader is not empty and isn't same with the server ID, resign the leader
if nextLeader != 0 && nextLeader != serverID {
log.Info("next leader key found, resign current leader",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.Uint64("nextLeaderID", nextLeader))
log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader))
am.ResetAllocatorGroup(allocatorGroup.dcLocation)
}
}
Expand Down Expand Up @@ -1298,7 +1264,6 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) {
if _, ok := am.localAllocatorConn.clientConns[addr]; ok {
newConn.Close()
log.Debug("use old connection",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("target", newConn.Target()),
zap.String("state", newConn.GetState().String()))
return
Expand All @@ -1316,7 +1281,6 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause()
log.Error("failed to grant the lease of the next leader key",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation),
zap.Uint64("serverID", serverID),
errs.ZapError(err))
Expand All @@ -1329,15 +1293,12 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u
if err != nil {
err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
log.Error("failed to write next leader key into etcd",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID),
errs.ZapError(err))
return err
}
if !resp.Succeeded {
log.Warn("write next leader id into etcd unsuccessfully",
zap.Uint32("keyspace-group-id", am.ksgID),
zap.String("dc-location", dcLocation))
log.Warn("write next leader id into etcd unsuccessfully", zap.String("dc-location", dcLocation))
return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully")
}
return nil
Expand Down
Loading

0 comments on commit 6956f69

Please sign in to comment.