Skip to content

Commit

Permalink
kv request: configurable KV Timeout (#45601)
Browse files Browse the repository at this point in the history
close #45380
  • Loading branch information
hihihuhu authored Aug 11, 2023
1 parent e42e77c commit 0a5e0b3
Show file tree
Hide file tree
Showing 33 changed files with 1,075 additions and 734 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6924,13 +6924,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "ed4a6bacc74d58cca6eb30c8828a3c138c78895782b407e607dc5c13f3b338e7",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230809050315-300545a8a3c4",
sha256 = "608e5c393dcf7fa07a7a360333816dc479b05bad6ad489a4643c9a096e47f5d9",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230811033710-8a214402da13",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230811033710-8a214402da13.zip",
],
)
go_repository(
Expand Down
1 change: 1 addition & 0 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ func TestCaptureHints(t *testing.T) {
// runtime hints
{"select /*+ memory_quota(1024 MB) */ * from t", "memory_quota(1024 mb)"},
{"select /*+ max_execution_time(1000) */ * from t", "max_execution_time(1000)"},
{"select /*+ tidb_kv_read_timeout(1000) */ * from t", "tidb_kv_read_timeout(1000)"},
// storage hints
{"select /*+ read_from_storage(tikv[t]) */ * from t", "read_from_storage(tikv[`t`])"},
// others
Expand Down
10 changes: 7 additions & 3 deletions bindinfo/tests/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,11 @@ func TestRuntimeHintsInEvolveTasks(t *testing.T) {
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b), index idx_c(c))")

tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(5000) */ * from t where a >= 4 and b >= 1 and c = 0")
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(5000), TIDB_KV_READ_TIMEOUT(20) */ * from t where a >= 4 and b >= 1 and c = 0")
tk.MustExec("admin flush bindings")
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 2)
require.Equal(t, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), no_order_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0", rows[0][1])
require.Equal(t, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), no_order_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000), tidb_kv_read_timeout(20)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0", rows[0][1])
}

func TestDefaultSessionVars(t *testing.T) {
Expand Down Expand Up @@ -746,13 +746,15 @@ func TestStmtHints(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), TIDB_KV_READ_TIMEOUT(20), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustQuery("select * from t")
require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
require.Equal(t, uint64(20), tk.Session().GetSessionVars().StmtCtx.TidbKvReadTimeout)
tk.MustQuery("select a, b from t")
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.TidbKvReadTimeout)
}

func TestPrivileges(t *testing.T) {
Expand Down Expand Up @@ -1285,6 +1287,7 @@ func TestBindSQLDigest(t *testing.T) {
// runtime hints
{"select * from t", "select /*+ memory_quota(1024 MB) */ * from t"},
{"select * from t", "select /*+ max_execution_time(1000) */ * from t"},
{"select * from t", "select /*+ tidb_kv_read_timeout(1000) */ * from t"},
// storage hints
{"select * from t", "select /*+ read_from_storage(tikv[t]) */ * from t"},
// others
Expand Down Expand Up @@ -1346,6 +1349,7 @@ func TestDropBindBySQLDigest(t *testing.T) {
// runtime hints
{"select * from t", "select /*+ memory_quota(1024 MB) */ * from t"},
{"select * from t", "select /*+ max_execution_time(1000) */ * from t"},
{"select * from t", "select /*+ tidb_kv_read_timeout(1000) */ * from t"},
// storage hints
{"select * from t", "select /*+ read_from_storage(tikv[t]) */ * from t"},
// others
Expand Down
2 changes: 1 addition & 1 deletion distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
embed = [":distsql"],
flaky = True,
race = "on",
shard_count = 23,
shard_count = 24,
deps = [
"//domain/resourcegroup",
"//kv",
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.ResourceGroupName = sv.ResourceGroupName
builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold
builder.Request.RunawayChecker = sv.StmtCtx.RunawayChecker
builder.Request.TidbKvReadTimeout = sv.GetTidbKvReadTimeout()
return builder
}

Expand Down
27 changes: 27 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,33 @@ func TestRequestBuilder8(t *testing.T) {
require.Equal(t, expect, actual)
}

func TestRequestBuilderTidbKvReadTimeout(t *testing.T) {
sv := variable.NewSessionVars(nil)
sv.TidbKvReadTimeout = 100
actual, err := (&RequestBuilder{}).
SetFromSessionVars(sv).
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
KeyRanges: kv.NewNonParitionedKeyRanges(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
TidbKvReadTimeout: 100,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

func TestTableRangesToKVRangesWithFbs(t *testing.T) {
ranges := []*ranger.Range{
{
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,8 @@ func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPla
// some hints like 'memory_quota' cannot be extracted from the PhysicalPlan directly,
// so we have to iterate all hints from the customer and keep some other necessary hints.
switch tableHint.HintName.L {
case "memory_quota", "use_toja", "no_index_merge", "max_execution_time",
case plannercore.HintMemoryQuota, plannercore.HintUseToja, plannercore.HintNoIndexMerge,
plannercore.HintMaxExecutionTime, plannercore.HintTidbKvReadTimeout,
plannercore.HintIgnoreIndex, plannercore.HintReadFromStorage, plannercore.HintMerge,
plannercore.HintSemiJoinRewrite, plannercore.HintNoDecorrelate:
hints = append(hints, tableHint)
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,7 @@ func (b *executorBuilder) getSnapshot() (kv.Snapshot, error) {
replicaReadType := sessVars.GetReplicaRead()
snapshot.SetOption(kv.ReadReplicaScope, b.readReplicaScope)
snapshot.SetOption(kv.TaskID, sessVars.StmtCtx.TaskID)
snapshot.SetOption(kv.TidbKvReadTimeout, sessVars.GetTidbKvReadTimeout())
snapshot.SetOption(kv.ResourceGroupName, sessVars.ResourceGroupName)
snapshot.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)

Expand Down
67 changes: 67 additions & 0 deletions executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,70 @@ func TestDeadlocksTable(t *testing.T) {
id2+"/2022-06-11 02:03:04.987654/1/203/<nil>/<nil>/<nil>/<nil>/201",
))
}

func TestTidbKvReadTimeout(t *testing.T) {
if *testkit.WithTiKV != "" {
t.Skip("skip test since it's only work for unistore")
}
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key, b int)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCDeadlineExceeded", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCDeadlineExceeded"))
}()
// Test for point_get request
rows := tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where a = 1").Rows()
require.Len(t, rows, 1)
explain := fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain)

// Test for batch_point_get request
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where a in (1,2)").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain)

// Test for cop request
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for tidb_kv_read_timeout session variable.
tk.MustExec("set @@tidb_kv_read_timeout=1;")
// Test for point_get request
rows = tk.MustQuery("explain analyze select * from t where a = 1").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain)

// Test for batch_point_get request
rows = tk.MustQuery("explain analyze select * from t where a in (1,2)").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain)

// Test for cop request
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20230809050315-300545a8a3c4
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20230809050315-300545a8a3c4 h1:FLvTsmiMyuk+4y+5ZZ3wllikEGQjom9HEewFU1BoVIc=
github.com/tikv/client-go/v2 v2.0.8-0.20230809050315-300545a8a3c4/go.mod h1:J17iHkj8buCLDF7lgKJLX5jq5aGozrbpa7+Ln6g8Xjc=
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13 h1:oTAPyrDR5UFVhg4SYmHNQ1gHQrwQfBjGGK/zKbz8VcA=
github.com/tikv/client-go/v2 v2.0.8-0.20230811033710-8a214402da13/go.mod h1:J17iHkj8buCLDF7lgKJLX5jq5aGozrbpa7+Ln6g8Xjc=
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842 h1:TwjBJvRx/DJbgMt7Vk5cFO7tG1DZnxR+22S2VmaNGRw=
github.com/tikv/pd/client v0.0.0-20230728033905-31343e006842/go.mod h1:VJwM+qMcQxvGgyu9C6wU7fhjLaALs+odsOvpUMmnhHo=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ type Request struct {
LimitSize uint64
// StoreBusyThreshold is the threshold for the store to return ServerIsBusy
StoreBusyThreshold time.Duration
// TidbKvReadTimeout is the timeout of kv read request
TidbKvReadTimeout uint64

RunawayChecker *resourcegroup.RunawayChecker

Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ const (
ResourceGroupName
// LoadBasedReplicaReadThreshold sets the TiKV wait duration threshold of enabling replica read automatically.
LoadBasedReplicaReadThreshold
// TidbKvReadTimeout sets the timeout value for readonly kv request in milliseconds
TidbKvReadTimeout
)

// ReplicaReadType is the type of replica to read data from
Expand Down
2 changes: 2 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3682,6 +3682,8 @@ func (n *TableOptimizerHint) Restore(ctx *format.RestoreCtx) error {
switch n.HintName.L {
case "max_execution_time":
ctx.WritePlainf("%d", n.HintData.(uint64))
case "tidb_kv_read_timeout":
ctx.WritePlainf("%d", n.HintData.(uint64))
case "resource_group":
ctx.WriteName(n.HintData.(string))
case "nth_plan":
Expand Down
2 changes: 2 additions & 0 deletions parser/ast/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func TestTableOptimizerHintRestore(t *testing.T) {
{"LEADING(t1@sel1, c1, t2)", "LEADING(`t1`@`sel1`, `c1`, `t2`)"},
{"MAX_EXECUTION_TIME(3000)", "MAX_EXECUTION_TIME(3000)"},
{"MAX_EXECUTION_TIME(@sel1 3000)", "MAX_EXECUTION_TIME(@`sel1` 3000)"},
{"TIDB_KV_READ_TIMEOUT(3000)", "TIDB_KV_READ_TIMEOUT(3000)"},
{"TIDB_KV_READ_TIMEOUT(@sel1 3000)", "TIDB_KV_READ_TIMEOUT(@`sel1` 3000)"},
{"USE_INDEX_MERGE(t1 c1)", "USE_INDEX_MERGE(`t1` `c1`)"},
{"USE_INDEX_MERGE(@sel1 t1 c1)", "USE_INDEX_MERGE(@`sel1` `t1` `c1`)"},
{"USE_INDEX_MERGE(t1@sel1 c1)", "USE_INDEX_MERGE(`t1`@`sel1` `c1`)"},
Expand Down
Loading

0 comments on commit 0a5e0b3

Please sign in to comment.