Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use context logger as much as possible #908

Merged
merged 5 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func sendBatchRequest(
select {
case batchConn.batchCommandsCh <- entry:
case <-ctx.Done():
logutil.BgLogger().Debug("send request is cancelled",
logutil.Logger(ctx).Debug("send request is cancelled",
zap.String("to", addr), zap.String("cause", ctx.Err().Error()))
return nil, errors.WithStack(ctx.Err())
case <-timer.C:
Expand All @@ -797,7 +797,7 @@ func sendBatchRequest(
return tikvrpc.FromBatchCommandsResponse(res)
case <-ctx.Done():
atomic.StoreInt32(&entry.canceled, 1)
logutil.BgLogger().Debug("wait response is cancelled",
logutil.Logger(ctx).Debug("wait response is cancelled",
zap.String("to", addr), zap.String("cause", ctx.Err().Error()))
return nil, errors.WithStack(ctx.Err())
case <-timer.C:
Expand Down
6 changes: 3 additions & 3 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
return nil, nil
Expand Down Expand Up @@ -905,7 +905,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
// TiFlash will always try to find out a valid peer, avoiding to retry too many times.
Expand Down Expand Up @@ -1959,7 +1959,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
(meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer ||
meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) {
err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions)
logutil.BgLogger().Info("region epoch is ahead of tikv", zap.Error(err))
logutil.Logger(bo.GetCtx()).Info("region epoch is ahead of tikv", zap.Error(err))
return true, bo.Backoff(retry.BoRegionMiss, err)
}
}
Expand Down
36 changes: 18 additions & 18 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
// If we don't cancel, but the error code is Canceled, it must be from grpc remote.
// This may happen when tikv is killed and exiting.
// Backoff and retry in this case.
logutil.BgLogger().Warn("receive a grpc cancel signal from remote", zap.Error(err))
logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal from remote", zap.Error(err))
}
}

Expand Down Expand Up @@ -1776,7 +1776,7 @@ func (s *RegionRequestSender) onRegionError(

if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `NotLeader` retry later",
zap.String("notLeader", notLeader.String()),
zap.String("ctx", ctx.String()),
Expand Down Expand Up @@ -1817,7 +1817,7 @@ func (s *RegionRequestSender) onRegionError(

if regionErr.GetRecoveryInProgress() != nil {
s.regionCache.InvalidateCachedRegion(ctx.Region)
logutil.BgLogger().Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoRegionRecoveryInProgress, errors.Errorf("region recovery in progress, ctx: %v", ctx))
if err != nil {
return false, err
Expand All @@ -1827,7 +1827,7 @@ func (s *RegionRequestSender) onRegionError(

if regionErr.GetIsWitness() != nil {
s.regionCache.InvalidateCachedRegion(ctx.Region)
logutil.BgLogger().Debug("tikv reports `IsWitness`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Debug("tikv reports `IsWitness`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoIsWitness, errors.Errorf("is witness, ctx: %v", ctx))
if err != nil {
return false, err
Expand All @@ -1839,7 +1839,7 @@ func (s *RegionRequestSender) onRegionError(
// if a request meets the FlashbackInProgress error, it should stop retrying immediately
// to avoid unnecessary backoff and potential unexpected data status to the user.
if flashbackInProgress := regionErr.GetFlashbackInProgress(); flashbackInProgress != nil {
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `FlashbackInProgress`",
zap.Stringer("req", req),
zap.Stringer("ctx", ctx),
Expand Down Expand Up @@ -1868,7 +1868,7 @@ func (s *RegionRequestSender) onRegionError(
// prepared for the flashback before, it should stop retrying immediately to avoid
// unnecessary backoff.
if regionErr.GetFlashbackNotPrepared() != nil {
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `FlashbackNotPrepared`",
zap.Stringer("req", req),
zap.Stringer("ctx", ctx),
Expand All @@ -1886,13 +1886,13 @@ func (s *RegionRequestSender) onRegionError(
}

if regionErr.GetKeyNotInRegion() != nil {
logutil.BgLogger().Error("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Error("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx))
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}

if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `EpochNotMatch` retry later",
zap.Stringer("EpochNotMatch", epochNotMatch),
zap.Stringer("ctx", ctx),
Expand All @@ -1908,7 +1908,7 @@ func (s *RegionRequestSender) onRegionError(
if s.replicaSelector != nil {
return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy)
}
logutil.BgLogger().Warn(
logutil.Logger(bo.GetCtx()).Warn(
"tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx),
Expand All @@ -1928,7 +1928,7 @@ func (s *RegionRequestSender) onRegionError(
// We can't know whether the request is committed or not, so it's an undetermined error too,
// but we don't handle it now.
if regionErr.GetStaleCommand() != nil {
logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
if s.replicaSelector != nil {
// Needn't backoff because the new leader should be elected soon
// and the replicaSelector will try the next peer.
Expand All @@ -1943,7 +1943,7 @@ func (s *RegionRequestSender) onRegionError(

if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
// store not match
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `StoreNotMatch` retry later",
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx),
Expand All @@ -1957,12 +1957,12 @@ func (s *RegionRequestSender) onRegionError(
}

if regionErr.GetRaftEntryTooLarge() != nil {
logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
return false, errors.New(regionErr.String())
}

if regionErr.GetMaxTimestampNotSynced() != nil {
logutil.BgLogger().Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx))
if err != nil {
return false, err
Expand All @@ -1972,7 +1972,7 @@ func (s *RegionRequestSender) onRegionError(

// A read request may be sent to a peer which has not been initialized yet, we should retry in this case.
if regionErr.GetRegionNotInitialized() != nil {
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `RegionNotInitialized` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
Expand All @@ -1987,7 +1987,7 @@ func (s *RegionRequestSender) onRegionError(

// The read-index can't be handled timely because the region is splitting or merging.
if regionErr.GetReadIndexNotReady() != nil {
logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports `ReadIndexNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
Expand All @@ -2002,7 +2002,7 @@ func (s *RegionRequestSender) onRegionError(
}

if regionErr.GetProposalInMergingMode() != nil {
logutil.BgLogger().Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx))
logutil.Logger(bo.GetCtx()).Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx))
// The region is merging and it can't provide service until merge finished, so backoff.
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("region is merging, ctx: %v", ctx))
if err != nil {
Expand All @@ -2015,7 +2015,7 @@ func (s *RegionRequestSender) onRegionError(
// This error is specific to stale read and the target replica is randomly selected. If the request is sent
// to the leader, the data must be ready, so we don't backoff here.
if regionErr.GetDataIsNotReady() != nil {
logutil.BgLogger().Warn(
logutil.Logger(bo.GetCtx()).Warn(
"tikv reports `DataIsNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
Expand All @@ -2033,7 +2033,7 @@ func (s *RegionRequestSender) onRegionError(
return true, nil
}

logutil.BgLogger().Debug(
logutil.Logger(bo.GetCtx()).Debug(
"tikv reports region failed",
zap.Stringer("regionErr", regionErr),
zap.Stringer("ctx", ctx),
Expand Down
4 changes: 2 additions & 2 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) er
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error {
if strings.Contains(err.Error(), tikverr.MismatchClusterID) {
logutil.BgLogger().Fatal("critical error", zap.Error(err))
logutil.Logger(b.ctx).Fatal("critical error", zap.Error(err))
}
select {
case <-b.ctx.Done():
Expand Down Expand Up @@ -169,7 +169,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
returnedErr = longestSleepCfg.err
}
logutil.BgLogger().Warn(errMsg)
logutil.Logger(b.ctx).Warn(errMsg)
// Use the backoff type that contributes most to the timeout to generate a MySQL error.
return errors.WithStack(returnedErr)
}
Expand Down