Skip to content

Commit 2d57455

Browse files
authored
lightning: allow configure the desired size and number of rows of each INSERT statement for logical mode (pingcap#46997)
close pingcap#46607
1 parent f47c310 commit 2d57455

File tree

13 files changed

+125
-126
lines changed

13 files changed

+125
-126
lines changed

br/pkg/lightning/backend/encode/encode.go

-5
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,6 @@ type SessionOptions struct {
6363

6464
// Rows represents a collection of encoded rows.
6565
type Rows interface {
66-
// SplitIntoChunks splits the rows into multiple consecutive parts, each
67-
// part having total byte size less than `splitSize`. The meaning of "byte
68-
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
69-
SplitIntoChunks(splitSize int) []Rows
70-
7166
// Clear returns a new collection with empty content. It may share the
7267
// capacity with the current instance. The typical usage is `x = x.Clear()`.
7368
Clear() Rows

br/pkg/lightning/backend/kv/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ go_test(
5656
embed = [":kv"],
5757
flaky = True,
5858
race = "on",
59-
shard_count = 19,
59+
shard_count = 18,
6060
deps = [
6161
"//br/pkg/lightning/backend/encode",
6262
"//br/pkg/lightning/common",

br/pkg/lightning/backend/kv/sql2kv.go

-31
Original file line numberDiff line numberDiff line change
@@ -326,37 +326,6 @@ func (kvs *Pairs) ClassifyAndAppend(
326326
*indices = indexKVs
327327
}
328328

329-
// SplitIntoChunks splits the key-value pairs into chunks.
330-
func (kvs *Pairs) SplitIntoChunks(splitSize int) []encode.Rows {
331-
if len(kvs.Pairs) == 0 {
332-
return nil
333-
}
334-
335-
res := make([]encode.Rows, 0, 1)
336-
i := 0
337-
cumSize := 0
338-
for j, pair := range kvs.Pairs {
339-
size := len(pair.Key) + len(pair.Val)
340-
if i < j && cumSize+size > splitSize {
341-
res = append(res, &Pairs{Pairs: kvs.Pairs[i:j]})
342-
i = j
343-
cumSize = 0
344-
}
345-
cumSize += size
346-
}
347-
348-
if i == 0 {
349-
res = append(res, kvs)
350-
} else {
351-
res = append(res, &Pairs{
352-
Pairs: kvs.Pairs[i:],
353-
BytesBuf: kvs.BytesBuf,
354-
MemBuf: kvs.MemBuf,
355-
})
356-
}
357-
return res
358-
}
359-
360329
// Clear clears the key-value pairs.
361330
func (kvs *Pairs) Clear() encode.Rows {
362331
if kvs.BytesBuf != nil {

br/pkg/lightning/backend/kv/sql2kv_test.go

-47
Original file line numberDiff line numberDiff line change
@@ -569,53 +569,6 @@ func TestShardRowId(t *testing.T) {
569569
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder).GetSessionVars()).Get(autoid.RowIDAllocType).Base(), int64(32))
570570
}
571571

572-
func TestSplitIntoChunks(t *testing.T) {
573-
pairs := []common.KvPair{
574-
{
575-
Key: []byte{1, 2, 3},
576-
Val: []byte{4, 5, 6},
577-
},
578-
{
579-
Key: []byte{7, 8},
580-
Val: []byte{9, 0},
581-
},
582-
{
583-
Key: []byte{1, 2, 3, 4},
584-
Val: []byte{5, 6, 7, 8},
585-
},
586-
{
587-
Key: []byte{9, 0},
588-
Val: []byte{1, 2},
589-
},
590-
}
591-
592-
splitBy10 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(10)
593-
require.Equal(t, splitBy10, []encode.Rows{
594-
lkv.MakeRowsFromKvPairs(pairs[0:2]),
595-
lkv.MakeRowsFromKvPairs(pairs[2:3]),
596-
lkv.MakeRowsFromKvPairs(pairs[3:4]),
597-
})
598-
599-
splitBy12 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(12)
600-
require.Equal(t, splitBy12, []encode.Rows{
601-
lkv.MakeRowsFromKvPairs(pairs[0:2]),
602-
lkv.MakeRowsFromKvPairs(pairs[2:4]),
603-
})
604-
605-
splitBy1000 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1000)
606-
require.Equal(t, splitBy1000, []encode.Rows{
607-
lkv.MakeRowsFromKvPairs(pairs[0:4]),
608-
})
609-
610-
splitBy1 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1)
611-
require.Equal(t, splitBy1, []encode.Rows{
612-
lkv.MakeRowsFromKvPairs(pairs[0:1]),
613-
lkv.MakeRowsFromKvPairs(pairs[1:2]),
614-
lkv.MakeRowsFromKvPairs(pairs[2:3]),
615-
lkv.MakeRowsFromKvPairs(pairs[3:4]),
616-
})
617-
}
618-
619572
func TestClassifyAndAppend(t *testing.T) {
620573
kvs := lkv.MakeRowFromKvPairs([]common.KvPair{
621574
{

br/pkg/lightning/backend/tidb/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ go_test(
3737
timeout = "short",
3838
srcs = ["tidb_test.go"],
3939
flaky = True,
40-
shard_count = 14,
40+
shard_count = 15,
4141
deps = [
4242
":tidb",
4343
"//br/pkg/lightning/backend",

br/pkg/lightning/backend/tidb/tidb.go

+19-21
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ type tidbBackend struct {
272272
// view should be the same.
273273
onDuplicate string
274274
errorMgr *errormanager.ErrorManager
275+
// maxChunkSize and maxChunkRows are the target size and number of rows of each INSERT SQL
276+
// statement to be sent to downstream. Sometimes we want to reduce the txn size to avoid
277+
// affecting the cluster too much.
278+
maxChunkSize uint64
279+
maxChunkRows int
275280
}
276281

277282
var _ backend.Backend = (*tidbBackend)(nil)
@@ -283,9 +288,10 @@ var _ backend.Backend = (*tidbBackend)(nil)
283288
func NewTiDBBackend(
284289
ctx context.Context,
285290
db *sql.DB,
286-
conflict config.Conflict,
291+
cfg *config.Config,
287292
errorMgr *errormanager.ErrorManager,
288293
) backend.Backend {
294+
conflict := cfg.Conflict
289295
var onDuplicate string
290296
switch conflict.Strategy {
291297
case config.ErrorOnDup:
@@ -305,10 +311,12 @@ func NewTiDBBackend(
305311
onDuplicate = config.ErrorOnDup
306312
}
307313
return &tidbBackend{
308-
db: db,
309-
conflictCfg: conflict,
310-
onDuplicate: onDuplicate,
311-
errorMgr: errorMgr,
314+
db: db,
315+
conflictCfg: conflict,
316+
onDuplicate: onDuplicate,
317+
errorMgr: errorMgr,
318+
maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize),
319+
maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows,
312320
}
313321
}
314322

@@ -329,18 +337,17 @@ func (row tidbRow) ClassifyAndAppend(data *encode.Rows, checksum *verification.K
329337
checksum.Add(&cs)
330338
}
331339

332-
func (rows tidbRows) SplitIntoChunks(splitSizeInt int) []encode.Rows {
340+
func (rows tidbRows) splitIntoChunks(splitSize uint64, splitRows int) []tidbRows {
333341
if len(rows) == 0 {
334342
return nil
335343
}
336344

337-
res := make([]encode.Rows, 0, 1)
345+
res := make([]tidbRows, 0, 1)
338346
i := 0
339347
cumSize := uint64(0)
340-
splitSize := uint64(splitSizeInt)
341348

342349
for j, row := range rows {
343-
if i < j && cumSize+row.Size() > splitSize {
350+
if i < j && (cumSize+row.Size() > splitSize || j-i >= splitRows) {
344351
res = append(res, rows[i:j])
345352
i = j
346353
cumSize = 0
@@ -581,13 +588,6 @@ func (*tidbBackend) RetryImportDelay() time.Duration {
581588
return 0
582589
}
583590

584-
func (*tidbBackend) MaxChunkSize() int {
585-
failpoint.Inject("FailIfImportedSomeRows", func() {
586-
failpoint.Return(1)
587-
})
588-
return 1048576
589-
}
590-
591591
func (*tidbBackend) ShouldPostProcess() bool {
592592
return true
593593
}
@@ -611,7 +611,7 @@ func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error
611611
func (be *tidbBackend) WriteRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error {
612612
var err error
613613
rowLoop:
614-
for _, r := range rows.SplitIntoChunks(be.MaxChunkSize()) {
614+
for _, r := range rows.(tidbRows).splitIntoChunks(be.maxChunkSize, be.maxChunkRows) {
615615
for i := 0; i < writeRowsMaxRetryTimes; i++ {
616616
// Write in the batch mode first.
617617
err = be.WriteBatchRowsToDB(ctx, tableName, columnNames, r)
@@ -648,8 +648,7 @@ type stmtTask struct {
648648
// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
649649
//
650650
// insert into t1 values (111), (222), (333), (444);
651-
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
652-
rows := r.(tidbRows)
651+
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
653652
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
654653
if insertStmt == nil {
655654
return nil
@@ -682,8 +681,7 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
682681
// insert into t1 values (444);
683682
//
684683
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
685-
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
686-
rows := r.(tidbRows)
684+
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
687685
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
688686
if insertStmt == nil {
689687
return nil

br/pkg/lightning/backend/tidb/tidb_test.go

+71-19
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
7373
cfg.Conflict.Strategy = config.ReplaceOnDup
7474
cfg.Conflict.Threshold = math.MaxInt64
7575
cfg.Conflict.MaxRecordRows = 100
76-
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()))
76+
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg, errormanager.New(nil, cfg, log.L()))
7777
return &mysqlSuite{
7878
dbHandle: db,
7979
mockDB: mock,
@@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
166166
cfg.Conflict.Strategy = config.IgnoreOnDup
167167
cfg.Conflict.Threshold = math.MaxInt64
168168
cfg.Conflict.MaxRecordRows = 0
169-
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
169+
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
170170
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
171171
require.NoError(t, err)
172172

@@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
193193
// test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup
194194

195195
cfg.Conflict.MaxRecordRows = 10
196-
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
196+
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
197197
engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
198198
require.NoError(t, err)
199199

@@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
246246
cfg.Conflict.Strategy = config.ErrorOnDup
247247
cfg.Conflict.Threshold = math.MaxInt64
248248
cfg.Conflict.MaxRecordRows = 0
249-
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
249+
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
250250
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
251251
require.NoError(t, err)
252252

@@ -536,9 +536,7 @@ func TestWriteRowsErrorNoRetry(t *testing.T) {
536536
cfg.Conflict.Strategy = config.ErrorOnDup
537537
cfg.Conflict.Threshold = 0
538538
cfg.Conflict.MaxRecordRows = 0
539-
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
540-
errormanager.New(s.dbHandle, cfg, log.L()),
541-
)
539+
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
542540
encBuilder := tidb.NewEncodingBuilder()
543541
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
544542
ctx := context.Background()
@@ -602,9 +600,7 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
602600
cfg.Conflict.MaxRecordRows = 10
603601
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
604602
cfg.App.MaxError.Type = *atomic.NewInt64(10)
605-
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
606-
errormanager.New(s.dbHandle, cfg, log.L()),
607-
)
603+
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
608604
encBuilder := tidb.NewEncodingBuilder()
609605
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
610606
ctx := context.Background()
@@ -657,9 +653,7 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
657653
cfg.Conflict.MaxRecordRows = 10
658654
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
659655
cfg.App.MaxError.Type = *atomic.NewInt64(3)
660-
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
661-
errormanager.New(s.dbHandle, cfg, log.L()),
662-
)
656+
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
663657
encBuilder := tidb.NewEncodingBuilder()
664658
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
665659
ctx := context.Background()
@@ -699,9 +693,7 @@ func TestWriteRowsRecordOneError(t *testing.T) {
699693
cfg.Conflict.Threshold = 0
700694
cfg.Conflict.MaxRecordRows = 0
701695
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
702-
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
703-
errormanager.New(s.dbHandle, cfg, log.L()),
704-
)
696+
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
705697
encBuilder := tidb.NewEncodingBuilder()
706698
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
707699
ctx := context.Background()
@@ -728,9 +720,7 @@ func TestDuplicateThreshold(t *testing.T) {
728720
cfg.Conflict.Strategy = config.IgnoreOnDup
729721
cfg.Conflict.Threshold = 5
730722
cfg.Conflict.MaxRecordRows = 0
731-
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
732-
errormanager.New(s.dbHandle, cfg, log.L()),
733-
)
723+
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
734724
encBuilder := tidb.NewEncodingBuilder()
735725
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
736726
ctx := context.Background()
@@ -851,3 +841,65 @@ func TestEncodeRowForRecord(t *testing.T) {
851841
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1})
852842
require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)")
853843
}
844+
845+
// TestLogicalImportBatch tests that each INSERT statement is limited by both
846+
// logical-import-batch-size and logical-import-batch-rows configurations. Here
847+
// we ensure each INSERT statement has up to 5 rows *and* ~30 bytes of values.
848+
func TestLogicalImportBatch(t *testing.T) {
849+
s := createMysqlSuite(t)
850+
defer s.TearDownTest(t)
851+
852+
s.mockDB.
853+
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(4),(8),(16)\\E").
854+
WillReturnResult(sqlmock.NewResult(5, 5))
855+
s.mockDB.
856+
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(32),(64),(128),(256),(512)\\E").
857+
WillReturnResult(sqlmock.NewResult(5, 5))
858+
s.mockDB.
859+
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1024),(2048),(4096),(8192)\\E").
860+
WillReturnResult(sqlmock.NewResult(4, 4))
861+
s.mockDB.
862+
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(16384),(32768),(65536),(131072)\\E").
863+
WillReturnResult(sqlmock.NewResult(4, 4))
864+
s.mockDB.
865+
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(262144)\\E").
866+
WillReturnResult(sqlmock.NewResult(1, 1))
867+
868+
ctx := context.Background()
869+
logger := log.L()
870+
871+
cfg := config.NewConfig()
872+
cfg.Conflict.Strategy = config.ErrorOnDup
873+
cfg.TikvImporter.LogicalImportBatchSize = 30
874+
cfg.TikvImporter.LogicalImportBatchRows = 5
875+
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
876+
encBuilder := tidb.NewEncodingBuilder()
877+
encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{
878+
Path: "1.csv",
879+
Table: s.tbl,
880+
Logger: log.L(),
881+
})
882+
require.NoError(t, err)
883+
884+
dataRows := encBuilder.MakeEmptyRows()
885+
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
886+
indexRows := encBuilder.MakeEmptyRows()
887+
indexChecksum := verification.MakeKVChecksum(0, 0, 0)
888+
for i := int64(0); i < 19; i++ { // encode rows 1, 2, 4, 8, ..., 262144.
889+
row, err := encoder.Encode(
890+
[]types.Datum{types.NewIntDatum(1 << i)},
891+
i,
892+
[]int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1},
893+
8*i,
894+
)
895+
require.NoError(t, err)
896+
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
897+
}
898+
899+
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
900+
require.NoError(t, err)
901+
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"})
902+
require.NoError(t, err)
903+
err = writer.AppendRows(ctx, []string{"a"}, dataRows)
904+
require.NoError(t, err)
905+
}

0 commit comments

Comments
 (0)