Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix encode kvs size greater than 4.0g caused pebble panic
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed May 13, 2021
1 parent 7c98ff0 commit 19cd4d1
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs(pairs), nil
}

func (kvs kvPairs) Size() uint64 {
size := uint64(0)
for _, kv := range kvs {
size += uint64(len(kv.Key) + len(kv.Val))
}
return size
}

func (kvs kvPairs) ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
Expand Down
3 changes: 3 additions & 0 deletions pkg/lightning/backend/kv/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Row interface {
indices *Rows,
indexChecksum *verification.KVChecksum,
)

// Size represents the total kv size of this Row.
Size() uint64
}

// Rows represents a collection of encoded rows.
Expand Down
4 changes: 4 additions & 0 deletions pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func NewTiDBBackend(db *sql.DB, onDuplicate string) backend.Backend {
return backend.MakeBackend(&tidbBackend{db: db, onDuplicate: onDuplicate})
}

func (row tidbRow) Size() uint64 {
return uint64(len(row))
}

func (row tidbRow) ClassifyAndAppend(data *kv.Rows, checksum *verification.KVChecksum, _ *kv.Rows, _ *verification.KVChecksum) {
rows := (*data).(tidbRows)
// Cannot do `rows := data.(*tidbRows); *rows = append(*rows, row)`.
Expand Down
9 changes: 9 additions & 0 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,7 @@ func (cr *chunkRestore) encodeLoop(
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
var newOffset, rowID int64
var kvSize uint64
outLoop:
for !canDeliver {
readDurStart := time.Now()
Expand Down Expand Up @@ -2636,6 +2637,14 @@ func (cr *chunkRestore) encodeLoop(
return
}
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
// pebble cannot allow > 4.0G kv in one batch.
// we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt.
// so add this check.
if kvSize > minDeliverBytes {
canDeliver = true
kvSize = 0
}
if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset {
canDeliver = true
}
Expand Down

0 comments on commit 19cd4d1

Please sign in to comment.