diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index bee442fef7bc5..ca394fcd52431 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -337,6 +337,7 @@ go_test( flaky = True, shard_count = 50, deps = [ +<<<<<<< HEAD:executor/BUILD.bazel "//config", "//ddl", "//ddl/placement", @@ -418,6 +419,89 @@ go_test( "//util/timeutil", "//util/topsql/state", "@com_github_golang_protobuf//proto", +======= + "//pkg/config", + "//pkg/ddl", + "//pkg/ddl/placement", + "//pkg/ddl/util", + "//pkg/distsql", + "//pkg/domain", + "//pkg/domain/infosync", + "//pkg/errno", + "//pkg/executor/aggfuncs", + "//pkg/executor/aggregate", + "//pkg/executor/importer", + "//pkg/executor/internal/builder", + "//pkg/executor/internal/exec", + "//pkg/expression", + "//pkg/expression/aggregation", + "//pkg/infoschema", + "//pkg/kv", + "//pkg/meta", + "//pkg/meta/autoid", + "//pkg/metrics", + "//pkg/parser", + "//pkg/parser/ast", + "//pkg/parser/auth", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/planner", + "//pkg/planner/core", + "//pkg/planner/property", + "//pkg/planner/util", + "//pkg/server", + "//pkg/session", + "//pkg/sessionctx", + "//pkg/sessionctx/binloginfo", + "//pkg/sessionctx/stmtctx", + "//pkg/sessionctx/variable", + "//pkg/sessiontxn", + "//pkg/sessiontxn/staleread", + "//pkg/statistics", + "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/util", + "//pkg/store/copr", + "//pkg/store/driver/error", + "//pkg/store/helper", + "//pkg/store/mockstore", + "//pkg/store/mockstore/unistore", + "//pkg/table/tables", + "//pkg/tablecodec", + "//pkg/testkit", + "//pkg/testkit/external", + "//pkg/testkit/testdata", + "//pkg/testkit/testmain", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util", + "//pkg/util/benchdaily", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/dbterror", + "//pkg/util/dbterror/exeerrors", + "//pkg/util/deadlockhistory", + "//pkg/util/disk", + "//pkg/util/execdetails", + "//pkg/util/gcutil", + "//pkg/util/globalconn", + "//pkg/util/hack", + "//pkg/util/logutil", + "//pkg/util/memory", + "//pkg/util/mock", + "//pkg/util/paging", + "//pkg/util/pdapi", + "//pkg/util/plancodec", + "//pkg/util/ranger", + "//pkg/util/sem", + "//pkg/util/set", + "//pkg/util/stmtsummary/v2:stmtsummary", + "//pkg/util/stringutil", + "//pkg/util/syncutil", + "//pkg/util/tableutil", + "//pkg/util/topsql/state", +>>>>>>> 084717902b8 (*: handle region error for GetMvccByEncodedKey API (#47811)):pkg/executor/BUILD.bazel "@com_github_gorilla_mux//:mux", "@com_github_jarcoal_httpmock//:httpmock", "@com_github_pingcap_errors//:errors", diff --git a/executor/executor_failpoint_test.go b/executor/executor_failpoint_test.go index 0e72dccd1e15c..c7f9ff3a04a14 100644 --- a/executor/executor_failpoint_test.go +++ b/executor/executor_failpoint_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" +<<<<<<< HEAD:executor/executor_failpoint_test.go "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/executor" @@ -33,6 +34,18 @@ import ( "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/deadlockhistory" +======= + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/store/copr" + "github.com/pingcap/tidb/pkg/store/helper" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/pingcap/tidb/pkg/util/deadlockhistory" +>>>>>>> 084717902b8 (*: handle region error for GetMvccByEncodedKey API (#47811)):pkg/executor/executor_failpoint_test.go "github.com/stretchr/testify/require" ) @@ -555,3 +568,101 @@ func TestDeadlocksTable(t *testing.T) { id2+"/2022-06-11 02:03:04.987654/1/203/////201", )) } +<<<<<<< HEAD:executor/executor_failpoint_test.go +======= + +func TestTiKVClientReadTimeout(t *testing.T) { + if *testkit.WithTiKV != "" { + t.Skip("skip test since it's only work for unistore") + } + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int primary key, b int)") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCDeadlineExceeded", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCDeadlineExceeded")) + }() + // Test for point_get request + rows := tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where a = 1").Rows() + require.Len(t, rows, 1) + explain := fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain) + + // Test for batch_point_get request + rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where a in (1,2)").Rows() + require.Len(t, rows, 1) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain) + + // Test for cop request + rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where b > 1").Rows() + require.Len(t, rows, 3) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain) + + // Test for stale read. + tk.MustExec("set @a=now(6);") + tk.MustExec("set @@tidb_replica_read='closest-replicas';") + rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@a) where b > 1").Rows() + require.Len(t, rows, 3) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain) + + // Test for tikv_client_read_timeout session variable. + tk.MustExec("set @@tikv_client_read_timeout=1;") + // Test for point_get request + rows = tk.MustQuery("explain analyze select * from t where a = 1").Rows() + require.Len(t, rows, 1) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain) + + // Test for batch_point_get request + rows = tk.MustQuery("explain analyze select * from t where a in (1,2)").Rows() + require.Len(t, rows, 1) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain) + + // Test for cop request + rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows() + require.Len(t, rows, 3) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain) + + // Test for stale read. + tk.MustExec("set @a=now(6);") + tk.MustExec("set @@tidb_replica_read='closest-replicas';") + rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows() + require.Len(t, rows, 3) + explain = fmt.Sprintf("%v", rows[0]) + require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain) +} + +func TestGetMvccByEncodedKeyRegionError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + h := helper.NewHelper(store.(helper.Storage)) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + schemaVersion := tk.Session().GetDomainInfoSchema().SchemaMetaVersion() + key := m.EncodeSchemaDiffKey(schemaVersion) + + resp, err := h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Less(t, uint64(0), resp.Info.Writes[0].CommitTs) + commitTs := resp.Info.Writes[0].CommitTs + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch", "2*return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/epochNotMatch")) + }() + resp, err = h.GetMvccByEncodedKey(key) + require.NoError(t, err) + require.NotNil(t, resp.Info) + require.Equal(t, 1, len(resp.Info.Writes)) + require.Equal(t, commitTs, resp.Info.Writes[0].CommitTs) +} +>>>>>>> 084717902b8 (*: handle region error for GetMvccByEncodedKey API (#47811)):pkg/executor/executor_failpoint_test.go diff --git a/store/helper/helper.go b/store/helper/helper.go index 1b077dcec8843..d7a7ab6a2a8f1 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -96,23 +96,45 @@ func NewHelper(store Storage) *Helper { // GetMvccByEncodedKey get the MVCC value by the specific encoded key. func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey) - if err != nil { - return nil, derr.ToTiDBErr(err) - } - + bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey}) - kvResp, err := h.Store.SendReq(tikv.NewBackofferWithVars(context.Background(), 500, nil), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.BgLogger().Info("get MVCC by encoded key failed", - zap.Stringer("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Stringer("keyLocation", keyLocation), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) + for { + keyLocation, err := h.RegionCache.LocateKey(bo, encodedKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + kvResp, err := h.Store.SendReq(bo, tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + regionErr, err := kvResp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + if err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())); err != nil { + return nil, err + } + continue + } + mvccResp := kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse) + if errMsg := mvccResp.GetError(); errMsg != "" { + logutil.BgLogger().Info("get MVCC by encoded key failed", + zap.Stringer("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Stringer("keyLocation", keyLocation), + zap.Reflect("kvResp", kvResp), + zap.String("error", errMsg)) + return nil, errors.New(errMsg) + } + return mvccResp, nil } - return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil } // MvccKV wraps the key's mvcc info in tikv. diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index b0e88e533582d..c52a49e3bbc9b 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -66,6 +66,11 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) } }) + failpoint.Inject("epochNotMatch", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})) + } + }) failpoint.Inject("unistoreRPCClientSendHook", func(val failpoint.Value) { if val.(bool) && UnistoreRPCClientSendHook != nil {