Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: add resolve lock logic for mvcc get key loading schema diff (#48330) #48417

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, 0, nil, err
}
// fetch the commit timestamp of the schema diff
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion)
schemaTs, err := do.getTimestampForSchemaVersionWithNonEmptyDiff(m, neededSchemaVersion, startTS)
if err != nil {
logutil.BgLogger().Warn("failed to get schema version", zap.Error(err), zap.Int64("version", neededSchemaVersion))
schemaTs = 0
Expand Down Expand Up @@ -285,18 +285,18 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64) (int64, error) {
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
helper := helper.NewHelper(tikvStore)
data, err := helper.GetMvccByEncodedKey(m.EncodeSchemaDiffKey(version))
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(m.EncodeSchemaDiffKey(version), startTS)
if err != nil {
return 0, err
}
if data == nil || data.Info == nil || len(data.Info.Writes) == 0 {
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
return int64(mvccResp.Info.Writes[0].CommitTs), nil
}
return 0, errors.Errorf("cannot get store from domain")
}
Expand Down
75 changes: 70 additions & 5 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ func NewHelper(store Storage) *Helper {
}
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms.
const MaxBackoffTimeoutForMvccGet = 5000

// GetMvccByEncodedKeyWithTS get the MVCC value by the specific encoded key, if lock is encountered it would be resolved.
func (h *Helper) GetMvccByEncodedKeyWithTS(encodedKey kv.Key, startTS uint64) (*kvrpcpb.MvccGetByKeyResponse, error) {
bo := tikv.NewBackofferWithVars(context.Background(), MaxBackoffTimeoutForMvccGet, nil)
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey})
for {
keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey)
Expand All @@ -106,14 +109,15 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe
}
kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.BgLogger().Info("get MVCC by encoded key failed",
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}

regionErr, err := kvResp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -124,20 +128,81 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe
}
continue
}

mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse)
if errMsg := mvccResp.GetError(); errMsg != "" {
logutil.BgLogger().Info("get MVCC by encoded key failed",
logutil.BgLogger().Warn("get MVCC by encoded key failed",
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp),
zap.String("error", errMsg))
return nil, errors.New(errMsg)
}
if mvccResp.Info == nil {
errMsg := "Invalid mvcc response result, the info field is nil"
logutil.BgLogger().Warn(errMsg,
zap.Stringer("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Stringer("keyLocation", keyLocation),
zap.Reflect("kvResp", kvResp))
return nil, errors.New(errMsg)
}

// Try to resolve the lock and retry mvcc get again if the input startTS is a valid value.
if startTS > 0 && mvccResp.Info.GetLock() != nil {
latestTS, err := h.Store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
logutil.BgLogger().Warn("Failed to get latest ts", zap.Error(err))
return nil, err
}
if startTS > latestTS {
errMsg := fmt.Sprintf("Snapshot ts=%v is larger than latest allocated ts=%v, lock could not be resolved",
startTS, latestTS)
logutil.BgLogger().Warn(errMsg)
return nil, errors.New(errMsg)
}
lockInfo := mvccResp.Info.GetLock()
lock := &txnlock.Lock{
Key: []byte(encodedKey),
Primary: lockInfo.GetPrimary(),
TxnID: lockInfo.GetStartTs(),
TTL: lockInfo.GetTtl(),
TxnSize: lockInfo.GetTxnSize(),
LockType: lockInfo.GetType(),
UseAsyncCommit: lockInfo.GetUseAsyncCommit(),
LockForUpdateTS: lockInfo.GetForUpdateTs(),
}
// Disable for read to avoid async resolve.
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: startTS,
Locks: []*txnlock.Lock{lock},
Lite: true,
ForRead: false,
Detail: nil,
}
resolveLockRes, err := h.Store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLocksOpts)
if err != nil {
return nil, err
}
msBeforeExpired := resolveLockRes.TTL
if msBeforeExpired > 0 {
if err = bo.BackoffWithCfgAndMaxSleep(tikv.BoTxnLock(), int(msBeforeExpired),
errors.Errorf("resolve lock fails lock: %v", lock)); err != nil {
return nil, err
}
}
continue
}
return mvccResp, nil
}
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
return h.GetMvccByEncodedKeyWithTS(encodedKey, 0)
}

// MvccKV wraps the key's mvcc info in tikv.
type MvccKV struct {
Key string `json:"key"`
Expand Down