Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: auto discover ANSI_QUOTES #889 #1005

Merged
merged 1 commit into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ type testChecker struct{}
var _ = tc.Suite(&testChecker{})

func (t *testChecker) TestCheckSyncConfig(c *tc.C) {
c.Assert(CheckSyncConfig(context.Background(), nil, false), tc.IsNil)
c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil)

cfgs := []*config.SubTaskConfig{
{
IgnoreCheckingItems: []string{config.AllChecking},
},
}
c.Assert(CheckSyncConfig(context.Background(), cfgs, false), tc.IsNil)
c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil)
}
11 changes: 4 additions & 7 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type Checker struct {

instances []*mysqlInstance

enableANSIQuotes bool

checkList []check.Checker
checkingItems map[string]string
result struct {
Expand All @@ -80,12 +78,11 @@ type Checker struct {
}

// NewChecker returns a checker
func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, enableANSIQuotes bool) *Checker {
func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker {
c := &Checker{
instances: make([]*mysqlInstance, 0, len(cfgs)),
checkingItems: checkingItems,
logger: log.With(zap.String("unit", "task check")),
enableANSIQuotes: enableANSIQuotes,
instances: make([]*mysqlInstance, 0, len(cfgs)),
checkingItems: checkingItems,
logger: log.With(zap.String("unit", "task check")),
}

for _, cfg := range cfgs {
Expand Down
6 changes: 3 additions & 3 deletions checker/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ var (
ErrorMsgHeader = "fail to check synchronization configuration with type"

// CheckSyncConfigFunc holds the CheckSyncConfig function
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error
CheckSyncConfigFunc func(ctx context.Context, cfgs []*config.SubTaskConfig) error
)

func init() {
CheckSyncConfigFunc = CheckSyncConfig
}

// CheckSyncConfig checks synchronization configuration
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableANSIQuotes bool) error {
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error {
if len(cfgs) == 0 {
return nil
}
Expand All @@ -56,7 +56,7 @@ func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, enableAN
return nil
}

c := NewChecker(cfgs, checkingItems, enableANSIQuotes)
c := NewChecker(cfgs, checkingItems)

err := c.Init(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ type SubTaskConfig struct {

CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"`

// deprecated, will auto discover SQL mode
EnableANSIQuotes bool `toml:"ansi-quotes" json:"ansi-quotes"`

// still needed by Syncer / Loader bin
Expand Down
8 changes: 4 additions & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
type LoaderConfig struct {
PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"`
Dir string `yaml:"dir" toml:"dir" json:"dir"`
SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit
}

func defaultLoaderConfig() LoaderConfig {
Expand Down Expand Up @@ -277,7 +278,7 @@ type TaskConfig struct {
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
// treat it as hidden configuration
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"`
// we store detail status in meta
// don't save configuration into it
Expand Down Expand Up @@ -311,7 +312,7 @@ type TaskConfig struct {
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`

CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"`

// deprecated
EnableANSIQuotes bool `yaml:"ansi-quotes" toml:"ansi-quotes" json:"ansi-quotes"`

// deprecated, replaced by `start-task --remove-meta`
Expand Down Expand Up @@ -523,7 +524,7 @@ func (c *TaskConfig) adjust() error {

// for backward compatible, set global config `ansi-quotes: true` if any syncer is true
if inst.Syncer.EnableANSIQuotes == true {
c.EnableANSIQuotes = true
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
}

if dupeRules := checkDuplicateString(inst.RouteRules); len(dupeRules) > 0 {
Expand Down Expand Up @@ -632,7 +633,6 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
c.TargetDB = &stCfg0.To // just ref
c.OnlineDDLScheme = stCfg0.OnlineDDLScheme
c.CleanDumpFile = stCfg0.CleanDumpFile
c.EnableANSIQuotes = stCfg0.EnableANSIQuotes
c.MySQLInstances = make([]*MySQLInstance, 0, len(stCfgs))
c.BAList = make(map[string]*filter.Rules)
c.Routes = make(map[string]*router.TableRule)
Expand Down
3 changes: 1 addition & 2 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func (t *testConfig) TestFromSubTaskConfigs(c *C) {
"sync-01": &stCfg1.SyncerConfig,
"sync-02": &stCfg2.SyncerConfig,
},
CleanDumpFile: stCfg1.CleanDumpFile,
EnableANSIQuotes: stCfg1.EnableANSIQuotes,
CleanDumpFile: stCfg1.CleanDumpFile,
}

c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare.
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

err = checker.CheckSyncConfigFunc(ctx, stCfgs, cfg.EnableANSIQuotes)
err = checker.CheckSyncConfigFunc(ctx, stCfgs)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (t *testMaster) TestStartTask(c *check.C) {

// test start task, but the first step check-task fails
bakCheckSyncConfigFunc := checker.CheckSyncConfigFunc
checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig, _ bool) error {
checker.CheckSyncConfigFunc = func(_ context.Context, _ []*config.SubTaskConfig) error {
return errors.New(errCheckSyncConfig)
}
defer func() {
Expand Down
17 changes: 7 additions & 10 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling {
func (m *Dumpling) Init(ctx context.Context) error {
var err error
m.dumpConfig, err = m.constructArgs()
m.detectAnsiQuotes()
m.detectSQLMode()
return err
}

Expand Down Expand Up @@ -274,21 +274,18 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
return dumpConfig, nil
}

// detectAnsiQuotes tries to detect ANSI_QUOTES from upstream. If success, change EnableANSIQuotes in subtask config
func (m *Dumpling) detectAnsiQuotes() {
// detectSQLMode tries to detect SQL mode from upstream. If success, write it to LoaderConfig
func (m *Dumpling) detectSQLMode() {
db, err := sql.Open("mysql", m.dumpConfig.GetDSN(""))
if err != nil {
return
}
defer db.Close()
enable, err := utils.HasAnsiQuotesMode(db)

sqlMode, err := utils.GetGlobalVariable(db, "sql_mode")
if err != nil {
return
}
if enable != m.cfg.EnableANSIQuotes {
m.logger.Warn("found mismatched ANSI_QUOTES setting, going to overwrite it to DB specified",
zap.Bool("DB specified", enable),
zap.Bool("config file specified", m.cfg.EnableANSIQuotes))
}
m.cfg.EnableANSIQuotes = enable
m.logger.Info("found upstream SQL mode", zap.String("SQL mode", sqlMode))
m.cfg.LoaderConfig.SQLMode = sqlMode
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500
github.com/pingcap/tidb v1.1.0-beta.0.20200901032733-f82e5320ad75
github.com/pingcap/tidb-tools v4.0.5-0.20200828085514-03575b185007+incompatible
github.com/prometheus/client_golang v1.5.1
Expand All @@ -44,7 +44,7 @@ require (
github.com/uber-go/atomic v1.4.0 // indirect
github.com/unrolled/render v1.0.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ github.com/pingcap/parser v0.0.0-20200731033026-84f62115187c/go.mod h1:vQdbJqobJ
github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4 h1:ATFD3gmwkSHcPt5DuQK3dZwqDU49WXOq/zRmwPJ6Nks=
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500 h1:dnXiRdcubiMSkzHBBSkJo+9E+OX3+Z629ZEQEIVfBsI=
github.com/pingcap/parser v0.0.0-20200905070518-a6534dd22500/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2 h1:JTzYYukREvxVSKW/ncrzNjFitd8snoQ/Xz32pw8i+s8=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s=
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA=
Expand Down Expand Up @@ -862,6 +864,8 @@ go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo=
go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20180214000028-650f4a345ab4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
11 changes: 5 additions & 6 deletions loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
tcontext "github.com/pingcap/dm/pkg/context"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
tmysql "github.com/pingcap/parser/mysql"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"
)
Expand Down Expand Up @@ -231,15 +230,15 @@ func tableName(schema, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, enableANSIQuotes bool) (*tableInfo, error) {
func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file string, sqlMode string) (*tableInfo, error) {
statement, err := exportStatement(file)
if err != nil {
return nil, err
}

parser2 := parser.New()
if enableANSIQuotes {
parser2.SetSQLMode(tmysql.ModeANSIQuotes)
parser2, err := utils.GetParserFromSQLModeStr(sqlMode)
if err != nil {
return nil, err
}

stmts, err := parserpkg.Parse(parser2, string(statement), "", "")
Expand Down
4 changes: 2 additions & 2 deletions loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", true)
tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", false)
tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
Expand Down
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (l *Loader) restoreData(ctx context.Context) error {
dataFiles := tables[table]
tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table)
if _, ok := l.tableInfos[tableName(db, table)]; !ok {
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.EnableANSIQuotes)
l.tableInfos[tableName(db, table)], err = parseTable(tctx, l.tableRouter, db, table, tableFile, l.cfg.LoaderConfig.SQLMode)
if err != nil {
return terror.Annotatef(err, "parse table %s/%s", db, table)
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"math"
"reflect"

"github.com/pingcap/parser"
"github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"

Expand Down Expand Up @@ -305,24 +307,34 @@ var (
}
)

// GetAnsiQuotesMode gets ansi-quotes mode from binlog statusVars
func GetAnsiQuotesMode(statusVars []byte) (bool, error) {
// getSQLMode gets SQL mode from binlog statusVars
func getSQLMode(statusVars []byte) (mysql.SQLMode, error) {
vars, err := statusVarsToKV(statusVars)
if err != nil {
return false, err
return mysql.ModeNone, err
}
b, ok := vars[QSqlModeCode]
if !ok {
// TODO(lance6716): if this will happen, create a terror
return false, errors.New("Q_SQL_MODE_CODE not found in status_vars")
// should not happen
return mysql.ModeNone, errors.New("Q_SQL_MODE_CODE not found in status_vars")
}

r := bytes.NewReader(b)
var v int64
_ = binary.Read(r, binary.LittleEndian, &v)

// MODE_ANSI_QUOTES = 0x00000004 ref: https://dev.mysql.com/doc/internals/en/query-event.html#q-sql-mode-code
return v&0x00000004 != 0, nil
return mysql.SQLMode(v), nil
}

// GetParserForStatusVars gets a parser for binlog which is suitable for its sql_mode in statusVars
func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) {
parser2 := parser.New()
mode, err := getSQLMode(statusVars)
if err != nil {
return nil, err
}
parser2.SetSQLMode(mode)
return parser2, nil
}

// if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

parser2, err := utils.GetParser(t.db, false)
parser2, err := utils.GetParser(t.db)
c.Assert(err, IsNil)

forLoop:
Expand Down
10 changes: 9 additions & 1 deletion pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
uuid "github.com/satori/go.uuid"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) {
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Expand Down Expand Up @@ -60,6 +62,12 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse
// NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction.
switch ev := e.Event.(type) {
case *replication.QueryEvent:
parser2, err := event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("can't determine sql_mode from binlog status_vars, use default parser instead", zap.Error(err))
parser2 = parser.New()
}

isDDL := common.CheckIsDDL(string(ev.Query), parser2)
if isDDL {
if latestGSet == nil {
Expand Down
7 changes: 2 additions & 5 deletions pkg/binlog/reader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,11 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
endPos, endGS, err := utils.GetMasterStatus(t.db, flavor)
c.Assert(err, IsNil)

parser2, err := utils.GetParser(t.db, false)
c.Assert(err, IsNil)

r1 := NewTCPReader(cfg)
c.Assert(r1, NotNil)
defer r1.Close()

gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2)
gs, err := GetGTIDsForPos(ctx, r1, endPos)
c.Assert(err, IsNil)
c.Assert(gs.Equal(endGS), IsTrue)

Expand All @@ -62,7 +59,7 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{
Name: endPos.Name,
Pos: endPos.Pos - 1,
}, parser2)
})
c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*")
c.Assert(gs, IsNil)
}
Loading