Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): adjust sql mode compatibility for mysql sink #3938

Merged
merged 27 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4e3a713
adjust sql mode compatibility for mysql sink
hicqu Dec 17, 2021
5813381
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
9dc5f2f
fix all tests
hicqu Dec 17, 2021
6da6c37
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
d6cd925
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
71c7ba2
fix tests
hicqu Dec 21, 2021
72da0d9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
17fa0b2
Merge branch 'master' into sql-mode-compatible
hicqu Dec 22, 2021
5a98a21
fix
hicqu Dec 24, 2021
1bae078
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
d8b9e4b
fix
hicqu Dec 24, 2021
50c5517
fix tests
hicqu Dec 24, 2021
4d2e346
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
9fb6111
fix tests
hicqu Dec 24, 2021
51d64f9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
01ff0fb
fix lint
hicqu Dec 29, 2021
c89a66b
Merge branch 'sql-mode-compatible' of github.com:hicqu/ticdc into sql…
hicqu Dec 29, 2021
2a3b252
fix integration test framework
hicqu Dec 29, 2021
dc4bdb1
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
d664b62
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
2507f09
fix test framework
hicqu Dec 31, 2021
d60f183
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
dd07786
fix a typo
hicqu Dec 31, 2021
831cb70
fix a bug
hicqu Dec 31, 2021
affddc6
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
c64112c
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Dec 31, 2021
c61de51
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Jan 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink/common"
dmutils "github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/cyclic"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
Expand Down Expand Up @@ -166,6 +167,10 @@ func newMySQLSink(
cancel: cancel,
}

if err = sink.adjustSQLModeCompatible(ctx); err != nil {
return nil, errors.Trace(err)
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
Expand All @@ -174,7 +179,7 @@ func newMySQLSink(
}
sink.cyclic = cyclic.NewCyclic(cfg)

err = sink.adjustSQLMode(ctx)
err = sink.adjustSQLModeCyclic(ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -348,8 +353,26 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

func (s *mysqlSink) adjustSQLModeCompatible(ctx context.Context) error {
var oldMode, newMode string
row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
hicqu marked this conversation as resolved.
Show resolved Hide resolved
err := row.Scan(&oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
newMode, err = dmutils.AdjustSQLModeCompatible(oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
}

// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
func (s *mysqlSink) adjustSQLModeCyclic(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
// extra columns (not null and no default value).
if s.cyclic == nil || !s.cyclic.Enabled() {
Expand Down
41 changes: 30 additions & 11 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,20 @@ func mockTestDB() (*sql.DB, error) {
return db, nil
}

func mockDBWithAdjustedSQLMode() (*sql.DB, sqlmock.Sqlmock, error) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
return db, mock, err
}
// sql mode is adjust for compatibility.
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO';").
WillReturnResult(sqlmock.NewResult(0, 0))
return db, mock, err
}

func TestAdjustSQLMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -460,12 +474,14 @@ func TestAdjustSQLMode(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)

// sql mode is adjust for cyclic.
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';").
AddRow("ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO"))
mock.ExpectExec("SET sql_mode = 'ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO';").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectClose()
return db, nil
Expand Down Expand Up @@ -577,7 +593,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
Expand Down Expand Up @@ -721,7 +737,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)").
Expand Down Expand Up @@ -787,7 +803,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)").
Expand Down Expand Up @@ -853,7 +869,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
for i := 0; i < defaultDMLMaxRetryTime; i++ {
mock.ExpectBegin()
Expand Down Expand Up @@ -902,7 +918,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1032,7 +1048,8 @@ func TestNewMySQLSink(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectClose()
require.Nil(t, err)
return db, nil
Expand Down Expand Up @@ -1071,7 +1088,8 @@ func TestMySQLSinkClose(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectClose()
require.Nil(t, err)
return db, nil
Expand Down Expand Up @@ -1111,7 +1129,8 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?)").
WithArgs(1).
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -618,6 +619,7 @@ func GetSQLModeStrBySQLMode(sqlMode tmysql.SQLMode) string {
sqlModeStr = append(sqlModeStr, str)
}
}
sort.Strings(sqlModeStr)
return strings.Join(sqlModeStr, ",")
}

Expand Down