From 19cd4d10c46bd851dd2292388adb04966d7b47d0 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 13 May 2021 18:01:14 +0800 Subject: [PATCH 1/7] lightning: fix encode kvs size greater than 4.0g caused pebble panic --- pkg/lightning/backend/kv/sql2kv.go | 8 ++++++++ pkg/lightning/backend/kv/types.go | 3 +++ pkg/lightning/backend/tidb/tidb.go | 4 ++++ pkg/lightning/restore/restore.go | 9 +++++++++ 4 files changed, 24 insertions(+) diff --git a/pkg/lightning/backend/kv/sql2kv.go b/pkg/lightning/backend/kv/sql2kv.go index b3268101e..7e188bb90 100644 --- a/pkg/lightning/backend/kv/sql2kv.go +++ b/pkg/lightning/backend/kv/sql2kv.go @@ -408,6 +408,14 @@ func (kvcodec *tableKVEncoder) Encode( return kvPairs(pairs), nil } +func (kvs kvPairs) Size() uint64 { + size := uint64(0) + for _, kv := range kvs { + size += uint64(len(kv.Key) + len(kv.Val)) + } + return size +} + func (kvs kvPairs) ClassifyAndAppend( data *Rows, dataChecksum *verification.KVChecksum, diff --git a/pkg/lightning/backend/kv/types.go b/pkg/lightning/backend/kv/types.go index 299f4a8cb..4ebf65f90 100644 --- a/pkg/lightning/backend/kv/types.go +++ b/pkg/lightning/backend/kv/types.go @@ -35,6 +35,9 @@ type Row interface { indices *Rows, indexChecksum *verification.KVChecksum, ) + + // Size represents the total kv size of this Row. + Size() uint64 } // Rows represents a collection of encoded rows. diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index e4513b5c2..10818e707 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -94,6 +94,10 @@ func NewTiDBBackend(db *sql.DB, onDuplicate string) backend.Backend { return backend.MakeBackend(&tidbBackend{db: db, onDuplicate: onDuplicate}) } +func (row tidbRow) Size() uint64 { + return uint64(len(row)) +} + func (row tidbRow) ClassifyAndAppend(data *kv.Rows, checksum *verification.KVChecksum, _ *kv.Rows, _ *verification.KVChecksum) { rows := (*data).(tidbRows) // Cannot do `rows := data.(*tidbRows); *rows = append(*rows, row)`. diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 06057ea22..fb02077d2 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2601,6 +2601,7 @@ func (cr *chunkRestore) encodeLoop( canDeliver := false kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt) var newOffset, rowID int64 + var kvSize uint64 outLoop: for !canDeliver { readDurStart := time.Now() @@ -2636,6 +2637,14 @@ func (cr *chunkRestore) encodeLoop( return } kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) + kvSize += kvs.Size() + // pebble cannot allow > 4.0G kv in one batch. + // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. + // so add this check. + if kvSize > minDeliverBytes { + canDeliver = true + kvSize = 0 + } if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { canDeliver = true } From c36436f109636c3429f732317467691e14c6e9c3 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 13 May 2021 18:20:55 +0800 Subject: [PATCH 2/7] fix build --- pkg/lightning/backend/noop/noop.go | 4 +++ pkg/lightning/restore/restore.go | 3 ++ pkg/lightning/restore/restore_test.go | 52 +++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 03f1fb297..930214686 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -154,6 +154,10 @@ func (e noopEncoder) Encode(log.Logger, []types.Datum, int64, []int) (kv.Row, er type noopRow struct{} +func (r noopRow) Size() uint64 { + return 0 +} + func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) { } diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index fb02077d2..613a281ba 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2638,6 +2638,9 @@ func (cr *chunkRestore) encodeLoop( } kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) kvSize += kvs.Size() + failpoint.Inject("mock-kv-size", func(val failpoint.Value) { + kvSize += uint64(val.(int)) + }) // pebble cannot allow > 4.0G kv in one batch. // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. // so add this check. diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 46df2b881..3da0e48d5 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1150,6 +1150,58 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) { c.Assert(kvsCh, HasLen, 0) } +func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { + ctx := context.Background() + kvsCh := make(chan []deliveredKVs, 100) + deliverCompleteCh := make(chan deliverResult) + kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567898, + }) + c.Assert(err, IsNil) + + dir := c.MkDir() + fileName := "db.limit.000.csv" + err = ioutil.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3\r\n4,5,6\r\n7,8,9\r"), 0o644) + c.Assert(err, IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + cfg := config.NewConfig() + + reader, err := store.Open(ctx, fileName) + c.Assert(err, IsNil) + w := worker.NewPool(ctx, 1, "io") + p := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, false) + s.cr.parser = p + + rc := &Controller{pauser: DeliverPauser, cfg: cfg} + c.Assert(failpoint.Enable( + "github.com/pingcap/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil) + _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) + + // we have 3 kvs total. after the failpoint injected. + // we will send one kv each time. + count := 0 + for { + kvs, ok := <-kvsCh + if !ok { + break + } + count += 1 + if count <= 3 { + c.Assert(kvs, HasLen, 1) + } + if count == 4 { + c.Assert(kvs, HasLen, 0) + break + } + } + // we will send empty kvs before encodeLoop exists + // so, we can receive 4 batch and 1 is empty + c.Assert(count, Equals, 4) +} + func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { ctx := context.Background() kvsCh := make(chan []deliveredKVs) From ce419b95e3d83dfb32fb1ba35e87a512175793c5 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 14 May 2021 17:27:02 +0800 Subject: [PATCH 3/7] update test --- pkg/lightning/restore/restore_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 3da0e48d5..c1730654e 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1152,7 +1152,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) { func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { ctx := context.Background() - kvsCh := make(chan []deliveredKVs, 100) + kvsCh := make(chan []deliveredKVs, 4) deliverCompleteCh := make(chan deliverResult) kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{ SQLMode: s.cfg.TiDB.SQLMode, @@ -1193,13 +1193,12 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { c.Assert(kvs, HasLen, 1) } if count == 4 { + // we will send empty kvs before encodeLoop exists + // so, we can receive 4 batch and 1 is empty c.Assert(kvs, HasLen, 0) break } } - // we will send empty kvs before encodeLoop exists - // so, we can receive 4 batch and 1 is empty - c.Assert(count, Equals, 4) } func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { From 023d2a2ecf4111790d96cd5297c541d4819fd0c8 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 17 May 2021 18:08:12 +0800 Subject: [PATCH 4/7] Update pkg/lightning/restore/restore.go Co-authored-by: glorv --- pkg/lightning/restore/restore.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 613a281ba..68471c5ae 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2644,7 +2644,10 @@ func (cr *chunkRestore) encodeLoop( // pebble cannot allow > 4.0G kv in one batch. // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. // so add this check. - if kvSize > minDeliverBytes { + if kvSize >= minDeliverBytes || len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { + canDeliver = true + kvSize = 0 + } canDeliver = true kvSize = 0 } From ec770d49484e72dfca8823c741e3bb22afa31470 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 20 May 2021 11:57:44 +0800 Subject: [PATCH 5/7] Update restore.go --- pkg/lightning/restore/restore.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 68471c5ae..213d05ee9 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2648,9 +2648,7 @@ func (cr *chunkRestore) encodeLoop( canDeliver = true kvSize = 0 } - canDeliver = true - kvSize = 0 - } + if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { canDeliver = true } From 846569b0af5856565d576b6229b16813a23d4693 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 20 May 2021 11:58:47 +0800 Subject: [PATCH 6/7] Update restore.go --- pkg/lightning/restore/restore.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 213d05ee9..5304fc319 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2648,10 +2648,6 @@ func (cr *chunkRestore) encodeLoop( canDeliver = true kvSize = 0 } - - if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { - canDeliver = true - } } encodeTotalDur += encodeDur metric.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds()) From 75b8c035cfde279d2ecefd8b6abbecb26b7fbec0 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 20 May 2021 13:25:32 +0800 Subject: [PATCH 7/7] fix ci --- tests/lightning_checkpoint_parquet/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/lightning_checkpoint_parquet/run.sh b/tests/lightning_checkpoint_parquet/run.sh index 9f1004095..31666bd3b 100755 --- a/tests/lightning_checkpoint_parquet/run.sh +++ b/tests/lightning_checkpoint_parquet/run.sh @@ -41,9 +41,9 @@ set +e run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null set -e run_sql 'SELECT count(*), sum(iVal) FROM `cppq_tsr`.tbl' -check_contains "count(*): 32" -# sum(0..31) -check_contains "sum(iVal): 496" +check_contains "count(*): 1" +# sum(0) +check_contains "sum(iVal): 0" # check chunk offset and update checkpoint current row id to a higher value so that # if parse read from start, the generated rows will be different