Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: fix duplicate entry in safemode (#3432) (#3434) #4088

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package syncer

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -165,6 +167,15 @@ func (c *compactor) compactJob(j *job) {
}

key := j.dml.identifyKey()

failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) {
value, err := strconv.Atoi(key)
upper := v.(int)
if err != nil || value > upper {
panic(fmt.Sprintf("downstream identifyKey check failed. key value %v should less than %v", value, upper))
}
})

prevPos, ok := tableKeyMap[key]
// if no such key in the buffer, add it
if !ok {
Expand All @@ -184,6 +195,8 @@ func (c *compactor) compactJob(j *job) {
j.dml.oldValues = nil
j.dml.originOldValues = nil
j.dml.op = insert
// DELETE + INSERT + UPDATE => INSERT with safemode
j.dml.safeMode = prevJob.dml.safeMode
} else if prevJob.tp == update {
// UPDATE + UPDATE => UPDATE
j.dml.oldValues = prevJob.dml.oldValues
Expand Down
4 changes: 3 additions & 1 deletion dm/syncer/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,13 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) {
newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi),
newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(update, false, targetTableID, sourceTable, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi),
},
output: []*DML{
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi),
newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "c"}, nil, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi),
},
},
}
Expand Down
50 changes: 26 additions & 24 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
updateDML = dmlOpType(update)
deleteDML = dmlOpType(del)
insertOnDuplicateDML dmlOpType = iota + 1
replaceDML
)

func (op dmlOpType) String() (str string) {
Expand All @@ -54,6 +55,8 @@ func (op dmlOpType) String() (str string) {
return "delete"
case insertOnDuplicateDML:
return "insert on duplicate update"
case replaceDML:
return "replace"
}
return
}
Expand Down Expand Up @@ -785,11 +788,15 @@ func (dml *DML) genDeleteSQL() ([]string, [][]interface{}) {
}

// genInsertSQL generates a `INSERT`.
// if in safemode, generates a `INSERT ON DUPLICATE UPDATE` statement.
// if in safemode, generates a `REPLACE` statement.
func (dml *DML) genInsertSQL() ([]string, [][]interface{}) {
var buf strings.Builder
buf.Grow(1024)
buf.WriteString("INSERT INTO ")
if dml.safeMode {
buf.WriteString("REPLACE INTO ")
} else {
buf.WriteString("INSERT INTO ")
}
buf.WriteString(dml.targetTableID)
buf.WriteString(" (")
for i, column := range dml.columns {
Expand All @@ -810,16 +817,6 @@ func (dml *DML) genInsertSQL() ([]string, [][]interface{}) {
buf.WriteString("?)")
}
}
if dml.safeMode {
buf.WriteString(" ON DUPLICATE KEY UPDATE ")
for i, column := range dml.columns {
col := dbutil.ColumnName(column.Name.O)
buf.WriteString(col + "=VALUES(" + col + ")")
if i != len(dml.columns)-1 {
buf.WriteByte(',')
}
}
}
return []string{buf.String()}, [][]interface{}{dml.values}
}

Expand All @@ -837,16 +834,21 @@ func valuesHolder(n int) string {
return builder.String()
}

// genInsertOnDuplicateSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)'
// genInsertSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)'
// if replace, generates a `REPLACE' with multiple rows like 'REPLACE INTO tb(a,b) VALUES (1,1),(2,2)'
// if onDuplicate, generates a `INSERT ON DUPLICATE KEY UPDATE` statement like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2) ON DUPLICATE KEY UPDATE a=VALUES(a),b=VALUES(b)'.
func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]string, [][]interface{}) {
func genInsertSQLMultipleRows(op dmlOpType, dmls []*DML) ([]string, [][]interface{}) {
if len(dmls) == 0 {
return nil, nil
}

var buf strings.Builder
buf.Grow(1024)
buf.WriteString("INSERT INTO")
if op == replaceDML {
buf.WriteString("REPLACE INTO")
} else {
buf.WriteString("INSERT INTO")
}
buf.WriteString(" " + dmls[0].targetTableID + " (")
for i, column := range dmls[0].columns {
buf.WriteString(dbutil.ColumnName(column.Name.O))
Expand All @@ -866,7 +868,7 @@ func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]strin
buf.WriteString(holder)
}

if onDuplicate {
if op == insertOnDuplicateDML {
buf.WriteString(" ON DUPLICATE KEY UPDATE ")
for i, column := range dmls[0].columns {
col := dbutil.ColumnName(column.Name.O)
Expand Down Expand Up @@ -927,10 +929,8 @@ func genSQLMultipleRows(op dmlOpType, dmls []*DML) (queries []string, args [][]i
log.L().Debug("generate DMLs with multiple rows", zap.Stringer("op", op), zap.Stringer("original op", dmls[0].op), zap.Int("rows", len(dmls)))
}
switch op {
case insertDML:
return genInsertOnDuplicateSQLMultipleRows(false, dmls)
case insertOnDuplicateDML:
return genInsertOnDuplicateSQLMultipleRows(true, dmls)
case insertDML, replaceDML, insertOnDuplicateDML:
return genInsertSQLMultipleRows(op, dmls)
case deleteDML:
return genDeleteSQLMultipleRows(dmls)
}
Expand Down Expand Up @@ -1052,17 +1052,19 @@ func genDMLsWithSameOp(dmls []*DML) ([]string, [][]interface{}) {
// group dmls with same dmlOp
for i, dml := range dmls {
curOp := dmlOpType(dml.op)
// if update statement didn't update identify values, regard it as insert on duplicate.
// if insert with safemode, regard it as insert on duplicate.
if (curOp == updateDML && !dml.updateIdentify()) || (curOp == insertDML && dml.safeMode) {
if curOp == updateDML && !dml.updateIdentify() && !dml.safeMode {
// if update statement didn't update identify values and not in safemode, regard it as insert on duplicate.
curOp = insertOnDuplicateDML
} else if curOp == insertDML && dml.safeMode {
// if insert with safemode, regard it as replace
curOp = replaceDML
}

if i == 0 {
lastOp = curOp
}

// now there are 4 situations: [insert, insert on duplicate(insert with safemode/update without identify keys), update(update identify keys), delete]
// now there are 5 situations: [insert, replace(insert with safemode), insert on duplicate(update without identify keys), update(update identify keys/update with safemode), delete]
if lastOp != curOp {
query, arg := genDMLsWithSameTable(lastOp, groupDMLs)
queries = append(queries, query...)
Expand Down
42 changes: 33 additions & 9 deletions dm/syncer/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) {
},
{
newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil),
[]string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"},
[][]interface{}{{1, 2, 3, "haha"}},
},
{
Expand All @@ -332,7 +332,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) {
},
{
newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil),
[]string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"},
[][]interface{}{{1}, {4, 5, 6, "hihi"}},
},
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {
newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(insert, true, targetTableID1, sourceTable12, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti12.Columns, ti12, ti12Index, downTi12),
// update no index
// update no index but safemode
newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti12.Columns, ti12, ti12Index, downTi12),
Expand Down Expand Up @@ -486,17 +486,29 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {

expectQueries := []string{
// table1
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE (`id`) IN ((?),(?),(?))",

// table2
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)",
"REPLACE INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col3`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)",
Expand All @@ -514,7 +526,19 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {

expectArgs := [][]interface{}{
// table1
{1, 1, "a", 2, 2, "b", 3, 3, "c", 1, 1, "aa", 2, 2, "bb", 3, 3, "cc", 1, 4, "aa", 2, 5, "bb", 3, 6, "cc"},
{1, 1, "a", 2, 2, "b", 3, 3, "c"},
{1},
{1, 1, "aa"},
{2},
{2, 2, "bb"},
{3},
{3, 3, "cc"},
{1},
{1, 4, "aa"},
{2},
{2, 5, "bb"},
{3},
{3, 6, "cc"},
{1},
{4, 4, "aa"},
{2},
Expand Down
10 changes: 5 additions & 5 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838401), "a"}},
}, {
flush,
Expand All @@ -879,7 +879,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838402), "b"}},
}, {
del,
Expand All @@ -888,7 +888,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
}, {
// safe mode is true, will split update to delete + replace
update,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838402)}, {int64(580981944116838401), "b"}},
}, {
flush,
Expand Down Expand Up @@ -1131,15 +1131,15 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int32(1), "a"}},
}, {
del,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1"},
[][]interface{}{{int32(1)}},
}, {
update,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int32(2)}, {int32(1), "b"}},
}, {
// start from this event, location passes safeModeExitLocation and safe mode should exit
Expand Down
21 changes: 21 additions & 0 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,27 @@ function run_sql_tidb_with_retry() {
fi
}

# shortcut for run tidb sql and check result with retry
function run_sql_tidb_with_retry_times() {
rc=0
for ((k=1; k<$3; k++)); do
run_sql_tidb "$1"
if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then
rc=1
break
fi
echo "run tidb sql failed $k-th time, retry later"
sleep 2
done
if [[ $rc = 0 ]]; then
echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'"
echo "____________________________________"
cat "$TEST_DIR/sql_res.$TEST_NAME.txt"
echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
exit 1
fi
}

# shortcut for check log contain with retry
function check_log_contain_with_retry() {
text=$1
Expand Down
Loading