Skip to content

Commit

Permalink
sink(ticdc): convert values of pre-columns properly (#8421)
Browse files Browse the repository at this point in the history
close #8420
  • Loading branch information
zhaoxinyu authored Mar 6, 2023
1 parent 7cb9371 commit a32372b
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 60 deletions.
9 changes: 5 additions & 4 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,15 @@ func convert2RowChanges(
return res
}

func convertBinaryToString(row *model.RowChangedEvent) {
for i, col := range row.Columns {
func convertBinaryToString(cols []*model.Column) {
for i, col := range cols {
if col == nil {
continue
}
if col.Charset != "" && col.Charset != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
row.Columns[i].Value = string(colValBytes)
cols[i].Value = string(colValBytes)
}
}
}
Expand All @@ -346,7 +346,8 @@ func (s *mysqlBackend) groupRowsByType(
deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)

for _, row := range event.Event.Rows {
convertBinaryToString(row)
convertBinaryToString(row.Columns)
convertBinaryToString(row.PreColumns)

if row.IsInsert() {
insertRow = append(
Expand Down
129 changes: 79 additions & 50 deletions cdc/sink/dmlsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -1179,10 +1180,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1196,18 +1199,20 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -1223,10 +1228,11 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 1}},
},
Expand All @@ -1240,18 +1246,20 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.HandleKeyFlag,
Value: "世界",
}},
IndexColumns: [][]int{{2, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -1268,21 +1276,25 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("开发"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("测试"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1296,29 +1308,40 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("纽约"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("北京"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"},
values: [][]interface{}{{1, 1, 2, 3, 3, 4, 1, 1, 2, 3, 3, 4, 1, 1, 3, 3}},
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE " +
"WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) " +
"THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE " +
"ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"},
values: [][]interface{}{{
1, "开发", 2, 3, "纽约", 4, 1, "开发", "测试", 3,
"纽约", "北京", 1, "开发", 3, "纽约",
}},
rowCount: 2,
},
},
Expand All @@ -1335,10 +1358,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},

IndexColumns: [][]int{{1, 2}},
Expand All @@ -1353,10 +1378,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1370,10 +1397,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -1384,7 +1413,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
},
values: [][]interface{}{{1, 1, 2, 2}, {2, 2}},
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
},
},
Expand Down
38 changes: 32 additions & 6 deletions tests/integration_tests/charset_gbk/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE DATABASE `charset_gbk_test0` CHARACTER SET utf8mb4;

USE `charset_gbk_test0`;

/* this is a test for columns which charset is gbk*/
/* this is a test for columns which charset is gbk, with pk*/
CREATE TABLE t0 (
id INT,
name varchar(128) CHARACTER SET gbk,
Expand All @@ -30,8 +30,34 @@ WHERE name = '测试';
DELETE FROM t0
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
/* this is a test for table which charset is gbk, without pk but with uk */
CREATE TABLE t1 (
id INT NOT NULL,
name varchar(128) CHARACTER SET gbk NOT NULL,
country char(32) CHARACTER SET gbk,
city varchar(64),
description text CHARACTER SET gbk,
image tinyblob,
UNIQUE KEY (id, name)
) ENGINE = InnoDB CHARSET = utf8mb4;

INSERT INTO t1
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
CREATE TABLE t2 (
id INT,
name varchar(128),
country char(32),
Expand All @@ -41,19 +67,19 @@ CREATE TABLE t1 (
PRIMARY KEY (id)
) ENGINE = InnoDB CHARSET = gbk;

INSERT INTO t1
INSERT INTO t2
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
INSERT INTO t2
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
UPDATE t2
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
DELETE FROM t2
WHERE name = '部署';

/* this is a test for db which charset is gbk*/
Expand Down

0 comments on commit a32372b

Please sign in to comment.