diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index fe3cfa189d067..acb4e01979268 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -19,7 +19,9 @@ go_library( "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", "//br/pkg/lightning/common", + "//br/pkg/lightning/log", "//br/pkg/membuf", "//br/pkg/storage", "//kv", @@ -27,6 +29,7 @@ go_library( "//util/logutil", "//util/mathutil", "//util/size", + "@com_github_cockroachdb_pebble//:pebble", "@com_github_pingcap_errors//:errors", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", @@ -47,11 +50,14 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 19, + shard_count = 20, deps = [ "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", "//br/pkg/lightning/common", "//br/pkg/storage", + "//util/codec", + "@com_github_cockroachdb_pebble//:pebble", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@org_golang_x_exp//rand", diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 357978f237601..4bf84530e8807 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -15,10 +15,15 @@ package external import ( + "bytes" "context" "encoding/hex" + "sort" + "github.com/cockroachdb/pebble" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" @@ -58,3 +63,200 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte } return iter, nil } + +// MemoryIngestData is the in-memory implementation of IngestData. +type MemoryIngestData struct { + keyAdapter local.KeyAdapter + duplicateDetection bool + duplicateDB *pebble.DB + dupDetectOpt local.DupDetectOpt + + keys [][]byte + values [][]byte + ts uint64 +} + +var _ local.IngestData = (*MemoryIngestData)(nil) + +func (m *MemoryIngestData) firstAndLastKeyIndex(lowerBound, upperBound []byte) (int, int) { + firstKeyIdx := 0 + if len(lowerBound) > 0 { + lowerBound = m.keyAdapter.Encode(nil, lowerBound, local.MinRowID) + firstKeyIdx = sort.Search(len(m.keys), func(i int) bool { + return bytes.Compare(lowerBound, m.keys[i]) <= 0 + }) + if firstKeyIdx == len(m.keys) { + return -1, -1 + } + } + + lastKeyIdx := len(m.keys) - 1 + if len(upperBound) > 0 { + upperBound = m.keyAdapter.Encode(nil, upperBound, local.MinRowID) + i := sort.Search(len(m.keys), func(i int) bool { + reverseIdx := len(m.keys) - 1 - i + return bytes.Compare(upperBound, m.keys[reverseIdx]) > 0 + }) + if i == len(m.keys) { + // should not happen + return -1, -1 + } + lastKeyIdx = len(m.keys) - 1 - i + } + return firstKeyIdx, lastKeyIdx +} + +// GetFirstAndLastKey implements IngestData.GetFirstAndLastKey. +func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { + firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound) + if firstKeyIdx < 0 || firstKeyIdx > lastKeyIdx { + return nil, nil, nil + } + firstKey, err := m.keyAdapter.Decode(nil, m.keys[firstKeyIdx]) + if err != nil { + return nil, nil, err + } + lastKey, err := m.keyAdapter.Decode(nil, m.keys[lastKeyIdx]) + if err != nil { + return nil, nil, err + } + return firstKey, lastKey, nil +} + +type memoryDataIter struct { + keys [][]byte + values [][]byte + + firstKeyIdx int + lastKeyIdx int + curIdx int +} + +// First implements ForwardIter. +func (m *memoryDataIter) First() bool { + if m.firstKeyIdx < 0 { + return false + } + m.curIdx = m.firstKeyIdx + return true +} + +// Valid implements ForwardIter. +func (m *memoryDataIter) Valid() bool { + return m.firstKeyIdx <= m.curIdx && m.curIdx <= m.lastKeyIdx +} + +// Next implements ForwardIter. +func (m *memoryDataIter) Next() bool { + m.curIdx++ + return m.Valid() +} + +// Key implements ForwardIter. +func (m *memoryDataIter) Key() []byte { + return m.keys[m.curIdx] +} + +// Value implements ForwardIter. +func (m *memoryDataIter) Value() []byte { + return m.values[m.curIdx] +} + +// Close implements ForwardIter. +func (m *memoryDataIter) Close() error { + return nil +} + +// Error implements ForwardIter. +func (m *memoryDataIter) Error() error { + return nil +} + +type memoryDataDupDetectIter struct { + iter *memoryDataIter + dupDetector *local.DupDetector + err error + curKey, curVal []byte +} + +// First implements ForwardIter. +func (m *memoryDataDupDetectIter) First() bool { + if m.err != nil || !m.iter.First() { + return false + } + m.curKey, m.curVal, m.err = m.dupDetector.Init(m.iter) + return m.Valid() +} + +// Valid implements ForwardIter. +func (m *memoryDataDupDetectIter) Valid() bool { + return m.err == nil && m.iter.Valid() +} + +// Next implements ForwardIter. +func (m *memoryDataDupDetectIter) Next() bool { + if m.err != nil { + return false + } + key, val, ok, err := m.dupDetector.Next(m.iter) + if err != nil { + m.err = err + return false + } + if !ok { + return false + } + m.curKey, m.curVal = key, val + return true +} + +// Key implements ForwardIter. +func (m *memoryDataDupDetectIter) Key() []byte { + return m.curKey +} + +// Value implements ForwardIter. +func (m *memoryDataDupDetectIter) Value() []byte { + return m.curVal +} + +// Close implements ForwardIter. +func (m *memoryDataDupDetectIter) Close() error { + return m.dupDetector.Close() +} + +// Error implements ForwardIter. +func (m *memoryDataDupDetectIter) Error() error { + return m.err +} + +// NewIter implements IngestData.NewIter. +func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) local.ForwardIter { + firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound) + iter := &memoryDataIter{ + keys: m.keys, + values: m.values, + firstKeyIdx: firstKeyIdx, + lastKeyIdx: lastKeyIdx, + } + if !m.duplicateDetection { + return iter + } + logger := log.FromContext(ctx) + detector := local.NewDupDetector(m.keyAdapter, m.duplicateDB.NewBatch(), logger, m.dupDetectOpt) + return &memoryDataDupDetectIter{ + iter: iter, + dupDetector: detector, + } +} + +// GetTS implements IngestData.GetTS. +func (m *MemoryIngestData) GetTS() uint64 { + return m.ts +} + +// Finish implements IngestData.Finish. +func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) { + //TODO implement me + panic("implement me") +} diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go index b27cb8770d7c8..5587665f60269 100644 --- a/br/pkg/lightning/backend/external/engine_test.go +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -17,13 +17,17 @@ package external import ( "bytes" "context" + "path" "slices" "testing" "time" + "github.com/cockroachdb/pebble" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/util/codec" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -107,3 +111,168 @@ func TestIter(t *testing.T) { // the first key must be less than or equal to startKey require.True(t, bytes.Compare(got[0].Key, startKey) <= 0) } + +func testGetFirstAndLastKey( + t *testing.T, + data local.IngestData, + lowerBound, upperBound []byte, + expectedFirstKey, expectedLastKey []byte, +) { + firstKey, lastKey, err := data.GetFirstAndLastKey(lowerBound, upperBound) + require.NoError(t, err) + require.Equal(t, expectedFirstKey, firstKey) + require.Equal(t, expectedLastKey, lastKey) +} + +func testNewIter( + t *testing.T, + data local.IngestData, + lowerBound, upperBound []byte, + expectedKeys, expectedValues [][]byte, +) { + ctx := context.Background() + iter := data.NewIter(ctx, lowerBound, upperBound) + var ( + keys, values [][]byte + ) + for iter.First(); iter.Valid(); iter.Next() { + require.NoError(t, iter.Error()) + key := make([]byte, len(iter.Key())) + copy(key, iter.Key()) + keys = append(keys, key) + value := make([]byte, len(iter.Value())) + copy(value, iter.Value()) + values = append(values, value) + } + require.NoError(t, iter.Error()) + require.NoError(t, iter.Close()) + require.Equal(t, expectedKeys, keys) + require.Equal(t, expectedValues, values) +} + +func checkDupDB(t *testing.T, db *pebble.DB, expectedKeys, expectedValues [][]byte) { + iter := db.NewIter(nil) + var ( + gotKeys, gotValues [][]byte + ) + for iter.First(); iter.Valid(); iter.Next() { + key := make([]byte, len(iter.Key())) + copy(key, iter.Key()) + gotKeys = append(gotKeys, key) + value := make([]byte, len(iter.Value())) + copy(value, iter.Value()) + gotValues = append(gotValues, value) + } + require.NoError(t, iter.Close()) + require.Equal(t, expectedKeys, gotKeys) + require.Equal(t, expectedValues, gotValues) + err := db.DeleteRange([]byte{0}, []byte{255}, nil) + require.NoError(t, err) +} + +func TestMemoryIngestData(t *testing.T) { + keys := [][]byte{ + []byte("key1"), + []byte("key2"), + []byte("key3"), + []byte("key4"), + []byte("key5"), + } + values := [][]byte{ + []byte("value1"), + []byte("value2"), + []byte("value3"), + []byte("value4"), + []byte("value5"), + } + data := &MemoryIngestData{ + keyAdapter: local.NoopKeyAdapter{}, + keys: keys, + values: values, + ts: 123, + } + + require.EqualValues(t, 123, data.GetTS()) + testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5")) + testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5")) + testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4")) + testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key35"), []byte("key3"), []byte("key3")) + testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key26"), nil, nil) + testGetFirstAndLastKey(t, data, []byte("key0"), []byte("key1"), nil, nil) + testGetFirstAndLastKey(t, data, []byte("key6"), []byte("key9"), nil, nil) + + testNewIter(t, data, nil, nil, keys, values) + testNewIter(t, data, []byte("key1"), []byte("key6"), keys, values) + testNewIter(t, data, []byte("key2"), []byte("key5"), keys[1:4], values[1:4]) + testNewIter(t, data, []byte("key25"), []byte("key35"), keys[2:3], values[2:3]) + testNewIter(t, data, []byte("key25"), []byte("key26"), nil, nil) + testNewIter(t, data, []byte("key0"), []byte("key1"), nil, nil) + testNewIter(t, data, []byte("key6"), []byte("key9"), nil, nil) + + dir := t.TempDir() + db, err := pebble.Open(path.Join(dir, "duplicate"), nil) + require.NoError(t, err) + keyAdapter := local.DupDetectKeyAdapter{} + data = &MemoryIngestData{ + keyAdapter: keyAdapter, + duplicateDetection: true, + duplicateDB: db, + ts: 234, + } + encodedKeys := make([][]byte, 0, len(keys)*2) + encodedValues := make([][]byte, 0, len(values)*2) + encodedZero := codec.EncodeInt(nil, 0) + encodedOne := codec.EncodeInt(nil, 1) + duplicatedKeys := make([][]byte, 0, len(keys)*2) + duplicatedValues := make([][]byte, 0, len(values)*2) + + for i := range keys { + encodedKey := keyAdapter.Encode(nil, keys[i], encodedZero) + encodedKeys = append(encodedKeys, encodedKey) + encodedValues = append(encodedValues, values[i]) + if i%2 == 0 { + continue + } + + // duplicatedKeys will be like key2_0, key2_1, key4_0, key4_1 + duplicatedKeys = append(duplicatedKeys, encodedKey) + duplicatedValues = append(duplicatedValues, values[i]) + + encodedKey = keyAdapter.Encode(nil, keys[i], encodedOne) + encodedKeys = append(encodedKeys, encodedKey) + newValues := make([]byte, len(values[i])+1) + copy(newValues, values[i]) + newValues[len(values[i])] = 1 + encodedValues = append(encodedValues, newValues) + duplicatedKeys = append(duplicatedKeys, encodedKey) + duplicatedValues = append(duplicatedValues, newValues) + } + data.keys = encodedKeys + data.values = encodedValues + + require.EqualValues(t, 234, data.GetTS()) + testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5")) + testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5")) + testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4")) + testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key35"), []byte("key3"), []byte("key3")) + testGetFirstAndLastKey(t, data, []byte("key25"), []byte("key26"), nil, nil) + testGetFirstAndLastKey(t, data, []byte("key0"), []byte("key1"), nil, nil) + testGetFirstAndLastKey(t, data, []byte("key6"), []byte("key9"), nil, nil) + + testNewIter(t, data, nil, nil, keys, values) + checkDupDB(t, db, duplicatedKeys, duplicatedValues) + testNewIter(t, data, []byte("key1"), []byte("key6"), keys, values) + checkDupDB(t, db, duplicatedKeys, duplicatedValues) + testNewIter(t, data, []byte("key1"), []byte("key3"), keys[:2], values[:2]) + checkDupDB(t, db, duplicatedKeys[:2], duplicatedValues[:2]) + testNewIter(t, data, []byte("key2"), []byte("key5"), keys[1:4], values[1:4]) + checkDupDB(t, db, duplicatedKeys, duplicatedValues) + testNewIter(t, data, []byte("key25"), []byte("key35"), keys[2:3], values[2:3]) + checkDupDB(t, db, nil, nil) + testNewIter(t, data, []byte("key25"), []byte("key26"), nil, nil) + checkDupDB(t, db, nil, nil) + testNewIter(t, data, []byte("key0"), []byte("key1"), nil, nil) + checkDupDB(t, db, nil, nil) + testNewIter(t, data, []byte("key6"), []byte("key9"), nil, nil) + checkDupDB(t, db, nil, nil) +} diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 267ab9106a5b0..d4221ed9194a3 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -947,7 +947,7 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt) } -var _ ingestData = (*Engine)(nil) +var _ IngestData = (*Engine)(nil) // GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) // in the engine. Empty upperBound means unbounded. @@ -982,17 +982,17 @@ func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by return firstKey, lastKey, nil } -// NewIter implements ingestData interface. +// NewIter implements IngestData interface. func (e *Engine) NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter { return e.newKVIter(ctx, &pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) } -// GetTS implements ingestData interface. +// GetTS implements IngestData interface. func (e *Engine) GetTS() uint64 { return e.TS } -// Finish implements ingestData interface. +// Finish implements IngestData interface. func (e *Engine) Finish(totalBytes, totalCount int64) { e.importedKVSize.Add(totalBytes) e.importedKVCount.Add(totalCount) @@ -1055,9 +1055,9 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) { totalKeySize += keySize } w.batchCount += len(kvs) - // noopKeyAdapter doesn't really change the key, + // NoopKeyAdapter doesn't really change the key, // skipping the encoding to avoid unnecessary alloc and copy. - if _, ok := keyAdapter.(noopKeyAdapter); !ok { + if _, ok := keyAdapter.(NoopKeyAdapter); !ok { if cap(w.sortedKeyBuf) < totalKeySize { w.sortedKeyBuf = make([]byte, totalKeySize) } diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index c2604caff2f9a..f03f1237fdf70 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -61,7 +61,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) { ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, + keyAdapter: NoopKeyAdapter{}, logger: log.L(), } f.db.Store(db) @@ -99,7 +99,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, + keyAdapter: NoopKeyAdapter{}, logger: log.L(), } f.db.Store(db) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 556d030bee9a8..224599479b9c9 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -56,19 +56,134 @@ var _ Iter = pebbleIter{} const maxDuplicateBatchSize = 4 << 20 -type dupDetectIter struct { - iter *pebble.Iterator +// DupDetector extract the decoded key and value from the iter which may contain +// duplicate keys and store the keys encoded by KeyAdapter. The duplicate keys +// and values will be saved in dupDB. +type DupDetector struct { + keyAdapter KeyAdapter + dupDBWriteBatch *pebble.Batch + curBatchSize int + curKey []byte curRawKey []byte curVal []byte nextKey []byte - err error - keyAdapter KeyAdapter - writeBatch *pebble.Batch - writeBatchSize int64 + logger log.Logger + option DupDetectOpt +} + +// NewDupDetector creates a new DupDetector. +// dupDBWriteBatch will be closed when DupDetector is closed. +func NewDupDetector( + keyAdaptor KeyAdapter, + dupDBWriteBatch *pebble.Batch, + logger log.Logger, + option DupDetectOpt, +) *DupDetector { + return &DupDetector{ + keyAdapter: keyAdaptor, + dupDBWriteBatch: dupDBWriteBatch, + logger: logger, + option: option, + } +} + +// KVIter is a slim interface that DupDetector needs. +type KVIter interface { + Next() bool + Key() []byte + Value() []byte +} + +// Init initializes the status of DupDetector by reading the current Key and +// Value of given iter. +func (d *DupDetector) Init(iter KVIter) (key []byte, val []byte, err error) { + d.curKey, err = d.keyAdapter.Decode(d.curKey[:0], iter.Key()) + if err != nil { + return nil, nil, err + } + d.curRawKey = append(d.curRawKey[:0], iter.Key()...) + d.curVal = append(d.curVal[:0], iter.Value()...) + return d.curKey, d.curVal, nil +} + +// Next reads the next key and value from given iter. If it meets duplicate key, +// it will record the duplicate key and value in dupDB and skip it. +func (d *DupDetector) Next(iter KVIter) (key []byte, value []byte, ok bool, err error) { + recordFirst := false + for iter.Next() { + encodedKey, val := iter.Key(), iter.Value() + d.nextKey, err = d.keyAdapter.Decode(d.nextKey[:0], encodedKey) + if err != nil { + return nil, nil, false, err + } + if !bytes.Equal(d.nextKey, d.curKey) { + d.curKey, d.nextKey = d.nextKey, d.curKey[:0] + d.curRawKey = append(d.curRawKey[:0], encodedKey...) + d.curVal = append(d.curVal[:0], val...) + return d.curKey, d.curVal, true, nil + } + if d.option.ReportErrOnDup { + dupKey := make([]byte, len(d.curKey)) + dupVal := make([]byte, len(val)) + copy(dupKey, d.curKey) + copy(dupVal, d.curVal) + return nil, nil, false, common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal) + } + if !recordFirst { + if err = d.record(d.curRawKey, d.curKey, d.curVal); err != nil { + return nil, nil, false, err + } + recordFirst = true + } + if err = d.record(encodedKey, d.nextKey, val); err != nil { + return nil, nil, false, err + } + } + return nil, nil, false, nil +} + +func (d *DupDetector) record(rawKey, key, val []byte) error { + d.logger.Debug("local duplicate key detected", zap.String("category", "detect-dupe"), + logutil.Key("key", key), + logutil.Key("value", val), + logutil.Key("rawKey", rawKey)) + if err := d.dupDBWriteBatch.Set(rawKey, val, nil); err != nil { + return err + } + d.curBatchSize += len(rawKey) + len(val) + if d.curBatchSize >= maxDuplicateBatchSize { + return d.flush() + } + return nil +} + +func (d *DupDetector) flush() error { + if err := d.dupDBWriteBatch.Commit(pebble.Sync); err != nil { + return err + } + d.dupDBWriteBatch.Reset() + d.curBatchSize = 0 + return nil +} + +// Close closes the DupDetector. +func (d *DupDetector) Close() error { + if err := d.flush(); err != nil { + return err + } + return d.dupDBWriteBatch.Close() +} + +type dupDetectIter struct { + keyAdapter KeyAdapter + iter *pebble.Iterator + dupDetector *DupDetector + err error + + curKey, curVal []byte logger log.Logger - option DupDetectOpt } // DupDetectOpt is the option for duplicate detection. @@ -102,60 +217,23 @@ func (d *dupDetectIter) Last() bool { } func (d *dupDetectIter) fill() { - d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key()) - d.curRawKey = append(d.curRawKey[:0], d.iter.Key()...) - d.curVal = append(d.curVal[:0], d.iter.Value()...) -} - -func (d *dupDetectIter) flush() { - d.err = d.writeBatch.Commit(pebble.Sync) - d.writeBatch.Reset() - d.writeBatchSize = 0 + d.curKey, d.curVal, d.err = d.dupDetector.Init(d.iter) } -func (d *dupDetectIter) record(rawKey, key, val []byte) { - d.logger.Debug("local duplicate key detected", zap.String("category", "detect-dupe"), - logutil.Key("key", key), - logutil.Key("value", val), - logutil.Key("rawKey", rawKey)) - d.err = d.writeBatch.Set(rawKey, val, nil) +func (d *dupDetectIter) Next() bool { if d.err != nil { - return + return false } - d.writeBatchSize += int64(len(rawKey) + len(val)) - if d.writeBatchSize >= maxDuplicateBatchSize { - d.flush() + key, val, ok, err := d.dupDetector.Next(d.iter) + if err != nil { + d.err = err + return false } -} - -func (d *dupDetectIter) Next() bool { - recordFirst := false - for d.err == nil && d.iter.Next() { - d.nextKey, d.err = d.keyAdapter.Decode(d.nextKey[:0], d.iter.Key()) - if d.err != nil { - return false - } - if !bytes.Equal(d.nextKey, d.curKey) { - d.curKey, d.nextKey = d.nextKey, d.curKey[:0] - d.curRawKey = append(d.curRawKey[:0], d.iter.Key()...) - d.curVal = append(d.curVal[:0], d.iter.Value()...) - return true - } - if d.option.ReportErrOnDup { - dupKey := make([]byte, len(d.curKey)) - dupVal := make([]byte, len(d.iter.Value())) - copy(dupKey, d.curKey) - copy(dupVal, d.curVal) - d.err = common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal) - return false - } - if !recordFirst { - d.record(d.curRawKey, d.curKey, d.curVal) - recordFirst = true - } - d.record(d.iter.Key(), d.nextKey, d.iter.Value()) + if !ok { + return false } - return false + d.curKey, d.curVal = key, val + return true } func (d *dupDetectIter) Key() []byte { @@ -175,11 +253,12 @@ func (d *dupDetectIter) Error() error { } func (d *dupDetectIter) Close() error { - if d.err == nil { - d.flush() + firstErr := d.dupDetector.Close() + err := d.iter.Close() + if firstErr != nil { + return firstErr } - _ = d.writeBatch.Close() - return d.iter.Close() + return err } func (*dupDetectIter) OpType() sst.Pair_OP { @@ -188,8 +267,14 @@ func (*dupDetectIter) OpType() sst.Pair_OP { var _ Iter = &dupDetectIter{} -func newDupDetectIter(db *pebble.DB, keyAdapter KeyAdapter, - opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt DupDetectOpt) *dupDetectIter { +func newDupDetectIter( + db *pebble.DB, + keyAdapter KeyAdapter, + opts *pebble.IterOptions, + dupDB *pebble.DB, + logger log.Logger, + dupOpt DupDetectOpt, +) *dupDetectIter { newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter} if len(opts.LowerBound) > 0 { newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, MinRowID) @@ -197,12 +282,13 @@ func newDupDetectIter(db *pebble.DB, keyAdapter KeyAdapter, if len(opts.UpperBound) > 0 { newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, MinRowID) } + + detector := NewDupDetector(keyAdapter, dupDB.NewBatch(), logger, dupOpt) return &dupDetectIter{ - iter: db.NewIter(newOpts), - keyAdapter: keyAdapter, - writeBatch: dupDB.NewBatch(), - logger: logger, - option: dupOpt, + keyAdapter: keyAdapter, + iter: db.NewIter(newOpts), + dupDetector: detector, + logger: logger, } } diff --git a/br/pkg/lightning/backend/local/iterator_test.go b/br/pkg/lightning/backend/local/iterator_test.go index 0a00cb5864cc0..8d079404dced8 100644 --- a/br/pkg/lightning/backend/local/iterator_test.go +++ b/br/pkg/lightning/backend/local/iterator_test.go @@ -16,9 +16,11 @@ package local import ( "bytes" + "fmt" "math/rand" "path/filepath" "sort" + "strconv" "testing" "time" @@ -101,7 +103,7 @@ func TestDupDetectIterator(t *testing.T) { i = j } - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} // Write pairs to db after shuffling the pairs. rnd := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -206,7 +208,7 @@ func TestDupDetectIterSeek(t *testing.T) { db, err := pebble.Open(filepath.Join(storeDir, "kv"), &pebble.Options{}) require.NoError(t, err) - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} wb := db.NewBatch() for _, p := range pairs { key := keyAdapter.Encode(nil, p.Key, p.RowID) @@ -228,7 +230,7 @@ func TestDupDetectIterSeek(t *testing.T) { } func TestKeyAdapterEncoding(t *testing.T) { - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} srcKey := []byte{1, 2, 3} v := keyAdapter.Encode(nil, srcKey, common.EncodeIntRowID(1)) resKey, err := keyAdapter.Decode(nil, v) @@ -240,3 +242,32 @@ func TestKeyAdapterEncoding(t *testing.T) { require.NoError(t, err) require.EqualValues(t, srcKey, resKey) } + +func BenchmarkDupDetectIter(b *testing.B) { + keyAdapter := DupDetectKeyAdapter{} + db, _ := pebble.Open(filepath.Join(b.TempDir(), "kv"), &pebble.Options{}) + wb := db.NewBatch() + val := []byte("value") + for i := 0; i < 100_000; i++ { + keyNum := i + // mimic we have 20% duplication + if keyNum%5 == 0 { + keyNum-- + } + keyStr := fmt.Sprintf("%09d", keyNum) + rowID := strconv.Itoa(i) + key := keyAdapter.Encode(nil, []byte(keyStr), []byte(rowID)) + wb.Set(key, val, nil) + } + wb.Commit(pebble.Sync) + + dupDB, _ := pebble.Open(filepath.Join(b.TempDir(), "dup"), &pebble.Options{}) + b.ResetTimer() + for i := 0; i < b.N; i++ { + iter := newDupDetectIter(db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), DupDetectOpt{}) + keyCnt := 0 + for iter.First(); iter.Valid(); iter.Next() { + keyCnt++ + } + } +} diff --git a/br/pkg/lightning/backend/local/key_adapter.go b/br/pkg/lightning/backend/local/key_adapter.go index 5d9d119b2c3ec..7d2361260ca94 100644 --- a/br/pkg/lightning/backend/local/key_adapter.go +++ b/br/pkg/lightning/backend/local/key_adapter.go @@ -22,10 +22,14 @@ import ( "github.com/pingcap/tidb/util/codec" ) -// KeyAdapter is used to encode and decode keys. +// KeyAdapter is used to encode and decode keys so that duplicate key can be +// identified by rowID and avoid overwritten. type KeyAdapter interface { - // Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the - // resulting slice. The encoded key is guaranteed to be in ascending order for comparison. + // Encode encodes the key with its corresponding rowID. It appends the encoded + // key to dst and returns the resulting slice. The encoded key is guaranteed to + // be in ascending order for comparison. + // rowID must be a coded mem-comparable value, one way to get it is to use + // tidb/util/codec package. Encode(dst []byte, key []byte, rowID []byte) []byte // Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice. @@ -45,25 +49,32 @@ func reallocBytes(b []byte, n int) []byte { return b } -type noopKeyAdapter struct{} +// NoopKeyAdapter is a key adapter that does nothing. +type NoopKeyAdapter struct{} -func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte { +// Encode implements KeyAdapter. +func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte { return append(dst, key...) } -func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { +// Decode implements KeyAdapter. +func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { return append(dst, data...), nil } -func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int { +// EncodedLen implements KeyAdapter. +func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int { return len(key) } -var _ KeyAdapter = noopKeyAdapter{} +var _ KeyAdapter = NoopKeyAdapter{} -type dupDetectKeyAdapter struct{} +// DupDetectKeyAdapter is a key adapter that appends rowID to the key to avoid +// overwritten. +type DupDetectKeyAdapter struct{} -func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte { +// Encode implements KeyAdapter. +func (DupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte { dst = codec.EncodeBytes(dst, key) dst = reallocBytes(dst, len(rowID)+2) dst = append(dst, rowID...) @@ -72,7 +83,8 @@ func (dupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte { return dst } -func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { +// Decode implements KeyAdapter. +func (DupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { if len(data) < 2 { return nil, errors.New("insufficient bytes to decode value") } @@ -96,11 +108,12 @@ func (dupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) { return append(dst, key...), nil } -func (dupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int { +// EncodedLen implements KeyAdapter. +func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int { return codec.EncodedBytesLength(len(key)) + len(rowID) + 2 } -var _ KeyAdapter = dupDetectKeyAdapter{} +var _ KeyAdapter = DupDetectKeyAdapter{} // static vars for rowID var ( diff --git a/br/pkg/lightning/backend/local/key_adapter_test.go b/br/pkg/lightning/backend/local/key_adapter_test.go index d80efa6de2af4..47e05e70a4f5d 100644 --- a/br/pkg/lightning/backend/local/key_adapter_test.go +++ b/br/pkg/lightning/backend/local/key_adapter_test.go @@ -33,7 +33,7 @@ func randBytes(n int) []byte { } func TestNoopKeyAdapter(t *testing.T) { - keyAdapter := noopKeyAdapter{} + keyAdapter := NoopKeyAdapter{} key := randBytes(32) require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID)) encodedKey := keyAdapter.Encode(nil, key, ZeroRowID) @@ -67,7 +67,7 @@ func TestDupDetectKeyAdapter(t *testing.T) { }, } - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} for _, input := range inputs { encodedRowID := common.EncodeIntRowID(input.rowID) result := keyAdapter.Encode(nil, input.key, encodedRowID) @@ -88,7 +88,7 @@ func TestDupDetectKeyOrder(t *testing.T) { {0x0, 0x1, 0x3, 0x4, 0x0}, {0x0, 0x1, 0x3, 0x4, 0x0, 0x0, 0x0}, } - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} encodedKeys := make([][]byte, 0, len(keys)) for _, key := range keys { encodedKeys = append(encodedKeys, keyAdapter.Encode(nil, key, common.EncodeIntRowID(1))) @@ -100,7 +100,7 @@ func TestDupDetectKeyOrder(t *testing.T) { } func TestDupDetectEncodeDupKey(t *testing.T) { - keyAdapter := dupDetectKeyAdapter{} + keyAdapter := DupDetectKeyAdapter{} key := randBytes(32) result1 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(10)) result2 := keyAdapter.Encode(nil, key, common.EncodeIntRowID(20)) @@ -112,7 +112,7 @@ func startWithSameMemory(x []byte, y []byte) bool { } func TestEncodeKeyToPreAllocatedBuf(t *testing.T) { - keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}} + keyAdapters := []KeyAdapter{NoopKeyAdapter{}, DupDetectKeyAdapter{}} for _, keyAdapter := range keyAdapters { key := randBytes(32) buf := make([]byte, 256) @@ -130,7 +130,7 @@ func TestDecodeKeyToPreAllocatedBuf(t *testing.T) { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8, } - keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}} + keyAdapters := []KeyAdapter{NoopKeyAdapter{}, DupDetectKeyAdapter{}} for _, keyAdapter := range keyAdapters { key, err := keyAdapter.Decode(nil, data) require.NoError(t, err) @@ -147,7 +147,7 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8, } - keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}} + keyAdapters := []KeyAdapter{NoopKeyAdapter{}, DupDetectKeyAdapter{}} for _, keyAdapter := range keyAdapters { key, err := keyAdapter.Decode(nil, data) require.NoError(t, err) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index b119a93efb112..26e3de03ee8ce 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -567,9 +567,9 @@ func NewBackend( return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) - keyAdapter := KeyAdapter(noopKeyAdapter{}) + keyAdapter := KeyAdapter(NoopKeyAdapter{}) if config.DupeDetectEnabled { - keyAdapter = dupDetectKeyAdapter{} + keyAdapter = DupDetectKeyAdapter{} } var writeLimiter StoreWriteLimiter if config.StoreWriteBWLimit > 0 { @@ -1193,7 +1193,7 @@ var fakeRegionJobs map[[2]string]struct { // It will retry internally when scan region meet error. func (local *Backend) generateJobForRange( ctx context.Context, - engine ingestData, + engine IngestData, keyRange Range, regionSplitSize, regionSplitKeys int64, ) ([]*regionJob, error) { diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 6f775390fd9cc..b4612b74caa5e 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -182,7 +182,7 @@ func TestRangeProperties(t *testing.T) { userProperties := make(map[string]string, 1) _ = collector.Finish(userProperties) - props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), noopKeyAdapter{}) + props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), NoopKeyAdapter{}) require.NoError(t, err) // Smallest key in props. @@ -335,7 +335,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, + keyAdapter: NoopKeyAdapter{}, logger: log.L(), } f.db.Store(db) @@ -1564,7 +1564,7 @@ func TestPartialWriteIngestBusy(t *testing.T) { ctx: engineCtx, cancel: cancel2, sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, + keyAdapter: NoopKeyAdapter{}, logger: log.L(), } f.db.Store(db) @@ -1703,7 +1703,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), - keyAdapter: noopKeyAdapter{}, + keyAdapter: NoopKeyAdapter{}, logger: log.L(), } f.db.Store(db) diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 5de40cca0e195..98933c88d274e 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -102,7 +102,7 @@ type regionJob struct { // writeResult is available only in wrote and ingested stage writeResult *tikvWriteResult - ingestData ingestData + ingestData IngestData regionSplitSize int64 regionSplitKeys int64 metrics *metric.Metrics @@ -122,10 +122,9 @@ type tikvWriteResult struct { remainingStartKey []byte } -// ingestData describes a common interface that is needed by TiKV write + +// IngestData describes a common interface that is needed by TiKV write + // ingest RPC. -// TODO(lance6716): make it public to remote backend can use it. -type ingestData interface { +type IngestData interface { // GetFirstAndLastKey returns the first and last key of the data reader in the // range [lowerBound, upperBound). Empty or nil bounds means unbounded. // lowerBound must be less than upperBound.