From 5633957aee0cf31473b565a0a4a311b2a717a3ca Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 8 Nov 2023 16:50:13 +0800 Subject: [PATCH] This is an automated cherry-pick of #48330 Signed-off-by: ti-chi-bot --- pkg/domain/domain.go | 12 ++--- pkg/store/helper/helper.go | 108 +++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 6 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index defdc7fe05537..fb82c5d0b69ba 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -228,7 +228,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, 0, nil, err } // fetch the commit timestamp of the schema diff - schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion) + schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS) if err != nil { logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion)) schemaTs = 0 @@ -303,18 +303,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i } // Returns the timestamp of a schema version, which is the commit timestamp of the schema diff -func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) { +func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) { tikvStore, ok := do.Store().(helper.Storage) if ok { - helper := helper.NewHelper(tikvStore) - data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version)) + newHelper := helper.NewHelper(tikvStore) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS) if err != nil { return 0, err } - if data == nil || data.Info == nil || len(data.Info.Writes) == 0 { + if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 { return 0, errors.Errorf("There is no Write MVCC info for the schema version") } - return int64(data.Info.Writes[0].CommitTs), nil + return int64(mvccResp.Info.Writes[0].CommitTs), nil } return 0, errors.Errorf("cannot get store from domain") } diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 370262edb0a6b..aa05ea764c3a7 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -96,11 +96,114 @@ func NewHelper(store Storage) *Helper { } } +<<<<<<< HEAD // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) if err != nil { return nil, derr.ToTiDBErr(err) +======= +// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. +const MaxBackoffTimeoutForMvccGet = 5000 + +// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved. +func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) { + bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil) + tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Warn("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + if mvccResp.Info == nil { + errMsg := "Invalid mvcc response result, the info field is nil" + logutil.BgLogger().Warn(errMsg, + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp)) + return nil, errors.New(errMsg) + } + + // Try to resolve the lock and retry mvcc get again if the input startTS is a valid value. + if startTS > 0 && mvccResp.Info.GetLock() != nil { + latestTS, err := h.Store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + logutil.BgLogger().Warn("Failed to get latest ts", zap.Error(err)) + return nil, err + } + if startTS > latestTS { + errMsg := fmt.Sprintf("Snapshot ts=%v is larger than latest allocated ts=%v, lock could not be resolved", + startTS, latestTS) + logutil.BgLogger().Warn(errMsg) + return nil, errors.New(errMsg) + } + lockInfo := mvccResp.Info.GetLock() + lock := &txnlock.Lock{ + Key: []byte(encodedKey), + Primary: lockInfo.GetPrimary(), + TxnID: lockInfo.GetStartTs(), + TTL: lockInfo.GetTtl(), + TxnSize: lockInfo.GetTxnSize(), + LockType: lockInfo.GetType(), + UseAsyncCommit: lockInfo.GetUseAsyncCommit(), + LockForUpdateTS: lockInfo.GetForUpdateTs(), + } + // Disable for read to avoid async resolve. + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: startTS, + Locks: []*txnlock.Lock{lock}, + Lite: true, + ForRead: false, + Detail: nil, + } + resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts) + if err != nil { + return nil, err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired), + errors.Errorf("resolve lock fails lock: %v", lock)); err != nil { + return nil, err + } + } + continue + } + return mvccResp, nil +>>>>>>> 64e5ea06226 (domain: add resolve lock logic for mvcc get key loading schema diff (#48330)) } tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) @@ -117,6 +220,11 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + return h.GetMvccByEncodedKeyWithTS(encodedKey, 0) +} + // MvccKV wraps the key's mvcc info in tikv. type MvccKV struct { Key string `json:"key"`