Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: auto discover ANSI_QUOTES #889

Merged
merged 36 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1dc615f
stash
lance6716 Aug 12, 2020
f9df7ec
stash
lance6716 Aug 12, 2020
322bcba
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Aug 12, 2020
7aa308c
*: auto discover ANSI_QUOTES
lance6716 Aug 12, 2020
0a3a8cb
fix ut
lance6716 Aug 12, 2020
f10a4bc
fix it
lance6716 Aug 12, 2020
bcd4c99
don't know why dm-master exit
lance6716 Aug 12, 2020
6d07410
fix it
lance6716 Aug 13, 2020
ab10da4
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Aug 13, 2020
86f4404
re-sort import
lance6716 Aug 13, 2020
d3f5b59
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Aug 24, 2020
104f5df
address comment
lance6716 Aug 24, 2020
2e6c012
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Aug 25, 2020
8cb5efa
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Aug 26, 2020
6ac75cc
track master
lance6716 Aug 26, 2020
488db8e
update behaviour
lance6716 Aug 26, 2020
730f09c
Merge branch 'master' into dev
lance6716 Aug 26, 2020
c785c6d
Merge branch 'master' into dev
lance6716 Aug 28, 2020
0ea56b3
fix test
lance6716 Aug 28, 2020
9db590e
fix CI
lance6716 Aug 28, 2020
b7ba9ec
Merge branch 'master' into dev
lance6716 Aug 30, 2020
b63e82e
Merge branch 'master' into dev
lance6716 Sep 2, 2020
674c326
track commit
lance6716 Sep 3, 2020
ee26026
Merge branch 'master' into dev
lance6716 Sep 3, 2020
0c9a91a
fix test
lance6716 Sep 3, 2020
a68caef
Merge branch 'master' of https://github.com/pingcap/dm into dev
lance6716 Sep 3, 2020
6498e24
add comments
lance6716 Sep 3, 2020
d8d29e0
fix wrong comment
lance6716 Sep 3, 2020
83bacb7
save my work
lance6716 Sep 4, 2020
02df4e3
address comment
lance6716 Sep 5, 2020
5cb6a4a
Merge branch 'master' into dev
lance6716 Sep 5, 2020
d265353
update parser const
lance6716 Sep 5, 2020
090d610
fix test
lance6716 Sep 5, 2020
1cbcf6b
fix IT
lance6716 Sep 5, 2020
5281599
fix comment
lance6716 Sep 5, 2020
0018d89
Merge branch 'master' into dev
GMHDBJD Sep 7, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ type testChecker struct{}
var _ = tc.Suite(&testChecker{})

func (t *testChecker) TestCheckSyncConfig(c *tc.C) {
c.Assert(CheckSyncConfig(context.Background(), nil, false), tc.IsNil)
c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil)

cfgs := []*config.SubTaskConfig{
{
IgnoreCheckingItems: []string{config.AllChecking},
},
}
c.Assert(CheckSyncConfig(context.Background(), cfgs, false), tc.IsNil)
c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil)
}
11 changes: 4 additions & 7 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type Checker struct {

instances []*mysqlInstance

enableANSIQuotes bool

checkList []check.Checker
checkingItems map[string]string
result struct {
Expand All @@ -80,12 +78,11 @@ type Checker struct {
}

// NewChecker returns a checker
func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, enableANSIQuotes bool) *Checker {
func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker {
c := &Checker{
instances: make([]*mysqlInstance, 0, len(cfgs)),
checkingItems: checkingItems,
logger: log.With(zap.String("unit", "task check")),
enableANSIQuotes: enableANSIQuotes,
instances: make([]*mysqlInstance, 0, len(cfgs)),
checkingItems: checkingItems,
logger: log.With(zap.String("unit", "task check")),
}

for _, cfg := range cfgs {
Expand Down
6 changes: 3 additions & 3 deletions checker/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ var (
ErrorMsgHeader = "fail to check synchronization configuration with type"

// CheckSyncConfigFunc holds the CheckSyncConfig function
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig) error
)

func init() {
CheckSyncConfigFunc = CheckSyncConfig
}

// CheckSyncConfig checks synchronization configuration
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error {
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error {
if len(cfgs) == 0 {
return nil
}
Expand All @@ -56,7 +56,7 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableAN
return nil
}

c := NewChecker(cfgs, checkingItems, enableANSIQuotes)
c := NewChecker(cfgs, checkingItems)

err := c.Init(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ type SubTaskConfig struct {

CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"`

// deprecated, will auto discover SQL mode
EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"`

// still needed by Syncer / Loader bin
Expand Down
8 changes: 4 additions & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
type LoaderConfig struct {
PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"`
Dir string `yaml:"dir" toml:"dir" json:"dir"`
SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit
}

func defaultLoaderConfig() LoaderConfig {
Expand Down Expand Up @@ -277,7 +278,7 @@ type TaskConfig struct {
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
// treat it as hidden configuration
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"`
// we store detail status in meta
// don't save configuration into it
Expand Down Expand Up @@ -311,7 +312,7 @@ type TaskConfig struct {
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`

CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"`

// deprecated
EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"`

// deprecated, replaced by `start-task --remove-meta`
Expand Down Expand Up @@ -523,7 +524,7 @@ func (c *TaskConfig) adjust() error {

// for backward compatible, set global config `ansi-quotes: true` if any syncer is true
if inst.Syncer.EnableANSIQuotes == true {
c.EnableANSIQuotes = true
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
}

if dupeRules := checkDuplicateString(inst.RouteRules); len(dupeRules) > 0 {
Expand Down Expand Up @@ -632,7 +633,6 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
c.TargetDB = &stCfg0.To // just ref
c.OnlineDDLScheme = stCfg0.OnlineDDLScheme
c.CleanDumpFile = stCfg0.CleanDumpFile
c.EnableANSIQuotes = stCfg0.EnableANSIQuotes
c.MySQLInstances = make([]*MySQLInstance, 0, len(stCfgs))
c.BAList = make(map[string]*filter.Rules)
c.Routes = make(map[string]*router.TableRule)
Expand Down
3 changes: 1 addition & 2 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func (t *testConfig) TestFromSubTaskConfigs(c *C) {
"sync-01": &stCfg1.SyncerConfig,
"sync-02": &stCfg2.SyncerConfig,
},
CleanDumpFile: stCfg1.CleanDumpFile,
EnableANSIQuotes: stCfg1.EnableANSIQuotes,
CleanDumpFile: stCfg1.CleanDumpFile,
}

c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare.
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

err = checker.CheckSyncConfigFunc(ctx, stCfgs, cfg.EnableANSIQuotes)
err = checker.CheckSyncConfigFunc(ctx, stCfgs)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (t *testMaster) TestStartTask(c *check.C) {

// test start task, but the first step check-task fails
bakCheckSyncConfigFunc := checker.CheckSyncConfigFunc
checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _ bool) error {
checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig) error {
return errors.New(errCheckSyncConfig)
}
defer func() {
Expand Down
17 changes: 7 additions & 10 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling {
func (m *Dumpling) Init(ctx context.Context) error {
var err error
m.dumpConfig, err = m.constructArgs()
m.detectAnsiQuotes()
m.detectSQLMode()
return err
}

Expand Down Expand Up @@ -274,21 +274,18 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
return dumpConfig, nil
}

// detectAnsiQuotes tries to detect ANSI_QUOTES from upstream. If success, change EnableANSIQuotes in subtask config
func (m *Dumpling) detectAnsiQuotes() {
// detectSQLMode tries to detect SQL mode from upstream. If success, write it to LoaderConfig
func (m *Dumpling) detectSQLMode() {
db, err := sql.Open("mysql", m.dumpConfig.GetDSN(""))
if err != nil {
return
}
defer db.Close()
enable, err := utils.HasAnsiQuotesMode(db)

sqlMode, err := utils.GetGlobalVariable(db, "sql_mode")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumpling didn't expose it's connection, so in fact we can't see its session variable. and by GetDSN and config, we can change its session variable different than global ones

if err != nil {
return
}
if enable != m.cfg.EnableANSIQuotes {
m.logger.Warn("found mismatched ANSI_QUOTES setting, going to overwrite it to DB specified",
zap.Bool("DB specified", enable),
zap.Bool("config file specified", m.cfg.EnableANSIQuotes))
}
m.cfg.EnableANSIQuotes = enable
m.logger.Info("found upstream SQL mode", zap.String("SQL mode", sqlMode))
m.cfg.LoaderConfig.SQLMode = sqlMode
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500
github.com/pingcap/tidb v1.1.0-beta.0.20200901032733-f82e5320ad75
github.com/pingcap/tidb-tools v4.0.5-0.20200828085514-03575b185007+incompatible
github.com/prometheus/client_golang v1.5.1
Expand All @@ -44,7 +44,7 @@ require (
github.com/uber-go/atomic v1.4.0 // indirect
github.com/unrolled/render v1.0.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c/go.mod h1:vQdbJqobJ
github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 h1:ATFD3gmwkSHcPt5DuQK3dZwqDU49WXOq/zRmwPJ6Nks=
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 h1:dnXiRdcubiMSkzHBBSkJo+9E+OX3+Z629ZEQEIVfBsI=
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2 h1:JTzYYukREvxVSKW/ncrzNjFitd8snoQ/Xz32pw8i+s8=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s=
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA=
Expand Down Expand Up @@ -862,6 +864,8 @@ go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20180214000028-650f4a345ab4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
11 changes: 5 additions & 6 deletions loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
tcontext "github.com/pingcap/dm/pkg/context"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
tmysql "github.com/pingcap/parser/mysql"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"
)
Expand Down Expand Up @@ -231,15 +230,15 @@ func tableName(schema, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, enableANSIQuotes bool) (*tableInfo, error) {
func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, sqlMode string) (*tableInfo, error) {
statement, err := exportStatement(file)
if err != nil {
return nil, err
}

parser2 := parser.New()
if enableANSIQuotes {
parser2.SetSQLMode(tmysql.ModeANSIQuotes)
parser2, err := utils.GetParserFromSQLModeStr(sqlMode)
if err != nil {
return nil, err
}

stmts, err := parserpkg.Parse(parser2, string(statement), "", "")
Expand Down
4 changes: 2 additions & 2 deletions loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", true)
tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", false)
tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
Expand Down
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (l *Loader) restoreData(ctx context.Context) error {
dataFiles := tables[table]
tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table)
if _, ok := l.tableInfos[tableName(db, table)]; !ok {
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.EnableANSIQuotes)
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode)
if err != nil {
return terror.Annotatef(err, "parse table %s/%s", db, table)
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"math"
"reflect"

"github.com/pingcap/parser"
"github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"

Expand Down Expand Up @@ -305,24 +307,34 @@ var (
}
)

// GetAnsiQuotesMode gets ansi-quotes mode from binlog statusVars
func GetAnsiQuotesMode(statusVars []byte) (bool, error) {
// getSQLMode gets SQL mode from binlog statusVars
func getSQLMode(statusVars []byte) (mysql.SQLMode, error) {
vars, err := statusVarsToKV(statusVars)
if err != nil {
return false, err
return mysql.ModeNone, err
}
b, ok := vars[QSqlModeCode]
if !ok {
// TODO(lance6716): if this will happen, create a terror
return false, errors.New("Q_SQL_MODE_CODE not found in status_vars")
// should not happen
return mysql.ModeNone, errors.New("Q_SQL_MODE_CODE not found in status_vars")
}

r := bytes.NewReader(b)
var v int64
_ = binary.Read(r, binary.LittleEndian, &v)

// MODE_ANSI_QUOTES = 0x00000004 ref: https://dev.mysql.com/doc/internals/en/query-event.html#q-sql-mode-code
return v&0x00000004 != 0, nil
return mysql.SQLMode(v), nil
}

// GetParserForStatusVars gets a parser for binlog which is suitable for its sql_mode in statusVars
func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) {
parser2 := parser.New()
mode, err := getSQLMode(statusVars)
if err != nil {
return nil, err
}
parser2.SetSQLMode(mode)
return parser2, nil
}

// if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

parser2, err := utils.GetParser(t.db, false)
parser2, err := utils.GetParser(t.db)
c.Assert(err, IsNil)

forLoop:
Expand Down
10 changes: 9 additions & 1 deletion pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
uuid "github.com/satori/go.uuid"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) {
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Expand Down Expand Up @@ -60,6 +62,12 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse
// NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction.
switch ev := e.Event.(type) {
case *replication.QueryEvent:
parser2, err := event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err))
parser2 = parser.New()
}

isDDL := common.CheckIsDDL(string(ev.Query), parser2)
if isDDL {
if latestGSet == nil {
Expand Down
7 changes: 2 additions & 5 deletions pkg/binlog/reader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,11 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
endPos, endGS, err := utils.GetMasterStatus(t.db, flavor)
c.Assert(err, IsNil)

parser2, err := utils.GetParser(t.db, false)
c.Assert(err, IsNil)

r1 := NewTCPReader(cfg)
c.Assert(r1, NotNil)
defer r1.Close()

gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2)
gs, err := GetGTIDsForPos(ctx, r1, endPos)
c.Assert(err, IsNil)
c.Assert(gs.Equal(endGS), IsTrue)

Expand All @@ -62,7 +59,7 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{
Name: endPos.Name,
Pos: endPos.Pos - 1,
}, parser2)
})
c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*")
c.Assert(gs, IsNil)
}
Loading