Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/mysql: support translated to insert SQL if old value enabled #955

Merged
merged 3 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 115 additions & 18 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
defaultBatchReplaceSize = 20
defaultReadTimeout = "2m"
defaultWriteTimeout = "2m"
defaultSafeMode = true
)

var (
Expand Down Expand Up @@ -274,6 +275,8 @@ type sinkParams struct {
batchReplaceSize int
readTimeout string
writeTimeout string
enableOldValue bool
safeMode bool
}

func (s *sinkParams) Clone() *sinkParams {
Expand All @@ -289,6 +292,7 @@ var defaultParams = &sinkParams{
batchReplaceSize: defaultBatchReplaceSize,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
safeMode: defaultSafeMode,
}

func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (string, error) {
Expand Down Expand Up @@ -349,7 +353,14 @@ func configureSinkURI(
}

// newMySQLSink creates a new MySQL sink using schema storage
func newMySQLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, filter *tifilter.Filter, opts map[string]string) (Sink, error) {
func newMySQLSink(
ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
filter *tifilter.Filter,
replicaConfig *config.ReplicaConfig,
opts map[string]string,
) (Sink, error) {
var db *sql.DB
params := defaultParams.Clone()

Expand Down Expand Up @@ -430,6 +441,18 @@ func newMySQLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI
params.batchReplaceSize = size
}

// TODO: force safe mode in startup phase
s = sinkURI.Query().Get("safe-mode")
if s != "" {
safeModeEnabled, err := strconv.ParseBool(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.safeMode = safeModeEnabled
}

params.enableOldValue = replicaConfig.EnableOldValue

// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
Expand Down Expand Up @@ -778,29 +801,52 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
values := make([][]interface{}, 0, len(rows))
replaces := make(map[string][][]interface{})
rowCount := 0
translateToInsert := s.params.enableOldValue && !s.params.safeMode

// flush cached batch replace or insert, to keep the sequence of DMLs
flushCacheDMLs := func() {
if s.params.batchReplaceEnabled && len(replaces) > 0 {
replaceSqls, replaceValues := reduceReplace(replaces, s.params.batchReplaceSize)
sqls = append(sqls, replaceSqls...)
values = append(values, replaceValues...)
replaces = make(map[string][][]interface{})
}
}

for _, row := range rows {
var query string
var args []interface{}
quoteTable := quotes.QuoteSchema(row.Table.Schema, row.Table.Table)
// TODO(leoppro): using `UPDATE` instead of `REPLACE` if the old value is enabled
if len(row.PreColumns) != 0 {
// flush cached batch replace, we must keep the sequence of DMLs
if s.params.batchReplaceEnabled && len(replaces) > 0 {
replaceSqls, replaceValues := reduceReplace(replaces, s.params.batchReplaceSize)
sqls = append(sqls, replaceSqls...)
values = append(values, replaceValues...)
replaces = make(map[string][][]interface{})

// Translate to UPDATE if old value is enabled, not in safe mode and is update event
if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 {
flushCacheDMLs()
query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns)
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
rowCount++
}
continue
}

// Case for delete event or update event
// If old value is enabled and not in safe mode,
// update will be translated to DELETE + INSERT(or REPLACE) SQL.
if len(row.PreColumns) != 0 {
flushCacheDMLs()
query, args = prepareDelete(quoteTable, row.PreColumns)
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
rowCount++
}
}

// Case for insert event or update event
if len(row.Columns) != 0 {
if s.params.batchReplaceEnabled {
query, args = prepareReplace(quoteTable, row.Columns, false /* appendPlaceHolder */)
query, args = prepareReplace(quoteTable, row.Columns, false /* appendPlaceHolder */, translateToInsert)
if query != "" {
if _, ok := replaces[query]; !ok {
replaces[query] = make([][]interface{}, 0)
Expand All @@ -809,7 +855,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
rowCount++
}
} else {
query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */)
query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert)
sqls = append(sqls, query)
values = append(values, args)
if query != "" {
Expand All @@ -820,11 +866,8 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
}
}
}
if s.params.batchReplaceEnabled {
replaceSqls, replaceValues := reduceReplace(replaces, s.params.batchReplaceSize)
sqls = append(sqls, replaceSqls...)
values = append(values, replaceValues...)
}
flushCacheDMLs()

dmls := &preparedDMLs{
sqls: sqls,
values: values,
Expand Down Expand Up @@ -864,7 +907,12 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent,
return nil
}

func prepareReplace(quoteTable string, cols []*model.Column, appendPlaceHolder bool) (string, []interface{}) {
func prepareReplace(
quoteTable string,
cols []*model.Column,
appendPlaceHolder bool,
translateToInsert bool,
) (string, []interface{}) {
var builder strings.Builder
columnNames := make([]string, 0, len(cols))
args := make([]interface{}, 0, len(cols))
Expand All @@ -880,7 +928,11 @@ func prepareReplace(quoteTable string, cols []*model.Column, appendPlaceHolder b
}

colList := "(" + buildColumnList(columnNames) + ")"
builder.WriteString("REPLACE INTO " + quoteTable + colList + " VALUES ")
if translateToInsert {
builder.WriteString("INSERT INTO " + quoteTable + colList + " VALUES ")
} else {
builder.WriteString("REPLACE INTO " + quoteTable + colList + " VALUES ")
}
if appendPlaceHolder {
builder.WriteString("(" + model.HolderString(len(columnNames)) + ");")
}
Expand Down Expand Up @@ -926,6 +978,51 @@ func reduceReplace(replaces map[string][][]interface{}, batchSize int) ([]string
return sqls, args
}

func prepareUpdate(quoteTable string, preCols, cols []*model.Column) (string, []interface{}) {
var builder strings.Builder
builder.WriteString("UPDATE " + quoteTable + " SET ")

columnNames := make([]string, 0, len(cols))
args := make([]interface{}, 0, len(cols)+len(preCols))
for _, col := range cols {
if col == nil || col.Flag.IsGeneratedColumn() {
continue
}
columnNames = append(columnNames, col.Name)
args = append(args, col.Value)
}
if len(args) == 0 {
return "", nil
}
for i, column := range columnNames {
if i == len(columnNames)-1 {
builder.WriteString("`" + model.EscapeName(column) + "`=?")
} else {
builder.WriteString("`" + model.EscapeName(column) + "`=?,")
}
}

builder.WriteString(" WHERE ")
colNames, wargs := whereSlice(preCols)
if len(wargs) == 0 {
return "", nil
}
for i := 0; i < len(colNames); i++ {
if i > 0 {
builder.WriteString(" AND ")
}
if wargs[i] == nil {
builder.WriteString(quotes.QuoteName(colNames[i]) + " IS NULL")
} else {
builder.WriteString(quotes.QuoteName(colNames[i]) + "=?")
args = append(args, wargs[i])
}
}
builder.WriteString(" LIMIT 1;")
sql := builder.String()
return sql, args
}

func prepareDelete(quoteTable string, cols []*model.Column) (string, []interface{}) {
var builder strings.Builder
builder.WriteString("DELETE FROM " + quoteTable + " WHERE ")
Expand Down
55 changes: 54 additions & 1 deletion cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,57 @@ func (s MySQLSinkSuite) TestPrepareDML(c *check.C) {
}
}

func (s MySQLSinkSuite) TestPrepareUpdate(c *check.C) {
testCases := []struct {
quoteTable string
preCols []*model.Column
cols []*model.Column
expectedSQL string
expectedArgs []interface{}
}{
{
quoteTable: "`test`.`t1`",
preCols: []*model.Column{},
cols: []*model.Column{},
expectedSQL: "",
expectedArgs: nil,
},
{
quoteTable: "`test`.`t1`",
preCols: []*model.Column{
{Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1},
{Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"},
},
cols: []*model.Column{
{Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1},
{Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test2"},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? LIMIT 1;",
expectedArgs: []interface{}{1, "test2", 1},
},
{
quoteTable: "`test`.`t1`",
preCols: []*model.Column{
{Name: "a", Type: mysql.TypeLong, Flag: model.MultipleKeyFlag | model.HandleKeyFlag, Value: 1},
{Name: "b", Type: mysql.TypeVarString, Flag: model.MultipleKeyFlag | model.HandleKeyFlag, Value: "test"},
{Name: "c", Type: mysql.TypeLong, Flag: model.GeneratedColumnFlag, Value: 100},
},
cols: []*model.Column{
{Name: "a", Type: mysql.TypeLong, Flag: model.MultipleKeyFlag | model.HandleKeyFlag, Value: 2},
{Name: "b", Type: mysql.TypeVarString, Flag: model.MultipleKeyFlag | model.HandleKeyFlag, Value: "test2"},
{Name: "c", Type: mysql.TypeLong, Flag: model.GeneratedColumnFlag, Value: 100},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1;",
expectedArgs: []interface{}{2, "test2", 1, "test"},
},
}
for _, tc := range testCases {
query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols)
c.Assert(query, check.Equals, tc.expectedSQL)
c.Assert(args, check.DeepEquals, tc.expectedArgs)
}
}

func (s MySQLSinkSuite) TestMapReplace(c *check.C) {
testCases := []struct {
quoteTable string
Expand Down Expand Up @@ -478,7 +529,7 @@ func (s MySQLSinkSuite) TestMapReplace(c *check.C) {
for _, tc := range testCases {
// multiple times to verify the stability of column sequence in query string
for i := 0; i < 10; i++ {
query, args := prepareReplace(tc.quoteTable, tc.cols, false)
query, args := prepareReplace(tc.quoteTable, tc.cols, false, false)
c.Assert(query, check.Equals, tc.expectedQuery)
c.Assert(args, check.DeepEquals, tc.expectedArgs)
}
Expand Down Expand Up @@ -614,6 +665,7 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) {
batchReplaceSize: defaultBatchReplaceSize,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
safeMode: defaultSafeMode,
})
c.Assert(param2, check.DeepEquals, &sinkParams{
changefeedID: "123",
Expand All @@ -624,6 +676,7 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) {
batchReplaceSize: defaultBatchReplaceSize,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
safeMode: defaultSafeMode,
})
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr st
case "blackhole":
return newBlackHoleSink(ctx, opts), nil
case "mysql", "tidb", "mysql+ssl", "tidb+ssl":
return newMySQLSink(ctx, changefeedID, sinkURI, filter, opts)
return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts)
case "kafka", "kafka+ssl":
return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh)
case "pulsar", "pulsar+ssl":
Expand Down