Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): adjust sql mode compatibility for mysql sink #3938

Merged
merged 27 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4e3a713
adjust sql mode compatibility for mysql sink
hicqu Dec 17, 2021
5813381
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
9dc5f2f
fix all tests
hicqu Dec 17, 2021
6da6c37
Merge branch 'master' into sql-mode-compatible
hicqu Dec 17, 2021
d6cd925
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
71c7ba2
fix tests
hicqu Dec 21, 2021
72da0d9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 21, 2021
17fa0b2
Merge branch 'master' into sql-mode-compatible
hicqu Dec 22, 2021
5a98a21
fix
hicqu Dec 24, 2021
1bae078
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
d8b9e4b
fix
hicqu Dec 24, 2021
50c5517
fix tests
hicqu Dec 24, 2021
4d2e346
Merge branch 'master' into sql-mode-compatible
hicqu Dec 24, 2021
9fb6111
fix tests
hicqu Dec 24, 2021
51d64f9
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
01ff0fb
fix lint
hicqu Dec 29, 2021
c89a66b
Merge branch 'sql-mode-compatible' of github.com:hicqu/ticdc into sql…
hicqu Dec 29, 2021
2a3b252
fix integration test framework
hicqu Dec 29, 2021
dc4bdb1
Merge branch 'master' into sql-mode-compatible
hicqu Dec 29, 2021
d664b62
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
2507f09
fix test framework
hicqu Dec 31, 2021
d60f183
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
dd07786
fix a typo
hicqu Dec 31, 2021
831cb70
fix a bug
hicqu Dec 31, 2021
affddc6
Merge branch 'master' into sql-mode-compatible
hicqu Dec 31, 2021
c64112c
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Dec 31, 2021
c61de51
Merge branch 'master' into sql-mode-compatible
ti-chi-bot Jan 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -130,6 +131,30 @@ func newMySQLSink(
}
defer testDB.Close()

// Adjust sql_mode for compatibility.
dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB)
if err != nil {
return nil, errors.Trace(err)
}
dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"])
if err != nil {
return nil, errors.Trace(err)
}

// Adjust sql_mode for cyclic replication.
var sinkCyclic *cyclic.Cyclic = nil
if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sinkCyclic = cyclic.NewCyclic(cfg)
dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"])
hicqu marked this conversation as resolved.
Show resolved Hide resolved
}
// NOTE: quote the string is necessary to avoid ambiguities.
dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"])

dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -157,6 +182,7 @@ func newMySQLSink(
db: db,
params: params,
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
Expand All @@ -166,25 +192,10 @@ func newMySQLSink(
cancel: cancel,
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sink.cyclic = cyclic.NewCyclic(cfg)

err = sink.adjustSQLMode(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)

err = sink.createSinkWorkers(ctx)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,26 +359,13 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
// extra columns (not null and no default value).
if s.cyclic == nil || !s.cyclic.Enabled() {
return nil
}
var oldMode, newMode string
row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err := row.Scan(&oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}

newMode = cyclic.RelaxSQLMode(oldMode)
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) {
row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err = row.Scan(&sqlMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
err = cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
return
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestGenerateDSNByParams(t *testing.T) {
defer testleak.AfterTestT(t)()

testDefaultParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -84,7 +84,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimezoneParam := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -98,7 +98,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimeoutParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand Down
30 changes: 15 additions & 15 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,17 @@ func TestReduceReplace(t *testing.T) {
}
}

func mockTestDB() (*sql.DB, error) {
func mockTestDB(adjustSQLMode bool) (*sql.DB, error) {
// mock for test db, which is used querying TiDB session variable
db, mock, err := sqlmock.New()
if err != nil {
return nil, err
}
if adjustSQLMode {
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
}
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand All @@ -455,18 +460,13 @@ func TestAdjustSQLMode(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectClose()
return db, nil
}
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -782,7 +782,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1027,7 +1027,7 @@ func TestNewMySQLSink(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestMySQLSinkClose(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestApplyDMLs(t *testing.T) {
if err != nil {
return nil, err
}
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand Down
11 changes: 6 additions & 5 deletions tests/mq_protocol_tests/framework/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package framework
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

_ "github.com/go-sql-driver/mysql" // imported for side effects
Expand Down Expand Up @@ -82,20 +84,19 @@ func (c *TaskContext) SQLHelper() *SQLHelper {
func (p *CDCProfile) String() string {
builder := strings.Builder{}
builder.WriteString("cli changefeed create ")

if p.PDUri == "" {
p.PDUri = "http://127.0.0.1:2379"
}

builder.WriteString("--pd=" + p.PDUri + " ")
builder.WriteString(fmt.Sprintf("--pd=%s ", strconv.Quote(p.PDUri)))

if p.SinkURI == "" {
log.Fatal("SinkURI cannot be empty!")
}

builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ")
builder.WriteString(fmt.Sprintf("--sink-uri=%s ", strconv.Quote(p.SinkURI)))

if p.ConfigFile != "" {
builder.WriteString("--config=" + p.ConfigFile + " ")
builder.WriteString(fmt.Sprintf("--config=%s ", strconv.Quote(p.ConfigFile)))
}

if p.Opts == nil || len(p.Opts) == 0 {
Expand Down