Skip to content

Commit

Permalink
*: Add sql-mode config for drainer (#511) (#513)
Browse files Browse the repository at this point in the history
If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql.
If this is not setted, it will not set any sql-mode.

(cherry picked from commit 06e378f)
  • Loading branch information
lonng authored and july2993 committed Apr 1, 2019
1 parent ba2724a commit 2db959a
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 27 deletions.
5 changes: 5 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pd-urls = "http://127.0.0.1:2379"
# syncer Configuration.
[syncer]

# Assume the upstream sql-mode.
# If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql.
# If this is not setted, it will not set any sql-mode.
# sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"

# disable sync these schema
ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"

Expand Down
13 changes: 12 additions & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/BurntSushi/toml"
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"

"github.com/pingcap/tidb-binlog/drainer/executor"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/flags"
Expand Down Expand Up @@ -42,6 +44,8 @@ var (

// SyncerConfig is the Syncer's configuration.
type SyncerConfig struct {
StrSQLMode *string `toml:"sql-mode" json:"sql-mode"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
Expand Down Expand Up @@ -81,7 +85,7 @@ func NewConfig() *Config {

cfg := &Config{
EtcdTimeout: defaultEtcdTimeout,
SyncerCfg: new(SyncerConfig),
SyncerCfg: &SyncerConfig{},
}
cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError)
fs := cfg.FlagSet
Expand Down Expand Up @@ -156,6 +160,13 @@ func (cfg *Config) Parse(args []string) error {
return errors.Trace(err)
}

if cfg.SyncerCfg.StrSQLMode != nil {
cfg.SyncerCfg.SQLMode, err = mysql.GetSQLMode(*cfg.SyncerCfg.StrSQLMode)
if err != nil {
return errors.Annotate(err, "invalid config: `sql-mode` must be a valid SQL_MODE")
}
}

cfg.tls, err = cfg.Security.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config %+v error %v", cfg.Security, err)
Expand Down
25 changes: 25 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
)

// Hook up gocheck into the "go test" runner.
Expand All @@ -30,4 +31,28 @@ func (t *testDrainerSuite) TestConfig(c *C) {
c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql")
c.Assert(cfg.SyncerCfg.To.Host, Equals, "127.0.0.1")
var strSQLMode *string
c.Assert(cfg.SyncerCfg.StrSQLMode, Equals, strSQLMode)
c.Assert(cfg.SyncerCfg.SQLMode, Equals, mysql.SQLMode(0))
}

func (t *testDrainerSuite) TestValidate(c *C) {
cfg := NewConfig()

cfg.ListenAddr = "http://123:9091"
err := cfg.validate()
c.Assert(err, ErrorMatches, ".*ListenAddr.*")
cfg.ListenAddr = "http://192.168.10.12:9091"

cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")

cfg.EtcdURLs = "http://127.0.0.1,http://192.168.12.12"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")

cfg.EtcdURLs = "http://127.0.0.1:2379,http://192.168.12.12:2379"
err = cfg.validate()
c.Assert(err, IsNil)
}
4 changes: 2 additions & 2 deletions drainer/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Executor interface {
}

// New returns the an Executor instance by given name
func New(name string, cfg *DBConfig) (Executor, error) {
func New(name string, cfg *DBConfig, sqlMode *string) (Executor, error) {
switch name {
case "mysql", "tidb":
return newMysql(cfg)
return newMysql(cfg, sqlMode)
case "pb":
return newPB(cfg)
case "flash":
Expand Down
4 changes: 2 additions & 2 deletions drainer/executor/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type mysqlExecutor struct {
*baseError
}

func newMysql(cfg *DBConfig) (Executor, error) {
db, err := pkgsql.OpenDB("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password)
func newMysql(cfg *DBConfig, sqlMode *string) (Executor, error) {
db, err := pkgsql.OpenDBWithSQLMode("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password, sqlMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
13 changes: 7 additions & 6 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv/oracle"
pb "github.com/pingcap/tipb/go-binlog"

"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/executor"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
"github.com/pingcap/tidb/store/tikv/oracle"
pb "github.com/pingcap/tipb/go-binlog"
)

var (
Expand Down Expand Up @@ -130,13 +131,13 @@ func (s *Syncer) checkWait(job *job) bool {

func (s *Syncer) enableSafeModeInitializationPhase() {
// set safeMode to true and useInsert to flase at the first, and will use the config after 5 minutes.
s.translator.SetConfig(true)
s.translator.SetConfig(true, s.cfg.SQLMode)

go func() {
ctx, cancel := context.WithCancel(s.ctx)
defer func() {
cancel()
s.translator.SetConfig(s.cfg.SafeMode)
s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode)
}()

select {
Expand Down Expand Up @@ -418,7 +419,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount, s.cfg.StrSQLMode)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -428,7 +429,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Trace(err)
}

s.translator.SetConfig(s.cfg.SafeMode)
s.translator.SetConfig(s.cfg.SafeMode, s.cfg.SQLMode)
go s.enableSafeModeInitializationPhase()

for i := 0; i < s.cfg.WorkerCount; i++ {
Expand Down
12 changes: 9 additions & 3 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
parsermysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/dml"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/mysql"
Expand All @@ -19,14 +20,17 @@ import (
)

// flashTranslator translates TiDB binlog to flash sqls
type flashTranslator struct{}
type flashTranslator struct {
sqlMode parsermysql.SQLMode
}

func init() {
Register("flash", &flashTranslator{})
}

// Config set the configuration
func (f *flashTranslator) SetConfig(bool) {
func (f *flashTranslator) SetConfig(_ bool, sqlMode parsermysql.SQLMode) {
f.sqlMode = sqlMode
}

func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) {
Expand Down Expand Up @@ -191,7 +195,9 @@ func (f *flashTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, r

func (f *flashTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) {
schema = strings.ToLower(schema)
stmt, err := parser.New().ParseOneStmt(sql, "", "")
ddlParser := parser.New()
ddlParser.SetSQLMode(f.sqlMode)
stmt, err := ddlParser.ParseOneStmt(sql, "", "")
if err != nil {
return "", errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
parsermysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
"github.com/pingcap/tidb/mysql"
Expand All @@ -24,7 +25,7 @@ func init() {
Register("kafka", &kafkaTranslator{})
}

func (p *kafkaTranslator) SetConfig(bool) {
func (p *kafkaTranslator) SetConfig(bool, parsermysql.SQLMode) {
// do nothing
}

Expand Down
9 changes: 7 additions & 2 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
parsermysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/dml"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/mysql"
Expand All @@ -27,19 +28,21 @@ const implicitColID = -1
type mysqlTranslator struct {
// safeMode is a mode for translate sql, will translate update to delete and replace, and translate insert to replace.
safeMode int32
sqlMode parsermysql.SQLMode
}

func init() {
Register("mysql", &mysqlTranslator{})
Register("tidb", &mysqlTranslator{})
}

func (m *mysqlTranslator) SetConfig(safeMode bool) {
func (m *mysqlTranslator) SetConfig(safeMode bool, sqlMode parsermysql.SQLMode) {
if safeMode {
atomic.StoreInt32(&m.safeMode, 1)
} else {
atomic.StoreInt32(&m.safeMode, 0)
}
m.sqlMode = sqlMode
}

func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) {
Expand Down Expand Up @@ -272,7 +275,9 @@ func (m *mysqlTranslator) genDeleteSQL(schema string, table *model.TableInfo, co
}

func (m *mysqlTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) {
stmt, err := parser.New().ParseOneStmt(sql, "", "")
ddlParser := parser.New()
ddlParser.SetSQLMode(m.sqlMode)
stmt, err := ddlParser.ParseOneStmt(sql, "", "")
if err != nil {
return "", errors.Trace(err)
}
Expand Down
10 changes: 7 additions & 3 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand All @@ -19,14 +20,15 @@ import (

// pbTranslator translates TiDB binlog to self-description protobuf
type pbTranslator struct {
sqlMode mysql.SQLMode
}

func init() {
Register("pb", &pbTranslator{})
}

func (p *pbTranslator) SetConfig(bool) {
// do nothing
func (p *pbTranslator) SetConfig(_ bool, sqlMode mysql.SQLMode) {
p.sqlMode = sqlMode
}

func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error) {
Expand Down Expand Up @@ -182,7 +184,9 @@ func (p *pbTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, rows
}

func (p *pbTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (string, error) {
stmt, err := parser.New().ParseOneStmt(sql, "", "")
ddlParser := parser.New()
ddlParser.SetSQLMode(p.sqlMode)
stmt, err := ddlParser.ParseOneStmt(sql, "", "")
if err != nil {
return "", errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion drainer/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
parsermysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -35,7 +36,7 @@ var providers = make(map[string]SQLTranslator)
// SQLTranslator is the interface for translating TiDB binlog to target sqls
type SQLTranslator interface {
// Config set the configuration
SetConfig(safeMode bool)
SetConfig(safeMode bool, sqlMode parsermysql.SQLMode)

// GenInsertSQLs generates the insert sqls
GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte, commitTS int64) ([]string, [][]string, [][]interface{}, error)
Expand Down
3 changes: 2 additions & 1 deletion drainer/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
parsermysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (t *testTranslatorSuite) TestTranslater(c *C) {
}

func testGenInsertSQLs(c *C, s SQLTranslator, safeMode bool) {
s.SetConfig(safeMode)
s.SetConfig(safeMode, parsermysql.ModeStrictTransTables|parsermysql.ModeNoEngineSubstitution)
schema := "t"
tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")}
exceptedKeys := []int{3, 2, 1}
Expand Down
4 changes: 2 additions & 2 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func closeExecutors(executors ...executor.Executor) {
}
}

func createExecutors(destDBType string, cfg *executor.DBConfig, count int) ([]executor.Executor, error) {
func createExecutors(destDBType string, cfg *executor.DBConfig, count int, sqlMODE *string) ([]executor.Executor, error) {
executors := make([]executor.Executor, 0, count)
for i := 0; i < count; i++ {
executor, err := executor.New(destDBType, cfg)
executor, err := executor.New(destDBType, cfg, sqlMODE)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -116,17 +117,26 @@ func ExecuteTxnWithHistogram(db *sql.DB, sqls []string, args [][]interface{}, hi
return nil
}

// OpenDB creates an instance of sql.DB.
func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) {
// OpenDBWithSQLMode creates an instance of sql.DB.
func OpenDBWithSQLMode(proto string, host string, port int, username string, password string, sqlMode *string) (*sql.DB, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&multiStatements=true", username, password, host, port)
if sqlMode != nil {
// same as "set sql_mode = '<sqlMode>'"
dbDSN += "&sql_mode='" + url.QueryEscape(*sqlMode) + "'"
}
db, err := sql.Open(proto, dbDSN)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "dsn: %s", dbDSN)
}

return db, nil
}

// OpenDB creates an instance of sql.DB.
func OpenDB(proto string, host string, port int, username string, password string) (*sql.DB, error) {
return OpenDBWithSQLMode(proto, host, port, username, password, nil)
}

// IgnoreDDLError checks the error can be ignored or not.
func IgnoreDDLError(err error) bool {
errCode, ok := GetSQLErrCode(err)
Expand Down

0 comments on commit 2db959a

Please sign in to comment.