From 7f561614a572a87ce8ac714e64dfb2668c20e20f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 7 Mar 2023 21:47:12 +0800 Subject: [PATCH] sink (ticdc): disable batch update dml when downstream is mysql (#8452) (#8460) close pingcap/tiflow#8420 --- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 21 +++- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 101 +++++++++++++++++- .../charset_gbk/data/test.sql | 2 + 3 files changed, 119 insertions(+), 5 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 72f9f411c6d..098610e7b15 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -385,10 +385,23 @@ func (s *mysqlBackend) batchSingleTxnDmls( // handle update if len(updateRows) > 0 { - for _, rows := range updateRows { - s, v := s.genUpdateSQL(rows...) - sqls = append(sqls, s...) - values = append(values, v...) + if s.cfg.IsTiDB { + for _, rows := range updateRows { + s, v := s.genUpdateSQL(rows...) + sqls = append(sqls, s...) + values = append(values, v...) + } + // The behavior of update statement differs between TiDB and MySQL. + // So we don't use batch update statement when downstream is MySQL. + // Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility + } else { + for _, rows := range updateRows { + for _, row := range rows { + sql, value := row.GenSQL(sqlmodel.DMLUpdate) + sqls = append(sqls, sql) + values = append(values, value) + } + } } } diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 259606e95a5..aa159c5459c 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -1199,12 +1199,14 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { func TestPrepareBatchDMLs(t *testing.T) { t.Parallel() testCases := []struct { + isTiDB bool input []*model.RowChangedEvent expected *preparedDMLs }{ // empty event { - input: []*model.RowChangedEvent{}, + isTiDB: true, + input: []*model.RowChangedEvent{}, expected: &preparedDMLs{ startTs: []model.Ts{}, sqls: []string{}, @@ -1212,6 +1214,7 @@ func TestPrepareBatchDMLs(t *testing.T) { }, }, { // delete event + isTiDB: false, input: []*model.RowChangedEvent{ { StartTs: 418658114257813514, @@ -1260,6 +1263,7 @@ func TestPrepareBatchDMLs(t *testing.T) { }, }, { // insert event + isTiDB: true, input: []*model.RowChangedEvent{ { StartTs: 418658114257813516, @@ -1308,6 +1312,7 @@ func TestPrepareBatchDMLs(t *testing.T) { }, // update event { + isTiDB: true, input: []*model.RowChangedEvent{ { StartTs: 418658114257813516, @@ -1390,6 +1395,7 @@ func TestPrepareBatchDMLs(t *testing.T) { }, // mixed event { + isTiDB: true, input: []*model.RowChangedEvent{ { StartTs: 418658114257813514, @@ -1460,6 +1466,98 @@ func TestPrepareBatchDMLs(t *testing.T) { rowCount: 3, }, }, + // update event and downstream is mysql and without pk + { + isTiDB: false, + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | + model.MultipleKeyFlag | + model.HandleKeyFlag | + model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("开发"), + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | + model.MultipleKeyFlag | + model.HandleKeyFlag | + model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("测试"), + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | + model.MultipleKeyFlag | + model.HandleKeyFlag | + model.UniqueKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("纽约"), + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | + model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 4, + }, { + Name: "a3", + Type: mysql.TypeVarchar, + Charset: charset.CharsetGBK, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: []byte("北京"), + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, + sqls: []string{ + "UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, " + + "`a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1", + "UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, " + + "`a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1", + }, + values: [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}}, + rowCount: 2, + }, + }, } ctx, cancel := context.WithCancel(context.Background()) @@ -1469,6 +1567,7 @@ func TestPrepareBatchDMLs(t *testing.T) { ms.cfg.SafeMode = false ms.cfg.EnableOldValue = true for _, tc := range testCases { + ms.cfg.IsTiDB = tc.isTiDB ms.events = make([]*eventsink.TxnCallbackableEvent, 1) ms.events[0] = &eventsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{Rows: tc.input}, diff --git a/tests/integration_tests/charset_gbk/data/test.sql b/tests/integration_tests/charset_gbk/data/test.sql index 68f7d9d8ed5..dc10d09644a 100644 --- a/tests/integration_tests/charset_gbk/data/test.sql +++ b/tests/integration_tests/charset_gbk/data/test.sql @@ -107,6 +107,8 @@ INSERT INTO t0 VALUES (2, '部署', "美国", "纽约", "世界,你好" , 0xCAC0BDE7C4E3BAC3); +SELECT sleep(5); + UPDATE t0 SET name = '开发' WHERE name = '测试'