From 1812a7100eaab41d763399ca583b778621622edd Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 4 Jan 2022 21:48:35 +0800 Subject: [PATCH] sink(ticdc): adjust sql mode compatibility for mysql sink (#3938) (#4197) --- cdc/sink/mysql.go | 64 +++++++++++------------ cdc/sink/mysql_params_test.go | 6 +-- cdc/sink/mysql_test.go | 30 +++++------ pkg/applier/redo_test.go | 3 ++ tests/mq_protocol_tests/framework/task.go | 11 ++-- 5 files changed, 58 insertions(+), 56 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 5ad82c22bf5..a902b137148 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/common" + dmutils "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/cyclic" "github.com/pingcap/tiflow/pkg/cyclic/mark" @@ -130,6 +131,30 @@ func newMySQLSink( } defer testDB.Close() + // Adjust sql_mode for compatibility. + dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB) + if err != nil { + return nil, errors.Trace(err) + } + dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"]) + if err != nil { + return nil, errors.Trace(err) + } + + // Adjust sql_mode for cyclic replication. + var sinkCyclic *cyclic.Cyclic = nil + if val, ok := opts[mark.OptCyclicConfig]; ok { + cfg := new(config.CyclicConfig) + err := cfg.Unmarshal([]byte(val)) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + sinkCyclic = cyclic.NewCyclic(cfg) + dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"]) + } + // NOTE: quote the string is necessary to avoid ambiguities. + dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"]) + dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB) if err != nil { return nil, errors.Trace(err) @@ -157,6 +182,7 @@ func newMySQLSink( db: db, params: params, filter: filter, + cyclic: sinkCyclic, txnCache: common.NewUnresolvedTxnCache(), statistics: NewStatistics(ctx, "mysql", opts), metricConflictDetectDurationHis: metricConflictDetectDurationHis, @@ -166,25 +192,10 @@ func newMySQLSink( cancel: cancel, } - if val, ok := opts[mark.OptCyclicConfig]; ok { - cfg := new(config.CyclicConfig) - err := cfg.Unmarshal([]byte(val)) - if err != nil { - return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) - } - sink.cyclic = cyclic.NewCyclic(cfg) - - err = sink.adjustSQLMode(ctx) - if err != nil { - return nil, errors.Trace(err) - } - } - sink.execWaitNotifier = new(notify.Notifier) sink.resolvedNotifier = new(notify.Notifier) err = sink.createSinkWorkers(ctx) - if err != nil { return nil, err } @@ -348,26 +359,13 @@ func needSwitchDB(ddl *model.DDLEvent) bool { return true } -// adjustSQLMode adjust sql mode according to sink config. -func (s *mysqlSink) adjustSQLMode(ctx context.Context) error { - // Must relax sql mode to support cyclic replication, as downstream may have - // extra columns (not null and no default value). - if s.cyclic == nil || !s.cyclic.Enabled() { - return nil - } - var oldMode, newMode string - row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;") - err := row.Scan(&oldMode) - if err != nil { - return cerror.WrapError(cerror.ErrMySQLQueryError, err) - } - - newMode = cyclic.RelaxSQLMode(oldMode) - _, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode)) +func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) { + row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;") + err = row.Scan(&sqlMode) if err != nil { - return cerror.WrapError(cerror.ErrMySQLQueryError, err) + err = cerror.WrapError(cerror.ErrMySQLQueryError, err) } - return nil + return } func (s *mysqlSink) createSinkWorkers(ctx context.Context) error { diff --git a/cdc/sink/mysql_params_test.go b/cdc/sink/mysql_params_test.go index dadc6dc8fa4..63f408f722a 100644 --- a/cdc/sink/mysql_params_test.go +++ b/cdc/sink/mysql_params_test.go @@ -62,7 +62,7 @@ func TestGenerateDSNByParams(t *testing.T) { defer testleak.AfterTestT(t)() testDefaultParams := func() { - db, err := mockTestDB() + db, err := mockTestDB(false) require.Nil(t, err) defer db.Close() @@ -84,7 +84,7 @@ func TestGenerateDSNByParams(t *testing.T) { } testTimezoneParam := func() { - db, err := mockTestDB() + db, err := mockTestDB(false) require.Nil(t, err) defer db.Close() @@ -98,7 +98,7 @@ func TestGenerateDSNByParams(t *testing.T) { } testTimeoutParams := func() { - db, err := mockTestDB() + db, err := mockTestDB(false) require.Nil(t, err) defer db.Close() diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index e9965144446..cf42a6cb8c8 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -427,12 +427,17 @@ func TestReduceReplace(t *testing.T) { } } -func mockTestDB() (*sql.DB, error) { +func mockTestDB(adjustSQLMode bool) (*sql.DB, error) { // mock for test db, which is used querying TiDB session variable db, mock, err := sqlmock.New() if err != nil { return nil, err } + if adjustSQLMode { + mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). + WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). + AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) + } columns := []string{"Variable_name", "Value"} mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), @@ -455,18 +460,13 @@ func TestAdjustSQLMode(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) - mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). - WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). - AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) - mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';"). - WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectClose() return db, nil } @@ -572,7 +572,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -716,7 +716,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -782,7 +782,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -848,7 +848,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -897,7 +897,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -1027,7 +1027,7 @@ func TestNewMySQLSink(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -1066,7 +1066,7 @@ func TestMySQLSinkClose(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } @@ -1106,7 +1106,7 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) { }() if dbIndex == 0 { // test db - db, err := mockTestDB() + db, err := mockTestDB(true) require.Nil(t, err) return db, nil } diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 158163476b1..5d2466bd27e 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -127,6 +127,9 @@ func TestApplyDMLs(t *testing.T) { if err != nil { return nil, err } + mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). + WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). + AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) columns := []string{"Variable_name", "Value"} mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), diff --git a/tests/mq_protocol_tests/framework/task.go b/tests/mq_protocol_tests/framework/task.go index c122af35e04..c6ece257fa1 100644 --- a/tests/mq_protocol_tests/framework/task.go +++ b/tests/mq_protocol_tests/framework/task.go @@ -16,6 +16,8 @@ package framework import ( "context" "database/sql" + "fmt" + "strconv" "strings" _ "github.com/go-sql-driver/mysql" // imported for side effects @@ -82,20 +84,19 @@ func (c *TaskContext) SQLHelper() *SQLHelper { func (p *CDCProfile) String() string { builder := strings.Builder{} builder.WriteString("cli changefeed create ") + if p.PDUri == "" { p.PDUri = "http://127.0.0.1:2379" } - - builder.WriteString("--pd=" + p.PDUri + " ") + builder.WriteString(fmt.Sprintf("--pd=%s ", strconv.Quote(p.PDUri))) if p.SinkURI == "" { log.Fatal("SinkURI cannot be empty!") } - - builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ") + builder.WriteString(fmt.Sprintf("--sink-uri=%s ", strconv.Quote(p.SinkURI))) if p.ConfigFile != "" { - builder.WriteString("--config=" + p.ConfigFile + " ") + builder.WriteString(fmt.Sprintf("--config=%s ", strconv.Quote(p.ConfigFile))) } if p.Opts == nil || len(p.Opts) == 0 {