Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: support batch coprocessor requests by store #39525

Merged
merged 18 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3519,8 +3519,9 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=",
version = "v2.0.3-0.20221129032117-857772dd0907",
replace = "github.com/you06/client-go/v2",
sum = "h1:iuZuGKaf/YT2yiKLaxF/C5QMISyoN5GnXRQCQdOtIy4=",
version = "v2.0.0-alpha.0.20221201041411-ea6107377f62",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
return builder
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,5 @@ replace (
github.com/pingcap/tidb/parser => ./parser
go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac
)

replace github.com/tikv/client-go/v2 => github.com/you06/client-go/v2 v2.0.0-alpha.0.20221201041411-ea6107377f62
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -930,8 +930,6 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907 h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=
github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down Expand Up @@ -977,6 +975,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a74
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI=
github.com/you06/client-go/v2 v2.0.0-alpha.0.20221201041411-ea6107377f62 h1:iuZuGKaf/YT2yiKLaxF/C5QMISyoN5GnXRQCQdOtIy4=
github.com/you06/client-go/v2 v2.0.0-alpha.0.20221201041411-ea6107377f62/go.mod h1:GyiI5+HcgUS7xb2fxcIyRBz2NdP6E2vgildKxEsa+Aw=
github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ type Request struct {
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,9 @@ type SessionVars struct {

// EnablePlanReplayerCapture indicates whether enabled plan replayer capture
EnablePlanReplayerCapture bool

// StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch.
StoreBatchSize int
}

// GetNewChunkWithCapacity Attempt to request memory from the chunk pool
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,13 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 1024, SetSession: func(s *SessionVars, val string) error {
you06 marked this conversation as resolved.
Show resolved Hide resolved
s.StoreBatchSize = TidbOptInt(val, DefTiDBStoreBatchSize)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ const (
TiDBEnablePlanReplayerCapture = "tidb_enable_plan_replayer_capture"
// TiDBEnableReusechunk indicates whether to enable chunk alloc
TiDBEnableReusechunk = "tidb_enable_reuse_chunk"

// TiDBStoreBatchSize indicates the batch size of coprocessor in the same store.
TiDBStoreBatchSize = "tidb_store_batch_size"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1108,6 +1111,7 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBStoreBatchSize = 0
)

// Process global variables.
Expand Down
47 changes: 47 additions & 0 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,50 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, smallConc, 0)
require.Equal(t, rateLimit.GetCapacity(), 4)
}

func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t"))
}),
)
require.NoError(t, err)
defer require.NoError(t, store.Close())
copClient := store.GetClient().(*copr.CopClient)
ctx := context.Background()
killed := uint32(0)
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks := it.GetTasks()
require.Equal(t, len(tasks), 2)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, tasks[0].RowCountHint, 5)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)

req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 3,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 1)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3)
require.Equal(t, tasks[0].RowCountHint, 14)
}
Loading