Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): adjust sql mode compatibility for mysql sink #3938

Merged
merged 27 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4e3a713
adjust sql mode compatibility for mysql sink
hicqu Dec 17, 2021
5813381
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
9dc5f2f
fix all tests
hicqu Dec 17, 2021
6da6c37
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
d6cd925
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
71c7ba2
fix tests
hicqu Dec 21, 2021
72da0d9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
17fa0b2
Merge branch 'master' into sql-mode-compatible
hicqu Dec 22, 2021
5a98a21
fix
hicqu Dec 24, 2021
1bae078
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
d8b9e4b
fix
hicqu Dec 24, 2021
50c5517
fix tests
hicqu Dec 24, 2021
4d2e346
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
9fb6111
fix tests
hicqu Dec 24, 2021
51d64f9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
01ff0fb
fix lint
hicqu Dec 29, 2021
c89a66b
Merge branch 'sql-mode-compatible' of github.com:hicqu/ticdc into sql…
hicqu Dec 29, 2021
2a3b252
fix integration test framework
hicqu Dec 29, 2021
dc4bdb1
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
d664b62
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
2507f09
fix test framework
hicqu Dec 31, 2021
d60f183
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
dd07786
fix a typo
hicqu Dec 31, 2021
831cb70
fix a bug
hicqu Dec 31, 2021
affddc6
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
c64112c
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Dec 31, 2021
c61de51
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Jan 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -166,6 +167,10 @@ func newMySQLSink(
cancel: cancel,
}

if err = sink.adjustSQLModeCompatible(ctx); err != nil {
return nil, errors.Trace(err)
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
Expand All @@ -174,7 +179,7 @@ func newMySQLSink(
}
sink.cyclic = cyclic.NewCyclic(cfg)

err = sink.adjustSQLMode(ctx)
err = sink.adjustSQLModeCyclic(ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -348,8 +353,26 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

func (s *mysqlSink) adjustSQLModeCompatible(ctx context.Context) error {
var oldMode, newMode string
row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
hicqu marked this conversation as resolved.
Show resolved Hide resolved
err := row.Scan(&oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
newMode, err = dmutils.AdjustSQLModeCompatible(oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
}

// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
func (s *mysqlSink) adjustSQLModeCyclic(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
// extra columns (not null and no default value).
if s.cyclic == nil || !s.cyclic.Enabled() {
Expand Down
37 changes: 26 additions & 11 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/retry"
testutils "github.com/pingcap/tiflow/tests/utils"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -444,6 +445,15 @@ func mockTestDB() (*sql.DB, error) {
return db, nil
}

func mockDBWithAdjustedSQLMode() (*sql.DB, sqlmock.Sqlmock, error) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
return db, mock, err
}
testutils.MustAdjustSQLMode(mock)
return db, mock, err
}

func TestAdjustSQLMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -460,12 +470,14 @@ func TestAdjustSQLMode(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)

// sql mode is adjust for cyclic.
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';").
AddRow("ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO"))
mock.ExpectExec("SET sql_mode = 'ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO';").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectClose()
return db, nil
Expand Down Expand Up @@ -577,7 +589,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
Expand Down Expand Up @@ -721,7 +733,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)").
Expand Down Expand Up @@ -787,7 +799,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)").
Expand Down Expand Up @@ -853,7 +865,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
for i := 0; i < defaultDMLMaxRetryTime; i++ {
mock.ExpectBegin()
Expand Down Expand Up @@ -902,7 +914,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1032,7 +1044,8 @@ func TestNewMySQLSink(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectClose()
require.Nil(t, err)
return db, nil
Expand Down Expand Up @@ -1071,7 +1084,8 @@ func TestMySQLSinkClose(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectClose()
require.Nil(t, err)
return db, nil
Expand Down Expand Up @@ -1111,7 +1125,8 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
db, mock, err := mockDBWithAdjustedSQLMode()
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?)").
WithArgs(1).
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -618,6 +619,7 @@ func GetSQLModeStrBySQLMode(sqlMode tmysql.SQLMode) string {
sqlModeStr = append(sqlModeStr, str)
}
}
sort.Strings(sqlModeStr)
return strings.Join(sqlModeStr, ",")
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/redo/reader"
"github.com/pingcap/tiflow/cdc/sink"
testutils "github.com/pingcap/tiflow/tests/utils"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -140,6 +141,7 @@ func TestApplyDMLs(t *testing.T) {
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
testutils.MustAdjustSQLMode(mock)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)").
WithArgs(1, "2").
Expand Down
28 changes: 28 additions & 0 deletions tests/utils/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"github.com/DATA-DOG/go-sqlmock"
)

// MustAdjustSQLMode adjusts SQL mode for compatibility, like what cdc mysql sink does.
func MustAdjustSQLMode(mock sqlmock.Sqlmock) {
// sql mode is adjust for compatibility.
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ALLOW_INVALID_DATES,IGNORE_SPACE,NO_AUTO_VALUE_ON_ZERO';").
WillReturnResult(sqlmock.NewResult(0, 0))
}