Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3938
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hicqu authored and ti-chi-bot committed Jan 3, 2022
1 parent d091d4f commit 64220e6
Show file tree
Hide file tree
Showing 5 changed files with 649 additions and 21 deletions.
156 changes: 154 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -108,6 +109,141 @@ type mysqlSink struct {
metricBucketSizeCounters []prometheus.Counter

forceReplicate bool
<<<<<<< HEAD
=======
cancel func()
}

var _ Sink = &mysqlSink{}

// newMySQLSink creates a new MySQL sink using schema storage
func newMySQLSink(
ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
filter *tifilter.Filter,
replicaConfig *config.ReplicaConfig,
opts map[string]string,
) (Sink, error) {
opts[OptChangefeedID] = changefeedID
params, err := parseSinkURIToParams(ctx, sinkURI, opts)
if err != nil {
return nil, err
}

params.enableOldValue = replicaConfig.EnableOldValue

// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
password, _ := sinkURI.User.Password()
port := sinkURI.Port()
if username == "" {
username = "root"
}
if port == "" {
port = "4000"
}

dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, sinkURI.Hostname(), port, params.tls)
dsn, err := dmysql.ParseDSN(dsnStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}

// create test db used for parameter detection
if dsn.Params == nil {
dsn.Params = make(map[string]string, 1)
}
if params.timezone != "" {
dsn.Params["time_zone"] = params.timezone
}
dsn.Params["readTimeout"] = params.readTimeout
dsn.Params["writeTimeout"] = params.writeTimeout
dsn.Params["timeout"] = params.dialTimeout
testDB, err := GetDBConnImpl(ctx, dsn.FormatDSN())
if err != nil {
return nil, err
}
defer testDB.Close()

// Adjust sql_mode for compatibility.
dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB)
if err != nil {
return nil, errors.Trace(err)
}
dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"])
if err != nil {
return nil, errors.Trace(err)
}

// Adjust sql_mode for cyclic replication.
var sinkCyclic *cyclic.Cyclic = nil
if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sinkCyclic = cyclic.NewCyclic(cfg)
dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"])
}
// NOTE: quote the string is necessary to avoid ambiguities.
dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"])

dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB)
if err != nil {
return nil, errors.Trace(err)
}
db, err := GetDBConnImpl(ctx, dsnStr)
if err != nil {
return nil, err
}

log.Info("Start mysql sink")

db.SetMaxIdleConns(params.workerCount)
db.SetMaxOpenConns(params.workerCount)

metricConflictDetectDurationHis := conflictDetectDurationHis.WithLabelValues(
params.captureAddr, params.changefeedID)
metricBucketSizeCounters := make([]prometheus.Counter, params.workerCount)
for i := 0; i < params.workerCount; i++ {
metricBucketSizeCounters[i] = bucketSizeCounter.WithLabelValues(
params.captureAddr, params.changefeedID, strconv.Itoa(i))
}
ctx, cancel := context.WithCancel(ctx)

sink := &mysqlSink{
db: db,
params: params,
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
errCh: make(chan error, 1),
forceReplicate: replicaConfig.ForceReplicate,
cancel: cancel,
}

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)

err = sink.createSinkWorkers(ctx)
if err != nil {
return nil, err
}

receiver, err := sink.resolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
go sink.flushRowChangedEvents(ctx, receiver)

return sink, nil
>>>>>>> 1df27c666 (sink(ticdc): adjust sql mode compatibility for mysql sink (#3938))
}

func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down Expand Up @@ -247,6 +383,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

<<<<<<< HEAD
// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
Expand All @@ -263,10 +400,25 @@ func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {

newMode = cyclic.RelaxSQLMode(oldMode)
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
=======
func needSwitchDB(ddl *model.DDLEvent) bool {
if len(ddl.TableInfo.Schema) == 0 {
return false
}
if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema {
return false
}
return true
}

func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) {
row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err = row.Scan(&sqlMode)
>>>>>>> 1df27c666 (sink(ticdc): adjust sql mode compatibility for mysql sink (#3938))
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
err = cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
return
}

var _ Sink = &mysqlSink{}
Expand Down
Loading

0 comments on commit 64220e6

Please sign in to comment.