Skip to content

Commit

Permalink
sinkv(ticdc): add max-multi-update-row config in mysql sink (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Apr 3, 2023
1 parent a22f4b7 commit 12bc280
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 17 deletions.
25 changes: 24 additions & 1 deletion cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ func convert2RowChanges(
tableInfo,
nil, nil)
}
res.SetApproximateDataSize(row.ApproximateDataSize)
return res
}

Expand Down Expand Up @@ -876,7 +877,7 @@ func (s *mysqlSink) groupRowsByType(
updateRow = append(
updateRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate))
if len(updateRow) >= s.params.maxTxnRow {
if len(updateRow) >= s.params.batchUpdateRowCount {
updateRows = append(updateRows, updateRow)
updateRow = make([]*sqlmodel.RowChange, 0, s.params.maxTxnRow)
}
Expand Down Expand Up @@ -1128,6 +1129,28 @@ func (s *mysqlSink) execDMLs(ctx context.Context, txns []*model.SingleTableTxn,
return nil
}

func (s *mysqlSink) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interface{}) {
size, count := 0, 0
for _, r := range rows {
size += int(r.GetApproximateDataSize())
count++
}
if size < defaultMaxBatchUpdateRowSize*count {
// use batch update
sql, value := sqlmodel.GenUpdateSQLFast(rows...)
return []string{sql}, [][]interface{}{value}
}
// each row has one independent update SQL.
sqls := make([]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
}
return sqls, values
}

// if the column value type is []byte and charset is not binary, we get its string
// representation. Because if we use the byte array respresentation, the go-sql-driver
// will automatically set `_binary` charset for that column, which is not expected.
Expand Down
28 changes: 28 additions & 0 deletions cdc/sink/mysql/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ const (
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"
defaultBatchDMLEnable = true
// defaultMaxBatchUpdateRowCount is the default max number of rows in a
// single batch update SQL.
defaultMaxBatchUpdateRowCount = 40
maxMaxBatchUpdateRowCount = 1024
// defaultMaxBatchUpdateRowSize(1KB) defines the default value of single row.
// When the average row size is larger defaultMaxBatchUpdateRowSize,
// disable batch update, otherwise enable batch update.
defaultMaxBatchUpdateRowSize = 1024
)

var (
Expand All @@ -72,6 +80,7 @@ var defaultParams = &sinkParams{
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}

var validSchemes = map[string]bool{
Expand All @@ -97,6 +106,7 @@ type sinkParams struct {
timezone string
tls string
batchDMLEnable bool
batchUpdateRowCount int
}

func (s *sinkParams) Clone() *sinkParams {
Expand Down Expand Up @@ -266,6 +276,24 @@ func parseSinkURIToParams(ctx context.Context,
params.batchDMLEnable = enable
}

s = sinkURI.Query().Get("max-multi-update-row")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid max-multi-update-row %d, which must be greater than 0", c))
}
if c > maxMaxBatchUpdateRowCount {
log.Warn("max-multi-update-row too large",
zap.Int("original", c), zap.Int("override", maxMaxBatchUpdateRowCount))
c = maxMaxBatchUpdateRowCount
}
params.batchUpdateRowCount = c
}

return params, nil
}

Expand Down
43 changes: 27 additions & 16 deletions cdc/sink/mysql/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestSinkParamsClone(t *testing.T) {
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}, param1)
require.Equal(t, &sinkParams{
changefeedID: model.DefaultChangeFeedID("123"),
Expand All @@ -58,6 +59,7 @@ func TestSinkParamsClone(t *testing.T) {
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}, param2)
}

Expand Down Expand Up @@ -211,9 +213,11 @@ func TestParseSinkURIToParams(t *testing.T) {
expected.changefeedID = model.DefaultChangeFeedID("cf-id")
expected.captureAddr = "127.0.0.1:8300"
expected.tidbTxnMode = "pessimistic"
expected.batchUpdateRowCount = 80
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" +
"&tidb-txn-mode=pessimistic"
"&tidb-txn-mode=pessimistic" +
"&max-multi-update-row=80"
opts := map[string]string{
metrics.OptCaptureAddr: expected.captureAddr,
}
Expand Down Expand Up @@ -256,22 +260,29 @@ func TestParseSinkURIOverride(t *testing.T) {
cases := []struct {
uri string
checker func(*sinkParams)
}{{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
}{
{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-multi-update-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.batchUpdateRowCount, maxMaxBatchUpdateRowCount)
},
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}}
}
ctx := context.TODO()
var uri *url.URL
var err error
Expand Down
13 changes: 13 additions & 0 deletions pkg/sqlmodel/row_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type RowChange struct {

tp RowChangeType
whereHandle *WhereHandle

approximateDataSize int64
}

// NewRowChange creates a new RowChange.
Expand Down Expand Up @@ -196,6 +198,17 @@ func (r *RowChange) SetWhereHandle(whereHandle *WhereHandle) {
r.whereHandle = whereHandle
}

// GetApproximateDataSize returns internal approximateDataSize, it could be zero
// if this value is not set.
func (r *RowChange) GetApproximateDataSize() int64 {
return r.approximateDataSize
}

// SetApproximateDataSize sets the approximate size of row change.
func (r *RowChange) SetApproximateDataSize(approximateDataSize int64) {
r.approximateDataSize = approximateDataSize
}

func (r *RowChange) lazyInitWhereHandle() {
if r.whereHandle != nil {
return
Expand Down

0 comments on commit 12bc280

Please sign in to comment.