Skip to content

Commit

Permalink
mysql(ticdc): fix wrong dml order when batch DML is enabled (#8553)
Browse files Browse the repository at this point in the history
close #8552
  • Loading branch information
sdojjy authored Mar 17, 2023
1 parent 2f2b976 commit 41b40aa
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 18 deletions.
31 changes: 16 additions & 15 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (s *mysqlBackend) batchSingleTxnDmls(
) (sqls []string, values [][]interface{}) {
insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert)

// handle delete
if len(deleteRows) > 0 {
for _, rows := range deleteRows {
sql, value := sqlmodel.GenDeleteSQL(rows...)
Expand All @@ -431,21 +432,6 @@ func (s *mysqlBackend) batchSingleTxnDmls(
}
}

// handle insert
if len(insertRows) > 0 {
for _, rows := range insertRows {
if translateToInsert {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...)
sqls = append(sqls, sql)
values = append(values, value)
} else {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, rows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}

// handle update
if len(updateRows) > 0 {
if s.cfg.IsTiDB {
Expand All @@ -468,6 +454,21 @@ func (s *mysqlBackend) batchSingleTxnDmls(
}
}

// handle insert
if len(insertRows) > 0 {
for _, rows := range insertRows {
if translateToInsert {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...)
sqls = append(sqls, sql)
values = append(values, value)
} else {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, rows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}

return
}

Expand Down
82 changes: 79 additions & 3 deletions cdc/sink/dmlsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
rowCount: 2,
},
},
// mixed event
// mixed event, and test delete, update, insert are ordered
{
isTiDB: true,
input: []*model.RowChangedEvent{
Expand Down Expand Up @@ -1412,15 +1412,91 @@ func TestPrepareBatchDMLs(t *testing.T) {
}},
IndexColumns: [][]int{{1, 2}},
},
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("开发"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("测试"),
}},
IndexColumns: [][]int{{1, 2}},
},
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("纽约"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
}, {
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("北京"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{
"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE " +
"WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE " +
"ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))",
"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
},
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
values: [][]interface{}{
{1, "世界", 2, "你好"},
{
1, "开发", 2, 3, "纽约", 4, 1, "开发", "测试", 3,
"纽约", "北京", 1, "开发", 3, "纽约",
},
{2, "你好"},
},
rowCount: 5,
},
},
// update event and downstream is mysql and without pk
Expand Down

0 comments on commit 41b40aa

Please sign in to comment.