Skip to content

Commit

Permalink
br/lightning: add duplicate detect adaptor for external writer (#46387)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
tangenta authored Aug 24, 2023
1 parent 097c13f commit 14745fd
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 22 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
"//util/codec",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_pingcap_errors//:errors",
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (r *byteReader) reload() error {
case io.ErrUnexpectedEOF:
// The last batch.
r.buf = r.buf[:nBytes]
break
default:
logutil.Logger(r.ctx).Warn("other error during reload", zap.Error(err))
return err
Expand Down
8 changes: 2 additions & 6 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ func TestByteReader(t *testing.T) {
y, err = br.readNBytes(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
copy(ms.src, []byte("xyz"))
x = *y
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
Expand All @@ -107,9 +105,7 @@ func TestByteReader(t *testing.T) {
y, err = br.readNBytes(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
copy(ms.src, []byte("xyz"))
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestIter(t *testing.T) {
SetMemorySizeLimit(uint64(rand.Intn(100)+1)).
SetPropSizeDistance(uint64(rand.Intn(50)+1)).
SetPropKeysDistance(uint64(rand.Intn(10)+1)).
Build(store, i, "/subtask")
Build(store, "/subtask", i)
kvStart := i * 100
kvEnd := (i + 1) * 100
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd]))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestCorruptContent(t *testing.T) {
}
if i == 0 {
_, err = writer.Write(ctx, []byte("corrupt"))
require.NoError(t, err)
}
err = writer.Close(ctx)
require.NoError(t, err)
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestSeekPropsOffsets(t *testing.T) {
got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, []uint64{30, 20}, got)
got, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store)
_, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []byte("key1"), []string{file1, file2}, store)
require.NoError(t, err)
Expand All @@ -105,6 +105,7 @@ func TestSeekPropsOffsets(t *testing.T) {
w3, err := store.Create(ctx, file3, nil)
require.NoError(t, err)
err = w3.Close(ctx)
require.NoError(t, err)

file4 := "/test4"
w4, err := store.Create(ctx, file4, nil)
Expand All @@ -125,7 +126,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 0, "/subtask")
Build(store, "/subtask", 0)
kvPairs := make([]common.KvPair, 0, 30)
for i := 0; i < 30; i++ {
kvPairs = append(kvPairs, common.KvPair{
Expand All @@ -142,7 +143,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 3, "/subtask")
Build(store, "/subtask", 3)
err = w2.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w2.Close(ctx)
Expand All @@ -152,7 +153,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 12, "/subtask")
Build(store, "/subtask", 12)
err = w3.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w3.Close(ctx)
Expand Down
32 changes: 25 additions & 7 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -74,11 +75,12 @@ func DummyOnCloseFunc(*WriterSummary) {}

// WriterBuilder builds a new Writer.
type WriterBuilder struct {
memSizeLimit uint64
writeBatchCount uint64
propSizeDist uint64
propKeysDist uint64
onClose OnCloseFunc
memSizeLimit uint64
writeBatchCount uint64
propSizeDist uint64
propKeysDist uint64
onClose OnCloseFunc
dupeDetectEnabled bool

bufferPool *membuf.Pool
}
Expand Down Expand Up @@ -132,18 +134,28 @@ func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder {
return b
}

// EnableDuplicationDetection enables the duplication detection of the writer.
func (b *WriterBuilder) EnableDuplicationDetection() *WriterBuilder {
b.dupeDetectEnabled = true
return b
}

// Build builds a new Writer. The files writer will create are under the prefix
// of "{prefix}/{writerID}".
func (b *WriterBuilder) Build(
store storage.ExternalStorage,
writerID int,
prefix string,
writerID int,
) *Writer {
bp := b.bufferPool
if bp == nil {
bp = membuf.NewPool()
}
filenamePrefix := filepath.Join(prefix, strconv.Itoa(writerID))
keyAdapter := local.KeyAdapter(local.NoopKeyAdapter{})
if b.dupeDetectEnabled {
keyAdapter = local.DupDetectKeyAdapter{}
}
return &Writer{
rc: &rangePropertiesCollector{
props: make([]*rangeProperty, 0, 1024),
Expand All @@ -157,6 +169,7 @@ func (b *WriterBuilder) Build(
writeBatch: make([]common.KvPair, 0, b.writeBatchCount),
currentSeq: 0,
filenamePrefix: filenamePrefix,
keyAdapter: keyAdapter,
writerID: writerID,
kvStore: nil,
onClose: b.onClose,
Expand All @@ -170,6 +183,7 @@ type Writer struct {
writerID int
currentSeq int
filenamePrefix string
keyAdapter local.KeyAdapter

kvStore *KeyValueStore
rc *rangePropertiesCollector
Expand All @@ -195,10 +209,14 @@ type Writer struct {
// Note that this method is NOT thread-safe.
func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
keyAdapter := w.keyAdapter
for _, pair := range kvs {
w.batchSize += uint64(len(pair.Key) + len(pair.Val))
key := w.kvBuffer.AddBytes(pair.Key)

buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)

w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
if w.batchSize >= w.memSizeLimit {
if err := w.flushKVs(ctx); err != nil {
Expand Down
77 changes: 75 additions & 2 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import (
"context"
"fmt"
"io"
"path"
"slices"
"strings"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
Expand All @@ -41,7 +45,7 @@ func TestWriter(t *testing.T) {
writer := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
Build(memStore, 0, "/test")
Build(memStore, "/test", 0)

kvCnt := rand.Intn(10) + 10
kvs := make([]common.KvPair, kvCnt)
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestWriterFlushMultiFileNames(t *testing.T) {
writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(60).
Build(memStore, 0, "/test")
Build(memStore, "/test", 0)

// 200 bytes key values.
kvCnt := 10
Expand Down Expand Up @@ -140,3 +144,72 @@ func TestWriterFlushMultiFileNames(t *testing.T) {
require.Equal(t, statFiles[i], fmt.Sprintf("/test/0_stat/%d", i))
}
}

func TestWriterDuplicateDetect(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()

writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(1000).
EnableDuplicationDetection().
Build(memStore, "/test", 0)
kvCount := 20
kvs := make([]common.KvPair, 0, kvCount)
for i := 0; i < kvCount; i++ {
v := i
if v == kvCount/2 {
v-- // insert a duplicate key.
}
kvs = append(kvs, common.KvPair{
Key: []byte{byte(v)},
Val: []byte{byte(v)},
RowID: dbkv.IntHandle(i).Encoded(),
})
}
rows := kv.MakeRowsFromKvPairs(kvs)
err := writer.AppendRows(ctx, nil, rows)
require.NoError(t, err)
_, err = writer.Close(ctx)
require.NoError(t, err)

keys := make([][]byte, 0, kvCount)
values := make([][]byte, 0, kvCount)

kvReader, err := newKVReader(ctx, "/test/0/0", memStore, 0, 100)
require.NoError(t, err)
for i := 0; i < kvCount; i++ {
key, value, err := kvReader.nextKV()
require.NoError(t, err)
require.Equal(t, kvs[i].Val, value)
clonedKey := make([]byte, len(key))
copy(clonedKey, key)
clonedVal := make([]byte, len(value))
copy(clonedVal, value)
keys = append(keys, clonedKey)
values = append(values, clonedVal)
}
_, _, err = kvReader.nextKV()
require.Equal(t, io.EOF, err)

dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "duplicate"), nil)
require.NoError(t, err)
keyAdapter := local.DupDetectKeyAdapter{}
data := &MemoryIngestData{
keyAdapter: keyAdapter,
duplicateDetection: true,
duplicateDB: db,
dupDetectOpt: local.DupDetectOpt{ReportErrOnDup: true},
keys: keys,
values: values,
ts: 123,
}
iter := data.NewIter(ctx, nil, nil)

for iter.First(); iter.Valid(); iter.Next() {
}
err = iter.Error()
require.Error(t, err)
require.Contains(t, err.Error(), "found duplicate key")
}

0 comments on commit 14745fd

Please sign in to comment.