Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): adjust sql mode compatibility for mysql sink (#3938) #4196

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 151 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -111,6 +112,7 @@ type mysqlSink struct {
cancel func()
}

<<<<<<< HEAD
func needSwitchDB(ddl *model.DDLEvent) bool {
if len(ddl.TableInfo.Schema) > 0 {
if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema {
Expand All @@ -119,6 +121,137 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}
return false
=======
var _ Sink = &mysqlSink{}

// newMySQLSink creates a new MySQL sink using schema storage
func newMySQLSink(
ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
filter *tifilter.Filter,
replicaConfig *config.ReplicaConfig,
opts map[string]string,
) (Sink, error) {
opts[OptChangefeedID] = changefeedID
params, err := parseSinkURIToParams(ctx, sinkURI, opts)
if err != nil {
return nil, err
}

params.enableOldValue = replicaConfig.EnableOldValue

// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
password, _ := sinkURI.User.Password()
port := sinkURI.Port()
if username == "" {
username = "root"
}
if port == "" {
port = "4000"
}

dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, sinkURI.Hostname(), port, params.tls)
dsn, err := dmysql.ParseDSN(dsnStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}

// create test db used for parameter detection
if dsn.Params == nil {
dsn.Params = make(map[string]string, 1)
}
if params.timezone != "" {
dsn.Params["time_zone"] = params.timezone
}
dsn.Params["readTimeout"] = params.readTimeout
dsn.Params["writeTimeout"] = params.writeTimeout
dsn.Params["timeout"] = params.dialTimeout
testDB, err := GetDBConnImpl(ctx, dsn.FormatDSN())
if err != nil {
return nil, err
}
defer testDB.Close()

// Adjust sql_mode for compatibility.
dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB)
if err != nil {
return nil, errors.Trace(err)
}
dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"])
if err != nil {
return nil, errors.Trace(err)
}

// Adjust sql_mode for cyclic replication.
var sinkCyclic *cyclic.Cyclic = nil
if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sinkCyclic = cyclic.NewCyclic(cfg)
dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"])
}
// NOTE: quote the string is necessary to avoid ambiguities.
dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"])

dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB)
if err != nil {
return nil, errors.Trace(err)
}
db, err := GetDBConnImpl(ctx, dsnStr)
if err != nil {
return nil, err
}

log.Info("Start mysql sink")

db.SetMaxIdleConns(params.workerCount)
db.SetMaxOpenConns(params.workerCount)

metricConflictDetectDurationHis := conflictDetectDurationHis.WithLabelValues(
params.captureAddr, params.changefeedID)
metricBucketSizeCounters := make([]prometheus.Counter, params.workerCount)
for i := 0; i < params.workerCount; i++ {
metricBucketSizeCounters[i] = bucketSizeCounter.WithLabelValues(
params.captureAddr, params.changefeedID, strconv.Itoa(i))
}
ctx, cancel := context.WithCancel(ctx)

sink := &mysqlSink{
db: db,
params: params,
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
errCh: make(chan error, 1),
forceReplicate: replicaConfig.ForceReplicate,
cancel: cancel,
}

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)

err = sink.createSinkWorkers(ctx)
if err != nil {
return nil, err
}

receiver, err := sink.resolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
go sink.flushRowChangedEvents(ctx, receiver)

return sink, nil
>>>>>>> 1df27c666 (sink(ticdc): adjust sql mode compatibility for mysql sink (#3938))
}

func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down Expand Up @@ -261,6 +394,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

<<<<<<< HEAD
// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
Expand All @@ -277,10 +411,25 @@ func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {

newMode = cyclic.RelaxSQLMode(oldMode)
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
=======
func needSwitchDB(ddl *model.DDLEvent) bool {
if len(ddl.TableInfo.Schema) == 0 {
return false
}
if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema {
return false
}
return true
}

func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) {
row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err = row.Scan(&sqlMode)
>>>>>>> 1df27c666 (sink(ticdc): adjust sql mode compatibility for mysql sink (#3938))
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
err = cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
return
}

var _ Sink = &mysqlSink{}
Expand Down
Loading