From 954be35e4523f2641942be7cb44fefda1b4a4f3b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 7 Sep 2020 20:04:37 +0800 Subject: [PATCH] *: auto discover ANSI_QUOTES (#889) --- checker/check_test.go | 4 +- checker/checker.go | 11 ++-- checker/cmd.go | 6 +- dm/config/subtask.go | 1 + dm/config/task.go | 8 +-- dm/config/task_test.go | 3 +- dm/master/server.go | 2 +- dm/master/server_test.go | 2 +- dumpling/dumpling.go | 17 +++--- go.mod | 4 +- go.sum | 4 ++ loader/convert_data.go | 11 ++-- loader/convert_data_test.go | 4 +- loader/loader.go | 2 +- pkg/binlog/event/util.go | 26 ++++++--- pkg/binlog/reader/tcp_test.go | 2 +- pkg/binlog/reader/util.go | 10 +++- pkg/binlog/reader/util_test.go | 7 +-- pkg/parser/common.go | 2 + pkg/utils/db.go | 100 +++++++++++++++++++-------------- pkg/v1dbschema/schema.go | 18 ++---- relay/relay.go | 2 +- syncer/db.go | 4 +- syncer/ddl_test.go | 4 +- syncer/error.go | 7 +-- syncer/handle_error.go | 12 ++-- syncer/schema.go | 2 +- syncer/syncer.go | 34 ++++++----- syncer/syncer_test.go | 18 +----- tests/relay_interrupt/run.sh | 2 +- 30 files changed, 172 insertions(+), 157 deletions(-) diff --git a/checker/check_test.go b/checker/check_test.go index 34c2e5542d..95a17c5e08 100644 --- a/checker/check_test.go +++ b/checker/check_test.go @@ -31,12 +31,12 @@ type testChecker struct{} var _ = tc.Suite(&testChecker{}) func (t *testChecker) TestCheckSyncConfig(c *tc.C) { - c.Assert(CheckSyncConfig(context.Background(), nil, false), tc.IsNil) + c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil) cfgs := []*config.SubTaskConfig{ { IgnoreCheckingItems: []string{config.AllChecking}, }, } - c.Assert(CheckSyncConfig(context.Background(), cfgs, false), tc.IsNil) + c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil) } diff --git a/checker/checker.go b/checker/checker.go index 7aef39502e..550fad57b6 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -69,8 +69,6 @@ type Checker struct { instances []*mysqlInstance - enableANSIQuotes bool - checkList []check.Checker checkingItems map[string]string result struct { @@ -80,12 +78,11 @@ type Checker struct { } // NewChecker returns a checker -func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, enableANSIQuotes bool) *Checker { +func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker { c := &Checker{ - instances: make([]*mysqlInstance, 0, len(cfgs)), - checkingItems: checkingItems, - logger: log.With(zap.String("unit", "task check")), - enableANSIQuotes: enableANSIQuotes, + instances: make([]*mysqlInstance, 0, len(cfgs)), + checkingItems: checkingItems, + logger: log.With(zap.String("unit", "task check")), } for _, cfg := range cfgs { diff --git a/checker/cmd.go b/checker/cmd.go index 1ae8e341f0..588ddbc927 100644 --- a/checker/cmd.go +++ b/checker/cmd.go @@ -26,7 +26,7 @@ var ( ErrorMsgHeader = "fail to check synchronization configuration with type" // CheckSyncConfigFunc holds the CheckSyncConfig function - CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error + CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig) error ) func init() { @@ -34,7 +34,7 @@ func init() { } // CheckSyncConfig checks synchronization configuration -func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error { +func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error { if len(cfgs) == 0 { return nil } @@ -56,7 +56,7 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableAN return nil } - c := NewChecker(cfgs, checkingItems, enableANSIQuotes) + c := NewChecker(cfgs, checkingItems) err := c.Init(ctx) if err != nil { diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 00e89f388a..13c9cae165 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -192,6 +192,7 @@ type SubTaskConfig struct { CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` + // deprecated, will auto discover SQL mode EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"` // still needed by Syncer / Loader bin diff --git a/dm/config/task.go b/dm/config/task.go index 83bdd33bf3..e02b91e791 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -202,6 +202,7 @@ func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error type LoaderConfig struct { PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"` Dir string `yaml:"dir" toml:"dir" json:"dir"` + SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit } func defaultLoaderConfig() LoaderConfig { @@ -277,7 +278,7 @@ type TaskConfig struct { TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"` IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"` ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support. - // treat it as hidden configuration + // treat it as hidden configuration IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"` // we store detail status in meta // don't save configuration into it @@ -311,7 +312,7 @@ type TaskConfig struct { Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"` CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"` - + // deprecated EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"` // deprecated, replaced by `start-task --remove-meta` @@ -523,7 +524,7 @@ func (c *TaskConfig) adjust() error { // for backward compatible, set global config `ansi-quotes: true` if any syncer is true if inst.Syncer.EnableANSIQuotes == true { - c.EnableANSIQuotes = true + log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect") } if dupeRules := checkDuplicateString(inst.RouteRules); len(dupeRules) > 0 { @@ -632,7 +633,6 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) { c.TargetDB = &stCfg0.To // just ref c.OnlineDDLScheme = stCfg0.OnlineDDLScheme c.CleanDumpFile = stCfg0.CleanDumpFile - c.EnableANSIQuotes = stCfg0.EnableANSIQuotes c.MySQLInstances = make([]*MySQLInstance, 0, len(stCfgs)) c.BAList = make(map[string]*filter.Rules) c.Routes = make(map[string]*router.TableRule) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index cf4850dd81..1b27931fe3 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -549,8 +549,7 @@ func (t *testConfig) TestFromSubTaskConfigs(c *C) { "sync-01": &stCfg1.SyncerConfig, "sync-02": &stCfg2.SyncerConfig, }, - CleanDumpFile: stCfg1.CleanDumpFile, - EnableANSIQuotes: stCfg1.EnableANSIQuotes, + CleanDumpFile: stCfg1.CleanDumpFile, } c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare. diff --git a/dm/master/server.go b/dm/master/server.go index e4cb71d63e..abe4cd5bd3 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1527,7 +1527,7 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } - err = checker.CheckSyncConfigFunc(ctx, stCfgs, cfg.EnableANSIQuotes) + err = checker.CheckSyncConfigFunc(ctx, stCfgs) if err != nil { return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 3acaf94340..c399fb4a83 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -428,7 +428,7 @@ func (t *testMaster) TestStartTask(c *check.C) { // test start task, but the first step check-task fails bakCheckSyncConfigFunc := checker.CheckSyncConfigFunc - checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _ bool) error { + checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig) error { return errors.New(errCheckSyncConfig) } defer func() { diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index 6bcd5cee7a..a4a7f9ee52 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -58,7 +58,7 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling { func (m *Dumpling) Init(ctx context.Context) error { var err error m.dumpConfig, err = m.constructArgs() - m.detectAnsiQuotes() + m.detectSQLMode() return err } @@ -274,21 +274,18 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { return dumpConfig, nil } -// detectAnsiQuotes tries to detect ANSI_QUOTES from upstream. If success, change EnableANSIQuotes in subtask config -func (m *Dumpling) detectAnsiQuotes() { +// detectSQLMode tries to detect SQL mode from upstream. If success, write it to LoaderConfig +func (m *Dumpling) detectSQLMode() { db, err := sql.Open("mysql", m.dumpConfig.GetDSN("")) if err != nil { return } defer db.Close() - enable, err := utils.HasAnsiQuotesMode(db) + + sqlMode, err := utils.GetGlobalVariable(db, "sql_mode") if err != nil { return } - if enable != m.cfg.EnableANSIQuotes { - m.logger.Warn("found mismatched ANSI_QUOTES setting, going to overwrite it to DB specified", - zap.Bool("DB specified", enable), - zap.Bool("config file specified", m.cfg.EnableANSIQuotes)) - } - m.cfg.EnableANSIQuotes = enable + m.logger.Info("found upstream SQL mode", zap.String("SQL mode", sqlMode)) + m.cfg.LoaderConfig.SQLMode = sqlMode } diff --git a/go.mod b/go.mod index 021bc8c688..0836bd3a28 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 - github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 + github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 github.com/pingcap/tidb v1.1.0-beta.0.20200901032733-f82e5320ad75 github.com/pingcap/tidb-tools v4.0.5-0.20200828085514-03575b185007+incompatible github.com/prometheus/client_golang v1.5.1 @@ -44,7 +44,7 @@ require ( github.com/uber-go/atomic v1.4.0 // indirect github.com/unrolled/render v1.0.1 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 - go.uber.org/zap v1.15.0 + go.uber.org/zap v1.16.0 golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb diff --git a/go.sum b/go.sum index efaa98b70c..23dabee818 100644 --- a/go.sum +++ b/go.sum @@ -583,6 +583,8 @@ github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c/go.mod h1:vQdbJqobJ github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 h1:ATFD3gmwkSHcPt5DuQK3dZwqDU49WXOq/zRmwPJ6Nks= github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= +github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 h1:dnXiRdcubiMSkzHBBSkJo+9E+OX3+Z629ZEQEIVfBsI= +github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2 h1:JTzYYukREvxVSKW/ncrzNjFitd8snoQ/Xz32pw8i+s8= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA= @@ -862,6 +864,8 @@ go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20180214000028-650f4a345ab4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/loader/convert_data.go b/loader/convert_data.go index 9c57e74b03..18b073c398 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -25,11 +25,10 @@ import ( tcontext "github.com/pingcap/dm/pkg/context" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" - "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - tmysql "github.com/pingcap/parser/mysql" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" router "github.com/pingcap/tidb-tools/pkg/table-router" ) @@ -231,15 +230,15 @@ func tableName(schema, table string) string { return fmt.Sprintf("`%s`.`%s`", schema, table) } -func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, enableANSIQuotes bool) (*tableInfo, error) { +func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, sqlMode string) (*tableInfo, error) { statement, err := exportStatement(file) if err != nil { return nil, err } - parser2 := parser.New() - if enableANSIQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2, err := utils.GetParserFromSQLModeStr(sqlMode) + if err != nil { + return nil, err } stmts, err := parserpkg.Parse(parser2, string(statement), "", "") diff --git a/loader/convert_data_test.go b/loader/convert_data_test.go index 51fac2158d..c302fdaa38 100644 --- a/loader/convert_data_test.go +++ b/loader/convert_data_test.go @@ -164,7 +164,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) { r, err := router.NewTableRouter(false, rules) c.Assert(err, IsNil) - tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", true) + tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES") c.Assert(err, IsNil) c.Assert(tableInfo, DeepEquals, expectedTableInfo) } @@ -192,7 +192,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) { r, err := router.NewTableRouter(false, rules) c.Assert(err, IsNil) - tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", false) + tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "") c.Assert(err, IsNil) c.Assert(tableInfo, DeepEquals, expectedTableInfo) } diff --git a/loader/loader.go b/loader/loader.go index bcca0ededd..0cd05e9e20 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -1190,7 +1190,7 @@ func (l *Loader) restoreData(ctx context.Context) error { dataFiles := tables[table] tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) if _, ok := l.tableInfos[tableName(db, table)]; !ok { - l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.EnableANSIQuotes) + l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode) if err != nil { return terror.Annotatef(err, "parse table %s/%s", db, table) } diff --git a/pkg/binlog/event/util.go b/pkg/binlog/event/util.go index 3f0665a77b..0abc97f091 100644 --- a/pkg/binlog/event/util.go +++ b/pkg/binlog/event/util.go @@ -25,6 +25,8 @@ import ( "math" "reflect" + "github.com/pingcap/parser" + "github.com/pingcap/parser/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -305,24 +307,34 @@ var ( } ) -// GetAnsiQuotesMode gets ansi-quotes mode from binlog statusVars -func GetAnsiQuotesMode(statusVars []byte) (bool, error) { +// getSQLMode gets SQL mode from binlog statusVars +func getSQLMode(statusVars []byte) (mysql.SQLMode, error) { vars, err := statusVarsToKV(statusVars) if err != nil { - return false, err + return mysql.ModeNone, err } b, ok := vars[QSqlModeCode] if !ok { - // TODO(lance6716): if this will happen, create a terror - return false, errors.New("Q_SQL_MODE_CODE not found in status_vars") + // should not happen + return mysql.ModeNone, errors.New("Q_SQL_MODE_CODE not found in status_vars") } r := bytes.NewReader(b) var v int64 _ = binary.Read(r, binary.LittleEndian, &v) - // MODE_ANSI_QUOTES = 0x00000004 ref: https://dev.mysql.com/doc/internals/en/query-event.html#q-sql-mode-code - return v&0x00000004 != 0, nil + return mysql.SQLMode(v), nil +} + +// GetParserForStatusVars gets a parser for binlog which is suitable for its sql_mode in statusVars +func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) { + parser2 := parser.New() + mode, err := getSQLMode(statusVars) + if err != nil { + return nil, err + } + parser2.SetSQLMode(mode) + return parser2, nil } // if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index d1e07ffac9..0d7191e769 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -298,7 +298,7 @@ func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) { timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - parser2, err := utils.GetParser(t.db, false) + parser2, err := utils.GetParser(t.db) c.Assert(err, IsNil) forLoop: diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index d0b8fc9a34..eb3439af7d 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -22,9 +22,11 @@ import ( uuid "github.com/satori/go.uuid" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/relay/common" ) @@ -32,7 +34,7 @@ import ( // GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). // NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. // NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. -func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) { +func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) { // start to get and parse binlog event from the beginning of the file. startPos := gmysql.Position{ Name: endPos.Name, @@ -60,6 +62,12 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. switch ev := e.Event.(type) { case *replication.QueryEvent: + parser2, err := event.GetParserForStatusVars(ev.StatusVars) + if err != nil { + log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err)) + parser2 = parser.New() + } + isDDL := common.CheckIsDDL(string(ev.Query), parser2) if isDDL { if latestGSet == nil { diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go index 1412bdfaa8..4c51699938 100644 --- a/pkg/binlog/reader/util_test.go +++ b/pkg/binlog/reader/util_test.go @@ -44,14 +44,11 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { endPos, endGS, err := utils.GetMasterStatus(t.db, flavor) c.Assert(err, IsNil) - parser2, err := utils.GetParser(t.db, false) - c.Assert(err, IsNil) - r1 := NewTCPReader(cfg) c.Assert(r1, NotNil) defer r1.Close() - gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2) + gs, err := GetGTIDsForPos(ctx, r1, endPos) c.Assert(err, IsNil) c.Assert(gs.Equal(endGS), IsTrue) @@ -62,7 +59,7 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{ Name: endPos.Name, Pos: endPos.Pos - 1, - }, parser2) + }) c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") c.Assert(gs, IsNil) } diff --git a/pkg/parser/common.go b/pkg/parser/common.go index 002f327e89..f6cd3bd0fd 100644 --- a/pkg/parser/common.go +++ b/pkg/parser/common.go @@ -94,6 +94,7 @@ func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, erro // RenameDDLTable renames table names in ddl by given `targetTableNames` // argument `targetTableNames` is same with return value of FetchDDLTableNames +// returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes func RenameDDLTable(stmt ast.StmtNode, targetTableNames []*filter.Table) (string, error) { switch v := stmt.(type) { case *ast.AlterDatabaseStmt: @@ -173,6 +174,7 @@ func RenameDDLTable(stmt ast.StmtNode, targetTableNames []*filter.Table) (string } // SplitDDL splits multiple operations in one DDL statement into multiple DDL statements +// returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes // if fail to restore, it would not restore the value of `stmt` (it changes it's values if `stmt` is one of DropTableStmt, RenameTableStmt, AlterTableStmt) func SplitDDL(stmt ast.StmtNode, schema string) (sqls []string, err error) { var ( diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 1389bbeced..aba62c8763 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -23,19 +23,20 @@ import ( "strings" "time" + "github.com/pingcap/failpoint" + "go.uber.org/zap" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/parser" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/check" "github.com/pingcap/tidb-tools/pkg/dbutil" gmysql "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" ) var ( @@ -247,9 +248,6 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) { // GetGlobalVariable gets server's global variable func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { - query := fmt.Sprintf("SHOW GLOBAL VARIABLES LIKE '%s'", variable) - rows, err := db.Query(query) - failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) { items := strings.Split(val.(string), ",") if len(items) != 2 { @@ -260,17 +258,51 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { if err1 != nil { log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) } - if variable == variableName { err = tmysql.NewErr(uint16(errCode)) log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err)) + failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError)) } }) + conn, err := db.Conn(context.Background()) if err != nil { return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - defer rows.Close() + defer conn.Close() + return getVariable(conn, variable, true) +} + +// GetSessionVariable gets connection's session variable +func GetSessionVariable(conn *sql.Conn, variable string) (value string, err error) { + failpoint.Inject("GetSessionVariableFailed", func(val failpoint.Value) { + items := strings.Split(val.(string), ",") + if len(items) != 2 { + log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string))) + } + variableName := items[0] + errCode, err1 := strconv.ParseUint(items[1], 10, 16) + if err1 != nil { + log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string))) + } + if variable == variableName { + err = tmysql.NewErr(uint16(errCode)) + log.L().Warn("GetSessionVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetSessionVariableFailed"), zap.Error(err)) + failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError)) + } + }) + return getVariable(conn, variable, false) +} + +func getVariable(conn *sql.Conn, variable string, isGlobal bool) (value string, err error) { + var template string + if isGlobal { + template = "SHOW GLOBAL VARIABLES LIKE '%s'" + } else { + template = "SHOW VARIABLES LIKE '%s'" + } + query := fmt.Sprintf(template, variable) + row := conn.QueryRowContext(context.Background(), query) // Show an example. /* @@ -282,17 +314,10 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { +---------------+-------+ */ - for rows.Next() { - err = rows.Scan(&variable, &value) - if err != nil { - return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - } - - if rows.Err() != nil { - return "", terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + err = row.Scan(&variable, &value) + if err != nil { + return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - return value, nil } @@ -341,41 +366,34 @@ func GetMariaDBUUID(db *sql.DB) (string, error) { return fmt.Sprintf("%d%s%d", domainID, domainServerIDSeparator, serverID), nil } -// GetSQLMode returns sql_mode. -func GetSQLMode(db *sql.DB) (tmysql.SQLMode, error) { - sqlMode, err := GetGlobalVariable(db, "sql_mode") +// GetParser gets a parser for sql.DB which is suitable for session variable sql_mode +func GetParser(db *sql.DB) (*parser.Parser, error) { + c, err := db.Conn(context.Background()) if err != nil { - return tmysql.ModeNone, err + return nil, err } - - mode, err := tmysql.GetSQLMode(sqlMode) - return mode, terror.ErrGetSQLModeFromStr.Delegate(err, sqlMode) + defer c.Close() + return GetParserForConn(c) } -// HasAnsiQuotesMode checks whether database has `ANSI_QUOTES` set -func HasAnsiQuotesMode(db *sql.DB) (bool, error) { - mode, err := GetSQLMode(db) +// GetParserForConn gets a parser for sql.Conn which is suitable for session variable sql_mode +func GetParserForConn(conn *sql.Conn) (*parser.Parser, error) { + sqlMode, err := GetSessionVariable(conn, "sql_mode") if err != nil { - return false, err + return nil, err } - return mode.HasANSIQuotesMode(), nil + return GetParserFromSQLModeStr(sqlMode) } -// GetParser gets a parser which maybe enabled `ANSI_QUOTES` sql_mode -func GetParser(db *sql.DB, ansiQuotesMode bool) (*parser.Parser, error) { - if !ansiQuotesMode { - // try get from DB - var err error - ansiQuotesMode, err = HasAnsiQuotesMode(db) - if err != nil { - return nil, err - } +// GetParserFromSQLModeStr gets a parser and applies given sqlMode +func GetParserFromSQLModeStr(sqlMode string) (*parser.Parser, error) { + mode, err := tmysql.GetSQLMode(sqlMode) + if err != nil { + return nil, err } parser2 := parser.New() - if ansiQuotesMode { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) - } + parser2.SetSQLMode(mode) return parser2, nil } diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index ec46c22550..5314cb654f 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -20,7 +20,6 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/parser" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/errno" gmysql "github.com/siddontang/go-mysql/mysql" @@ -36,7 +35,6 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" ) // UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: @@ -50,12 +48,6 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo } defer db.CloseBaseConn(dbConn) - // setup SQL parser. - parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) - if err != nil { - return terror.ErrFailUpdateV1DBSchema.Delegate(err) - } - // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := replication.BinlogSyncerConfig{ ServerID: cfg.ServerID, @@ -70,7 +62,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo tcpReader := reader.NewTCPReader(syncCfg) // update checkpoint. - err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader, parser2) + err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader) if err != nil { return terror.ErrFailUpdateV1DBSchema.Delegate(err) } @@ -87,7 +79,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo // - update column value: // - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable. // NOTE: no need to update the value of `table_info` because DM can get schema automatically from downstream when replicating DML. -func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader, parser2 *parser.Parser) error { +func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader) error { logger := log.L().WithFields(zap.String("task", taskName), zap.String("source", sourceID)) logger.Info("updating syncer checkpoint", zap.Bool("fill GTID", fillGTIDs)) var gs gtid.Set @@ -103,7 +95,7 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN return terror.Annotatef(err, "get global checkpoint position for source %s", sourceID) } if pos.Name != "" { - gs, err = getGTIDsForPos(tctx, pos, tcpReader, parser2) + gs, err = getGTIDsForPos(tctx, pos, tcpReader) if err != nil { return terror.Annotatef(err, "get GTID sets for position %s", pos) } @@ -176,7 +168,7 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour } // getGTIDsForPos gets the GTID sets for the position. -func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader, parser2 *parser.Parser) (gs gtid.Set, err error) { +func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gs gtid.Set, err error) { // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. @@ -208,7 +200,7 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade if err != nil { return nil, err } - gs, err = reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + gs, err = reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos) if err != nil { return nil, err } diff --git a/relay/relay.go b/relay/relay.go index 290c11a10f..b93aaf192f 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -226,7 +226,7 @@ func (r *Relay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterReque } func (r *Relay) process(parentCtx context.Context) error { - parser2, err := utils.GetParser(r.db, false) // refine to use user config later + parser2, err := utils.GetParser(r.db) // refine to use user config later if err != nil { return err } diff --git a/syncer/db.go b/syncer/db.go index f98948435c..5c3c89f6a9 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -91,8 +91,8 @@ func (conn *UpStreamConn) getServerUUID(flavor string) (string, error) { return utils.GetServerUUID(conn.BaseDB.DB, flavor) } -func (conn *UpStreamConn) getParser(ansiQuotesMode bool) (*parser.Parser, error) { - return utils.GetParser(conn.BaseDB.DB, ansiQuotesMode) +func (conn *UpStreamConn) getParser() (*parser.Parser, error) { + return utils.GetParser(conn.BaseDB.DB) } func (conn *UpStreamConn) killConn(connID uint32) error { diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index 12352ff6ad..1490bff080 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -72,12 +72,12 @@ func (s *testSyncerSuite) TestAnsiQuotes(c *C) { } db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + mock.ExpectQuery("SHOW VARIABLES LIKE"). WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ANSI_QUOTES")) c.Assert(err, IsNil) - parser, err := utils.GetParser(db, false) + parser, err := utils.GetParser(db) c.Assert(err, IsNil) for _, sql := range ansiQuotesCases { diff --git a/syncer/error.go b/syncer/error.go index 68ec2694b0..d02e59386a 100644 --- a/syncer/error.go +++ b/syncer/error.go @@ -20,6 +20,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -95,10 +96,8 @@ func originError(err error) error { // handleSpecialDDLError handles special errors for DDL execution. func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls []string, index int, conn *DBConn) error { - parser2, err2 := s.fromDB.getParser(s.cfg.EnableANSIQuotes) - if err2 != nil { - return err // return the original error - } + // We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes + parser2 := parser.New() // it only ignore `invalid connection` error (timeout or other causes) for `ADD INDEX`. // `invalid connection` means some data already sent to the server, diff --git a/syncer/handle_error.go b/syncer/handle_error.go index 646bd4600b..c1dbef3d90 100644 --- a/syncer/handle_error.go +++ b/syncer/handle_error.go @@ -17,15 +17,14 @@ import ( "context" "fmt" + "github.com/pingcap/parser" + "github.com/pingcap/dm/dm/command" "github.com/pingcap/dm/dm/pb" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - tmysql "github.com/pingcap/parser/mysql" - "github.com/siddontang/go-mysql/replication" ) @@ -73,9 +72,10 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque func (s *Syncer) genEvents(sqls []string) ([]*replication.BinlogEvent, error) { events := make([]*replication.BinlogEvent, 0) - parser2 := parser.New() - if s.cfg.EnableANSIQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2, err := s.fromDB.getParser() + if err != nil { + s.tctx.L().Error("failed to get SQL mode specified parser from upstream, using default SQL mode instead") + parser2 = parser.New() } for _, sql := range sqls { diff --git a/syncer/schema.go b/syncer/schema.go index c1c0391c31..4c8f8f66c4 100644 --- a/syncer/schema.go +++ b/syncer/schema.go @@ -37,7 +37,7 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR // for set schema, we must ensure it's a valid `CREATE TABLE` statement. // now, we only set schema for schema-tracker, // if want to update the one in checkpoint, it should wait for the flush of checkpoint. - parser2, err := s.fromDB.getParser(s.cfg.EnableANSIQuotes) + parser2, err := s.fromDB.getParser() if err != nil { return "", err } diff --git a/syncer/syncer.go b/syncer/syncer.go index 505f5d71a8..1911f2f04e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -41,12 +41,15 @@ import ( "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/atomic2" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" + "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" @@ -61,7 +64,6 @@ import ( operator "github.com/pingcap/dm/syncer/err-operator" sm "github.com/pingcap/dm/syncer/safe-mode" "github.com/pingcap/dm/syncer/shardddl" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) var ( @@ -622,18 +624,17 @@ func (s *Syncer) getTable(origSchema, origTable, renamedSchema, renamedTable str func (s *Syncer) trackTableInfoFromDownstream(origSchema, origTable, renamedSchema, renamedTable string) error { // TODO: Switch to use the HTTP interface to retrieve the TableInfo directly // (and get rid of ddlDBConn). - rows, err := s.ddlDBConn.querySQL(s.tctx, "SHOW CREATE TABLE "+dbutil.TableName(renamedSchema, renamedTable)) + // use parser for downstream. + parser2, err := utils.GetParserForConn(s.ddlDBConn.baseConn.DBConn) if err != nil { return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } - defer rows.Close() - // use parser for downstream. - // NOTE: refine the definition of `ansi-quotes` with `sql_mode` in `session` later. - parser2, err := utils.GetParser(s.ddlDB.DB, s.cfg.EnableANSIQuotes) + rows, err := s.ddlDBConn.querySQL(s.tctx, "SHOW CREATE TABLE "+dbutil.TableName(renamedSchema, renamedTable)) if err != nil { return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } + defer rows.Close() ctx := context.Background() for rows.Next() { @@ -1075,11 +1076,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }() - parser2, err := s.fromDB.getParser(s.cfg.EnableANSIQuotes) - if err != nil { - return err - } - fresh, err := s.IsFreshTask(ctx) if err != nil { return err @@ -1413,7 +1409,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tryReSync: tryReSync, startTime: startTime, traceID: &traceID, - parser2: parser2, shardingReSyncCh: &shardingReSyncCh, } @@ -1678,7 +1673,13 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) error { sql := strings.TrimSpace(string(ev.Query)) usedSchema := string(ev.Schema) - parseResult, err := s.parseDDLSQL(sql, ec.parser2, usedSchema) + parser2, err := event.GetParserForStatusVars(ev.StatusVars) + if err != nil { + log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err)) + parser2 = parser.New() + } + + parseResult, err := s.parseDDLSQL(sql, parser2, usedSchema) if err != nil { s.tctx.L().Error("fail to parse statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) return err @@ -1722,7 +1723,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e onlineDDLTableNames map[string]*filter.Table ) - sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, ec.parser2, parseResult.stmt, usedSchema) + // for DDL, we don't apply operator until we try to execute it. so can handle sharding cases + // We use default parser because inside function where need parser, sqls are came from parserpkg.SplitDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes + sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, parser.New(), parseResult.stmt, usedSchema) if err != nil { s.tctx.L().Error("fail to resolve statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) return err @@ -1754,7 +1757,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name ) for _, sql := range sqls { - sqlDDL, tableNames, stmt, handleErr := s.handleDDL(ec.parser2, usedSchema, sql) + // We use default parser because sqls are came from above *Syncer.resolveDDLSQL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes + sqlDDL, tableNames, stmt, handleErr := s.handleDDL(parser.New(), usedSchema, sql) if handleErr != nil { return handleErr } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ef8c42c41f..17f3828255 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -216,10 +216,10 @@ func (s *testSyncerSuite) resetEventsGenerator(c *C) { func (s *testSyncerSuite) TearDownSuite(c *C) {} func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.Parser, error) { - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + mock.ExpectQuery("SHOW VARIABLES LIKE"). WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - return utils.GetParser(db, false) + return utils.GetParser(db) } func (s *testSyncerSuite) mockCheckPointCreate(checkPointMock sqlmock.Sqlmock, tag string) { @@ -1168,11 +1168,6 @@ func (s *testSyncerSuite) TestRun(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - // mock get parser - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - go syncer.Process(ctx, resultCh) expectJobs1 := []*expectJob{ @@ -1267,10 +1262,6 @@ func (s *testSyncerSuite) TestRun(c *C) { mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(3), "c"}}}}, } - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - ctx, cancel = context.WithCancel(context.Background()) resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns @@ -1389,11 +1380,6 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - // mock get parser - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - // disable 5-minute safe mode c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) go syncer.Process(ctx, resultCh) diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index e6855589c7..00ae4f6ba7 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -24,7 +24,7 @@ function run() { failpoints=( # 1152 is ErrAbortingConnection "github.com/pingcap/dm/pkg/utils/GetGlobalVariableFailed=return(\"server_uuid,1152\")" - "github.com/pingcap/dm/pkg/utils/GetGlobalVariableFailed=return(\"sql_mode,1152\")" + "github.com/pingcap/dm/pkg/utils/GetSessionVariableFailed=return(\"sql_mode,1152\")" ) for(( i=0;i<${#failpoints[@]};i++)) do