From 1dc615ff8383be20d87616a2932b5fd6b178261b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 13:10:02 +0800 Subject: [PATCH 01/23] stash --- dm/config/task.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dm/config/task.go b/dm/config/task.go index 84c82b0cd1..6d4c3154fb 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -307,6 +307,7 @@ type TaskConfig struct { CleanDumpFile bool `yaml:"clean-dump-file"` + // TODO(lance6716): check usage of this EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"` } From f9df7ec2bf0921e0f026acd434c8c312a983e35a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 16:11:26 +0800 Subject: [PATCH 02/23] stash --- dm/config/subtask.go | 1 + dm/config/task.go | 1 - loader/loader.go | 1 + pkg/binlog/reader/util.go | 1 + pkg/parser/common.go | 1 + pkg/utils/db.go | 1 + pkg/v1dbschema/schema.go | 1 + syncer/job.go | 1 + syncer/optimist.go | 1 + syncer/syncer.go | 5 +++-- 10 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 00e89f388a..011eb3b834 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -192,6 +192,7 @@ type SubTaskConfig struct { CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` + // TODO(lance6716): check usage of this EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"` // still needed by Syncer / Loader bin diff --git a/dm/config/task.go b/dm/config/task.go index 6d4c3154fb..84c82b0cd1 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -307,7 +307,6 @@ type TaskConfig struct { CleanDumpFile bool `yaml:"clean-dump-file"` - // TODO(lance6716): check usage of this EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"` } diff --git a/loader/loader.go b/loader/loader.go index 58c7cc6cbe..d302e9beb7 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -1157,6 +1157,7 @@ func (l *Loader) restoreData(ctx context.Context) error { dataFiles := tables[table] tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) if _, ok := l.tableInfos[tableName(db, table)]; !ok { + // TODO(lance6716): check whether mydumper/dumpling follows upstream SQLMode, thus need us specify l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.EnableANSIQuotes) if err != nil { return terror.Annotatef(err, "parse table %s/%s", db, table) diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index d0b8fc9a34..bac7d67c86 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -60,6 +60,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. switch ev := e.Event.(type) { case *replication.QueryEvent: + // TODO(lance6716): first blood! isDDL := common.CheckIsDDL(string(ev.Query), parser2) if isDDL { if latestGSet == nil { diff --git a/pkg/parser/common.go b/pkg/parser/common.go index 002f327e89..f455be9992 100644 --- a/pkg/parser/common.go +++ b/pkg/parser/common.go @@ -173,6 +173,7 @@ func RenameDDLTable(stmt ast.StmtNode, targetTableNames []*filter.Table) (string } // SplitDDL splits multiple operations in one DDL statement into multiple DDL statements +// returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes // if fail to restore, it would not restore the value of `stmt` (it changes it's values if `stmt` is one of DropTableStmt, RenameTableStmt, AlterTableStmt) func SplitDDL(stmt ast.StmtNode, schema string) (sqls []string, err error) { var ( diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 8d43994380..9c28ea3db2 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -362,6 +362,7 @@ func HasAnsiQuotesMode(db *sql.DB) (bool, error) { } // GetParser gets a parser which maybe enabled `ANSI_QUOTES` sql_mode +// TODO(lance6716): check remove second param func GetParser(db *sql.DB, ansiQuotesMode bool) (*parser.Parser, error) { if !ansiQuotesMode { // try get from DB diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 62253d5ea1..0b740e9e70 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -51,6 +51,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo defer db.CloseBaseConn(dbConn) // setup SQL parser. + // TODO(lance6716): remove this and call parser in binlog parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) if err != nil { return terror.ErrFailUpdateV1DBSchema.Delegate(err) diff --git a/syncer/job.go b/syncer/job.go index 29dec4563e..77eafc4064 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -102,6 +102,7 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql // newDDL job is used to create a new ddl job // when cfg.ShardMode == "", ddlInfo == nil,sourceTbls != nil, we use sourceTbls to record ddl affected tables. // when cfg.ShardMode == ShardOptimistic || ShardPessimistic, ddlInfo != nil, sourceTbls == nil. +// TODO(lance6716): check where ddls came from: noshard: backquotes func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, location, cmdLocation binlog.Location, traceID string, sourceTbls map[string]map[string]struct{}) *job { location1 := location.Clone() diff --git a/syncer/optimist.go b/syncer/optimist.go index fb152e25f2..ba60aedbea 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -178,6 +178,7 @@ func (s *Syncer) handleQueryEventOptimistic( } // updated needHandleDDLs to DDLs received from DM-master. + // TODO(lance6716): check optimism could handle different ANSI_QUOTES needHandleDDLs = op.DDLs s.tctx.L().Info("start to handle ddls in optimistic shard mode", zap.String("event", "query"), diff --git a/syncer/syncer.go b/syncer/syncer.go index a890f5068c..40a434dec3 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1617,6 +1617,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e sql := strings.TrimSpace(string(ev.Query)) usedSchema := string(ev.Schema) + // TODO(lance6716): second blood parseResult, err := s.parseDDLSQL(sql, ec.parser2, usedSchema) if err != nil { s.tctx.L().Error("fail to parse statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) @@ -1665,7 +1666,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // for DDL, we don't apply operator until we try to execute it. // so can handle sharding cases - sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, ec.parser2, parseResult.stmt, usedSchema) + sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, parser.New(), parseResult.stmt, usedSchema) if err != nil { s.tctx.L().Error("fail to resolve statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) return err @@ -1697,7 +1698,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name ) for _, sql := range sqls { - sqlDDL, tableNames, stmt, handleErr := s.handleDDL(ec.parser2, usedSchema, sql) + sqlDDL, tableNames, stmt, handleErr := s.handleDDL(parser.New(), usedSchema, sql) if handleErr != nil { return handleErr } From 7aa308cb442791cd5790109e2d5c828201978cab Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 17:03:07 +0800 Subject: [PATCH 03/23] *: auto discover ANSI_QUOTES --- dm/config/subtask.go | 2 +- pkg/binlog/reader/tcp_test.go | 2 +- pkg/binlog/reader/util.go | 16 ++++++++++++++-- pkg/binlog/reader/util_test.go | 7 ++----- pkg/parser/common.go | 1 + pkg/utils/db.go | 14 +++++--------- pkg/v1dbschema/schema.go | 19 +++++-------------- relay/relay.go | 2 +- syncer/db.go | 4 ++-- syncer/ddl_test.go | 2 +- syncer/error.go | 6 ++---- syncer/job.go | 1 - syncer/optimist.go | 1 - syncer/schema.go | 2 +- syncer/syncer.go | 24 ++++++++++++++---------- syncer/syncer_test.go | 2 +- 16 files changed, 51 insertions(+), 54 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 011eb3b834..016748f0ad 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -192,7 +192,7 @@ type SubTaskConfig struct { CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` - // TODO(lance6716): check usage of this + // TODO(lance6716): remove this (and parent config) after updating tidb-tools "diff" and checking if dump unit need this EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"` // still needed by Syncer / Loader bin diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index d1e07ffac9..0d7191e769 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -298,7 +298,7 @@ func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) { timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - parser2, err := utils.GetParser(t.db, false) + parser2, err := utils.GetParser(t.db) c.Assert(err, IsNil) forLoop: diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index bac7d67c86..d4cb164356 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -19,12 +19,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser" + tmysql "github.com/pingcap/parser/mysql" uuid "github.com/satori/go.uuid" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/relay/common" ) @@ -32,7 +35,7 @@ import ( // GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). // NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. // NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. -func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) { +func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) { // start to get and parse binlog event from the beginning of the file. startPos := gmysql.Position{ Name: endPos.Name, @@ -60,7 +63,16 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. switch ev := e.Event.(type) { case *replication.QueryEvent: - // TODO(lance6716): first blood! + parser2 := parser.New() + ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) + if err != nil { + log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) + ansiQuotes = false + } + if ansiQuotes { + parser2.SetSQLMode(tmysql.ModeANSIQuotes) + } + isDDL := common.CheckIsDDL(string(ev.Query), parser2) if isDDL { if latestGSet == nil { diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go index 1412bdfaa8..4c51699938 100644 --- a/pkg/binlog/reader/util_test.go +++ b/pkg/binlog/reader/util_test.go @@ -44,14 +44,11 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { endPos, endGS, err := utils.GetMasterStatus(t.db, flavor) c.Assert(err, IsNil) - parser2, err := utils.GetParser(t.db, false) - c.Assert(err, IsNil) - r1 := NewTCPReader(cfg) c.Assert(r1, NotNil) defer r1.Close() - gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2) + gs, err := GetGTIDsForPos(ctx, r1, endPos) c.Assert(err, IsNil) c.Assert(gs.Equal(endGS), IsTrue) @@ -62,7 +59,7 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{ Name: endPos.Name, Pos: endPos.Pos - 1, - }, parser2) + }) c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") c.Assert(gs, IsNil) } diff --git a/pkg/parser/common.go b/pkg/parser/common.go index f455be9992..f6cd3bd0fd 100644 --- a/pkg/parser/common.go +++ b/pkg/parser/common.go @@ -94,6 +94,7 @@ func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, erro // RenameDDLTable renames table names in ddl by given `targetTableNames` // argument `targetTableNames` is same with return value of FetchDDLTableNames +// returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes func RenameDDLTable(stmt ast.StmtNode, targetTableNames []*filter.Table) (string, error) { switch v := stmt.(type) { case *ast.AlterDatabaseStmt: diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 9c28ea3db2..db7c2beea3 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -362,15 +362,11 @@ func HasAnsiQuotesMode(db *sql.DB) (bool, error) { } // GetParser gets a parser which maybe enabled `ANSI_QUOTES` sql_mode -// TODO(lance6716): check remove second param -func GetParser(db *sql.DB, ansiQuotesMode bool) (*parser.Parser, error) { - if !ansiQuotesMode { - // try get from DB - var err error - ansiQuotesMode, err = HasAnsiQuotesMode(db) - if err != nil { - return nil, err - } +func GetParser(db *sql.DB) (*parser.Parser, error) { + // try get from DB + ansiQuotesMode, err := HasAnsiQuotesMode(db) + if err != nil { + return nil, err } parser2 := parser.New() diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 0b740e9e70..eaa85d1cfe 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -20,7 +20,6 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/parser" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/errno" gmysql "github.com/siddontang/go-mysql/mysql" @@ -36,7 +35,6 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" ) // UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: @@ -50,13 +48,6 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo } defer db.CloseBaseConn(dbConn) - // setup SQL parser. - // TODO(lance6716): remove this and call parser in binlog - parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) - if err != nil { - return terror.ErrFailUpdateV1DBSchema.Delegate(err) - } - // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := replication.BinlogSyncerConfig{ ServerID: cfg.ServerID, @@ -71,7 +62,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo tcpReader := reader.NewTCPReader(syncCfg) // update checkpoint. - err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader, parser2) + err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader) if err != nil { return terror.ErrFailUpdateV1DBSchema.Delegate(err) } @@ -88,7 +79,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo // - update column value: // - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable. // NOTE: no need to update the value of `table_info` because DM can get schema automatically from downstream when replicating DML. -func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader, parser2 *parser.Parser) error { +func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader) error { logger := log.L().WithFields(zap.String("task", taskName), zap.String("source", sourceID)) logger.Info("updating syncer checkpoint", zap.Bool("fill GTID", fillGTIDs)) var gs gtid.Set @@ -104,7 +95,7 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN return terror.Annotatef(err, "get global checkpoint position for source %s", sourceID) } if pos.Name != "" { - gs, err = getGTIDsForPos(tctx, pos, tcpReader, parser2) + gs, err = getGTIDsForPos(tctx, pos, tcpReader) if err != nil { return terror.Annotatef(err, "get GTID sets for position %s", pos) } @@ -177,7 +168,7 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour } // getGTIDsForPos gets the GTID sets for the position. -func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader, parser2 *parser.Parser) (gtid.Set, error) { +func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gtid.Set, error) { // NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream, // we may encounter errors when reading binlog event but cleared by another test case. failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) { @@ -191,7 +182,7 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade if err != nil { return nil, err } - gs, err := reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + gs, err := reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos) if err != nil { return nil, err } diff --git a/relay/relay.go b/relay/relay.go index 1ec3cdbb84..159cc1c44d 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -225,7 +225,7 @@ func (r *Relay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterReque } func (r *Relay) process(parentCtx context.Context) error { - parser2, err := utils.GetParser(r.db, false) // refine to use user config later + parser2, err := utils.GetParser(r.db) // refine to use user config later if err != nil { return err } diff --git a/syncer/db.go b/syncer/db.go index f98948435c..5c3c89f6a9 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -91,8 +91,8 @@ func (conn *UpStreamConn) getServerUUID(flavor string) (string, error) { return utils.GetServerUUID(conn.BaseDB.DB, flavor) } -func (conn *UpStreamConn) getParser(ansiQuotesMode bool) (*parser.Parser, error) { - return utils.GetParser(conn.BaseDB.DB, ansiQuotesMode) +func (conn *UpStreamConn) getParser() (*parser.Parser, error) { + return utils.GetParser(conn.BaseDB.DB) } func (conn *UpStreamConn) killConn(connID uint32) error { diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index 12352ff6ad..74e76313bb 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -77,7 +77,7 @@ func (s *testSyncerSuite) TestAnsiQuotes(c *C) { AddRow("sql_mode", "ANSI_QUOTES")) c.Assert(err, IsNil) - parser, err := utils.GetParser(db, false) + parser, err := utils.GetParser(db) c.Assert(err, IsNil) for _, sql := range ansiQuotesCases { diff --git a/syncer/error.go b/syncer/error.go index 68ec2694b0..cbd13f5faf 100644 --- a/syncer/error.go +++ b/syncer/error.go @@ -20,6 +20,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -95,10 +96,7 @@ func originError(err error) error { // handleSpecialDDLError handles special errors for DDL execution. func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls []string, index int, conn *DBConn) error { - parser2, err2 := s.fromDB.getParser(s.cfg.EnableANSIQuotes) - if err2 != nil { - return err // return the original error - } + parser2 := parser.New() // it only ignore `invalid connection` error (timeout or other causes) for `ADD INDEX`. // `invalid connection` means some data already sent to the server, diff --git a/syncer/job.go b/syncer/job.go index 77eafc4064..29dec4563e 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -102,7 +102,6 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql // newDDL job is used to create a new ddl job // when cfg.ShardMode == "", ddlInfo == nil,sourceTbls != nil, we use sourceTbls to record ddl affected tables. // when cfg.ShardMode == ShardOptimistic || ShardPessimistic, ddlInfo != nil, sourceTbls == nil. -// TODO(lance6716): check where ddls came from: noshard: backquotes func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, location, cmdLocation binlog.Location, traceID string, sourceTbls map[string]map[string]struct{}) *job { location1 := location.Clone() diff --git a/syncer/optimist.go b/syncer/optimist.go index ba60aedbea..fb152e25f2 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -178,7 +178,6 @@ func (s *Syncer) handleQueryEventOptimistic( } // updated needHandleDDLs to DDLs received from DM-master. - // TODO(lance6716): check optimism could handle different ANSI_QUOTES needHandleDDLs = op.DDLs s.tctx.L().Info("start to handle ddls in optimistic shard mode", zap.String("event", "query"), diff --git a/syncer/schema.go b/syncer/schema.go index c1c0391c31..4c8f8f66c4 100644 --- a/syncer/schema.go +++ b/syncer/schema.go @@ -37,7 +37,7 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR // for set schema, we must ensure it's a valid `CREATE TABLE` statement. // now, we only set schema for schema-tracker, // if want to update the one in checkpoint, it should wait for the flush of checkpoint. - parser2, err := s.fromDB.getParser(s.cfg.EnableANSIQuotes) + parser2, err := s.fromDB.getParser() if err != nil { return "", err } diff --git a/syncer/syncer.go b/syncer/syncer.go index 40a434dec3..516238ba38 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/format" "github.com/pingcap/parser/model" + tmysql "github.com/pingcap/parser/mysql" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/dbutil" @@ -46,6 +47,7 @@ import ( "github.com/pingcap/dm/pkg/atomic2" "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" + "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" @@ -619,8 +621,7 @@ func (s *Syncer) trackTableInfoFromDownstream(origSchema, origTable, renamedSche defer rows.Close() // use parser for downstream. - // NOTE: refine the definition of `ansi-quotes` with `sql_mode` in `session` later. - parser2, err := utils.GetParser(s.ddlDB.DB, s.cfg.EnableANSIQuotes) + parser2, err := utils.GetParser(s.ddlDB.DB) if err != nil { return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } @@ -1062,11 +1063,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }() - parser2, err := s.fromDB.getParser(s.cfg.EnableANSIQuotes) - if err != nil { - return err - } - fresh, err := s.IsFreshTask(ctx) if err != nil { return err @@ -1331,7 +1327,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tryReSync: tryReSync, startTime: startTime, traceID: &traceID, - parser2: parser2, shardingReSyncCh: &shardingReSyncCh, } @@ -1617,8 +1612,17 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e sql := strings.TrimSpace(string(ev.Query)) usedSchema := string(ev.Schema) - // TODO(lance6716): second blood - parseResult, err := s.parseDDLSQL(sql, ec.parser2, usedSchema) + parser2 := parser.New() + ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) + if err != nil { + log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) + ansiQuotes = false + } + if ansiQuotes { + parser2.SetSQLMode(tmysql.ModeANSIQuotes) + } + + parseResult, err := s.parseDDLSQL(sql, parser2, usedSchema) if err != nil { s.tctx.L().Error("fail to parse statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) return err diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 6361d1b658..c118b74114 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -215,7 +215,7 @@ func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser. mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - return utils.GetParser(db, false) + return utils.GetParser(db) } func (s *testSyncerSuite) mockCheckPointCreate(checkPointMock sqlmock.Sqlmock, tag string) { From 0a3a8cbe7ec7e8dc83b406d6f1781046b2f98122 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 18:03:44 +0800 Subject: [PATCH 04/23] fix ut --- syncer/syncer_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c118b74114..01b8ab907a 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1173,11 +1173,6 @@ func (s *testSyncerSuite) TestRun(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - // mock get parser - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - go syncer.Process(ctx, resultCh) expectJobs1 := []*expectJob{ @@ -1272,10 +1267,6 @@ func (s *testSyncerSuite) TestRun(c *C) { mockBinlogEvent{typ: Delete, args: []interface{}{uint64(8), "test_1", "t_1", []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, [][]interface{}{{int32(3), "c"}}}}, } - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - ctx, cancel = context.WithCancel(context.Background()) resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns From f10a4bc7403618fa49865e36ad6a8771572a1355 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 20:40:17 +0800 Subject: [PATCH 05/23] fix it --- pkg/utils/db.go | 69 +++++++++++++++++++++++++----------------------- syncer/syncer.go | 8 +++--- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index db7c2beea3..94302b38ac 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -247,8 +247,27 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) { // GetGlobalVariable gets server's global variable func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { - query := fmt.Sprintf("SHOW GLOBAL VARIABLES LIKE '%s'", variable) - rows, err := db.Query(query) + conn, err := db.Conn(context.Background()) + if err != nil { + return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + return getVariable(conn, variable, true) +} + +// GetSessionVariable gets connection's session variable +func GetSessionVariable(conn *sql.Conn, variable string) (value string, err error) { + return getVariable(conn, variable, false) +} + +func getVariable(conn *sql.Conn, variable string, isGlobal bool) (value string, err error) { + var template string + if isGlobal { + template = "SHOW GLOBAL VARIABLES LIKE '%s'" + } else { + template = "SHOW VARIABLES LIKE '%s'" + } + query := fmt.Sprintf(template, variable) + row := conn.QueryRowContext(context.Background(), query) failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) { items := strings.Split(val.(string), ",") @@ -267,11 +286,6 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { } }) - if err != nil { - return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - defer rows.Close() - // Show an example. /* mysql> SHOW GLOBAL VARIABLES LIKE "binlog_format"; @@ -282,17 +296,10 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { +---------------+-------+ */ - for rows.Next() { - err = rows.Scan(&variable, &value) - if err != nil { - return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - } - - if rows.Err() != nil { - return "", terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + err = row.Scan(&variable, &value) + if err != nil { + return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - return value, nil } @@ -341,36 +348,32 @@ func GetMariaDBUUID(db *sql.DB) (string, error) { return fmt.Sprintf("%d%s%d", domainID, domainServerIDSeparator, serverID), nil } -// GetSQLMode returns sql_mode. -func GetSQLMode(db *sql.DB) (tmysql.SQLMode, error) { +// GetParser gets a parser for sql.DB which maybe enabled `ANSI_QUOTES` sql_mode +func GetParser(db *sql.DB) (*parser.Parser, error) { sqlMode, err := GetGlobalVariable(db, "sql_mode") if err != nil { - return tmysql.ModeNone, err + return nil, err } - - mode, err := tmysql.GetSQLMode(sqlMode) - return mode, terror.ErrGetSQLModeFromStr.Delegate(err, sqlMode) + return getParserFromSQLModeStr(sqlMode) } -// HasAnsiQuotesMode checks whether database has `ANSI_QUOTES` set -func HasAnsiQuotesMode(db *sql.DB) (bool, error) { - mode, err := GetSQLMode(db) +// GetParserForConn gets a parser for sql.Conn which maybe enabled `ANSI_QUOTES` sql_mode +func GetParserForConn(conn *sql.Conn) (*parser.Parser, error) { + sqlMode, err := GetSessionVariable(conn, "sql_mode") if err != nil { - return false, err + return nil, err } - return mode.HasANSIQuotesMode(), nil + return getParserFromSQLModeStr(sqlMode) } -// GetParser gets a parser which maybe enabled `ANSI_QUOTES` sql_mode -func GetParser(db *sql.DB) (*parser.Parser, error) { - // try get from DB - ansiQuotesMode, err := HasAnsiQuotesMode(db) +func getParserFromSQLModeStr(sqlMode string) (*parser.Parser, error) { + mode, err := tmysql.GetSQLMode(sqlMode) if err != nil { return nil, err } parser2 := parser.New() - if ansiQuotesMode { + if mode.HasANSIQuotesMode() { parser2.SetSQLMode(tmysql.ModeANSIQuotes) } return parser2, nil diff --git a/syncer/syncer.go b/syncer/syncer.go index 516238ba38..78d45eb7a5 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -614,17 +614,17 @@ func (s *Syncer) getTable(origSchema, origTable, renamedSchema, renamedTable str func (s *Syncer) trackTableInfoFromDownstream(origSchema, origTable, renamedSchema, renamedTable string) error { // TODO: Switch to use the HTTP interface to retrieve the TableInfo directly // (and get rid of ddlDBConn). - rows, err := s.ddlDBConn.querySQL(s.tctx, "SHOW CREATE TABLE "+dbutil.TableName(renamedSchema, renamedTable)) + // use parser for downstream. + parser2, err := utils.GetParserForConn(s.ddlDBConn.baseConn.DBConn) if err != nil { return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } - defer rows.Close() - // use parser for downstream. - parser2, err := utils.GetParser(s.ddlDB.DB) + rows, err := s.ddlDBConn.querySQL(s.tctx, "SHOW CREATE TABLE "+dbutil.TableName(renamedSchema, renamedTable)) if err != nil { return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } + defer rows.Close() ctx := context.Background() for rows.Next() { From bcd4c990bca7129ef9455edb2cb82e58689c9ff9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 12 Aug 2020 22:48:55 +0800 Subject: [PATCH 06/23] don't know why dm-master exit --- pkg/utils/db.go | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 94302b38ac..e70c608162 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -23,19 +23,20 @@ import ( "strings" "time" + "github.com/pingcap/failpoint" + "go.uber.org/zap" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/parser" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/check" "github.com/pingcap/tidb-tools/pkg/dbutil" gmysql "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" ) var ( @@ -247,10 +248,36 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) { // GetGlobalVariable gets server's global variable func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { + failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) { + items := strings.Split(val.(string), ",") + log.L().Fatal("lance test -1") + log.L().Fatal("lance test", zap.Any("items", items)) + log.L().Fatal("lance test 0") + if len(items) != 2 { + log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) + } + log.L().Fatal("lance test 0.5") + variableName := items[0] + log.L().Fatal("lance test 1") + errCode, err1 := strconv.ParseUint(items[1], 10, 16) + log.L().Fatal("lance test 2") + if err1 != nil { + log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) + } + log.L().Fatal("lance test 3") + if variable == variableName { + log.L().Fatal("lance test 4") + err = tmysql.NewErr(uint16(errCode)) + log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err)) + failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError)) + } + log.L().Fatal("lance test 6") + }) conn, err := db.Conn(context.Background()) if err != nil { return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) } + defer conn.Close() return getVariable(conn, variable, true) } @@ -269,23 +296,6 @@ func getVariable(conn *sql.Conn, variable string, isGlobal bool) (value string, query := fmt.Sprintf(template, variable) row := conn.QueryRowContext(context.Background(), query) - failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) { - items := strings.Split(val.(string), ",") - if len(items) != 2 { - log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) - } - variableName := items[0] - errCode, err1 := strconv.ParseUint(items[1], 10, 16) - if err1 != nil { - log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) - } - - if variable == variableName { - err = tmysql.NewErr(uint16(errCode)) - log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err)) - } - }) - // Show an example. /* mysql> SHOW GLOBAL VARIABLES LIKE "binlog_format"; From 6d074105f7e99c9095eeee6e394cd7cddab15e28 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 13 Aug 2020 10:08:48 +0800 Subject: [PATCH 07/23] fix it --- pkg/utils/db.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index e70c608162..ef7d9aa9ee 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -250,29 +250,21 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) { func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) { items := strings.Split(val.(string), ",") - log.L().Fatal("lance test -1") - log.L().Fatal("lance test", zap.Any("items", items)) - log.L().Fatal("lance test 0") if len(items) != 2 { log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) } - log.L().Fatal("lance test 0.5") variableName := items[0] - log.L().Fatal("lance test 1") errCode, err1 := strconv.ParseUint(items[1], 10, 16) - log.L().Fatal("lance test 2") if err1 != nil { log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string))) } - log.L().Fatal("lance test 3") if variable == variableName { - log.L().Fatal("lance test 4") err = tmysql.NewErr(uint16(errCode)) log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err)) failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError)) } - log.L().Fatal("lance test 6") }) + conn, err := db.Conn(context.Background()) if err != nil { return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError) From 86f4404e41f982f5d8de619f103796eb62de5d6f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 13 Aug 2020 10:14:44 +0800 Subject: [PATCH 08/23] re-sort import --- pkg/utils/db.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index ef7d9aa9ee..36c943de76 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -23,20 +23,19 @@ import ( "strings" "time" - "github.com/pingcap/failpoint" - "go.uber.org/zap" - "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/check" "github.com/pingcap/tidb-tools/pkg/dbutil" gmysql "github.com/siddontang/go-mysql/mysql" + "go.uber.org/zap" ) var ( From 104f5df733698d825843bb8808157ba646c8f306 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 24 Aug 2020 20:17:54 +0800 Subject: [PATCH 09/23] address comment --- syncer/db.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/syncer/db.go b/syncer/db.go index 5c3c89f6a9..ec90291fd7 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -14,6 +14,7 @@ package syncer import ( + "context" "database/sql" "strings" "time" @@ -92,7 +93,11 @@ func (conn *UpStreamConn) getServerUUID(flavor string) (string, error) { } func (conn *UpStreamConn) getParser() (*parser.Parser, error) { - return utils.GetParser(conn.BaseDB.DB) + c, err := conn.BaseDB.DB.Conn(context.Background()) + if err != nil { + return nil, err + } + return utils.GetParserForConn(c) } func (conn *UpStreamConn) killConn(connID uint32) error { From 6ac75cc039e5593f101d8284e1025c707955677e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 26 Aug 2020 15:21:08 +0800 Subject: [PATCH 10/23] track master --- checker/checker.go | 11 ++++------- checker/cmd.go | 6 +++--- dm/config/subtask.go | 2 +- dm/config/task.go | 7 +++---- dm/config/task_test.go | 4 ++-- dm/master/server.go | 2 +- 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index 7aef39502e..550fad57b6 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -69,8 +69,6 @@ type Checker struct { instances []*mysqlInstance - enableANSIQuotes bool - checkList []check.Checker checkingItems map[string]string result struct { @@ -80,12 +78,11 @@ type Checker struct { } // NewChecker returns a checker -func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, enableANSIQuotes bool) *Checker { +func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker { c := &Checker{ - instances: make([]*mysqlInstance, 0, len(cfgs)), - checkingItems: checkingItems, - logger: log.With(zap.String("unit", "task check")), - enableANSIQuotes: enableANSIQuotes, + instances: make([]*mysqlInstance, 0, len(cfgs)), + checkingItems: checkingItems, + logger: log.With(zap.String("unit", "task check")), } for _, cfg := range cfgs { diff --git a/checker/cmd.go b/checker/cmd.go index 1ae8e341f0..588ddbc927 100644 --- a/checker/cmd.go +++ b/checker/cmd.go @@ -26,7 +26,7 @@ var ( ErrorMsgHeader = "fail to check synchronization configuration with type" // CheckSyncConfigFunc holds the CheckSyncConfig function - CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error + CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig) error ) func init() { @@ -34,7 +34,7 @@ func init() { } // CheckSyncConfig checks synchronization configuration -func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error { +func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error { if len(cfgs) == 0 { return nil } @@ -56,7 +56,7 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableAN return nil } - c := NewChecker(cfgs, checkingItems, enableANSIQuotes) + c := NewChecker(cfgs, checkingItems) err := c.Init(ctx) if err != nil { diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 016748f0ad..195250c504 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -192,7 +192,7 @@ type SubTaskConfig struct { CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` - // TODO(lance6716): remove this (and parent config) after updating tidb-tools "diff" and checking if dump unit need this + // TODO(lance6716): deprecated it (and parent config) after checking if dump unit need this EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"` // still needed by Syncer / Loader bin diff --git a/dm/config/task.go b/dm/config/task.go index c0a0e51639..4d1d08190c 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -273,7 +273,7 @@ type TaskConfig struct { TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"` IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"` ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support. - // treat it as hidden configuration + // treat it as hidden configuration IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"` // we store detail status in meta // don't save configuration into it @@ -307,7 +307,7 @@ type TaskConfig struct { Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"` CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"` - + // deprecated EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"` } @@ -516,7 +516,7 @@ func (c *TaskConfig) adjust() error { // for backward compatible, set global config `ansi-quotes: true` if any syncer is true if inst.Syncer.EnableANSIQuotes == true { - c.EnableANSIQuotes = true + log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect") } if dupeRules := checkDuplicateString(inst.RouteRules); len(dupeRules) > 0 { @@ -621,7 +621,6 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) { c.TargetDB = &stCfg0.To // just ref c.OnlineDDLScheme = stCfg0.OnlineDDLScheme c.CleanDumpFile = stCfg0.CleanDumpFile - c.EnableANSIQuotes = stCfg0.EnableANSIQuotes c.MySQLInstances = make([]*MySQLInstance, 0, len(stCfgs)) c.BAList = make(map[string]*filter.Rules) c.Routes = make(map[string]*router.TableRule) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index effb106607..eb264ffec8 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -549,8 +549,8 @@ func (t *testConfig) TestFromSubTaskConfigs(c *C) { "sync-01": &stCfg1.SyncerConfig, "sync-02": &stCfg2.SyncerConfig, }, - CleanDumpFile: stCfg1.CleanDumpFile, - EnableANSIQuotes: stCfg1.EnableANSIQuotes, + CleanDumpFile: stCfg1.CleanDumpFile, + //EnableANSIQuotes: stCfg1.EnableANSIQuotes, } c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare. diff --git a/dm/master/server.go b/dm/master/server.go index e607b8ffdf..d84811f996 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1523,7 +1523,7 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } - err = checker.CheckSyncConfigFunc(ctx, stCfgs, cfg.EnableANSIQuotes) + err = checker.CheckSyncConfigFunc(ctx, stCfgs) if err != nil { return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } From 488db8e18caf607b369b723953d183dd06e41914 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 26 Aug 2020 17:40:02 +0800 Subject: [PATCH 11/23] update behaviour --- dm/config/task.go | 1 + dumpling/dumpling.go | 17 +++++++---------- loader/convert_data.go | 11 +++++------ loader/loader.go | 3 +-- pkg/utils/db.go | 11 +++++------ 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 4d1d08190c..5859a6ff0b 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -200,6 +200,7 @@ func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error type LoaderConfig struct { PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"` Dir string `yaml:"dir" toml:"dir" json:"dir"` + SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit } func defaultLoaderConfig() LoaderConfig { diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index e835f974d8..4de5123c7c 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -58,7 +58,7 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling { func (m *Dumpling) Init(ctx context.Context) error { var err error m.dumpConfig, err = m.constructArgs() - m.detectAnsiQuotes() + m.detectSQLMode() return err } @@ -269,21 +269,18 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { return dumpConfig, nil } -// detectAnsiQuotes tries to detect ANSI_QUOTES from upstream. If success, change EnableANSIQuotes in subtask config -func (m *Dumpling) detectAnsiQuotes() { +// detectSQLMode tries to detect SQL mode from upstream. If success, write it to LoaderConfig +func (m *Dumpling) detectSQLMode() { db, err := sql.Open("mysql", m.dumpConfig.GetDSN("")) if err != nil { return } defer db.Close() - enable, err := utils.HasAnsiQuotesMode(db) + + sqlMode, err := utils.GetGlobalVariable(db, "sql_mode") if err != nil { return } - if enable != m.cfg.EnableANSIQuotes { - m.logger.Warn("found mismatched ANSI_QUOTES setting, going to overwrite it to DB specified", - zap.Bool("DB specified", enable), - zap.Bool("config file specified", m.cfg.EnableANSIQuotes)) - } - m.cfg.EnableANSIQuotes = enable + m.logger.Info("found upstream SQL mode", zap.String("SQL mode", sqlMode)) + m.cfg.LoaderConfig.SQLMode = sqlMode } diff --git a/loader/convert_data.go b/loader/convert_data.go index 9c57e74b03..18b073c398 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -25,11 +25,10 @@ import ( tcontext "github.com/pingcap/dm/pkg/context" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" - "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - tmysql "github.com/pingcap/parser/mysql" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" router "github.com/pingcap/tidb-tools/pkg/table-router" ) @@ -231,15 +230,15 @@ func tableName(schema, table string) string { return fmt.Sprintf("`%s`.`%s`", schema, table) } -func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, enableANSIQuotes bool) (*tableInfo, error) { +func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, sqlMode string) (*tableInfo, error) { statement, err := exportStatement(file) if err != nil { return nil, err } - parser2 := parser.New() - if enableANSIQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2, err := utils.GetParserFromSQLModeStr(sqlMode) + if err != nil { + return nil, err } stmts, err := parserpkg.Parse(parser2, string(statement), "", "") diff --git a/loader/loader.go b/loader/loader.go index d302e9beb7..9a604745e8 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -1157,8 +1157,7 @@ func (l *Loader) restoreData(ctx context.Context) error { dataFiles := tables[table] tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) if _, ok := l.tableInfos[tableName(db, table)]; !ok { - // TODO(lance6716): check whether mydumper/dumpling follows upstream SQLMode, thus need us specify - l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.EnableANSIQuotes) + l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode) if err != nil { return terror.Annotatef(err, "parse table %s/%s", db, table) } diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 36c943de76..5b94904b21 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -355,7 +355,7 @@ func GetParser(db *sql.DB) (*parser.Parser, error) { if err != nil { return nil, err } - return getParserFromSQLModeStr(sqlMode) + return GetParserFromSQLModeStr(sqlMode) } // GetParserForConn gets a parser for sql.Conn which maybe enabled `ANSI_QUOTES` sql_mode @@ -364,19 +364,18 @@ func GetParserForConn(conn *sql.Conn) (*parser.Parser, error) { if err != nil { return nil, err } - return getParserFromSQLModeStr(sqlMode) + return GetParserFromSQLModeStr(sqlMode) } -func getParserFromSQLModeStr(sqlMode string) (*parser.Parser, error) { +// GetParserFromSQLModeStr gets a parser and applies given sqlMode +func GetParserFromSQLModeStr(sqlMode string) (*parser.Parser, error) { mode, err := tmysql.GetSQLMode(sqlMode) if err != nil { return nil, err } parser2 := parser.New() - if mode.HasANSIQuotesMode() { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) - } + parser2.SetSQLMode(mode) return parser2, nil } From 0ea56b3c99fa4f11f09bd38923a6d563524ce6da Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 28 Aug 2020 16:29:59 +0800 Subject: [PATCH 12/23] fix test --- dm/master/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 3acaf94340..c399fb4a83 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -428,7 +428,7 @@ func (t *testMaster) TestStartTask(c *check.C) { // test start task, but the first step check-task fails bakCheckSyncConfigFunc := checker.CheckSyncConfigFunc - checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _ bool) error { + checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig) error { return errors.New(errCheckSyncConfig) } defer func() { From 9db590eb089a656a5b80aa94a930ade765f47aef Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 28 Aug 2020 17:06:32 +0800 Subject: [PATCH 13/23] fix CI --- checker/check_test.go | 4 ++-- loader/convert_data_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/checker/check_test.go b/checker/check_test.go index 34c2e5542d..95a17c5e08 100644 --- a/checker/check_test.go +++ b/checker/check_test.go @@ -31,12 +31,12 @@ type testChecker struct{} var _ = tc.Suite(&testChecker{}) func (t *testChecker) TestCheckSyncConfig(c *tc.C) { - c.Assert(CheckSyncConfig(context.Background(), nil, false), tc.IsNil) + c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil) cfgs := []*config.SubTaskConfig{ { IgnoreCheckingItems: []string{config.AllChecking}, }, } - c.Assert(CheckSyncConfig(context.Background(), cfgs, false), tc.IsNil) + c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil) } diff --git a/loader/convert_data_test.go b/loader/convert_data_test.go index 51fac2158d..c302fdaa38 100644 --- a/loader/convert_data_test.go +++ b/loader/convert_data_test.go @@ -164,7 +164,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) { r, err := router.NewTableRouter(false, rules) c.Assert(err, IsNil) - tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", true) + tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES") c.Assert(err, IsNil) c.Assert(tableInfo, DeepEquals, expectedTableInfo) } @@ -192,7 +192,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) { r, err := router.NewTableRouter(false, rules) c.Assert(err, IsNil) - tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", false) + tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "") c.Assert(err, IsNil) c.Assert(tableInfo, DeepEquals, expectedTableInfo) } From 674c326741c87f3623944db49229cb210436b2f5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 3 Sep 2020 12:42:57 +0800 Subject: [PATCH 14/23] track commit --- dm/config/subtask.go | 2 +- syncer/db.go | 1 + syncer/handle_error.go | 12 ++++++------ 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 195250c504..13c9cae165 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -192,7 +192,7 @@ type SubTaskConfig struct { CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` - // TODO(lance6716): deprecated it (and parent config) after checking if dump unit need this + // deprecated, will auto discover SQL mode EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"` // still needed by Syncer / Loader bin diff --git a/syncer/db.go b/syncer/db.go index ec90291fd7..6e0a5aea51 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -97,6 +97,7 @@ func (conn *UpStreamConn) getParser() (*parser.Parser, error) { if err != nil { return nil, err } + defer c.Close() return utils.GetParserForConn(c) } diff --git a/syncer/handle_error.go b/syncer/handle_error.go index 646bd4600b..c1dbef3d90 100644 --- a/syncer/handle_error.go +++ b/syncer/handle_error.go @@ -17,15 +17,14 @@ import ( "context" "fmt" + "github.com/pingcap/parser" + "github.com/pingcap/dm/dm/command" "github.com/pingcap/dm/dm/pb" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - tmysql "github.com/pingcap/parser/mysql" - "github.com/siddontang/go-mysql/replication" ) @@ -73,9 +72,10 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque func (s *Syncer) genEvents(sqls []string) ([]*replication.BinlogEvent, error) { events := make([]*replication.BinlogEvent, 0) - parser2 := parser.New() - if s.cfg.EnableANSIQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2, err := s.fromDB.getParser() + if err != nil { + s.tctx.L().Error("failed to get SQL mode specified parser from upstream, using default SQL mode instead") + parser2 = parser.New() } for _, sql := range sqls { From 0c9a91af577dbf8ee62b5e6575c8d5dcf862255c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 3 Sep 2020 14:01:07 +0800 Subject: [PATCH 15/23] fix test --- syncer/syncer_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7aa69df238..da84bccd63 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1380,11 +1380,6 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - // mock get parser - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - // disable 5-minute safe mode c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) go syncer.Process(ctx, resultCh) From 6498e2452c72aae9ac711e184c589d3ffc7166af Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 3 Sep 2020 19:15:56 +0800 Subject: [PATCH 16/23] add comments --- syncer/error.go | 1 + syncer/syncer.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/syncer/error.go b/syncer/error.go index cbd13f5faf..d02e59386a 100644 --- a/syncer/error.go +++ b/syncer/error.go @@ -96,6 +96,7 @@ func originError(err error) error { // handleSpecialDDLError handles special errors for DDL execution. func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls []string, index int, conn *DBConn) error { + // We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes parser2 := parser.New() // it only ignore `invalid connection` error (timeout or other causes) for `ADD INDEX`. diff --git a/syncer/syncer.go b/syncer/syncer.go index 8b96a91efb..6b681aa3a3 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1727,8 +1727,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e onlineDDLTableNames map[string]*filter.Table ) - // for DDL, we don't apply operator until we try to execute it. - // so can handle sharding cases + // for DDL, we don't apply operator until we try to execute it. so can handle sharding cases + // We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, parser.New(), parseResult.stmt, usedSchema) if err != nil { s.tctx.L().Error("fail to resolve statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) @@ -1761,6 +1761,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name ) for _, sql := range sqls { + // We use default parser because sqls are came from above *Syncer.resolveDDLSQL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes sqlDDL, tableNames, stmt, handleErr := s.handleDDL(parser.New(), usedSchema, sql) if handleErr != nil { return handleErr From d8d29e0bdb6132847b3fb3cf2f597ff9db83ec97 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 3 Sep 2020 19:21:13 +0800 Subject: [PATCH 17/23] fix wrong comment --- syncer/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 6b681aa3a3..fd653af53e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1728,7 +1728,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e ) // for DDL, we don't apply operator until we try to execute it. so can handle sharding cases - // We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes + // We use default parser because inside function where need parser, sqls are came from parserpkg.SplitDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes sqls, onlineDDLTableNames, err = s.resolveDDLSQL(ec.tctx, parser.New(), parseResult.stmt, usedSchema) if err != nil { s.tctx.L().Error("fail to resolve statement", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema), zap.Stringer("last location", ec.lastLocation), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err)) From 83bacb745bab7bcfa01f3ec59c37319640acbaae Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 4 Sep 2020 10:56:33 +0800 Subject: [PATCH 18/23] save my work --- pkg/binlog/reader/util.go | 1 + syncer/syncer.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index d4cb164356..fb8116fb16 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -63,6 +63,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. switch ev := e.Event.(type) { case *replication.QueryEvent: + // TODO(lance6716): getParserForStatusVars parser2 := parser.New() ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) if err != nil { diff --git a/syncer/syncer.go b/syncer/syncer.go index fd653af53e..e1e3804cc1 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1673,6 +1673,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) error { sql := strings.TrimSpace(string(ev.Query)) usedSchema := string(ev.Schema) + // TODO(lance6716): getParserForStatusVars parser2 := parser.New() ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) if err != nil { From 02df4e3a36cf0b78b0f51179eb6e5c3f72692a2d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Sep 2020 21:00:41 +0800 Subject: [PATCH 19/23] address comment --- pkg/utils/db.go | 9 +++++---- syncer/db.go | 8 +------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 5b22c67cfd..c37bc1fd7a 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -349,16 +349,17 @@ func GetMariaDBUUID(db *sql.DB) (string, error) { return fmt.Sprintf("%d%s%d", domainID, domainServerIDSeparator, serverID), nil } -// GetParser gets a parser for sql.DB which maybe enabled `ANSI_QUOTES` sql_mode +// GetParser gets a parser for sql.DB which is suitable for session variable sql_mode func GetParser(db *sql.DB) (*parser.Parser, error) { - sqlMode, err := GetGlobalVariable(db, "sql_mode") + c, err := db.Conn(context.Background()) if err != nil { return nil, err } - return GetParserFromSQLModeStr(sqlMode) + defer c.Close() + return GetParserForConn(c) } -// GetParserForConn gets a parser for sql.Conn which maybe enabled `ANSI_QUOTES` sql_mode +// GetParserForConn gets a parser for sql.Conn which is suitable for session variable sql_mode func GetParserForConn(conn *sql.Conn) (*parser.Parser, error) { sqlMode, err := GetSessionVariable(conn, "sql_mode") if err != nil { diff --git a/syncer/db.go b/syncer/db.go index 6e0a5aea51..5c3c89f6a9 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -14,7 +14,6 @@ package syncer import ( - "context" "database/sql" "strings" "time" @@ -93,12 +92,7 @@ func (conn *UpStreamConn) getServerUUID(flavor string) (string, error) { } func (conn *UpStreamConn) getParser() (*parser.Parser, error) { - c, err := conn.BaseDB.DB.Conn(context.Background()) - if err != nil { - return nil, err - } - defer c.Close() - return utils.GetParserForConn(c) + return utils.GetParser(conn.BaseDB.DB) } func (conn *UpStreamConn) killConn(connID uint32) error { From d26535321cf2076a89478896f879eee69500583b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Sep 2020 21:16:29 +0800 Subject: [PATCH 20/23] update parser const --- go.mod | 4 ++-- go.sum | 4 ++++ pkg/binlog/event/util.go | 26 +++++++++++++++++++------- pkg/binlog/reader/util.go | 10 ++-------- syncer/syncer.go | 13 ++++--------- 5 files changed, 31 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 021bc8c688..0836bd3a28 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 - github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 + github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 github.com/pingcap/tidb v1.1.0-beta.0.20200901032733-f82e5320ad75 github.com/pingcap/tidb-tools v4.0.5-0.20200828085514-03575b185007+incompatible github.com/prometheus/client_golang v1.5.1 @@ -44,7 +44,7 @@ require ( github.com/uber-go/atomic v1.4.0 // indirect github.com/unrolled/render v1.0.1 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 - go.uber.org/zap v1.15.0 + go.uber.org/zap v1.16.0 golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb diff --git a/go.sum b/go.sum index efaa98b70c..23dabee818 100644 --- a/go.sum +++ b/go.sum @@ -583,6 +583,8 @@ github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c/go.mod h1:vQdbJqobJ github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 h1:ATFD3gmwkSHcPt5DuQK3dZwqDU49WXOq/zRmwPJ6Nks= github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= +github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 h1:dnXiRdcubiMSkzHBBSkJo+9E+OX3+Z629ZEQEIVfBsI= +github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2 h1:JTzYYukREvxVSKW/ncrzNjFitd8snoQ/Xz32pw8i+s8= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA= @@ -862,6 +864,8 @@ go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20180214000028-650f4a345ab4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/pkg/binlog/event/util.go b/pkg/binlog/event/util.go index 3f0665a77b..0abc97f091 100644 --- a/pkg/binlog/event/util.go +++ b/pkg/binlog/event/util.go @@ -25,6 +25,8 @@ import ( "math" "reflect" + "github.com/pingcap/parser" + "github.com/pingcap/parser/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -305,24 +307,34 @@ var ( } ) -// GetAnsiQuotesMode gets ansi-quotes mode from binlog statusVars -func GetAnsiQuotesMode(statusVars []byte) (bool, error) { +// getSQLMode gets SQL mode from binlog statusVars +func getSQLMode(statusVars []byte) (mysql.SQLMode, error) { vars, err := statusVarsToKV(statusVars) if err != nil { - return false, err + return mysql.ModeNone, err } b, ok := vars[QSqlModeCode] if !ok { - // TODO(lance6716): if this will happen, create a terror - return false, errors.New("Q_SQL_MODE_CODE not found in status_vars") + // should not happen + return mysql.ModeNone, errors.New("Q_SQL_MODE_CODE not found in status_vars") } r := bytes.NewReader(b) var v int64 _ = binary.Read(r, binary.LittleEndian, &v) - // MODE_ANSI_QUOTES = 0x00000004 ref: https://dev.mysql.com/doc/internals/en/query-event.html#q-sql-mode-code - return v&0x00000004 != 0, nil + return mysql.SQLMode(v), nil +} + +// GetParserForStatusVars gets a parser for binlog which is suitable for its sql_mode in statusVars +func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) { + parser2 := parser.New() + mode, err := getSQLMode(statusVars) + if err != nil { + return nil, err + } + parser2.SetSQLMode(mode) + return parser2, nil } // if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index fb8116fb16..573b2b4272 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser" - tmysql "github.com/pingcap/parser/mysql" uuid "github.com/satori/go.uuid" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -63,15 +62,10 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. switch ev := e.Event.(type) { case *replication.QueryEvent: - // TODO(lance6716): getParserForStatusVars - parser2 := parser.New() - ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) + parser2, err := event.GetParserForStatusVars(ev.StatusVars) if err != nil { log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) - ansiQuotes = false - } - if ansiQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2 = parser.New() } isDDL := common.CheckIsDDL(string(ev.Query), parser2) diff --git a/syncer/syncer.go b/syncer/syncer.go index 8365e6bf71..cc84eb6300 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/format" "github.com/pingcap/parser/model" - tmysql "github.com/pingcap/parser/mysql" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/dbutil" @@ -42,6 +41,8 @@ import ( "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" @@ -63,7 +64,6 @@ import ( operator "github.com/pingcap/dm/syncer/err-operator" sm "github.com/pingcap/dm/syncer/safe-mode" "github.com/pingcap/dm/syncer/shardddl" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) var ( @@ -1673,15 +1673,10 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) error { sql := strings.TrimSpace(string(ev.Query)) usedSchema := string(ev.Schema) - // TODO(lance6716): getParserForStatusVars - parser2 := parser.New() - ansiQuotes, err := event.GetAnsiQuotesMode(ev.StatusVars) + parser2, err := event.GetParserForStatusVars(ev.StatusVars) if err != nil { log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) - ansiQuotes = false - } - if ansiQuotes { - parser2.SetSQLMode(tmysql.ModeANSIQuotes) + parser2 = parser.New() } parseResult, err := s.parseDDLSQL(sql, parser2, usedSchema) From 090d610c668884f7e2532f174227555e4048c378 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Sep 2020 21:23:05 +0800 Subject: [PATCH 21/23] fix test --- syncer/ddl_test.go | 2 +- syncer/syncer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index 74e76313bb..1490bff080 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -72,7 +72,7 @@ func (s *testSyncerSuite) TestAnsiQuotes(c *C) { } db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + mock.ExpectQuery("SHOW VARIABLES LIKE"). WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ANSI_QUOTES")) c.Assert(err, IsNil) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index da84bccd63..17f3828255 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -216,7 +216,7 @@ func (s *testSyncerSuite) resetEventsGenerator(c *C) { func (s *testSyncerSuite) TearDownSuite(c *C) {} func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.Parser, error) { - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + mock.ExpectQuery("SHOW VARIABLES LIKE"). WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) return utils.GetParser(db) From 1cbcf6b8cce9bd6a7267098149f6ce6ca103ef93 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Sep 2020 22:35:02 +0800 Subject: [PATCH 22/23] fix IT --- pkg/utils/db.go | 21 +++++++++++++++++++-- tests/relay_interrupt/run.sh | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index c37bc1fd7a..aba62c8763 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -23,19 +23,20 @@ import ( "strings" "time" + "github.com/pingcap/failpoint" + "go.uber.org/zap" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/parser" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/check" "github.com/pingcap/tidb-tools/pkg/dbutil" gmysql "github.com/siddontang/go-mysql/mysql" - "go.uber.org/zap" ) var ( @@ -274,6 +275,22 @@ func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) { // GetSessionVariable gets connection's session variable func GetSessionVariable(conn *sql.Conn, variable string) (value string, err error) { + failpoint.Inject("GetSessionVariableFailed", func(val failpoint.Value) { + items := strings.Split(val.(string), ",") + if len(items) != 2 { + log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string))) + } + variableName := items[0] + errCode, err1 := strconv.ParseUint(items[1], 10, 16) + if err1 != nil { + log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string))) + } + if variable == variableName { + err = tmysql.NewErr(uint16(errCode)) + log.L().Warn("GetSessionVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetSessionVariableFailed"), zap.Error(err)) + failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError)) + } + }) return getVariable(conn, variable, false) } diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index e6855589c7..00ae4f6ba7 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -24,7 +24,7 @@ function run() { failpoints=( # 1152 is ErrAbortingConnection "github.com/pingcap/dm/pkg/utils/GetGlobalVariableFailed=return(\"server_uuid,1152\")" - "github.com/pingcap/dm/pkg/utils/GetGlobalVariableFailed=return(\"sql_mode,1152\")" + "github.com/pingcap/dm/pkg/utils/GetSessionVariableFailed=return(\"sql_mode,1152\")" ) for(( i=0;i<${#failpoints[@]};i++)) do From 5281599e3d38beab4e83e85bc3349e1af04797be Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 5 Sep 2020 22:42:44 +0800 Subject: [PATCH 23/23] fix comment --- dm/config/task_test.go | 1 - pkg/binlog/reader/util.go | 2 +- syncer/syncer.go | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 686ebbfb37..1b27931fe3 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -550,7 +550,6 @@ func (t *testConfig) TestFromSubTaskConfigs(c *C) { "sync-02": &stCfg2.SyncerConfig, }, CleanDumpFile: stCfg1.CleanDumpFile, - //EnableANSIQuotes: stCfg1.EnableANSIQuotes, } c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare. diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index 573b2b4272..eb3439af7d 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -64,7 +64,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid case *replication.QueryEvent: parser2, err := event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) + log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err)) parser2 = parser.New() } diff --git a/syncer/syncer.go b/syncer/syncer.go index cc84eb6300..1911f2f04e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1675,7 +1675,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e usedSchema := string(ev.Schema) parser2, err := event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("can't determine ANSI_QUOTES from binlog status_vars, use no ANSI_QUOTES instead", zap.Error(err)) + log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err)) parser2 = parser.New() }