From da0ad54fd82e4e868d24b88a8aa42dde26153920 Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 20 Mar 2019 10:25:18 +0800 Subject: [PATCH] stability: port ddl test as a workload (#328) * stability: port ddl test as a workload * stability: rename pkg/blockWriter to pkg/blockwriter --- tests/cluster_info.go | 9 +- tests/cmd/e2e/main.go | 45 +- tests/pkg/blockwriter/blockWriter.go | 273 +++++++ tests/pkg/workload/ddl/internal/datatype.go | 338 ++++++++ tests/pkg/workload/ddl/internal/ddl.go | 584 ++++++++++++++ tests/pkg/workload/ddl/internal/ddl_ops.go | 847 ++++++++++++++++++++ tests/pkg/workload/ddl/internal/demo.go | 86 ++ tests/pkg/workload/ddl/internal/dml_ops.go | 620 ++++++++++++++ tests/pkg/workload/ddl/internal/meta.go | 636 +++++++++++++++ tests/pkg/workload/ddl/internal/metrics.go | 29 + tests/pkg/workload/ddl/internal/run.go | 119 +++ tests/pkg/workload/ddl/internal/util.go | 177 ++++ tests/pkg/workload/ddl/workload.go | 50 ++ tests/pkg/workload/interface.go | 31 + 14 files changed, 3837 insertions(+), 7 deletions(-) create mode 100644 tests/pkg/blockwriter/blockWriter.go create mode 100644 tests/pkg/workload/ddl/internal/datatype.go create mode 100644 tests/pkg/workload/ddl/internal/ddl.go create mode 100644 tests/pkg/workload/ddl/internal/ddl_ops.go create mode 100644 tests/pkg/workload/ddl/internal/demo.go create mode 100644 tests/pkg/workload/ddl/internal/dml_ops.go create mode 100644 tests/pkg/workload/ddl/internal/meta.go create mode 100644 tests/pkg/workload/ddl/internal/metrics.go create mode 100644 tests/pkg/workload/ddl/internal/run.go create mode 100644 tests/pkg/workload/ddl/internal/util.go create mode 100644 tests/pkg/workload/ddl/workload.go create mode 100644 tests/pkg/workload/interface.go diff --git a/tests/cluster_info.go b/tests/cluster_info.go index 332be46759..5b5137c2dd 100644 --- a/tests/cluster_info.go +++ b/tests/cluster_info.go @@ -1,6 +1,9 @@ package tests -import "strconv" +import ( + "fmt" + "strconv" +) func (tc *TidbClusterInfo) set(name string, value string) (string, bool) { // NOTE: not thread-safe, maybe make info struct immutable @@ -48,3 +51,7 @@ func (tc *TidbClusterInfo) UpgradeAll(tag string) *TidbClusterInfo { UpgradeTiKV("pingcap/tikv:" + tag). UpgradeTiDB("pingcap/tidb:" + tag) } + +func (tc *TidbClusterInfo) DSN(dbName string) string { + return fmt.Sprintf("root:%s@tcp(%s-tidb.%s:4000)/%s", tc.Password, tc.ClusterName, tc.Namespace, dbName) +} diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 0f88ce47d0..61275986e2 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -22,6 +22,8 @@ import ( "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/tests" "github.com/pingcap/tidb-operator/tests/backup" + "github.com/pingcap/tidb-operator/tests/pkg/workload" + "github.com/pingcap/tidb-operator/tests/pkg/workload/ddl" "k8s.io/apiserver/pkg/util/logs" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -118,12 +120,43 @@ func main() { glog.Fatal(err) } - clusterInfo = clusterInfo.ScaleTiDB(3) - if err := oa.ScaleTidbCluster(clusterInfo); err != nil { - oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo}) - glog.Fatal(err) - } - if err = oa.CheckTidbClusterStatus(clusterInfo); err != nil { + err = workload.Run(func() error { + clusterInfo = clusterInfo.ScaleTiDB(3).ScaleTiKV(5).ScalePD(5) + if err := oa.ScaleTidbCluster(clusterInfo); err != nil { + return err + } + if err := oa.CheckTidbClusterStatus(clusterInfo); err != nil { + return err + } + + clusterInfo = clusterInfo.ScalePD(3) + if err := oa.ScaleTidbCluster(clusterInfo); err != nil { + return err + } + if err := oa.CheckTidbClusterStatus(clusterInfo); err != nil { + return err + } + + clusterInfo = clusterInfo.ScaleTiKV(3) + if err := oa.ScaleTidbCluster(clusterInfo); err != nil { + return err + } + if err := oa.CheckTidbClusterStatus(clusterInfo); err != nil { + return err + } + + clusterInfo = clusterInfo.ScaleTiDB(1) + if err := oa.ScaleTidbCluster(clusterInfo); err != nil { + return err + } + if err := oa.CheckTidbClusterStatus(clusterInfo); err != nil { + return err + } + + return nil + }, ddl.New(clusterInfo.DSN("test"), 1, 1)) + + if err != nil { oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo}) glog.Fatal(err) } diff --git a/tests/pkg/blockwriter/blockWriter.go b/tests/pkg/blockwriter/blockWriter.go new file mode 100644 index 0000000000..8434f151b0 --- /dev/null +++ b/tests/pkg/blockwriter/blockWriter.go @@ -0,0 +1,273 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License.package spec + +package blockwriter + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + "github.com/pingcap/tidb-operator/tests/pkg/util" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + queryChanSize int = 10000 +) + +// BlockWriterCase is for concurrent writing blocks. +type BlockWriterCase struct { + cfg Config + bws []*blockWriter + + isRunning uint32 + isInit uint32 + stopChan chan struct{} + + sync.RWMutex +} + +// Config defines the config of BlockWriterCase +type Config struct { + TableNum int + Concurrency int + BatchSize int + RawSize int +} + +type blockWriter struct { + rawSize int + values []string + batchSize int +} + +// NewBlockWriterCase returns the BlockWriterCase. +func NewBlockWriterCase(cfg Config) *BlockWriterCase { + c := &BlockWriterCase{ + cfg: cfg, + stopChan: make(chan struct{}, 1), + } + + if c.cfg.TableNum < 1 { + c.cfg.TableNum = 1 + } + c.initBlocks() + + return c +} + +func (c *BlockWriterCase) initBlocks() { + c.bws = make([]*blockWriter, c.cfg.Concurrency) + for i := 0; i < c.cfg.Concurrency; i++ { + c.bws[i] = c.newBlockWriter() + } +} + +func (c *BlockWriterCase) newBlockWriter() *blockWriter { + return &blockWriter{ + rawSize: c.cfg.RawSize, + values: make([]string, c.cfg.BatchSize), + batchSize: c.cfg.BatchSize, + } +} + +func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) { + defer func() { + glog.Infof("[%s] [action: generate Query] stopped", c) + wg.Done() + }() + + for { + tableN := rand.Intn(c.cfg.TableNum) + var index string + if tableN > 0 { + index = fmt.Sprintf("%d", tableN) + } + + var querys []string + for i := 0; i < 100; i++ { + values := make([]string, c.cfg.BatchSize) + for i := 0; i < c.cfg.BatchSize; i++ { + blockData := util.RandString(c.cfg.RawSize) + values[i] = fmt.Sprintf("('%s')", blockData) + } + + querys = append(querys, fmt.Sprintf( + "INSERT INTO block_writer%s(raw_bytes) VALUES %s", + index, strings.Join(values, ","))) + } + + select { + case <-ctx.Done(): + return + default: + if len(queryChan) < queryChanSize { + queryChan <- querys + } else { + glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c) + util.Sleep(ctx, 10*time.Second) + } + } + } +} + +func (bw *blockWriter) batchExecute(db *sql.DB, query string) error { + _, err := db.Exec(query) + if err != nil { + glog.Errorf("[block_writer] exec sql [%s] failed, err: %v", query, err) + return err + } + + return nil +} + +func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []string) { + for { + select { + case <-ctx.Done(): + return + default: + } + + querys, ok := <-queryChan + if !ok { + // No more query + return + } + + for _, query := range querys { + select { + case <-ctx.Done(): + return + default: + if err := bw.batchExecute(db, query); err != nil { + glog.Fatal(err) + } + } + } + } +} + +// Initialize inits case +func (c *BlockWriterCase) initialize(db *sql.DB) error { + glog.Infof("[%s] start to init...", c) + defer func() { + atomic.StoreUint32(&c.isInit, 1) + glog.Infof("[%s] init end...", c) + }() + + for i := 0; i < c.cfg.TableNum; i++ { + var s string + if i > 0 { + s = fmt.Sprintf("%d", i) + } + + tmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS block_writer%s %s", s, ` + ( + id BIGINT NOT NULL AUTO_INCREMENT, + raw_bytes BLOB NOT NULL, + PRIMARY KEY (id) +)`) + + err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + _, err := db.Exec(tmt) + if err != nil { + glog.Warningf("[%s] exec sql [%s] failed, err: %v, retry...", c, tmt, err) + return false, nil + } + + return true, nil + }) + + if err != nil { + glog.Errorf("[%s] exec sql [%s] failed, err: %v", c, tmt, err) + return err + } + } + + return nil +} + +// Start starts to run cases +func (c *BlockWriterCase) Start(db *sql.DB) error { + if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) { + err := fmt.Errorf("[%s] is running, you can't start it again", c) + glog.Error(err) + return err + } + + defer func() { + c.RLock() + glog.Infof("[%s] stopped", c) + atomic.SwapUint32(&c.isRunning, 0) + }() + + if c.isInit == 0 { + if err := c.initialize(db); err != nil { + return err + } + } + + glog.Infof("[%s] start to execute case...", c) + + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + queryChan := make(chan []string, queryChanSize) + + for i := 0; i < c.cfg.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + c.bws[i].run(ctx, db, queryChan) + }(i) + } + + wg.Add(1) + go c.generateQuery(ctx, queryChan, &wg) + +loop: + for { + select { + case <-c.stopChan: + glog.Infof("[%s] stoping...", c) + cancel() + break loop + default: + util.Sleep(context.Background(), 2*time.Second) + } + } + + wg.Wait() + close(queryChan) + + return nil +} + +// Stop stops cases +func (c *BlockWriterCase) Stop() { + c.stopChan <- struct{}{} +} + +// String implements fmt.Stringer interface. +func (c *BlockWriterCase) String() string { + return "block_writer" +} diff --git a/tests/pkg/workload/ddl/internal/datatype.go b/tests/pkg/workload/ddl/internal/datatype.go new file mode 100644 index 0000000000..9dc62c284e --- /dev/null +++ b/tests/pkg/workload/ddl/internal/datatype.go @@ -0,0 +1,338 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "math/rand" + "time" +) + +// Kind constants. +const ( + KindTINYINT int = iota + KindSMALLINT //int16 + KindMEDIUMINT //int24 + KindInt32 //int32 + KindBigInt //int64 + KindBit + + KindFloat + KindDouble + + KindDECIMAL + + KindChar + KindVarChar + + KindBLOB + KindTINYBLOB + KindMEDIUMBLOB + KindLONGBLOB + + KindTEXT + KindTINYTEXT + KindMEDIUMTEXT + KindLONGTEXT + + KindBool + + KindDATE + KindTIME + KindDATETIME + KindTIMESTAMP + KindYEAR + + KindJSON + + KindEnum + KindSet +) + +var ALLFieldType = map[int]string{ + KindTINYINT: "TINYINT", + KindSMALLINT: "SMALLINT", + KindMEDIUMINT: "MEDIUMINT", + KindInt32: "INT", + KindBigInt: "BIGINT", + KindBit: "BIT", + + KindFloat: "FLOAT", + KindDouble: "DOUBLE", + + KindDECIMAL: "DECIMAL", + + KindChar: "CHAR", + KindVarChar: "VARCHAR", + + KindBLOB: "BLOB", + KindTINYBLOB: "TINYBLOB", + KindMEDIUMBLOB: "MEDIUMBLOB", + KindLONGBLOB: "LONGBLOB", + + KindTEXT: "TEXT", + KindTINYTEXT: "TINYTEXT", + KindMEDIUMTEXT: "MEDIUMTEXT", + KindLONGTEXT: "LONGTEXT", + + KindBool: "BOOL", + + KindDATE: "DATE", + KindTIME: "TIME", + KindDATETIME: "DATETIME", + KindTIMESTAMP: "TIMESTAMP", + KindYEAR: "YEAR", + + KindJSON: "JSON", + KindEnum: "ENUM", + KindSet: "SET", +} + +// testFieldType is use to control what kind of data types to test, +// if we just want to test timestamp, we can annotation the another data types. +var testFieldType = []int{ + KindTINYINT, + KindSMALLINT, + KindMEDIUMINT, + KindInt32, + KindBigInt, + + //KindBit, // have default value bug unfix. + + KindFloat, + KindDouble, + + KindDECIMAL, + + KindChar, + KindVarChar, + + KindBLOB, + KindTEXT, + + KindBool, + + KindDATE, + KindTIME, + KindDATETIME, + KindTIMESTAMP, + KindYEAR, + + //KindJSON, // have `admin check table when index is virtual generated column` bug unfix + KindEnum, + KindSet, +} + +func randDataType() int { + i := rand.Intn(len(testFieldType)) + return testFieldType[i] +} + +const ( + BitMaxLen = 64 + CharMaxLen = 256 + VarCharMaxLen = 256 // varchar max len , actual range is [0,65536) + BLOBMaxLen = 256 // BLOB max len , actual range is [0,65536) + TINYBLOBMaxLen = 256 // TINYBLOB max len , actual range is [0,256) + MEDIUMBLOBMaxLen = 256 // MEDIUMBLOB max len , actual range is [0,16777216) + LONGBLOBMaxLen = 256 // LONGBLOB max len , actual range is [0,4294967296) + + TEXTMaxLen = 256 // TEXT max len , actual range is [0,65536) + TINYTEXTMaxLen = 256 // TINYTEXT max len , actual range is [0,256) + MEDIUMTEXTMaxLen = 256 // MEDIUMTEXT max len , actual range is [0,16777216) + LONGTEXTMaxLen = 256 // LONGTEXT max len , actual range is [0,4294967296) + + MAXDECIMALM = 65 // 1~65 + MAXDECIMALN = 30 // 0~30 + + EnumMaxLen = 10 + SetMaxLen = 10 + + TimeFormat = "2006-01-02 15:04:05" + TimeFormatForDATE = "2006-01-02" + TimeFormatForTIME = "15:04:05" + + MINDATETIME = "1000-01-01 00:00:00" + MAXDATETIME = "9999-12-31 23:59:59" + + MINTIMESTAMP = "1970-01-01 00:00:01" + MAXTIMESTAMP = "2038-01-19 03:14:07" +) + +var MinDATETIME time.Time +var MaxDATETIME time.Time +var GapDATETIMEUnix int64 + +var MinTIMESTAMP time.Time +var MaxTIMESTAMP time.Time +var GapTIMESTAMPUnix int64 + +func getMaxLenByKind(kind int) int { + switch kind { + case KindChar: + return CharMaxLen + case KindVarChar: + return VarCharMaxLen + case KindBLOB: + return BLOBMaxLen + case KindTINYBLOB: + return TINYBLOBMaxLen + case KindMEDIUMBLOB: + return MEDIUMBLOBMaxLen + case KindLONGBLOB: + return LONGBLOBMaxLen + case KindTEXT: + return TEXTMaxLen + case KindTINYTEXT: + return TINYTEXTMaxLen + case KindMEDIUMTEXT: + return MEDIUMTEXTMaxLen + case KindLONGTEXT: + return LONGTEXTMaxLen + case KindBit: + return BitMaxLen + case KindEnum: + return EnumMaxLen + case KindSet: + return SetMaxLen + } + return 0 +} + +type ambiguousTimeStr struct { + start string + end string +} + +type ambiguousTime struct { + start int64 + end int64 +} + +var ambiguousTimeStrSlice = []ambiguousTimeStr{ + // backward + { + start: "1900-12-31 23:54:17", + end: "1901-01-01 00:00:00", + }, + // moved forward + { + start: "1940-06-02 23:59:59", + end: "1940-06-03 01:00:00", + }, + // move backward + { + start: "1940-09-30 23:00:00", + end: "1940-10-01 00:00:00", + }, + // moved forward + { + start: "1941-03-15 23:59:59", + end: "1941-03-16 01:00:00", + }, + // move backward + { + start: "1941-09-30 23:00:00", + end: "1941-10-01 00:00:00", + }, + // moved forward + { + start: "1986-05-03 23:59:59", + end: "1986-05-04 01:00:00", + }, + // move backward + { + start: "1986-09-13 23:00:00", + end: "1986-09-14 00:00:00", + }, + // moved forward + { + start: "1987-04-11 23:59:59", + end: "1987-04-12 01:00:00", + }, + // move backward + { + start: "1987-09-12 23:00:00", + end: "1987-09-13 00:00:00", + }, + // moved forward + { + start: "1988-04-09 23:59:59", + end: "1988-04-10 01:00:00", + }, + // move backward + { + start: "1988-09-10 23:00:00", + end: "1988-09-11 00:00:00", + }, + + // moved forward + { + start: "1989-04-15 23:59:59", + end: "1989-04-16 01:00:00", + }, + // move backward + { + start: "1989-09-16 23:00:00", + end: "1989-09-17 00:00:00", + }, + // moved forward + { + start: "1990-04-14 23:59:59", + end: "1990-04-15 01:00:00", + }, + // move backward + { + start: "1990-09-15 23:00:00", + end: "1990-09-16 00:00:00", + }, + // moved forward + { + start: "1991-04-13 23:59:59", + end: "1991-04-14 01:00:00", + }, + // move backward + { + start: "1991-09-14 23:00:00", + end: "1991-09-15 00:00:00", + }, +} + +var ambiguousTimeSlice []ambiguousTime + +var local = time.Local + +func init() { + var err error + local, err = time.LoadLocation("Asia/Shanghai") + if err != nil { + local = time.Local + } + for _, v := range ambiguousTimeStrSlice { + start, _ := time.ParseInLocation(TimeFormat, v.start, local) + end, _ := time.ParseInLocation(TimeFormat, v.end, local) + amt := ambiguousTime{ + start: start.Unix(), + end: end.Unix(), + } + ambiguousTimeSlice = append(ambiguousTimeSlice, amt) + } + + MinDATETIME, _ = time.ParseInLocation(TimeFormat, MINDATETIME, local) + MaxDATETIME, _ = time.ParseInLocation(TimeFormat, MAXDATETIME, local) + GapDATETIMEUnix = MaxDATETIME.Unix() - MinDATETIME.Unix() + + MinTIMESTAMP, _ = time.ParseInLocation(TimeFormat, MINTIMESTAMP, local) + MaxTIMESTAMP, _ = time.ParseInLocation(TimeFormat, MAXTIMESTAMP, local) + GapTIMESTAMPUnix = MaxTIMESTAMP.Unix() - MinTIMESTAMP.Unix() +} diff --git a/tests/pkg/workload/ddl/internal/ddl.go b/tests/pkg/workload/ddl/internal/ddl.go new file mode 100644 index 0000000000..cdcb3ee38d --- /dev/null +++ b/tests/pkg/workload/ddl/internal/ddl.go @@ -0,0 +1,584 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/golang/glog" + "github.com/juju/errors" +) + +// The DDL test case is intended to test the correctness of DDL operations. It +// generates test cases by probability so that it should be run in background for +// enough time to see if there are any issues. +// +// The DDL test case have multiple go routines run in parallel, one for DML operations, +// other for DDL operations. The feature of each operation (for example, covering +// what kind of scenario) is determined and generated at start up time (See +// `generateDMLOps`, `generateDDLOps``), while the order of each operation is +// randomized in each round. +// +// If there are remaining DDL operations while all DML operations are performed, a +// new round of DML operations will be started (with new randomized order) and when +// all DDL operations are done, the remaining DML operations are discarded. vice +// versa. +// +// Since there are some conflicts between some DDL operations and DML operations, +// for example, inserting a row while removing a column may cause errors in +// inserting because of incorrect column numbers, some locks and some conflicting +// detections are introduced. The conflicting detection will ignore errors raised +// in such scenarios. In addition, the data in memory is stored by column instead +// of by row to minimize data conflicts in adding and removing columns. + +type DDLCaseConfig struct { + Concurrency int `toml:"concurrency"` + MySQLCompatible bool `toml:"mysql_compactible"` + TablesToCreate int `toml:"tables_to_create"` + TestTp DDLTestType `toml:"test_type"` +} + +type DDLTestType int + +const ( + SerialDDLTest DDLTestType = iota + ParallelDDLTest +) + +type ExecuteDDLFunc func(*testCase, []ddlTestOpExecutor, func() error) error +type ExecuteDMLFunc func(*testCase, []dmlTestOpExecutor, func() error) error + +type DDLCase struct { + cfg *DDLCaseConfig + cases []*testCase +} + +func (c *DDLCase) String() string { + return "ddl" +} + +// Execute executes each goroutine (i.e. `testCase`) concurrently. +func (c *DDLCase) Execute(ctx context.Context, dbss [][]*sql.DB, exeDDLFunc ExecuteDDLFunc, exeDMLFunc ExecuteDMLFunc) error { + for _, dbs := range dbss { + for _, db := range dbs { + enableTiKVGC(db) + } + } + + glog.Infof("[%s] start to test...", c) + defer func() { + glog.Infof("[%s] test end...", c) + }() + var wg sync.WaitGroup + for i := 0; i < c.cfg.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + err := c.cases[i].execute(exeDDLFunc, exeDMLFunc) + if err != nil { + for _, dbs := range dbss { + for _, db := range dbs { + disableTiKVGC(db) + } + } + glog.Fatalf("[ddl] [instance %d] ERROR: %s", i, errors.ErrorStack(err)) + } + } + }(i) + } + wg.Wait() + return nil +} + +// Initialize initializes each concurrent goroutine (i.e. `testCase`). +func (c *DDLCase) Initialize(ctx context.Context, dbss [][]*sql.DB) error { + for i := 0; i < c.cfg.Concurrency; i++ { + err := c.cases[i].initialize(dbss[i]) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +// NewDDLCase returns a DDLCase, which contains specified `testCase`s. +func NewDDLCase(cfg *DDLCaseConfig) *DDLCase { + cases := make([]*testCase, cfg.Concurrency) + for i := 0; i < cfg.Concurrency; i++ { + cases[i] = &testCase{ + cfg: cfg, + tables: make(map[string]*ddlTestTable), + ddlOps: make([]ddlTestOpExecutor, 0), + dmlOps: make([]dmlTestOpExecutor, 0), + caseIndex: i, + stop: 0, + } + } + b := &DDLCase{ + cfg: cfg, + cases: cases, + } + return b +} + +const ( + ddlTestValueNull string = "NULL" + ddlTestValueInvalid int32 = -99 +) + +type ddlTestOpExecutor struct { + executeFunc func(interface{}, chan *ddlJobTask) error + config interface{} + ddlKind DDLKind +} + +type dmlTestOpExecutor struct { + prepareFunc func(interface{}, chan *dmlJobTask) error + config interface{} +} + +type DMLKind int + +const ( + dmlInsert DMLKind = iota + dmlUpdate + dmlDelete +) + +type dmlJobArg unsafe.Pointer + +type dmlJobTask struct { + k DMLKind + tblInfo *ddlTestTable + sql string + assigns []*ddlTestColumnDescriptor + whereColumns []*ddlTestColumnDescriptor + err error +} + +// initialize generates possible DDL and DML operations for one `testCase`. +// Different `testCase`s will be run in parallel according to the concurrent configuration. +func (c *testCase) initialize(dbs []*sql.DB) error { + var err error + c.dbs = dbs + if err = c.generateDDLOps(); err != nil { + return errors.Trace(err) + } + if err = c.generateDMLOps(); err != nil { + return errors.Trace(err) + } + // Create 2 table before executes DDL & DML + taskCh := make(chan *ddlJobTask, 2) + c.prepareAddTable(nil, taskCh) + c.prepareAddTable(nil, taskCh) + if c.cfg.TestTp == SerialDDLTest { + err = c.execSerialDDLSQL(taskCh) + if err != nil { + return errors.Trace(err) + } + err = c.execSerialDDLSQL(taskCh) + } else { + err = c.execParaDDLSQL(taskCh, len(taskCh)) + } + if err != nil { + return errors.Trace(err) + } + return nil +} + +/* +ParallelExecuteOperations executes process: +1. Generate many kind of DDL SQLs +2. Parallel send every kind of DDL request to TiDB +3. Wait all DDL SQLs request finish +4. Send `admin show ddl jobs` request to TiDB to confirm parallel DDL requests execute order +5. Do the same DDL change on local with the same DDL requests executed order of TiDB +6. Judge the every DDL execution result of TiDB and local. If both of local and TiDB execute result are no wrong, or both are wrong it will be ok. Otherwise, It must be something wrong. +*/ +func ParallelExecuteOperations(c *testCase, ops []ddlTestOpExecutor, postOp func() error) error { + perm := rand.Perm(len(ops)) + taskCh := make(chan *ddlJobTask, len(ops)) + for _, idx := range perm { + if c.isStop() { + return nil + } + op := ops[idx] + if rand.Float64() > mapOfDDLKindProbability[op.ddlKind] { + continue + } + op.executeFunc(op.config, taskCh) + } + err := c.execParaDDLSQL(taskCh, len(taskCh)) + if err != nil { + return errors.Trace(err) + } + close(taskCh) + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + return nil +} + +func SerialExecuteOperations(c *testCase, ops []ddlTestOpExecutor, postOp func() error) error { + perm := rand.Perm(len(ops)) + taskCh := make(chan *ddlJobTask, 1) + for _, idx := range perm { + if c.isStop() { + return nil + } + op := ops[idx] + if rand.Float64() > mapOfDDLKindProbability[op.ddlKind] { + continue + } + op.executeFunc(op.config, taskCh) + err := c.execSerialDDLSQL(taskCh) + if err != nil { + return errors.Trace(err) + } + } + close(taskCh) + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + return nil +} + +func TransactionExecuteOperations(c *testCase, ops []dmlTestOpExecutor, postOp func() error) error { + transactionOpsLen := rand.Intn(len(ops)) + if transactionOpsLen < 1 { + transactionOpsLen = 1 + } + taskCh := make(chan *dmlJobTask, len(ops)) + opNum := 0 + perm := rand.Perm(len(ops)) + for i, idx := range perm { + if c.isStop() { + return nil + } + op := ops[idx] + err := op.prepareFunc(op.config, taskCh) + if err != nil { + if err.Error() != "Conflict operation" { + return errors.Trace(err) + } + continue + } + opNum++ + if opNum >= transactionOpsLen { + err = c.execDMLInTransactionSQL(taskCh) + if err != nil { + return errors.Trace(err) + } + transactionOpsLen = rand.Intn(len(ops)) + if transactionOpsLen < 1 { + transactionOpsLen = 1 + } + if transactionOpsLen > (len(ops) - i) { + transactionOpsLen = len(ops) - i + } + opNum = 0 + if postOp != nil { + err = postOp() + if err != nil { + return errors.Trace(err) + } + } + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + } + } + return nil +} + +func SerialExecuteDML(c *testCase, ops []dmlTestOpExecutor, postOp func() error) error { + perm := rand.Perm(len(ops)) + taskCh := make(chan *dmlJobTask, 1) + for _, idx := range perm { + if c.isStop() { + return nil + } + op := ops[idx] + err := op.prepareFunc(op.config, taskCh) + if err != nil { + if err.Error() != "Conflict operation" { + return errors.Trace(err) + } + continue + } + err = c.execSerialDMLSQL(taskCh) + if err != nil { + return errors.Trace(err) + } + } + close(taskCh) + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + return nil +} + +// execute iterates over two list of operations concurrently, one is +// ddl operations, one is dml operations. +// When one list completes, it starts over from the beginning again. +// When both of them ONCE complete, it exits. +func (c *testCase) execute(executeDDL ExecuteDDLFunc, exeDMLFunc ExecuteDMLFunc) error { + var ( + ddlAllComplete int32 = 0 + dmlAllComplete int32 = 0 + ) + + err := parallel(func() error { + var err error + for { + err = executeDDL(c, c.ddlOps, nil) + atomic.StoreInt32(&ddlAllComplete, 1) + if atomic.LoadInt32(&ddlAllComplete) != 0 && atomic.LoadInt32(&dmlAllComplete) != 0 || err != nil { + break + } + } + return errors.Trace(err) + }, func() error { + var err error + for { + err = exeDMLFunc(c, c.dmlOps, func() error { + return c.executeVerifyIntegrity() + }) + atomic.StoreInt32(&dmlAllComplete, 1) + if atomic.LoadInt32(&ddlAllComplete) != 0 && atomic.LoadInt32(&dmlAllComplete) != 0 || err != nil { + break + } + } + return errors.Trace(err) + }) + + if err != nil { + ddlFailedCounter.Inc() + return errors.Trace(err) + } + + glog.Infof("[ddl] [instance %d] Round completed", c.caseIndex) + glog.Infof("[ddl] [instance %d] Executing post round operations...", c.caseIndex) + + if !c.cfg.MySQLCompatible { + err := c.executeAdminCheck() + if err != nil { + return errors.Trace(err) + } + } + + return nil +} + +var selectID int32 + +// executeVerifyIntegrity verifies the integrity of the data in the database +// by comparing the data in memory (that we expected) with the data in the database. +func (c *testCase) executeVerifyIntegrity() error { + c.tablesLock.RLock() + tablesSnapshot := make([]*ddlTestTable, 0) + for _, table := range c.tables { + tablesSnapshot = append(tablesSnapshot, table) + } + gotTableTime := time.Now() + c.tablesLock.RUnlock() + + uniqID := atomic.AddInt32(&selectID, 1) + + for _, table := range tablesSnapshot { + table.lock.RLock() + columnsSnapshot := table.filterColumns(table.predicateAll) + table.lock.RUnlock() + + // build SQL + sql := "SELECT " + for i, column := range columnsSnapshot { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("%s", column.getSelectName()) + } + sql += fmt.Sprintf(" FROM `%s`", table.name) + + dbIdx := rand.Intn(len(c.dbs)) + db := c.dbs[dbIdx] + + // execute + opStart := time.Now() + rows, err := db.Query(sql) + glog.Infof("[ddl] [instance %d] %s, elapsed time:%v, got table time:%v, selectID:%v", c.caseIndex, sql, time.Since(opStart).Seconds(), gotTableTime, uniqID) + if err == nil { + defer rows.Close() + } + // When column is removed, SELECT statement may return error so that we ignore them here. + if table.isDeleted() { + return nil + } + for _, column := range columnsSnapshot { + if column.isDeleted() { + return nil + } + } + if err != nil { + return errors.Annotatef(err, "Error when executing SQL: %s\n%s", sql, table.debugPrintToString()) + } + + // Read all rows. + var actualRows [][]interface{} + for rows.Next() { + cols, err1 := rows.Columns() + if err1 != nil { + return errors.Trace(err) + } + + glog.Infof("[ddl] [instance %d] rows.Columns():%v, len(cols):%v, selectID:%v", c.caseIndex, cols, len(cols), uniqID) + + // See https://stackoverflow.com/questions/14477941/read-select-columns-into-string-in-go + rawResult := make([][]byte, len(cols)) + result := make([]interface{}, len(cols)) + dest := make([]interface{}, len(cols)) + for i := range rawResult { + dest[i] = &rawResult[i] + } + + err1 = rows.Scan(dest...) + if err1 != nil { + return errors.Trace(err) + } + + for i, raw := range rawResult { + if raw == nil { + result[i] = ddlTestValueNull + } else { + result[i] = trimValue(columnsSnapshot[i].k, raw) + } + } + + actualRows = append(actualRows, result) + } + if rows.Err() != nil { + return errors.Trace(rows.Err()) + } + + // Even if SQL executes successfully, column deletion will cause different data as well + if table.isDeleted() { + return nil + } + for _, column := range columnsSnapshot { + if column.isDeleted() { + return nil + } + } + + // Make signatures for actual rows. + actualRowsMap := make(map[string]int) + for _, row := range actualRows { + rowString := "" + for _, col := range row { + rowString += fmt.Sprintf("%v,", col) + } + _, ok := actualRowsMap[rowString] + if !ok { + actualRowsMap[rowString] = 0 + } + actualRowsMap[rowString]++ + } + + // Compare with expecting rows. + checkTime := time.Now() + for i := 0; i < table.numberOfRows; i++ { + rowString := "" + for _, column := range columnsSnapshot { + if column.rows[i] == nil { + rowString += fmt.Sprintf("NULL,") + } else { + rowString += fmt.Sprintf("%v,", column.rows[i]) + } + } + _, ok := actualRowsMap[rowString] + if !ok { + c.stopTest() + err = fmt.Errorf("Expecting row %s in table `%s` but not found, sql: %s, selectID:%v, checkTime:%v, rowErr:%v, actualRowsMap:%#v\n%s", rowString, table.name, sql, uniqID, checkTime, rows.Err(), actualRowsMap, table.debugPrintToString()) + glog.Infof("err: %v", err) + return errors.Trace(err) + } + actualRowsMap[rowString]-- + if actualRowsMap[rowString] < 0 { + c.stopTest() + err = fmt.Errorf("Expecting row %s in table `%s` but not found, sql: %s, selectID:%v, checkTime:%v, rowErr:%v, actualRowsMap:%#v\n%s", rowString, table.name, sql, uniqID, checkTime, rows.Err(), actualRowsMap, table.debugPrintToString()) + glog.Infof("err: %v", err) + return errors.Trace(err) + } + } + for rowString, occurs := range actualRowsMap { + if occurs > 0 { + c.stopTest() + err = fmt.Errorf("Unexpected row %s in table `%s`, sql: %s, selectID:%v, checkTime:%v, rowErr:%v, actualRowsMap:%#v\n%s", rowString, table.name, sql, uniqID, checkTime, rows.Err(), actualRowsMap, table.debugPrintToString()) + glog.Infof("err: %v", err) + return errors.Trace(err) + } + } + } + return nil +} + +func trimValue(tp int, val []byte) string { + // a='{"DnOJQOlx":52,"ZmvzPtdm":82}' + // eg: set a={"a":"b","b":"c"} + // get a={"a": "b", "b": "c"} , so have to remove the space + if tp == KindJSON { + for i := 1; i < len(val)-2; i++ { + if val[i-1] == '"' && val[i] == ':' && val[i+1] == ' ' { + val = append(val[:i+1], val[i+2:]...) + } + if val[i-1] == ',' && val[i] == ' ' && val[i+1] == '"' { + val = append(val[:i], val[i+1:]...) + } + } + } + return string(val) +} + +func (c *testCase) executeAdminCheck() error { + if len(c.tables) == 0 { + return nil + } + + // build SQL + sql := "ADMIN CHECK TABLE " + i := 0 + for _, table := range c.tables { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s`", table.name) + i++ + } + dbIdx := rand.Intn(len(c.dbs)) + db := c.dbs[dbIdx] + // execute + glog.Infof("[ddl] [instance %d] %s", c.caseIndex, sql) + _, err := db.Exec(sql) + if err != nil { + if ignore_error(err) { + return nil + } + return errors.Annotatef(err, "Error when executing SQL: %s", sql) + } + return nil +} diff --git a/tests/pkg/workload/ddl/internal/ddl_ops.go b/tests/pkg/workload/ddl/internal/ddl_ops.go new file mode 100644 index 0000000000..5ef4edc866 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/ddl_ops.go @@ -0,0 +1,847 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "database/sql" + "fmt" + "math/rand" + "sort" + "strconv" + "sync" + "time" + "unsafe" + + "k8s.io/apimachinery/pkg/util/uuid" + + "github.com/golang/glog" + "github.com/juju/errors" +) + +func (c *testCase) generateDDLOps() error { + if err := c.generateAddTable(); err != nil { + return errors.Trace(err) + } + if err := c.generateDropTable(); err != nil { + return errors.Trace(err) + } + if err := c.generateAddIndex(); err != nil { + return errors.Trace(err) + } + if err := c.generateDropIndex(); err != nil { + return errors.Trace(err) + } + if err := c.generateAddColumn(); err != nil { + return errors.Trace(err) + } + if err := c.generateDropColumn(); err != nil { + return errors.Trace(err) + } + return nil +} + +type DDLKind = int + +const ( + ddlAddTable DDLKind = iota + ddlAddIndex + ddlAddColumn + + ddlDropTable + ddlDropIndex + ddlDropColumn + + ddlKindNil +) + +var mapOfDDLKind = map[string]DDLKind{ + "create table": ddlAddTable, + "add index": ddlAddIndex, + "add column": ddlAddColumn, + + "drop table": ddlDropTable, + "drop index": ddlDropIndex, + "drop column": ddlDropColumn, +} + +var mapOfDDLKindToString = map[DDLKind]string{ + ddlAddTable: "create table", + ddlAddIndex: "add index", + ddlAddColumn: "add column", + + ddlDropTable: "drop table", + ddlDropIndex: "drop index", + ddlDropColumn: "drop column", +} + +// mapOfDDLKindProbability use to control every kind of ddl request execute probability. +var mapOfDDLKindProbability = map[DDLKind]float64{ + ddlAddTable: 0.15, + ddlDropTable: 0.15, + + ddlAddIndex: 0.8, + ddlDropIndex: 0.5, + + ddlAddColumn: 0.8, + ddlDropColumn: 0.5, +} + +type ddlJob struct { + id int + tableName string + k DDLKind + jobState string + tableID string +} + +type ddlJobArg unsafe.Pointer + +type ddlJobTask struct { + ddlID int + k DDLKind + tblInfo *ddlTestTable + sql string + arg ddlJobArg + err error // remote TiDB execute error +} + +func (c *testCase) updateTableInfo(task *ddlJobTask) error { + switch task.k { + case ddlAddTable: + return c.addTableInfo(task) + case ddlDropTable: + return c.dropTableJob(task) + case ddlAddIndex: + return c.addIndexJob(task) + case ddlDropIndex: + return c.dropIndexJob(task) + case ddlAddColumn: + return c.addColumnJob(task) + case ddlDropColumn: + return c.dropColumnJob(task) + } + return fmt.Errorf("unknow ddl task , %v", *task) +} + +/* +execParaDDLSQL get a batch of ddl from taskCh, and then: +1. Parallel send every kind of DDL request to TiDB +2. Wait all DDL SQLs request finish +3. Send `admin show ddl jobs` request to TiDB to confirm parallel DDL requests execute order +4. Do the same DDL change on local with the same DDL requests executed order of TiDB +5. Judge the every DDL execution result of TiDB and local. If both of local and TiDB execute result are no wrong, or both are wrong it will be ok. Otherwise, It must be something wrong. +*/ +func (c *testCase) execParaDDLSQL(taskCh chan *ddlJobTask, num int) error { + if num == 0 { + return nil + } + tasks := make([]*ddlJobTask, 0, num) + var wg sync.WaitGroup + for i := 0; i < num; i++ { + task := <-taskCh + tasks = append(tasks, task) + wg.Add(1) + go func(task *ddlJobTask) { + defer wg.Done() + opStart := time.Now() + db := c.dbs[0] + _, err := db.Exec(task.sql) + glog.Infof("[ddl] [instance %d] TiDB execute %s , err %v, table_id %s, elapsed time:%v", c.caseIndex, task.sql, err, task.tblInfo.id, time.Since(opStart).Seconds()) + task.err = err + }(task) + } + wg.Wait() + db := c.dbs[0] + SortTasks, err := c.getSortTask(db, tasks) + if err != nil { + return err + } + for _, task := range SortTasks { + err := c.updateTableInfo(task) + glog.Infof("[ddl] [instance %d] local execute %s, err %v , table_id %s, ddlID %v", c.caseIndex, task.sql, err, task.tblInfo.id, task.ddlID) + if err == nil && task.err != nil || err != nil && task.err == nil { + return fmt.Errorf("Error when executing SQL: %s\n, local err: %#v, remote tidb err: %#v\n%s\n", task.sql, err, task.err, task.tblInfo.debugPrintToString()) + } + } + return nil +} + +// execSerialDDLSQL gets a job from taskCh, and then executes the job. +func (c *testCase) execSerialDDLSQL(taskCh chan *ddlJobTask) error { + if len(taskCh) < 1 { + return nil + } + task := <-taskCh + db := c.dbs[0] + opStart := time.Now() + _, err := db.Exec(task.sql) + glog.Infof("[ddl] [instance %d] %s, elapsed time:%v", c.caseIndex, task.sql, time.Since(opStart).Seconds()) + if err != nil { + return fmt.Errorf("Error when executing SQL: %s\n remote tidb Err: %#v\n%s\n", task.sql, err, task.tblInfo.debugPrintToString()) + } + err = c.updateTableInfo(task) + if err != nil { + return fmt.Errorf("Error when executing SQL: %s\n local Err: %#v\n%s\n", task.sql, err, task.tblInfo.debugPrintToString()) + } + return nil +} + +func (c *testCase) generateAddTable() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAddTable, nil, ddlAddTable}) + return nil +} + +func (c *testCase) prepareAddTable(cfg interface{}, taskCh chan *ddlJobTask) error { + columnCount := rand.Intn(c.cfg.TablesToCreate) + 2 + tableColumns := make([]*ddlTestColumn, 0, columnCount) + for i := 0; i < columnCount; i++ { + columns := getRandDDLTestColumns() + tableColumns = append(tableColumns, columns...) + } + + // Generate primary key with [0, 3) size + primaryKeyFields := rand.Intn(3) + primaryKeys := make([]int, 0) + if primaryKeyFields > 0 { + // Random elections column as primary key, but also check the column whether can be primary key. + perm := rand.Perm(len(tableColumns))[0:primaryKeyFields] + for _, columnIndex := range perm { + if tableColumns[columnIndex].canBePrimary() { + tableColumns[columnIndex].isPrimaryKey = true + primaryKeys = append(primaryKeys, columnIndex) + } + } + primaryKeyFields = len(primaryKeys) + } + + tableInfo := ddlTestTable{ + name: string(uuid.NewUUID()), + columns: tableColumns, + indexes: make([]*ddlTestIndex, 0), + numberOfRows: 0, + deleted: 0, + } + + sql := fmt.Sprintf("CREATE TABLE `%s` (", tableInfo.name) + for i := 0; i < len(tableInfo.columns); i++ { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s` %s", tableColumns[i].name, tableColumns[i].getDefinition()) + } + if primaryKeyFields > 0 { + sql += ", PRIMARY KEY (" + for i, columnIndex := range primaryKeys { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s`", tableColumns[columnIndex].name) + } + sql += ")" + } + sql += ")" + + task := &ddlJobTask{ + k: ddlAddTable, + sql: sql, + tblInfo: &tableInfo, + } + taskCh <- task + return nil +} + +func (c *testCase) addTableInfo(task *ddlJobTask) error { + c.tablesLock.Lock() + defer c.tablesLock.Unlock() + c.tables[task.tblInfo.name] = task.tblInfo + + return nil +} + +func (c *testCase) generateDropTable() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareDropTable, nil, ddlDropTable}) + return nil +} + +func (c *testCase) prepareDropTable(cfg interface{}, taskCh chan *ddlJobTask) error { + c.tablesLock.Lock() + defer c.tablesLock.Unlock() + tableToDrop := c.pickupRandomTable() + if len(c.tables) <= 1 || tableToDrop == nil { + return nil + } + tableToDrop.setDeleted() + sql := fmt.Sprintf("DROP TABLE `%s`", tableToDrop.name) + + task := &ddlJobTask{ + k: ddlDropTable, + sql: sql, + tblInfo: tableToDrop, + } + taskCh <- task + return nil +} + +func (c *testCase) dropTableJob(task *ddlJobTask) error { + c.tablesLock.Lock() + defer c.tablesLock.Unlock() + if c.isTableDeleted(task.tblInfo) { + return fmt.Errorf("table %s is not exists", task.tblInfo.name) + } + delete(c.tables, task.tblInfo.name) + return nil +} + +type ddlTestIndexStrategy = int + +const ( + ddlTestIndexStrategyBegin ddlTestIndexStrategy = iota + ddlTestIndexStrategySingleColumnAtBeginning + ddlTestIndexStrategySingleColumnAtEnd + ddlTestIndexStrategySingleColumnRandom + ddlTestIndexStrategyMultipleColumnRandom + ddlTestIndexStrategyEnd +) + +type ddlTestAddIndexConfig struct { + strategy ddlTestIndexStrategy +} + +type ddlIndexJobArg struct { + index *ddlTestIndex +} + +func (c *testCase) generateAddIndex() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAddIndex, nil, ddlAddIndex}) + return nil +} + +func (c *testCase) prepareAddIndex(_ interface{}, taskCh chan *ddlJobTask) error { + table := c.pickupRandomTable() + if table == nil { + return nil + } + strategy := rand.Intn(ddlTestIndexStrategyMultipleColumnRandom) + ddlTestIndexStrategySingleColumnAtBeginning + // build index definition + index := ddlTestIndex{ + name: string(uuid.NewUUID()), + signature: "", + columns: make([]*ddlTestColumn, 0), + } + + switch strategy { + case ddlTestIndexStrategySingleColumnAtBeginning: + if !table.columns[0].canBeIndex() { + return nil + } + index.columns = append(index.columns, table.columns[0]) + case ddlTestIndexStrategySingleColumnAtEnd: + if !table.columns[len(table.columns)-1].canBeIndex() { + return nil + } + index.columns = append(index.columns, table.columns[len(table.columns)-1]) + case ddlTestIndexStrategySingleColumnRandom: + col := table.columns[rand.Intn(len(table.columns))] + if !col.canBeIndex() { + return nil + } + index.columns = append(index.columns, col) + case ddlTestIndexStrategyMultipleColumnRandom: + numberOfColumns := rand.Intn(len(table.columns)) + 1 + // Multiple columns of one index should no more than 16. + if numberOfColumns > 10 { + numberOfColumns = 10 + } + perm := rand.Perm(len(table.columns))[:numberOfColumns] + for _, idx := range perm { + if table.columns[idx].canBeIndex() { + index.columns = append(index.columns, table.columns[idx]) + } + } + } + + if len(index.columns) == 0 { + return nil + } + + signature := "" + for _, col := range index.columns { + signature += col.name + "," + } + index.signature = signature + + // check whether index duplicates + for _, idx := range table.indexes { + if idx.signature == index.signature { + return nil + } + } + + // build SQL + sql := fmt.Sprintf("ALTER TABLE `%s` ADD INDEX `%s` (", table.name, index.name) + for i, column := range index.columns { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s`", column.name) + } + sql += ")" + + arg := &ddlIndexJobArg{index: &index} + task := &ddlJobTask{ + k: ddlAddIndex, + sql: sql, + tblInfo: table, + arg: ddlJobArg(arg), + } + taskCh <- task + return nil +} + +func (c *testCase) addIndexJob(task *ddlJobTask) error { + jobArg := (*ddlIndexJobArg)(task.arg) + tblInfo := task.tblInfo + + if c.isTableDeleted(tblInfo) { + return fmt.Errorf("table %s is not exists", tblInfo.name) + } + + for _, column := range jobArg.index.columns { + if tblInfo.isColumnDeleted(column) { + return fmt.Errorf("local Execute add index %s on column %s error , column is deleted", jobArg.index.name, column.name) + } + } + tblInfo.indexes = append(tblInfo.indexes, jobArg.index) + for _, column := range jobArg.index.columns { + column.indexReferences++ + } + return nil +} + +func (c *testCase) generateDropIndex() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareDropIndex, nil, ddlDropIndex}) + return nil +} + +func (c *testCase) prepareDropIndex(_ interface{}, taskCh chan *ddlJobTask) error { + table := c.pickupRandomTable() + if table == nil { + return nil + } + if len(table.indexes) == 0 { + return nil + } + indexToDropIndex := rand.Intn(len(table.indexes)) + indexToDrop := table.indexes[indexToDropIndex] + sql := fmt.Sprintf("ALTER TABLE `%s` DROP INDEX `%s`", table.name, indexToDrop.name) + + arg := &ddlIndexJobArg{index: indexToDrop} + task := &ddlJobTask{ + k: ddlDropIndex, + sql: sql, + tblInfo: table, + arg: ddlJobArg(arg), + } + taskCh <- task + return nil +} + +func (c *testCase) dropIndexJob(task *ddlJobTask) error { + jobArg := (*ddlIndexJobArg)(task.arg) + tblInfo := task.tblInfo + + if c.isTableDeleted(tblInfo) { + return fmt.Errorf("table %s is not exists", tblInfo.name) + } + + iOfDropIndex := -1 + for i := range tblInfo.indexes { + if jobArg.index.name == tblInfo.indexes[i].name { + iOfDropIndex = i + break + } + } + if iOfDropIndex == -1 { + return fmt.Errorf("table %s , index %s is not exists", tblInfo.name, jobArg.index.name) + } + + for _, column := range jobArg.index.columns { + column.indexReferences-- + if column.indexReferences < 0 { + return fmt.Errorf("drop index, index.column %s Unexpected index reference", column.name) + } + } + tblInfo.indexes = append(tblInfo.indexes[:iOfDropIndex], tblInfo.indexes[iOfDropIndex+1:]...) + return nil +} + +type ddlTestAddDropColumnStrategy = int + +const ( + ddlTestAddDropColumnStrategyBegin ddlTestAddDropColumnStrategy = iota + ddlTestAddDropColumnStrategyAtBeginning + ddlTestAddDropColumnStrategyAtEnd + ddlTestAddDropColumnStrategyAtRandom + ddlTestAddDropColumnStrategyEnd +) + +type ddlTestAddDropColumnConfig struct { + strategy ddlTestAddDropColumnStrategy +} + +type ddlColumnJobArg struct { + column *ddlTestColumn + strategy ddlTestAddDropColumnStrategy + insertAfterColumn *ddlTestColumn +} + +func (c *testCase) generateAddColumn() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareAddColumn, nil, ddlAddColumn}) + return nil +} + +func (c *testCase) prepareAddColumn(_ interface{}, taskCh chan *ddlJobTask) error { + table := c.pickupRandomTable() + if table == nil { + return nil + } + strategy := rand.Intn(ddlTestAddDropColumnStrategyAtRandom) + ddlTestAddDropColumnStrategyAtBeginning + newColumn := getRandDDLTestColumn() + insertAfterPosition := -1 + // build SQL + sql := fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN `%s` %s", table.name, newColumn.name, newColumn.getDefinition()) + switch strategy { + case ddlTestAddDropColumnStrategyAtBeginning: + sql += " FIRST" + case ddlTestAddDropColumnStrategyAtEnd: + // do nothing + case ddlTestAddDropColumnStrategyAtRandom: + insertAfterPosition = rand.Intn(len(table.columns)) + sql += fmt.Sprintf(" AFTER `%s`", table.columns[insertAfterPosition].name) + } + + arg := &ddlColumnJobArg{ + column: newColumn, + strategy: strategy, + } + if insertAfterPosition != -1 { + arg.insertAfterColumn = table.columns[insertAfterPosition] + } + task := &ddlJobTask{ + k: ddlAddColumn, + sql: sql, + tblInfo: table, + arg: ddlJobArg(arg), + } + taskCh <- task + return nil +} + +func (c *testCase) addColumnJob(task *ddlJobTask) error { + jobArg := (*ddlColumnJobArg)(task.arg) + table := task.tblInfo + table.lock.Lock() + defer table.lock.Unlock() + + if c.isTableDeleted(table) { + return fmt.Errorf("table %s is not exists", table.name) + } + newColumn := jobArg.column + strategy := jobArg.strategy + + newColumn.rows = make([]interface{}, table.numberOfRows) + for i := 0; i < table.numberOfRows; i++ { + newColumn.rows[i] = newColumn.defaultValue + } + + switch strategy { + case ddlTestAddDropColumnStrategyAtBeginning: + table.columns = append([]*ddlTestColumn{newColumn}, table.columns...) + case ddlTestAddDropColumnStrategyAtEnd: + table.columns = append(table.columns, newColumn) + case ddlTestAddDropColumnStrategyAtRandom: + insertAfterPosition := -1 + for i := range table.columns { + if jobArg.insertAfterColumn.name == table.columns[i].name { + insertAfterPosition = i + break + } + } + if insertAfterPosition == -1 { + return fmt.Errorf("table %s ,insert column %s after column, column %s is not exists ", table.name, newColumn.name, jobArg.insertAfterColumn.name) + } + table.columns = append(table.columns[:insertAfterPosition+1], append([]*ddlTestColumn{newColumn}, table.columns[insertAfterPosition+1:]...)...) + } + return nil +} + +func (c *testCase) generateDropColumn() error { + c.ddlOps = append(c.ddlOps, ddlTestOpExecutor{c.prepareDropColumn, nil, ddlDropColumn}) + return nil +} + +func (c *testCase) prepareDropColumn(_ interface{}, taskCh chan *ddlJobTask) error { + table := c.pickupRandomTable() + if table == nil { + return nil + } + + columnsSnapshot := table.filterColumns(table.predicateAll) + if len(columnsSnapshot) <= 1 { + return nil + } + + strategy := rand.Intn(ddlTestAddDropColumnStrategyAtRandom) + ddlTestAddDropColumnStrategyAtBeginning + columnToDropIndex := -1 + switch strategy { + case ddlTestAddDropColumnStrategyAtBeginning: + columnToDropIndex = 0 + case ddlTestAddDropColumnStrategyAtEnd: + columnToDropIndex = len(table.columns) - 1 + case ddlTestAddDropColumnStrategyAtRandom: + columnToDropIndex = rand.Intn(len(table.columns)) + } + + columnToDrop := table.columns[columnToDropIndex] + + // primary key columns cannot be dropped + if columnToDrop.isPrimaryKey { + return nil + } + + // column cannot be dropped if the column has generated column dependency + if columnToDrop.hasGenerateCol() { + return nil + } + + // we does not support dropping a column with index + if columnToDrop.indexReferences > 0 { + return nil + } + columnToDrop.setDeleted() + sql := fmt.Sprintf("ALTER TABLE `%s` DROP COLUMN `%s`", table.name, columnToDrop.name) + + arg := &ddlColumnJobArg{ + column: columnToDrop, + strategy: strategy, + insertAfterColumn: nil, + } + task := &ddlJobTask{ + k: ddlDropColumn, + sql: sql, + tblInfo: table, + arg: ddlJobArg(arg), + } + taskCh <- task + return nil +} + +func (c *testCase) dropColumnJob(task *ddlJobTask) error { + jobArg := (*ddlColumnJobArg)(task.arg) + table := task.tblInfo + table.lock.Lock() + defer table.lock.Unlock() + if c.isTableDeleted(table) { + return fmt.Errorf("table %s is not exists", table.name) + } + columnToDrop := jobArg.column + if columnToDrop.indexReferences > 0 { + columnToDrop.setDeletedRecover() + return fmt.Errorf("local Execute drop column %s on table %s error , column has index reference", jobArg.column.name, table.name) + } + dropColumnPosition := -1 + for i := range table.columns { + if columnToDrop.name == table.columns[i].name { + dropColumnPosition = i + break + } + } + if dropColumnPosition == -1 { + return fmt.Errorf("table %s ,drop column , column %s is not exists ", table.name, columnToDrop.name) + } + // update table definitions + table.columns = append(table.columns[:dropColumnPosition], table.columns[dropColumnPosition+1:]...) + // if the drop column is a generated column , we should update the dependency column + if columnToDrop.isGenerated() { + col := columnToDrop.dependency + i := 0 + for i = range col.dependenciedCols { + if col.dependenciedCols[i].name == columnToDrop.name { + break + } + } + col.dependenciedCols = append(col.dependenciedCols[:i], col.dependenciedCols[i+1:]...) + } + return nil +} + +// getHistoryDDLJobs send "admin show ddl jobs" to TiDB to get ddl jobs execute order. +// Use TABLE_NAME or TABLE_ID, and JOB_TYPE to confirm which ddl job is the DDL request we send to TiDB. +// We cannot send the same DDL type to same table more than once in a batch of parallel DDL request. The reason is below: +// For example, execute SQL1: "ALTER TABLE t1 DROP COLUMN c1" , SQL2:"ALTER TABLE t1 DROP COLUMN c2", and the "admin show ddl jobs" result is: +// +--------+---------+------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+--------+ +// | JOB_ID | DB_NAME | TABLE_NAME | JOB_TYPE | SCHEMA_STATE | SCHEMA_ID | TABLE_ID | ROW_COUNT | START_TIME | STATE | +// +--------+---------+------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+--------+ +// | 47 | test | t1 | drop column | none | 1 | 44 | 0 | 2018-07-13 13:13:55.57 +0800 CST | synced | +// | 46 | test | t1 | drop column | none | 1 | 44 | 0 | 2018-07-13 13:13:52.523 +0800 CST | synced | +// +--------+---------+------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+--------+ +// We cannot confirm which DDL execute first. +func (c *testCase) getHistoryDDLJobs(db *sql.DB, tasks []*ddlJobTask) ([]*ddlJob, error) { + // build SQL + sql := "admin show ddl jobs" + // execute + opStart := time.Now() + rows, err := db.Query(sql) + glog.Infof("%s, elapsed time:%v", sql, time.Since(opStart).Seconds()) + if err != nil { + return nil, err + } + defer rows.Close() + + jobs := make([]*ddlJob, 0, len(tasks)) + // Read all rows. + var actualRows [][]string + for rows.Next() { + cols, err1 := rows.Columns() + if err1 != nil { + return nil, err1 + } + + rawResult := make([][]byte, len(cols)) + result := make([]string, len(cols)) + dest := make([]interface{}, len(cols)) + for i := range rawResult { + dest[i] = &rawResult[i] + } + + err1 = rows.Scan(dest...) + if err1 != nil { + return nil, err1 + } + + for i, raw := range rawResult { + if raw == nil { + result[i] = "NULL" + } else { + val := string(raw) + result[i] = val + } + } + actualRows = append(actualRows, result) + } + if rows.Err() != nil { + return nil, rows.Err() + } + /********************************* + +--------+---------+--------------------------------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+-----------+ + | JOB_ID | DB_NAME | TABLE_NAME | JOB_TYPE | SCHEMA_STATE | SCHEMA_ID | TABLE_ID | ROW_COUNT | START_TIME | STATE | + +--------+---------+--------------------------------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+-----------+ + | 49519 | test | | add column | none | 49481 | 49511 | 0 | 2018-07-09 21:29:02.249 +0800 CST | cancelled | + | 49518 | test | | drop table | none | 49481 | 49511 | 0 | 2018-07-09 21:29:01.999 +0800 CST | synced | + | 49517 | test | ea5be232-50ce-43b1-8d40-33de2ae08bca | create table | public | 49481 | 49515 | 0 | 2018-07-09 21:29:01.999 +0800 CST | synced | + +--------+---------+--------------------------------------+--------------+--------------+-----------+----------+-----------+-----------------------------------+-----------+ + *********************************/ + for _, row := range actualRows { + if len(row) < 9 { + return nil, fmt.Errorf("%s return error, no enough column return , return row: %s", sql, row) + } + id, err := strconv.Atoi(row[0]) + if err != nil { + return nil, err + } + if id <= c.lastDDLID { + continue + } + k, ok := mapOfDDLKind[row[3]] + if !ok { + continue + } + job := ddlJob{ + id: id, + tableName: row[2], + k: k, + tableID: row[6], // table id + jobState: row[9], + } + jobs = append(jobs, &job) + } + return jobs, nil +} + +// getSortTask return the tasks sort by ddl JOB_ID +func (c *testCase) getSortTask(db *sql.DB, tasks []*ddlJobTask) ([]*ddlJobTask, error) { + jobs, err := c.getHistoryDDLJobs(db, tasks) + if err != nil { + return nil, err + } + sortTasks := make([]*ddlJobTask, 0, len(tasks)) + for _, job := range jobs { + for _, task := range tasks { + if task.k == ddlAddTable && job.k == ddlAddTable && task.tblInfo.name == job.tableName { + task.ddlID = job.id + task.tblInfo.id = job.tableID + sortTasks = append(sortTasks, task) + break + } + if task.k != ddlAddTable && job.k == task.k && task.tblInfo.id == job.tableID { + task.ddlID = job.id + sortTasks = append(sortTasks, task) + break + } + } + if len(sortTasks) == len(tasks) { + break + } + } + + if len(sortTasks) != len(tasks) { + str := "admin show ddl jobs len != len(tasks)\n" + str += "admin get job\n" + str += fmt.Sprintf("%v\t%v\t%v\t%v\t%v\n", "Job_ID", "TABLE_NAME", "JOB_TYPE", "TABLE_ID", "JOB_STATE") + for _, job := range jobs { + str += fmt.Sprintf("%v\t%v\t%v\t%v\t%v\n", job.id, job.tableName, mapOfDDLKindToString[job.k], job.tableID, job.jobState) + } + str += "ddl tasks\n" + str += fmt.Sprintf("%v\t%v\t%v\t%v\n", "Job_ID", "TABLE_NAME", "JOB_TYPE", "TABLE_ID") + for _, task := range tasks { + str += fmt.Sprintf("%v\t%v\t%v\t%v\n", task.ddlID, task.tblInfo.name, mapOfDDLKindToString[task.k], task.tblInfo.id) + } + + str += "ddl sort tasks\n" + str += fmt.Sprintf("%v\t%v\t%v\t%v\n", "Job_ID", "TABLE_NAME", "JOB_TYPE", "TABLE_ID") + for _, task := range sortTasks { + str += fmt.Sprintf("%v\t%v\t%v\t%v\n", task.ddlID, task.tblInfo.name, mapOfDDLKindToString[task.k], task.tblInfo.id) + } + return nil, fmt.Errorf(str) + } + + sort.Sort(ddlJobTasks(sortTasks)) + if len(sortTasks) > 0 { + c.lastDDLID = sortTasks[len(sortTasks)-1].ddlID + } + return sortTasks, nil +} + +type ddlJobTasks []*ddlJobTask + +func (tasks ddlJobTasks) Swap(i, j int) { + tasks[i], tasks[j] = tasks[j], tasks[i] +} + +func (tasks ddlJobTasks) Len() int { + return len(tasks) +} + +func (tasks ddlJobTasks) Less(i, j int) bool { + return tasks[i].ddlID < tasks[j].ddlID +} diff --git a/tests/pkg/workload/ddl/internal/demo.go b/tests/pkg/workload/ddl/internal/demo.go new file mode 100644 index 0000000000..d41f4a3588 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/demo.go @@ -0,0 +1,86 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +//import ( +// "database/sql" +// "sync" +//) +// +//func OpenDB(dsn string, maxIdleConns int) (*sql.DB, error) { panic("unimplemented") } +// +//func Run(dbName string) { +// dbss := make([][]*sql.DB, 0) +//} +// +//type DDLCaseConfig struct { +// MySQLCompatible bool +// Concurrency int +// TablesToCreate int +//} +// +//type DDLCase struct { +// cfg *DDLCaseConfig +// cases []*testCase +//} +// +//type ddlJobTask struct { +// ddlID int +// k DDLKind +// tblInfo *ddlTestTable +// sql string +// arg ddlJobArg +// err error // remote TiDB execute error +//} +// +//type ddlTestOpExecutor struct { +// executeFunc func(interface{}, chan *ddlJobTask) error +// config interface{} +// ddlKind DDLKind +//} +// +//type dmlTestOpExecutor struct { +// prepareFunc func(interface{}, chan *dmlJobTask) error +// config interface{} +//} +// +//type testCase struct { +// cfg *DDLCaseConfig +// dbs []*sql.DB +// caseIndex int +// ddlOps []ddlTestOpExecutor +// dmlOps []dmlTestOpExecutor +// tables map[string]*ddlTestTable +// tablesLock sync.RWMutex +// stop int32 +// lastDDLID int +//} +// +//func NewDDLCase(cfg *DDLCaseConfig) *DDLCase { +// cases := make([]*testCase, cfg.Concurrency) +// for i := 0; i < cfg.Concurrency; i++ { +// cases[i] = &testCase{ +// cfg: cfg, +// tables: make(map[string]*ddlTestTable), +// ddlOps: make([]ddlTestOpExecutor, 0), +// dmlOps: make([]dmlTestOpExecutor, 0), +// caseIndex: i, +// stop: 0, +// } +// } +// b := &DDLCase{ +// cfg: cfg, +// cases: cases, +// } +// return b +//} diff --git a/tests/pkg/workload/ddl/internal/dml_ops.go b/tests/pkg/workload/ddl/internal/dml_ops.go new file mode 100644 index 0000000000..dfa3488018 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/dml_ops.go @@ -0,0 +1,620 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + + "github.com/golang/glog" + "github.com/juju/errors" +) + +func (c *testCase) generateDMLOps() error { + if err := c.generateInsert(); err != nil { + return errors.Trace(err) + } + if err := c.generateUpdate(); err != nil { + return errors.Trace(err) + } + if err := c.generateDelete(); err != nil { + return errors.Trace(err) + } + return nil +} + +type ddlTestInsertColumnStrategy int +type ddlTestInsertMissingValueStrategy int + +const ( + ddlTestInsertColumnStrategyBegin ddlTestInsertColumnStrategy = iota + ddlTestInsertColumnStrategyZeroNonPk + ddlTestInsertColumnStrategyAllNonPk + ddlTestInsertColumnStrategyRandomNonPk + ddlTestInsertColumnStrategyEnd +) + +const ( + ddlTestInsertMissingValueStrategyBegin ddlTestInsertMissingValueStrategy = iota + ddlTestInsertMissingValueStrategyAllNull + ddlTestInsertMissingValueStrategyAllDefault + ddlTestInsertMissingValueStrategyRandom + ddlTestInsertMissingValueStrategyEnd +) + +type ddlTestInsertConfig struct { + useSetStatement bool // whether to use SET or VALUE statement + columnStrategy ddlTestInsertColumnStrategy // how non-Primary-Key columns are picked + missingValueStrategy ddlTestInsertMissingValueStrategy // how columns are filled when they are not picked in VALUE statement +} + +func checkConflict(task *dmlJobTask) error { + if task.tblInfo.isDeleted() { + return ddlTestErrorConflict{} + } + if task.assigns != nil { + for _, cd := range task.assigns { + if cd.column.isDeleted() { + return ddlTestErrorConflict{} + } + } + } + if task.whereColumns != nil { + for _, cd := range task.whereColumns { + if cd.column.isDeleted() { + return ddlTestErrorConflict{} + } + } + } + return nil +} + +func (c *testCase) sendDMLRequest(ctx context.Context, conn *sql.Conn, task *dmlJobTask) error { + _, err := conn.ExecContext(ctx, task.sql) + task.err = err + glog.Infof("[ddl] [instance %d] %s, err: %v", c.caseIndex, task.sql, err) + if err != nil { + err2 := checkConflict(task) + if err2 != nil { + return nil + } + return errors.Annotatef(err, "Error when executing SQL: %s\n%s", task.sql, task.tblInfo.debugPrintToString()) + } + return nil +} + +func (c *testCase) execDMLInLocal(task *dmlJobTask) error { + switch task.k { + case dmlInsert: + return c.doInsertJob(task) + case dmlUpdate: + return c.doUpdateJob(task) + case dmlDelete: + return c.doDeleteJob(task) + } + return fmt.Errorf("unknow dml task , %v", *task) +} + +// execSerialDDLSQL gets a job from taskCh, and then executes the job. +func (c *testCase) execSerialDMLSQL(taskCh chan *dmlJobTask) error { + ctx := context.Background() + dbIdx := rand.Intn(len(c.dbs)) + db := c.dbs[dbIdx] + conn, err := db.Conn(ctx) + if err != nil { + return nil + } + defer conn.Close() + for task := range taskCh { + err := c.sendDMLRequest(ctx, conn, task) + if err != nil { + if ignore_error(err) { + continue + } + return errors.Trace(err) + } + if task.err != nil { + continue + } + err = c.execDMLInLocal(task) + if err != nil { + return fmt.Errorf("Error when executing SQL: %s\n local Err: %#v\n%s\n", task.sql, err, task.tblInfo.debugPrintToString()) + } + } + return nil +} + +// execSerialDDLSQL gets a job from taskCh, and then executes the job. +func (c *testCase) execDMLInTransactionSQL(taskCh chan *dmlJobTask) error { + tasksLen := len(taskCh) + + ctx := context.Background() + conn, err := c.dbs[1].Conn(ctx) + if err != nil { + return nil + } + defer conn.Close() + + _, err = conn.ExecContext(ctx, "begin") + glog.Infof("[ddl] [instance %d] begin error: %v", c.caseIndex, err) + if err != nil { + return errors.Annotatef(err, "Error when executing SQL: %s", "begin") + } + + tasks := make([]*dmlJobTask, 0, tasksLen) + for i := 0; i < tasksLen; i++ { + task := <-taskCh + err = c.sendDMLRequest(ctx, conn, task) + tasks = append(tasks, task) + } + + _, err = conn.ExecContext(ctx, "commit") + glog.Infof("[ddl] [instance %d] commit error: %v", c.caseIndex, err) + if err != nil { + if ignore_error(err) { + return nil + } + for i := 0; i < tasksLen; i++ { + task := tasks[i] + // no conflict when send request but conflict when commit + if task.err == nil && checkConflict(task) != nil { + return nil + } + } + return errors.Annotatef(err, "Error when executing SQL: %s", "commit") + } + + for i := 0; i < tasksLen; i++ { + task := tasks[i] + if task.err != nil { + continue + } + err = c.execDMLInLocal(task) + if err != nil { + return fmt.Errorf("Error when executing SQL: %s\n local Err: %#v\n%s\n", task.sql, err, task.tblInfo.debugPrintToString()) + } + } + return nil +} + +func (c *testCase) generateInsert() error { + for i := 0; i < 5; i++ { + for columnStrategy := ddlTestInsertColumnStrategyBegin + 1; columnStrategy < ddlTestInsertColumnStrategyEnd; columnStrategy++ { + // Note: `useSetStatement` is commented out since `... VALUES ...` SQL will generates column conflicts with add / drop column. + // We always use `... SET ...` syntax currently. + + // for useSetStatement := 0; useSetStatement < 2; useSetStatement++ { + config := ddlTestInsertConfig{ + useSetStatement: true, // !(useSetStatement == 0), + columnStrategy: columnStrategy, + } + // if config.useSetStatement { + c.dmlOps = append(c.dmlOps, dmlTestOpExecutor{c.prepareInsert, config}) + // } else { + // for missingValueStrategy := ddlTestInsertMissingValueStrategyBegin + 1; missingValueStrategy < ddlTestInsertMissingValueStrategyEnd; missingValueStrategy++ { + // config.missingValueStrategy = missingValueStrategy + // c.dmlOps = append(c.dmlOps, ddlTestOpExecutor{c.executeInsert, config}) + // } + // } + // } + } + } + return nil +} + +func (c *testCase) prepareInsert(cfg interface{}, taskCh chan *dmlJobTask) error { + c.tablesLock.RLock() + table := c.pickupRandomTable() + if table == nil { + c.tablesLock.RUnlock() + return nil + } + table.lock.RLock() + columns := table.filterColumns(table.predicateNotGenerated) + nonPkColumns := table.filterColumns(table.predicateNonPrimaryKeyAndNotGen) + table.lock.RUnlock() + c.tablesLock.RUnlock() + + config := cfg.(ddlTestInsertConfig) + + // build assignments + assigns := make([]*ddlTestColumnDescriptor, 0) + for _, column := range columns { + pick := false + if column.isPrimaryKey { + // PrimaryKey Column is always assigned values + pick = true + } else { + // NonPrimaryKey Column is assigned by strategy + switch config.columnStrategy { + case ddlTestInsertColumnStrategyAllNonPk: + pick = true + case ddlTestInsertColumnStrategyZeroNonPk: + pick = false + case ddlTestInsertColumnStrategyRandomNonPk: + if rand.Float64() <= float64(1)/float64(len(nonPkColumns)) { + pick = true + } + } + } + if pick { + // check unique value when inserting into a column of primary key + if column.isPrimaryKey { + if newValue, ok := column.randValueUnique(column.rows); ok { + assigns = append(assigns, &ddlTestColumnDescriptor{column, newValue}) + } else { + return nil + } + } else { + assigns = append(assigns, &ddlTestColumnDescriptor{column, column.randValue()}) + } + } + } + + // build SQL + sql := "" + if config.useSetStatement { + if len(assigns) == 0 { + return nil + } + sql = fmt.Sprintf("INSERT INTO `%s` SET ", table.name) + perm := rand.Perm(len(assigns)) + for i, idx := range perm { + assign := assigns[idx] + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s` = %v", assign.column.name, assign.getValueString()) + } + } else { + sql = fmt.Sprintf("INSERT INTO `%s` VALUE (", table.name) + for colIdx, column := range columns { + if colIdx > 0 { + sql += ", " + } + cd := column.getMatchedColumnDescriptor(assigns) + if cd != nil { + sql += fmt.Sprintf("%v", cd.getValueString()) + } else { + var missingValueSQL string + switch config.missingValueStrategy { + case ddlTestInsertMissingValueStrategyAllDefault: + missingValueSQL = "DEFAULT" + case ddlTestInsertMissingValueStrategyAllNull: + missingValueSQL = "NULL" + case ddlTestInsertMissingValueStrategyRandom: + if rand.Float64() <= 0.5 { + missingValueSQL = "DEFAULT" + } else { + missingValueSQL = "NULL" + } + } + sql += missingValueSQL + var missingValue interface{} + if missingValueSQL == "DEFAULT" { + missingValue = column.defaultValue + } else if missingValueSQL == "NULL" { + missingValue = ddlTestValueNull + } else { + panic("invalid missing value") + } + // add column to ref list + assigns = append(assigns, &ddlTestColumnDescriptor{column, missingValue}) + } + } + sql += ")" + } + + task := &dmlJobTask{ + k: dmlInsert, + sql: sql, + tblInfo: table, + assigns: assigns, + } + taskCh <- task + return nil +} + +func (c *testCase) doInsertJob(task *dmlJobTask) error { + table := task.tblInfo + assigns := task.assigns + + // append row + table.lock.Lock() + for _, column := range table.columns { + cd := column.getMatchedColumnDescriptor(assigns) + if cd == nil { + if column.isGenerated() { + cd = column.dependency.getMatchedColumnDescriptor(assigns) + if cd == nil { + column.rows = append(column.rows, nil) + } else { + column.rows = append(column.rows, cd.column.getDependenciedColsValue(column)) + } + } else { + // only happens when using SET + column.rows = append(column.rows, column.defaultValue) + } + } else { + column.rows = append(column.rows, cd.value) + } + } + table.numberOfRows++ + table.lock.Unlock() + return nil +} + +type ddlTestWhereStrategy int + +const ( + ddlTestWhereStrategyBegin ddlTestWhereStrategy = iota + ddlTestWhereStrategyNone + ddlTestWhereStrategyRandomInPk + ddlTestWhereStrategyRandomInNonPk + ddlTestWhereStrategyRandomMixed + ddlTestWhereStrategyEnd +) + +type ddlTestUpdateTargetStrategy int + +const ( + ddlTestUpdateTargetStrategyBegin ddlTestUpdateTargetStrategy = iota + ddlTestUpdateTargetStrategyAllColumns + ddlTestUpdateTargetStrategyRandom + ddlTestUpdateTargetStrategyEnd +) + +type ddlTestUpdateConfig struct { + whereStrategy ddlTestWhereStrategy // how "where" statement is generated + targetStrategy ddlTestUpdateTargetStrategy // which column to update +} + +func (c *testCase) generateUpdate() error { + for i := 0; i < 5; i++ { + for whereStrategy := ddlTestWhereStrategyBegin + 1; whereStrategy < ddlTestWhereStrategyEnd; whereStrategy++ { + for targetStrategy := ddlTestUpdateTargetStrategyBegin + 1; targetStrategy < ddlTestUpdateTargetStrategyEnd; targetStrategy++ { + config := ddlTestUpdateConfig{ + whereStrategy: whereStrategy, + targetStrategy: targetStrategy, + } + c.dmlOps = append(c.dmlOps, dmlTestOpExecutor{c.prepareUpdate, config}) + } + } + } + return nil +} + +func (c *testCase) buildWhereColumns(whereStrategy ddlTestWhereStrategy, pkColumns, nonPkColumns []*ddlTestColumn, numberOfRows int) []*ddlTestColumnDescriptor { + // build where conditions + whereColumns := make([]*ddlTestColumnDescriptor, 0) + if whereStrategy == ddlTestWhereStrategyRandomInPk || whereStrategy == ddlTestWhereStrategyRandomMixed { + if len(pkColumns) > 0 { + picks := rand.Intn(len(pkColumns)) + perm := rand.Perm(picks) + for _, idx := range perm { + // value will be filled later + whereColumns = append(whereColumns, &ddlTestColumnDescriptor{pkColumns[idx], -1}) + } + } + } + if whereStrategy == ddlTestWhereStrategyRandomInNonPk || whereStrategy == ddlTestWhereStrategyRandomMixed { + if len(nonPkColumns) > 0 { + picks := rand.Intn(len(nonPkColumns)) + perm := rand.Perm(picks) + for _, idx := range perm { + // value will be filled later + whereColumns = append(whereColumns, &ddlTestColumnDescriptor{nonPkColumns[idx], -1}) + } + } + } + + // fill values of where statements + if len(whereColumns) > 0 { + rowToUpdate := rand.Intn(numberOfRows) + for _, cd := range whereColumns { + cd.value = cd.column.rows[rowToUpdate] + } + } + + return whereColumns +} + +func (c *testCase) prepareUpdate(cfg interface{}, taskCh chan *dmlJobTask) error { + c.tablesLock.RLock() + table := c.pickupRandomTable() + if table == nil { + c.tablesLock.RUnlock() + return nil + } + table.lock.RLock() + pkColumns := table.filterColumns(table.predicatePrimaryKey) + nonPkColumnsAndCanBeWhere := table.filterColumns(table.predicateNonPrimaryKeyAndCanBeWhere) + nonPkColumnsAndNotGen := table.filterColumns(table.predicateNonPrimaryKeyAndNotGen) + table.lock.RUnlock() + c.tablesLock.RUnlock() + + if table.numberOfRows == 0 { + return nil + } + + config := cfg.(ddlTestUpdateConfig) + + // build where conditions + whereColumns := c.buildWhereColumns(config.whereStrategy, pkColumns, nonPkColumnsAndCanBeWhere, table.numberOfRows) + + // build assignments + assigns := make([]*ddlTestColumnDescriptor, 0) + picks := 0 + switch config.targetStrategy { + case ddlTestUpdateTargetStrategyRandom: + if len(nonPkColumnsAndNotGen) > 0 { + picks = rand.Intn(len(nonPkColumnsAndNotGen)) + } + case ddlTestUpdateTargetStrategyAllColumns: + picks = len(nonPkColumnsAndNotGen) + } + if picks == 0 { + return nil + } + perm := rand.Perm(picks) + for _, idx := range perm { + assigns = append(assigns, &ddlTestColumnDescriptor{nonPkColumnsAndNotGen[idx], nonPkColumnsAndNotGen[idx].randValue()}) + } + + // build SQL + sql := fmt.Sprintf("UPDATE `%s` SET ", table.name) + for i, cd := range assigns { + if i > 0 { + sql += ", " + } + sql += fmt.Sprintf("`%s` = %v", cd.column.name, cd.getValueString()) + } + if len(whereColumns) > 0 { + sql += " WHERE " + for i, cd := range whereColumns { + if i > 0 { + sql += " AND " + } + sql += cd.buildConditionSQL() + } + } + + task := &dmlJobTask{ + k: dmlUpdate, + tblInfo: table, + sql: sql, + assigns: assigns, + whereColumns: whereColumns, + } + + taskCh <- task + return nil +} + +func (c *testCase) doUpdateJob(task *dmlJobTask) error { + table := task.tblInfo + assigns := task.assigns + whereColumns := task.whereColumns + + // update values + table.lock.RLock() + for i := 0; i < table.numberOfRows; i++ { + match := true + for _, cd := range whereColumns { + if cd.value != cd.column.rows[i] { + match = false + break + } + } + if match { + for _, cd := range assigns { + cd.column.rows[i] = cd.value + if cd.column.hasGenerateCol() { + for _, col := range cd.column.dependenciedCols { + col.rows[i] = cd.column.getDependenciedColsValue(col) + } + } + } + } + } + table.lock.RUnlock() + return nil + +} + +type ddlTestDeleteConfig struct { + whereStrategy ddlTestWhereStrategy // how "where" statement is generated +} + +func (c *testCase) generateDelete() error { + for i := 0; i < 5; i++ { + for whereStrategy := ddlTestWhereStrategyBegin + 1; whereStrategy < ddlTestWhereStrategyEnd; whereStrategy++ { + config := ddlTestDeleteConfig{ + whereStrategy: whereStrategy, + } + c.dmlOps = append(c.dmlOps, dmlTestOpExecutor{c.prepareDelete, config}) + } + } + return nil +} + +func (c *testCase) prepareDelete(cfg interface{}, taskCh chan *dmlJobTask) error { + c.tablesLock.RLock() + table := c.pickupRandomTable() + if table == nil { + c.tablesLock.RUnlock() + return nil + } + table.lock.RLock() + pkColumns := table.filterColumns(table.predicatePrimaryKey) + nonPkColumnsAndCanBeWhere := table.filterColumns(table.predicateNonPrimaryKeyAndCanBeWhere) + table.lock.RUnlock() + c.tablesLock.RUnlock() + + if table.numberOfRows == 0 { + return nil + } + + config := cfg.(ddlTestDeleteConfig) + whereColumns := c.buildWhereColumns(config.whereStrategy, pkColumns, nonPkColumnsAndCanBeWhere, table.numberOfRows) + + // build SQL + sql := fmt.Sprintf("DELETE FROM `%s`", table.name) + if len(whereColumns) > 0 { + sql += " WHERE " + for i, cd := range whereColumns { + if i > 0 { + sql += " AND " + } + sql += cd.buildConditionSQL() + } + } + + task := &dmlJobTask{ + k: dmlDelete, + tblInfo: table, + sql: sql, + whereColumns: whereColumns, + } + taskCh <- task + return nil +} + +func (c *testCase) doDeleteJob(task *dmlJobTask) error { + table := task.tblInfo + whereColumns := task.whereColumns + + // update values + table.lock.Lock() + for i := table.numberOfRows - 1; i >= 0; i-- { + match := true + for _, cd := range whereColumns { + if cd.value != cd.column.rows[i] { + match = false + break + } + } + if match { + // we must use `table.columns` here, since there might be new columns after deletion + for _, column := range table.columns { + column.rows = append(column.rows[:i], column.rows[i+1:]...) + } + table.numberOfRows-- + } + } + table.lock.Unlock() + + return nil +} diff --git a/tests/pkg/workload/ddl/internal/meta.go b/tests/pkg/workload/ddl/internal/meta.go new file mode 100644 index 0000000000..9a95d02e37 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/meta.go @@ -0,0 +1,636 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "bytes" + "database/sql" + "encoding/json" + "fmt" + "math/rand" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/util/uuid" +) + +type testCase struct { + cfg *DDLCaseConfig + dbs []*sql.DB + caseIndex int + ddlOps []ddlTestOpExecutor + dmlOps []dmlTestOpExecutor + tables map[string]*ddlTestTable + tablesLock sync.RWMutex + stop int32 + lastDDLID int +} + +type ddlTestErrorConflict struct { +} + +func (err ddlTestErrorConflict) Error() string { + return "Conflict operation" +} + +func (c *testCase) stopTest() { + atomic.StoreInt32(&c.stop, 1) +} + +func (c *testCase) isStop() bool { + return atomic.LoadInt32(&c.stop) == 1 +} + +// pickupRandomTables picks a table randomly. The callee should ensure that +// during this function call the table list is not modified. +// +// Normally the DML op callee should acquire a lock before calling this function +// because the table list may be modified by another parallel DDL op. However +// the DDL op callee doesn't need to acquire a lock because no one will modify the +// table list in parallel ---- DDL ops are executed one by one. +func (c *testCase) pickupRandomTable() *ddlTestTable { + tableNames := make([]string, 0) + for name, table := range c.tables { + if table.isDeleted() { + continue + } + tableNames = append(tableNames, name) + } + if len(tableNames) == 0 { + return nil + } + name := tableNames[rand.Intn(len(tableNames))] + return c.tables[name] +} + +func (c *testCase) isTableDeleted(table *ddlTestTable) bool { + if _, ok := c.tables[table.name]; ok { + return false + } + return true +} + +type ddlTestTable struct { + deleted int32 + name string + id string // table_id , get from admin show ddl jobs + columns []*ddlTestColumn + indexes []*ddlTestIndex + numberOfRows int + lock sync.RWMutex +} + +func (table *ddlTestTable) isDeleted() bool { + return atomic.LoadInt32(&table.deleted) != 0 +} + +func (table *ddlTestTable) setDeleted() { + atomic.StoreInt32(&table.deleted, 1) +} + +func (table *ddlTestTable) filterColumns(predicate func(*ddlTestColumn) bool) []*ddlTestColumn { + retColumns := make([]*ddlTestColumn, 0) + for index, col := range table.columns { + if predicate(col) && !col.isDeleted() { + retColumns = append(retColumns, table.columns[index]) + } + } + return retColumns +} + +func (table *ddlTestTable) predicateAll(col *ddlTestColumn) bool { + return true +} + +func (table *ddlTestTable) predicateNotGenerated(col *ddlTestColumn) bool { + return col.notGenerated() +} + +func (table *ddlTestTable) predicatePrimaryKey(col *ddlTestColumn) bool { + return col.isPrimaryKey +} + +func (table *ddlTestTable) predicateNonPrimaryKey(col *ddlTestColumn) bool { + return !col.isPrimaryKey +} + +func (table *ddlTestTable) predicateNonPrimaryKeyAndCanBeWhere(col *ddlTestColumn) bool { + return !col.isPrimaryKey && col.canBeWhere() +} + +func (table *ddlTestTable) predicateNonPrimaryKeyAndNotGen(col *ddlTestColumn) bool { + return !col.isPrimaryKey && col.notGenerated() +} + +// isColumnDeleted checks the col is deleted in this table +// col.isDeleted() will be true before when dropColumnJob(), +// but the col is really deleted after remote TiDB successful execute drop column ddl, and then, the col will be deleted from table.columns. +func (table *ddlTestTable) isColumnDeleted(col *ddlTestColumn) bool { + for i := range table.columns { + if col.name == table.columns[i].name { + return false + } + } + return true +} + +func (table *ddlTestTable) debugPrintToString() string { + var buffer bytes.Buffer + table.lock.RLock() + buffer.WriteString(fmt.Sprintf("======== DEBUG BEGIN ========\n")) + buffer.WriteString(fmt.Sprintf("Dumping expected contents for table `%s`:\n", table.name)) + if table.isDeleted() { + buffer.WriteString("[WARN] This table is marked as DELETED.\n") + } + buffer.WriteString("## Non-Primary Indexes: \n") + for i, index := range table.indexes { + buffer.WriteString(fmt.Sprintf("Index #%d: Name = `%s`, Columnns = [", i, index.name)) + for _, column := range index.columns { + buffer.WriteString(fmt.Sprintf("`%s`, ", column.name)) + } + buffer.WriteString("]\n") + } + buffer.WriteString("## Columns: \n") + for i, column := range table.columns { + buffer.WriteString(fmt.Sprintf("Column #%d", i)) + if column.isDeleted() { + buffer.WriteString(" [DELETED]") + } + buffer.WriteString(fmt.Sprintf(": Name = `%s`, Definition = %s, isPrimaryKey = %v, used in %d indexes\n", + column.name, column.getDefinition(), column.isPrimaryKey, column.indexReferences)) + } + buffer.WriteString(fmt.Sprintf("## Values (number of rows = %d): \n", table.numberOfRows)) + for i := 0; i < table.numberOfRows; i++ { + buffer.WriteString("#") + buffer.WriteString(padRight(fmt.Sprintf("%d", i), " ", 4)) + buffer.WriteString(": ") + for _, col := range table.columns { + buffer.WriteString(padLeft(fmt.Sprintf("%v", col.rows[i]), " ", 11)) + buffer.WriteString(", ") + } + buffer.WriteString("\n") + } + buffer.WriteString("======== DEBUG END ========\n") + table.lock.RUnlock() + return buffer.String() +} + +type ddlTestColumnDescriptor struct { + column *ddlTestColumn + value interface{} +} + +func (ddlt *ddlTestColumnDescriptor) getValueString() string { + // make bit data visible + if ddlt.column.k == KindBit { + return fmt.Sprintf("b'%v'", ddlt.value) + } else { + return fmt.Sprintf("'%v'", ddlt.value) + } +} + +func (ddlt *ddlTestColumnDescriptor) buildConditionSQL() string { + var sql string + if ddlt.value == ddlTestValueNull || ddlt.value == nil { + sql += fmt.Sprintf("`%s` IS NULL", ddlt.column.name) + } else { + switch ddlt.column.k { + case KindFloat: + sql += fmt.Sprintf("abs(`%s` - %v) < 0.0000001", ddlt.column.name, ddlt.getValueString()) + case KindDouble: + sql += fmt.Sprintf("abs(`%s` - %v) < 0.0000000000000001", ddlt.column.name, ddlt.getValueString()) + default: + sql += fmt.Sprintf("`%s` = %v", ddlt.column.name, ddlt.getValueString()) + } + } + return sql +} + +type ddlTestColumn struct { + k int + deleted int32 + name string + fieldType string + + filedTypeM int //such as: VARCHAR(10) , filedTypeM = 10 + filedTypeD int //such as: DECIMAL(10,5) , filedTypeD = 5 + filedPrecision int + defaultValue interface{} + isPrimaryKey bool + rows []interface{} + indexReferences int + + dependenciedCols []*ddlTestColumn + dependency *ddlTestColumn + mValue map[string]interface{} + nameOfGen string + + setValue []string //for enum , set data type +} + +func (col *ddlTestColumn) isDeleted() bool { + return atomic.LoadInt32(&col.deleted) != 0 +} + +func (col *ddlTestColumn) setDeleted() { + atomic.StoreInt32(&col.deleted, 1) +} + +func (col *ddlTestColumn) setDeletedRecover() { + atomic.StoreInt32(&col.deleted, 0) +} + +func (col *ddlTestColumn) getMatchedColumnDescriptor(descriptors []*ddlTestColumnDescriptor) *ddlTestColumnDescriptor { + for _, d := range descriptors { + if d.column == col { + return d + } + } + return nil +} + +func (col *ddlTestColumn) getDefinition() string { + if col.isPrimaryKey { + return col.fieldType + } + + if col.isGenerated() { + return fmt.Sprintf("%s AS (JSON_EXTRACT(`%s`,'$.%s'))", col.fieldType, col.dependency.name, col.nameOfGen) + } + + if col.canHaveDefaultValue() { + return fmt.Sprintf("%s NULL DEFAULT %v", col.fieldType, col.getDefaultValueString()) + } else { + return fmt.Sprintf("%s NULL", col.fieldType) + } + +} + +func (col *ddlTestColumn) getSelectName() string { + if col.k == KindBit { + return fmt.Sprintf("bin(`%s`)", col.name) + } else { + return fmt.Sprintf("`%s`", col.name) + } +} + +func (col *ddlTestColumn) getDefaultValueString() string { + if col.k == KindBit { + return fmt.Sprintf("b'%v'", col.defaultValue) + } else { + return fmt.Sprintf("'%v'", col.defaultValue) + } +} + +func (col *ddlTestColumn) isEqual(r int, str string) bool { + vstr := fmt.Sprintf("%v", col.rows[r]) + return strings.Compare(vstr, str) == 0 +} + +func (col *ddlTestColumn) getDependenciedColsValue(genCol *ddlTestColumn) interface{} { + if col.mValue == nil { + return nil + } + v := col.mValue[genCol.nameOfGen] + switch genCol.k { + case KindChar, KindVarChar, KindTEXT, KindBLOB: + v = fmt.Sprintf("\"%v\"", v) + } + return v +} + +func getDDLTestColumn(n int) *ddlTestColumn { + column := &ddlTestColumn{ + k: n, + name: string(uuid.NewUUID()), + fieldType: ALLFieldType[n], + rows: make([]interface{}, 0), + deleted: 0, + } + switch n { + case KindChar, KindVarChar, KindBLOB, KindTEXT, KindBit: + maxLen := getMaxLenByKind(n) + column.filedTypeM = int(rand.Intn(maxLen)) + for column.filedTypeM == 0 && column.k == KindBit { + column.filedTypeM = int(rand.Intn(maxLen)) + } + + for column.filedTypeM < 3 && column.k != KindBit { // len('""') = 2 + column.filedTypeM = int(rand.Intn(maxLen)) + } + column.fieldType = fmt.Sprintf("%s(%d)", ALLFieldType[n], column.filedTypeM) + case KindTINYBLOB, KindMEDIUMBLOB, KindLONGBLOB, KindTINYTEXT, KindMEDIUMTEXT, KindLONGTEXT: + column.filedTypeM = getMaxLenByKind(n) + case KindDECIMAL: + column.filedTypeM, column.filedTypeD = randMD() + column.fieldType = fmt.Sprintf("%s(%d,%d)", ALLFieldType[n], column.filedTypeM, column.filedTypeD) + case KindEnum, KindSet: + maxLen := getMaxLenByKind(n) + l := maxLen + 1 + column.setValue = make([]string, l) + m := make(map[string]struct{}) + column.fieldType += "(" + for i := 0; i < l; i++ { + column.setValue[i] = randEnumString(m) + if i > 0 { + column.fieldType += ", " + } + column.fieldType += fmt.Sprintf("\"%s\"", column.setValue[i]) + } + column.fieldType += ")" + } + + if column.canHaveDefaultValue() { + column.defaultValue = column.randValue() + } + + return column +} + +func getRandDDLTestColumn() *ddlTestColumn { + var n int + for { + n = randDataType() + if n != KindJSON { + break + } + } + return getDDLTestColumn(n) +} + +func getRandDDLTestColumnForJson() *ddlTestColumn { + var n int + for { + n = randDataType() + if n != KindJSON && n != KindBit && n != KindSet && n != KindEnum { + break + } + } + return getDDLTestColumn(n) +} + +func getRandDDLTestColumns() []*ddlTestColumn { + n := randDataType() + cols := make([]*ddlTestColumn, 0) + + if n == KindJSON { + cols = getRandJsonCol() + } else { + column := getDDLTestColumn(n) + cols = append(cols, column) + } + return cols +} + +const JsonFieldNum = 5 + +func getRandJsonCol() []*ddlTestColumn { + fieldNum := rand.Intn(JsonFieldNum) + 1 + + cols := make([]*ddlTestColumn, 0, fieldNum+1) + + column := &ddlTestColumn{ + k: KindJSON, + name: string(uuid.NewUUID()), + fieldType: ALLFieldType[KindJSON], + rows: make([]interface{}, 0), + deleted: 0, + + dependenciedCols: make([]*ddlTestColumn, 0, fieldNum), + } + + m := make(map[string]interface{}, 0) + for i := 0; i < fieldNum; i++ { + col := getRandDDLTestColumnForJson() + col.nameOfGen = randFieldName(m) + m[col.nameOfGen] = col.randValue() + col.dependency = column + + column.dependenciedCols = append(column.dependenciedCols, col) + cols = append(cols, col) + } + column.mValue = m + cols = append(cols, column) + return cols +} + +func (col *ddlTestColumn) isGenerated() bool { + return col.dependency != nil +} + +func (col *ddlTestColumn) notGenerated() bool { + return col.dependency == nil +} + +func (col *ddlTestColumn) hasGenerateCol() bool { + return len(col.dependenciedCols) > 0 +} + +// randValue return a rand value of the column +func (col *ddlTestColumn) randValue() interface{} { + switch col.k { + case KindTINYINT: + return rand.Int31n(1<<8) - 1<<7 + case KindSMALLINT: + return rand.Int31n(1<<16) - 1<<15 + case KindMEDIUMINT: + return rand.Int31n(1<<24) - 1<<23 + case KindInt32: + return rand.Int63n(1<<32) - 1<<31 + case KindBigInt: + if rand.Intn(2) == 1 { + return rand.Int63() + } + return -1 - rand.Int63() + case KindBit: + if col.filedTypeM >= 64 { + return fmt.Sprintf("%b", rand.Uint64()) + } else { + m := col.filedTypeM + if col.filedTypeM > 7 { // it is a bug + m = m - 1 + } + n := (int64)((1 << (uint)(m)) - 1) + return fmt.Sprintf("%b", rand.Int63n(n)) + } + case KindFloat: + return rand.Float32() + 1 + case KindDouble: + return rand.Float64() + 1 + case KindDECIMAL: + return randDecimal(col.filedTypeM, col.filedTypeD) + case KindChar, KindVarChar, KindBLOB, KindTINYBLOB, KindMEDIUMBLOB, KindLONGBLOB, KindTEXT, KindTINYTEXT, KindMEDIUMTEXT, KindLONGTEXT: + if col.filedTypeM == 0 { + return "" + } else { + if col.isGenerated() { + if col.filedTypeM <= 2 { + return "" + } + return randSeq(rand.Intn(col.filedTypeM - 2)) + } + return randSeq(rand.Intn(col.filedTypeM)) + } + case KindBool: + return rand.Intn(2) + case KindDATE: + randTime := time.Unix(MinDATETIME.Unix()+rand.Int63n(GapDATETIMEUnix), 0) + return randTime.Format(TimeFormatForDATE) + case KindTIME: + randTime := time.Unix(MinTIMESTAMP.Unix()+rand.Int63n(GapTIMESTAMPUnix), 0) + return randTime.Format(TimeFormatForTIME) + case KindDATETIME: + randTime := randTime(MinDATETIME, GapDATETIMEUnix) + return randTime.Format(TimeFormat) + case KindTIMESTAMP: + randTime := randTime(MinTIMESTAMP, GapTIMESTAMPUnix) + return randTime.Format(TimeFormat) + case KindYEAR: + return rand.Intn(254) + 1901 //1901 ~ 2155 + case KindJSON: + return col.randJsonValue() + case KindEnum: + i := rand.Intn(len(col.setValue)) + return col.setValue[i] + case KindSet: + var l int + for l == 0 { + l = rand.Intn(len(col.setValue)) + } + idxs := make([]int, l) + m := make(map[int]struct{}) + for i := 0; i < l; i++ { + idx := rand.Intn(len(col.setValue)) + _, ok := m[idx] + for ok { + idx = rand.Intn(len(col.setValue)) + _, ok = m[idx] + } + m[idx] = struct{}{} + idxs[i] = idx + } + sort.Ints(idxs) + s := "" + for i := range idxs { + if i > 0 { + s += "," + } + s += col.setValue[idxs[i]] + } + return s + default: + return nil + } +} + +func randTime(minTime time.Time, gap int64) time.Time { + // https://github.com/chronotope/chrono-tz/issues/23 + // see all invalid time: https://timezonedb.com/time-zones/Asia/Shanghai + var randTime time.Time + for { + randTime = time.Unix(minTime.Unix()+rand.Int63n(gap), 0).In(local) + if notAmbiguousTime(randTime) { + break + } + } + return randTime +} + +func (col *ddlTestColumn) randJsonValue() string { + for _, dCol := range col.dependenciedCols { + col.mValue[dCol.nameOfGen] = dCol.randValue() + } + jsonRow, _ := json.Marshal(col.mValue) + return string(jsonRow) +} + +func notAmbiguousTime(t time.Time) bool { + ok := true + for _, amt := range ambiguousTimeSlice { + if t.Unix() >= amt.start && t.Unix() <= amt.end { + ok = false + break + } + } + return ok +} + +// randValueUnique use for primary key column to get unique value +func (col *ddlTestColumn) randValueUnique(PreValue []interface{}) (interface{}, bool) { + // retry times + for i := 0; i < 10; i++ { + v := col.randValue() + flag := true + for _, pv := range PreValue { + if v == pv { + flag = false + break + } + } + if flag { + return v, true + } + } + return nil, false +} + +func (col *ddlTestColumn) canBePrimary() bool { + return col.canBeIndex() && col.notGenerated() +} + +func (col *ddlTestColumn) canBeIndex() bool { + switch col.k { + case KindChar, KindVarChar: + if col.filedTypeM == 0 { + return false + } else { + return true + } + case KindBLOB, KindTINYBLOB, KindMEDIUMBLOB, KindLONGBLOB, KindTEXT, KindTINYTEXT, KindMEDIUMTEXT, KindLONGTEXT, KindJSON: + return false + default: + return true + } +} + +func (col *ddlTestColumn) canBeSet() bool { + return col.notGenerated() +} + +func (col *ddlTestColumn) canBeWhere() bool { + switch col.k { + case KindJSON: + return false + default: + return true + } +} + +//BLOB, TEXT, GEOMETRY or JSON column 'b' can't have a default value") +func (col *ddlTestColumn) canHaveDefaultValue() bool { + switch col.k { + case KindBLOB, KindTINYBLOB, KindMEDIUMBLOB, KindLONGBLOB, KindTEXT, KindTINYTEXT, KindMEDIUMTEXT, KindLONGTEXT, KindJSON: + return false + default: + return true + } +} + +type ddlTestIndex struct { + name string + signature string + columns []*ddlTestColumn +} diff --git a/tests/pkg/workload/ddl/internal/metrics.go b/tests/pkg/workload/ddl/internal/metrics.go new file mode 100644 index 0000000000..7a09e8ace2 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/metrics.go @@ -0,0 +1,29 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import "github.com/prometheus/client_golang/prometheus" + +var ( + ddlFailedCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb_test", + Subsystem: "stability", + Name: "ddl_failed_total", + Help: "Counter of failed ddl operations.", + }) +) + +func init() { + prometheus.MustRegister(ddlFailedCounter) +} diff --git a/tests/pkg/workload/ddl/internal/run.go b/tests/pkg/workload/ddl/internal/run.go new file mode 100644 index 0000000000..182b4b99d6 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/run.go @@ -0,0 +1,119 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "database/sql" + "strings" + "time" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/push" + "golang.org/x/net/context" +) + +var defaultPushMetricsInterval = 15 * time.Second +var enableTransactionTestFlag = "0" +var enableTransactionTest = false + +func init() { + if enableTransactionTestFlag == "1" { + enableTransactionTest = true + } +} + +func getPromAddr() string { + return "" // TODO +} + +func openDB(dsn string, maxIdleConns int) (*sql.DB, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + + db.SetMaxIdleConns(maxIdleConns) + glog.Info("DB opens successfully") + return db, nil +} + +func Run(ctx context.Context, dbDSN string, concurrency int, tablesToCreate int, mysqlCompatible bool, testTp DDLTestType) { + glog.Infof("[ddl] Enable transaction test is: %v", enableTransactionTest) + + dbss := make([][]*sql.DB, 0, concurrency) + for i := 0; i < concurrency; i++ { + dbs := make([]*sql.DB, 0, 2) + // Parallel send DDL request need more connection to send DDL request concurrently + db0, err := openDB(dbDSN, 20) + if err != nil { + glog.Fatalf("[ddl] create db client error %v", err) + } + db1, err := openDB(dbDSN, 1) + if err != nil { + glog.Fatalf("[ddl] create db client error %v", err) + } + dbs = append(dbs, db0) + dbs = append(dbs, db1) + dbss = append(dbss, dbs) + } + + if promAddr := getPromAddr(); len(promAddr) > 0 { + go func() { + for { + err := push.FromGatherer("ddl", push.HostnameGroupingKey(), promAddr, prometheus.DefaultGatherer) + if err != nil { + glog.Errorf("[ddl] could not push metrics to prometheus push gateway: %v", err) + } + + time.Sleep(defaultPushMetricsInterval) + } + }() + } + + cfg := DDLCaseConfig{ + Concurrency: concurrency, + TablesToCreate: tablesToCreate, + MySQLCompatible: mysqlCompatible, + TestTp: testTp, + } + ddl := NewDDLCase(&cfg) + exeDDLFunc := SerialExecuteOperations + if cfg.TestTp == ParallelDDLTest { + exeDDLFunc = ParallelExecuteOperations + } + execDMLFunc := SerialExecuteDML + if enableTransactionTest { + execDMLFunc = TransactionExecuteOperations + } + if err := ddl.Initialize(ctx, dbss); err != nil { + glog.Fatalf("[ddl] initialze error %v", err) + } + if err := ddl.Execute(ctx, dbss, exeDDLFunc, execDMLFunc); err != nil { + glog.Fatalf("[ddl] execute error %v", err) + } +} + +func ignore_error(err error) bool { + if err == nil { + return true + } + errStr := err.Error() + if strings.Contains(errStr, "Information schema is changed") { + return true + } + if strings.Contains(errStr, "try again later") { + return true + } + return false +} diff --git a/tests/pkg/workload/ddl/internal/util.go b/tests/pkg/workload/ddl/internal/util.go new file mode 100644 index 0000000000..633bd9e691 --- /dev/null +++ b/tests/pkg/workload/ddl/internal/util.go @@ -0,0 +1,177 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "database/sql" + "math/rand" + "strings" + + "github.com/golang/glog" +) + +func padLeft(str, pad string, length int) string { + if len(str) >= length { + return str + } + padding := strings.Repeat(pad, length) + str = padding + str + return str[len(str)-length:] +} + +func padRight(str, pad string, length int) string { + if len(str) >= length { + return str + } + padding := strings.Repeat(pad, length) + str = str + padding + return str[:length] +} + +func enableTiKVGC(db *sql.DB) { + sql := "update mysql.tidb set VARIABLE_VALUE = '10m' where VARIABLE_NAME = 'tikv_gc_life_time';" + _, err := db.Exec(sql) + if err != nil { + glog.Warningf("Failed to enable TiKV GC") + } +} + +func disableTiKVGC(db *sql.DB) { + sql := "update mysql.tidb set VARIABLE_VALUE = '500h' where VARIABLE_NAME = 'tikv_gc_life_time';" + _, err := db.Exec(sql) + if err != nil { + glog.Warningf("Failed to disable TiKV GC") + } +} + +// parallel run functions in parallel and wait until all of them are completed. +// If one of them returns error, the result is that error. +func parallel(funcs ...func() error) error { + cr := make(chan error, len(funcs)) + for _, foo := range funcs { + go func(foo func() error) { + err := foo() + cr <- err + }(foo) + } + var err error + for i := 0; i < len(funcs); i++ { + r := <-cr + if r != nil { + err = r + } + } + return err +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + +func randSeq(n int) string { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return string(b) +} + +const numberBytes = "0123456789" + +func randNum(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = numberBytes[rand.Int63()%int64(len(numberBytes))] + } + return b +} + +func randMD() (m int, d int) { + for m == 0 { + m = rand.Intn(MAXDECIMALM) + } + min := m + if min > MAXDECIMALN { + min = MAXDECIMALN + } + d = rand.Intn(min) + return +} + +func randDecimal(m, d int) string { + ms := randNum(m - d) + ds := randNum(d) + var i int + for i = range ms { + if ms[i] != byte('0') { + break + } + } + ms = ms[i:] + l := len(ms) + len(ds) + 1 + flag := rand.Intn(2) + //check for 0.0... avoid -0.0 + zeroFlag := true + for i := range ms { + if ms[i] != byte('0') { + zeroFlag = false + } + } + for i := range ds { + if ds[i] != byte('0') { + zeroFlag = false + } + } + if zeroFlag { + flag = 0 + } + vs := make([]byte, 0, l+flag) + if flag == 1 { + vs = append(vs, '-') + } + vs = append(vs, ms...) + if len(ds) == 0 { + return string(vs) + } + vs = append(vs, '.') + vs = append(vs, ds...) + return string(vs) +} + +const FieldNameLen = 8 + +func randFieldName(m map[string]interface{}) string { + name := randSeq(FieldNameLen) + _, ok := m[name] + for ok { + name = randSeq(FieldNameLen) + _, ok = m[name] + } + return name +} + +const EnumValueLen = 5 + +func randEnumString(m map[string]struct{}) string { + l := rand.Intn(EnumValueLen) + 1 + name := randSeq(l) + nameL := strings.ToLower(name) + _, ok := m[nameL] + for ok { + l = rand.Intn(EnumValueLen) + 1 + name = randSeq(l) + nameL = strings.ToLower(name) + _, ok = m[nameL] + } + m[nameL] = struct{}{} + return name +} diff --git a/tests/pkg/workload/ddl/workload.go b/tests/pkg/workload/ddl/workload.go new file mode 100644 index 0000000000..23e4ca2229 --- /dev/null +++ b/tests/pkg/workload/ddl/workload.go @@ -0,0 +1,50 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/juju/errors" + "github.com/pingcap/tidb-operator/tests/pkg/workload" + "github.com/pingcap/tidb-operator/tests/pkg/workload/ddl/internal" +) + +var _ = workload.Workload(&DDLWorkload{}) + +type DDLWorkload struct { + DSN string + Concurrency int + Tables int + + ctx context.Context + cancel context.CancelFunc +} + +func New(dsn string, concurrency int, tables int) workload.Workload { + return &DDLWorkload{DSN: dsn, Concurrency: concurrency, Tables: tables} +} + +func (w *DDLWorkload) Enter() error { + if w.ctx != nil { + return errors.New("already in ddl workload context") + } + w.ctx, w.cancel = context.WithCancel(context.Background()) + go internal.Run(w.ctx, w.DSN, w.Concurrency, w.Tables, false, internal.SerialDDLTest) + return nil +} + +func (w *DDLWorkload) Leave() { + w.cancel() +} diff --git a/tests/pkg/workload/interface.go b/tests/pkg/workload/interface.go new file mode 100644 index 0000000000..26e5284b20 --- /dev/null +++ b/tests/pkg/workload/interface.go @@ -0,0 +1,31 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package workload + +import "github.com/pingcap/errors" + +type Workload interface { + Enter() error + Leave() +} + +func Run(f func() error, ws ...Workload) error { + for _, w := range ws { + if err := w.Enter(); err != nil { + return errors.Annotate(err, "enter workload") + } + defer w.Leave() + } + return f() +}