From e024216dc3891113161229f7a27d8fa37fa358a8 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 19 Oct 2023 19:38:05 +0800 Subject: [PATCH 1/5] init Signed-off-by: crazycs520 --- pkg/executor/executor_failpoint_test.go | 23 +++++++++++++ pkg/store/helper/helper.go | 43 ++++++++++++++++--------- pkg/store/mockstore/unistore/rpc.go | 5 +++ 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/pkg/executor/executor_failpoint_test.go b/pkg/executor/executor_failpoint_test.go index 16e10453c2c1b..d01808e3cc9ad 100644 --- a/pkg/executor/executor_failpoint_test.go +++ b/pkg/executor/executor_failpoint_test.go @@ -27,9 +27,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/store/copr" + "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/deadlockhistory" @@ -622,3 +624,24 @@ func TestTiKVClientReadTimeout(t *testing.T) { explain = fmt.Sprintf("%v", rows[0]) require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain) } + +func TestGetMvccByEncodedKeyRegionError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + h := helper.NewHelper(store.(helper.Storage)) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + schemaVersion := tk.Session().GetDomainInfoSchema().SchemaMetaVersion() + key := m.EncodeSchemaDiffKey(schemaVersion) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch", "11*return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch")) + }() + resp, err := h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Less(t, uint64(0), resp.Info.Writes[0].CommitTs) +} diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 370262edb0a6b..b29b66db82ddf 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -98,23 +98,36 @@ func NewHelper(store Storage) *Helper { // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) - if err != nil { - return nil, derr.ToTiDBErr(err) - } + bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } - tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) - kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", - zap.Stringer("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Stringer("keyLocation", keyLocation), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) + tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } // MvccKV wraps the key's mvcc info in tikv. diff --git a/pkg/store/mockstore/unistore/rpc.go b/pkg/store/mockstore/unistore/rpc.go index ccbeb28748a44..a564e0f2225f9 100644 --- a/pkg/store/mockstore/unistore/rpc.go +++ b/pkg/store/mockstore/unistore/rpc.go @@ -66,6 +66,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) } }) + failpoint.Inject("epochNotMatch", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})) + } + }) failpoint.Inject("unistoreRPCClientSendHook", func(val failpoint.Value) { if fn := UnistoreRPCClientSendHook.Load(); val.(bool) && fn != nil { From 55754b06ddf05d1cc4dc4e81ecb1619453c5eda0 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 19 Oct 2023 20:51:53 +0800 Subject: [PATCH 2/5] fix test Signed-off-by: crazycs520 --- pkg/executor/executor_failpoint_test.go | 4 ++-- pkg/store/helper/helper.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/executor/executor_failpoint_test.go b/pkg/executor/executor_failpoint_test.go index d01808e3cc9ad..a2079a856620a 100644 --- a/pkg/executor/executor_failpoint_test.go +++ b/pkg/executor/executor_failpoint_test.go @@ -635,9 +635,9 @@ func TestGetMvccByEncodedKeyRegionError(t *testing.T) { schemaVersion := tk.Session().GetDomainInfoSchema().SchemaMetaVersion() key := m.EncodeSchemaDiffKey(schemaVersion) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch", "11*return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch", "2*return(true)")) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/epochNotMatch")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch")) }() resp, err := h.GetMvccByEncodedKey(key) require.NoError(t, err) diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index b29b66db82ddf..bf9a7cb6d4e55 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -98,7 +98,7 @@ func NewHelper(store Storage) *Helper { // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) + bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) for { keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) if err != nil { From 4f4ab6c1fd06c9e563f3046dd837193c0232b6ee Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 19 Oct 2023 21:03:02 +0800 Subject: [PATCH 3/5] make bazel_prepare Signed-off-by: crazycs520 --- pkg/executor/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 5657d3d1cbab5..ec14ab06e530f 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -385,6 +385,7 @@ go_test( "//pkg/expression/aggregation", "//pkg/infoschema", "//pkg/kv", + "//pkg/meta", "//pkg/meta/autoid", "//pkg/metrics", "//pkg/parser", From a9d2a416af545b71aa5029a449ae52bddcee0373 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 19 Oct 2023 22:12:08 +0800 Subject: [PATCH 4/5] refine test Signed-off-by: crazycs520 --- pkg/executor/executor_failpoint_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/executor/executor_failpoint_test.go b/pkg/executor/executor_failpoint_test.go index a2079a856620a..01b4ff86e1469 100644 --- a/pkg/executor/executor_failpoint_test.go +++ b/pkg/executor/executor_failpoint_test.go @@ -635,13 +635,20 @@ func TestGetMvccByEncodedKeyRegionError(t *testing.T) { schemaVersion := tk.Session().GetDomainInfoSchema().SchemaMetaVersion() key := m.EncodeSchemaDiffKey(schemaVersion) + resp, err := h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Less(t, uint64(0), resp.Info.Writes[0].CommitTs) + commitTs := resp.Info.Writes[0].CommitTs + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch", "2*return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch")) }() - resp, err := h.GetMvccByEncodedKey(key) + resp, err = h.GetMvccByEncodedKey(key) require.NoError(t, err) require.NotNil(t, resp.Info) require.Equal(t, 1, len(resp.Info.Writes)) - require.Less(t, uint64(0), resp.Info.Writes[0].CommitTs) + require.Equal(t, commitTs, resp.Info.Writes[0].CommitTs) } From b44609ae64938e104d649420e3a77a9d0c48f7ef Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 19 Oct 2023 22:31:39 +0800 Subject: [PATCH 5/5] address comment Signed-off-by: crazycs520 --- pkg/store/helper/helper.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index bf9a7cb6d4e55..18c65a15c92d5 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -99,13 +99,12 @@ func NewHelper(store Storage) *Helper { // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) + tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) for { keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) if err != nil { return nil, derr.ToTiDBErr(err) } - - tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) if err != nil { logutil.BgLogger().Info("get MVCC by encoded key failed", @@ -126,7 +125,17 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe } continue } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + return mvccResp, nil } }