Skip to content

Commit

Permalink
Merge branch 'master' into planDIgest
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 committed Dec 1, 2022
2 parents afc6d18 + 9d9eaca commit d46a192
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 36 deletions.
3 changes: 3 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,9 @@ func (w *addIndexWorker) checkHandleExists(key kv.Key, value []byte, handle kv.H
if err != nil {
str = string(val)
}
if types.IsBinaryStr(colInfos[i].Ft) || types.IsTypeBit(colInfos[i].Ft) {
str = util.FmtNonASCIIPrintableCharToHex(str)
}
valueStr = append(valueStr, str)
}
return kv.ErrKeyExists.FastGenByArgs(strings.Join(valueStr, "-"), indexName)
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
return builder
}

Expand Down
16 changes: 16 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,22 @@ func TestInsertErrorMsg(t *testing.T) {
tk.MustExec(`create table t (a int primary key, b datetime, d date)`)
tk.MustContainErrMsg(`insert into t values (1, '2019-02-11 30:00:00', '2019-01-31')`,
"Incorrect datetime value: '2019-02-11 30:00:00' for column 'b' at row 1")

// test for Issue #35289
tk.MustExec("CREATE TABLE t1 (a BINARY(16) PRIMARY KEY);")
tk.MustExec(`INSERT INTO t1 VALUES (AES_ENCRYPT('a','a'));`)
err := tk.ExecToErr(`INSERT INTO t1 VALUES (AES_ENCRYPT('a','a'));`)
require.Error(t, err, `ERROR 1062 (23000): Duplicate entry '{ W]\xA1\x06u\x9D\xBD\xB1\xA3.\xE2\xD9\xA7t' for key 't1.PRIMARY'`)

tk.MustExec(`INSERT INTO t1 VALUES (AES_ENCRYPT('b','b'));`)
err = tk.ExecToErr(`INSERT INTO t1 VALUES (AES_ENCRYPT('b','b'));`)
require.Error(t, err, "ERROR 1062 (23000): Duplicate entry '\\x0C\\x1E\\x8DG`\\xEB\\x93 F&BC\\xF0\\xB5\\xF4\\xB7' for key 't1.PRIMARY'")

tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a bit primary key) engine=innodb;")
tk.MustExec("insert into t1 values (b'0');")
err = tk.ExecToErr(`insert into t1 values (b'0');`)
require.Error(t, err, `ERROR 1062 (23000): Duplicate entry '\x00' for key 't1.PRIMARY'`)
}

func TestIssue16366(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ type Request struct {
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,9 @@ type SessionVars struct {

// EnablePlanReplayerCapture indicates whether enabled plan replayer capture
EnablePlanReplayerCapture bool

// StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch.
StoreBatchSize int
}

// GetNewChunkWithCapacity Attempt to request memory from the chunk pool
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,13 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error {
s.StoreBatchSize = TidbOptInt(val, DefTiDBStoreBatchSize)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ const (
TiDBEnablePlanReplayerCapture = "tidb_enable_plan_replayer_capture"
// TiDBEnableReusechunk indicates whether to enable chunk alloc
TiDBEnableReusechunk = "tidb_enable_reuse_chunk"

// TiDBStoreBatchSize indicates the batch size of coprocessor in the same store.
TiDBStoreBatchSize = "tidb_store_batch_size"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1108,6 +1111,7 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBStoreBatchSize = 0
)

// Process global variables.
Expand Down
47 changes: 47 additions & 0 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,50 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, smallConc, 0)
require.Equal(t, rateLimit.GetCapacity(), 4)
}

func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t"))
}),
)
require.NoError(t, err)
defer require.NoError(t, store.Close())
copClient := store.GetClient().(*copr.CopClient)
ctx := context.Background()
killed := uint32(0)
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks := it.GetTasks()
require.Equal(t, len(tasks), 2)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, tasks[0].RowCountHint, 5)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)

req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 3,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 1)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3)
require.Equal(t, tasks[0].RowCountHint, 14)
}
Loading

0 comments on commit d46a192

Please sign in to comment.