Skip to content

Commit

Permalink
sink(ticdc): Use dm sqlmodel pkg to generate sql in batch mode (#7548)
Browse files Browse the repository at this point in the history
ref #7653
  • Loading branch information
asddongmen authored Nov 22, 2022
1 parent 02b61a7 commit 5ab53f5
Show file tree
Hide file tree
Showing 15 changed files with 850 additions and 36 deletions.
53 changes: 53 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/sqlmodel"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -1151,6 +1152,8 @@ func TestBuildTableInfo(t *testing.T) {
cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
}
}
Expand All @@ -1163,3 +1166,53 @@ func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
require.NoError(t, err)
return result.String()
}

func TestNewDMRowChange(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (id INT," +
" a1 INT NOT NULL," +
" a3 INT NOT NULL," +
" UNIQUE KEY dex1(a1, a3));",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) DEFAULT NULL,\n" +
" `a1` int(0) NOT NULL,\n" +
" `a3` int(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
tableName := &model.TableName{Schema: "db", Table: "t1"}
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil)
sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot)
require.Equal(t, []interface{}{1, 2}, argsGot)

sqlGot, argsGot = sqlmodel.GenDeleteSQL(rowChange, rowChange)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", sqlGot)
require.Equal(t, []interface{}{1, 2, 1, 2}, argsGot)
}
}
1 change: 1 addition & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf
firstCol := columns[colOffsets[0]]
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
func newMySQLSink4Test(ctx context.Context) *mysqlSink {
params := defaultParams.Clone()
params.batchReplaceEnabled = false

return &mysqlSink{
txnCache: newUnresolvedTxnCache(),
statistics: metrics.NewStatistics(ctx, "", metrics.SinkTypeDB),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package eventsink
type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(rows ...*CallbackableEvent[E]) error
WriteEvents(events ...*CallbackableEvent[E]) error
// Close closes the sink.
Close() error
}
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/txn/mysql/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} {
}

// reduceReplace groups SQLs with the same replace statement format, as following
// sql: `REPLACE INTO `test`.`t` (`a`,`b`) VALUES (?,?,?,?,?,?)`
// sql: `REPLACE INTO `test`.`t` (`a`,`b`) VALUES (?,?),(?,?),(?,?)`
// args: (1,"",2,"2",3,"")
func reduceReplace(replaces map[string][][]interface{}, batchSize int) ([]string, [][]interface{}) {
nextHolderString := func(query string, valueNum int, last bool) string {
Expand Down
210 changes: 189 additions & 21 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -33,9 +35,9 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/retry"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/sqlmodel"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -191,6 +193,160 @@ type preparedDMLs struct {
rowCount int
}

// convert2RowChanges is a helper function that convert the row change representation
// of CDC into a general one.
func convert2RowChanges(
row *model.RowChangedEvent,
tableInfo *timodel.TableInfo,
changeType sqlmodel.RowChangeType,
) *sqlmodel.RowChange {
preValues := make([]interface{}, 0, len(row.PreColumns))
for _, col := range row.PreColumns {
if col == nil {
// will not use this value, just append a dummy value
preValues = append(preValues, "omitted value")
continue
}
preValues = append(preValues, col.Value)
}

postValues := make([]interface{}, 0, len(row.Columns))
for _, col := range row.Columns {
if col == nil {
postValues = append(postValues, "omitted value")
continue
}
postValues = append(postValues, col.Value)
}

var res *sqlmodel.RowChange

switch changeType {
case sqlmodel.RowChangeInsert:
res = sqlmodel.NewRowChange(
row.Table,
nil,
nil,
postValues,
tableInfo,
nil, nil)
case sqlmodel.RowChangeUpdate:
res = sqlmodel.NewRowChange(
row.Table,
nil,
preValues,
postValues,
tableInfo,
nil, nil)
case sqlmodel.RowChangeDelete:
res = sqlmodel.NewRowChange(
row.Table,
nil,
preValues,
nil,
tableInfo,
nil, nil)
}
return res
}

func convertBinaryToString(row *model.RowChangedEvent) {
for i, col := range row.Columns {
if col == nil {
continue
}
if col.Charset != "" && col.Charset != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
row.Columns[i].Value = string(colValBytes)
}
}
}
}

// TODO: Find a way to make batch delete dmls more efficient.
func groupRowsByType(
event *eventsink.TxnCallbackableEvent,
tableInfo *timodel.TableInfo,
spiltUpdate bool,
) (insertRows, updateRows, deleteRows []*sqlmodel.RowChange) {
for _, row := range event.Event.Rows {
convertBinaryToString(row)
if row.IsInsert() {
insertRows = append(
insertRows,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert))
} else if row.IsDelete() {
deleteRows = append(
deleteRows,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete))
} else if row.IsUpdate() {
if spiltUpdate {
deleteRows = append(
deleteRows,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete))
insertRows = append(
insertRows,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert))
} else {
updateRows = append(
updateRows,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate))
}
}
}
return
}

func batchSingleTxnDmls(
event *eventsink.TxnCallbackableEvent,
tableInfo *timodel.TableInfo,
translateToInsert bool,
) (sqls []string, values [][]interface{}) {
insertRows, updateRows, deleteRows := groupRowsByType(event, tableInfo, !translateToInsert)

if len(deleteRows) > 0 {
sql, value := sqlmodel.GenDeleteSQL(deleteRows...)
sqls = append(sqls, sql)
values = append(values, value)
}

// handle insert
if len(insertRows) > 0 {
if translateToInsert {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, insertRows...)
sqls = append(sqls, sql)
values = append(values, value)
} else {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, insertRows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}

// handle update
if len(updateRows) > 0 {
// TODO: do a testing on update performance.
sql, value := sqlmodel.GenUpdateSQL(updateRows...)
sqls = append(sqls, sql)
values = append(values, value)
}

return
}

func hasHandleKey(cols []*model.Column) bool {
for _, col := range cols {
if col == nil {
continue
}
if col.Flag.IsHandleKey() {
return true
}
}
return false
}

// prepareDMLs converts model.RowChangedEvent list to query string list and args list
func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
// TODO: use a sync.Pool to reduce allocations.
Expand All @@ -214,33 +370,49 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
// translateToInsert control the update and insert behavior
// we only translate into insert when old value is enabled and safe mode is disabled
translateToInsert := s.cfg.EnableOldValue && !s.cfg.SafeMode
for _, event := range s.events {
for _, row := range event.Event.Rows {
if !translateToInsert {
break
}
// A row can be translated in to INSERT, when it was committed after
// the table it belongs to been replicating by TiCDC, which means it must not be
// replicated before, and there is no such row in downstream MySQL.
translateToInsert = row.CommitTs > row.ReplicatingTs
}
}

rowCount := 0
for _, event := range s.events {
if len(event.Event.Rows) == 0 {
continue
}
rowCount += len(event.Event.Rows)

firstRow := event.Event.Rows[0]
if len(startTs) == 0 || startTs[len(startTs)-1] != firstRow.StartTs {
startTs = append(startTs, firstRow.StartTs)
}

// A row can be translated in to INSERT, when it was committed after
// the table it belongs to been replicating by TiCDC, which means it must not be
// replicated before, and there is no such row in downstream MySQL.
translateToInsert = translateToInsert && firstRow.CommitTs > firstRow.ReplicatingTs

if event.Callback != nil {
callbacks = append(callbacks, event.Callback)
}

for _, row := range event.Event.Rows {
if len(startTs) == 0 || startTs[len(startTs)-1] != row.StartTs {
startTs = append(startTs, row.StartTs)
// Determine whether to use batch dml feature here.
if s.cfg.BatchDMLEnable {
tableColumns := firstRow.Columns
if firstRow.IsDelete() {
tableColumns = firstRow.PreColumns
}
// only use batch dml when the table has a handle key
if hasHandleKey(tableColumns) {
// TODO(dongmen): find a better way to get table info.
tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns)
sql, value := batchSingleTxnDmls(event, tableInfo, translateToInsert)
sqls = append(sqls, sql...)
values = append(values, value...)
continue
}
}

quoteTable := firstRow.Table.QuoteString()
for _, row := range event.Event.Rows {
var query string
var args []interface{}
quoteTable := quotes.QuoteSchema(row.Table.Schema, row.Table.Table)

// If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE.
// NOTICE: Only update events with the old value feature enabled will have both columns and preColumns.
if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 {
Expand All @@ -249,7 +421,6 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
rowCount++
}
continue
}
Expand All @@ -266,7 +437,6 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
rowCount++
}
}

Expand All @@ -286,14 +456,12 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
replaces[query] = make([][]interface{}, 0)
}
replaces[query] = append(replaces[query], args)
rowCount++
}
} else {
query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert)
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
rowCount++
}
}
}
Expand Down
Loading

0 comments on commit 5ab53f5

Please sign in to comment.