Skip to content

Commit

Permalink
lightning: remove the unnecessary offset field in the key encoding …
Browse files Browse the repository at this point in the history
…for duplicate detection (#29975)
  • Loading branch information
sleepymole authored Dec 28, 2021
1 parent b813b37 commit 7688d37
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 365 deletions.
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ func (kvcodec *tableKVEncoder) Encode(
kvPairs := kvcodec.se.takeKvPairs()
for i := 0; i < len(kvPairs.pairs); i++ {
kvPairs.pairs[i].RowID = rowID
kvPairs.pairs[i].Offset = offset
}
kvcodec.recordCache = record[:0]
return kvPairs, nil
Expand Down
72 changes: 30 additions & 42 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
},
}})

Expand All @@ -136,10 +135,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -262,8 +260,7 @@ func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
0x1, 0x0, // not null offsets = [1]
0x7f, // column version = 127 (10000000 clamped to TINYINT)
},
RowID: 1,
Offset: 1234,
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -300,10 +297,9 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
},
}})
}
Expand All @@ -328,16 +324,14 @@ func (s *kvSuite) TestEncodeDoubleAutoIncrement(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
},
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoIncrementType).Base(), Equals, int64(70))
Expand Down Expand Up @@ -372,10 +366,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(70))
Expand All @@ -384,10 +377,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(71))
Expand Down Expand Up @@ -424,24 +416,20 @@ func (s *kvSuite) TestShardRowId(c *C) {
func (s *kvSuite) TestSplitIntoChunks(c *C) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
Offset: 1000,
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
},
{
Key: []byte{7, 8},
Val: []byte{9, 0},
Offset: 2000,
Key: []byte{7, 8},
Val: []byte{9, 0},
},
{
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
Offset: 3000,
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
},
{
Key: []byte{9, 0},
Val: []byte{1, 2},
Offset: 4000,
Key: []byte{9, 0},
Val: []byte{1, 2},
},
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewDuplicateManager(local *local, ts uint64, opts *kv.SessionOptions) (*Dup
regionConcurrency: local.tcpConcurrency,
splitCli: local.splitCli,
tikvCli: local.tikvCli,
keyAdapter: duplicateKeyAdapter{},
keyAdapter: dupDetectKeyAdapter{},
ts: ts,
connPool: common.NewGRPCConns(),
// TODO: not sure what is the correct concurrency value.
Expand Down Expand Up @@ -495,7 +495,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
return err
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
indexLogger.Error(
"[detect-dupe] decode key error when query handle for duplicate index",
Expand Down
75 changes: 49 additions & 26 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
pkgkv "github.com/pingcap/tidb/br/pkg/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -423,7 +424,7 @@ func (e *Engine) getSizeProperties() (*sizeProperties, error) {
newRangeProps := make(rangeProperties, 0, len(rangeProps))
for _, p := range rangeProps {
if !bytes.Equal(p.Key, engineMetaKey) {
p.Key, _, _, err = e.keyAdapter.Decode(nil, p.Key)
p.Key, err = e.keyAdapter.Decode(nil, p.Key)
if err != nil {
log.L().Warn(
"decodeRangeProperties failed because the props key is invalid",
Expand Down Expand Up @@ -965,6 +966,22 @@ func (e *Engine) unfinishedRanges(ranges []Range) []Range {
return filterOverlapRange(ranges, e.finishedRanges.ranges)
}

func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) pkgkv.Iter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
newOpts.LowerBound = normalIterStartKey
opts = &newOpts
}
if !e.duplicateDetection {
return pebbleIter{Iterator: e.db.NewIter(opts)}
}
logger := log.With(
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
}

type sstMeta struct {
path string
minKey []byte
Expand Down Expand Up @@ -992,11 +1009,10 @@ type Writer struct {
// if the kvs in writeBatch are in order, we can avoid doing a `sort.Slice` which
// is quite slow. in our bench, the sort operation eats about 5% of total CPU
isWriteBatchSorted bool
sortedKeyBuf []byte

batchCount int
batchSize int64
totalSize int64
totalCount int64

lastMetaSeq int32
}
Expand All @@ -1008,25 +1024,32 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
return errors.Trace(err)
}
w.writer = writer
w.writer.minKey = append([]byte{}, kvs[0].Key...)
}

totalKeyLen := 0
keyAdapter := w.engine.keyAdapter
totalKeySize := 0
for i := 0; i < len(kvs); i++ {
totalKeyLen += w.engine.keyAdapter.EncodedLen(kvs[i].Key)
}
buf := make([]byte, totalKeyLen)
encodedKvs := make([]common.KvPair, len(kvs))
for i := 0; i < len(kvs); i++ {
encodedKey := w.engine.keyAdapter.Encode(buf, kvs[i].Key, kvs[i].RowID, kvs[i].Offset)
buf = buf[len(encodedKey):]
encodedKvs[i] = common.KvPair{Key: encodedKey, Val: kvs[i].Val}
w.batchSize += int64(len(encodedKvs[i].Key) + len(encodedKvs[i].Val))
keySize := keyAdapter.EncodedLen(kvs[i].Key)
w.batchSize += int64(keySize + len(kvs[i].Val))
totalKeySize += keySize
}
w.batchCount += len(kvs)
// noopKeyAdapter doesn't really change the key,
// skipping the encoding to avoid unnecessary alloc and copy.
if _, ok := keyAdapter.(noopKeyAdapter); !ok {
if cap(w.sortedKeyBuf) < totalKeySize {
w.sortedKeyBuf = make([]byte, totalKeySize)
}
buf := w.sortedKeyBuf[:0]
newKvs := make([]common.KvPair, len(kvs))
for i := 0; i < len(kvs); i++ {
buf = keyAdapter.Encode(buf, kvs[i].Key, kvs[i].RowID)
newKvs[i] = common.KvPair{Key: buf, Val: kvs[i].Val}
buf = buf[len(buf):]
}
kvs = newKvs
}

w.batchCount += len(encodedKvs)
w.totalCount += int64(len(encodedKvs))
return w.writer.writeKVs(encodedKvs)
return w.writer.writeKVs(kvs)
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand All @@ -1036,14 +1059,15 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
if cnt > 0 {
lastKey = w.writeBatch[cnt-1].Key
}
keyAdapter := w.engine.keyAdapter
for _, pair := range kvs {
if w.isWriteBatchSorted && bytes.Compare(lastKey, pair.Key) > 0 {
w.isWriteBatchSorted = false
}
lastKey = pair.Key
w.batchSize += int64(len(pair.Key) + len(pair.Val))
buf := w.kvBuffer.AllocBytes(w.engine.keyAdapter.EncodedLen(pair.Key))
key := w.engine.keyAdapter.Encode(buf, pair.Key, pair.RowID, pair.Offset)
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
w.writeBatch[cnt].Key = key
Expand All @@ -1060,7 +1084,6 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
return err
}
}
w.totalCount += int64(len(kvs))
return nil
}

Expand Down Expand Up @@ -1099,7 +1122,6 @@ func (w *Writer) flush(ctx context.Context) error {
return nil
}

w.totalSize += w.batchSize
if len(w.writeBatch) > 0 {
if err := w.flushKVs(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1157,7 +1179,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
w.isWriteBatchSorted = true
}

writer.minKey = append(writer.minKey[:0], w.writeBatch[0].Key...)
err = writer.writeKVs(w.writeBatch[:w.batchCount])
if err != nil {
return errors.Trace(err)
Expand All @@ -1171,7 +1192,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
return errors.Trace(err)
}

w.totalSize += w.batchSize
w.batchSize = 0
w.batchCount = 0
w.kvBuffer.Reset()
Expand Down Expand Up @@ -1222,7 +1242,9 @@ func (sw *sstWriter) writeKVs(kvs []common.KvPair) error {
if len(kvs) == 0 {
return nil
}

if len(sw.minKey) == 0 {
sw.minKey = append([]byte{}, kvs[0].Key...)
}
if bytes.Compare(kvs[0].Key, sw.maxKey) <= 0 {
return errorUnorderedSSTInsertion
}
Expand All @@ -1241,9 +1263,10 @@ func (sw *sstWriter) writeKVs(kvs []common.KvPair) error {
return errors.Trace(err)
}
sw.totalSize += int64(len(p.Key)) + int64(len(p.Val))
lastKey = p.Key
}
sw.totalCount += int64(len(kvs))
sw.maxKey = append(sw.maxKey[:0], kvs[len(kvs)-1].Key...)
sw.maxKey = append(sw.maxKey[:0], lastKey...)
return nil
}

Expand Down
Loading

0 comments on commit 7688d37

Please sign in to comment.