Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): adjust sql mode compatibility for mysql sink (#3938) #4197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -130,6 +131,30 @@ func newMySQLSink(
}
defer testDB.Close()

// Adjust sql_mode for compatibility.
dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB)
if err != nil {
return nil, errors.Trace(err)
}
dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"])
if err != nil {
return nil, errors.Trace(err)
}

// Adjust sql_mode for cyclic replication.
var sinkCyclic *cyclic.Cyclic = nil
if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sinkCyclic = cyclic.NewCyclic(cfg)
dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"])
}
// NOTE: quote the string is necessary to avoid ambiguities.
dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"])

dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -157,6 +182,7 @@ func newMySQLSink(
db: db,
params: params,
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
Expand All @@ -166,25 +192,10 @@ func newMySQLSink(
cancel: cancel,
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sink.cyclic = cyclic.NewCyclic(cfg)

err = sink.adjustSQLMode(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)

err = sink.createSinkWorkers(ctx)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,26 +359,13 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
// extra columns (not null and no default value).
if s.cyclic == nil || !s.cyclic.Enabled() {
return nil
}
var oldMode, newMode string
row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err := row.Scan(&oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}

newMode = cyclic.RelaxSQLMode(oldMode)
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) {
row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err = row.Scan(&sqlMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
err = cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
return
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestGenerateDSNByParams(t *testing.T) {
defer testleak.AfterTestT(t)()

testDefaultParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -84,7 +84,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimezoneParam := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -98,7 +98,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimeoutParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand Down
30 changes: 15 additions & 15 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,17 @@ func TestReduceReplace(t *testing.T) {
}
}

func mockTestDB() (*sql.DB, error) {
func mockTestDB(adjustSQLMode bool) (*sql.DB, error) {
// mock for test db, which is used querying TiDB session variable
db, mock, err := sqlmock.New()
if err != nil {
return nil, err
}
if adjustSQLMode {
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
}
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand All @@ -455,18 +460,13 @@ func TestAdjustSQLMode(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectClose()
return db, nil
}
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -782,7 +782,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1027,7 +1027,7 @@ func TestNewMySQLSink(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestMySQLSinkClose(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestApplyDMLs(t *testing.T) {
if err != nil {
return nil, err
}
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand Down
11 changes: 6 additions & 5 deletions tests/mq_protocol_tests/framework/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package framework
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

_ "github.com/go-sql-driver/mysql" // imported for side effects
Expand Down Expand Up @@ -82,20 +84,19 @@ func (c *TaskContext) SQLHelper() *SQLHelper {
func (p *CDCProfile) String() string {
builder := strings.Builder{}
builder.WriteString("cli changefeed create ")

if p.PDUri == "" {
p.PDUri = "http://127.0.0.1:2379"
}

builder.WriteString("--pd=" + p.PDUri + " ")
builder.WriteString(fmt.Sprintf("--pd=%s ", strconv.Quote(p.PDUri)))

if p.SinkURI == "" {
log.Fatal("SinkURI cannot be empty!")
}

builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ")
builder.WriteString(fmt.Sprintf("--sink-uri=%s ", strconv.Quote(p.SinkURI)))

if p.ConfigFile != "" {
builder.WriteString("--config=" + p.ConfigFile + " ")
builder.WriteString(fmt.Sprintf("--config=%s ", strconv.Quote(p.ConfigFile)))
}

if p.Opts == nil || len(p.Opts) == 0 {
Expand Down