Skip to content

Commit

Permalink
global sort: fix inner retry doesn't reset collector (pingcap#52965)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Apr 29, 2024
1 parent 245f7a9 commit c60f97d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 14 deletions.
3 changes: 0 additions & 3 deletions br/pkg/storage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (
}
s.rwm.Lock()
defer s.rwm.Unlock()
if _, ok := s.dataStore[name]; ok {
return nil, errors.Errorf("the file already exists: %s", name)
}
theFile := new(memFile)
s.dataStore[name] = theFile
return &memFileWriter{
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ type ExternalStorage interface {
// URI returns the base path as a URI
URI() string

// Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption
// Create opens a file writer by path. path is relative path to storage base
// path. The old file under same path will be overwritten. Currently only s3
// implemented WriterOption.
Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
// Rename file name from oldFileName to newFileName
Rename(ctx context.Context, oldFileName, newFileName string) error
Expand Down
16 changes: 8 additions & 8 deletions pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,15 @@ type Writer struct {
}

// WriteRow implements ingest.Writer.
func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error {
func (w *Writer) WriteRow(ctx context.Context, key, val []byte, handle tidbkv.Handle) error {
keyAdapter := w.keyAdapter

var rowID []byte
if handle != nil {
rowID = handle.Encoded()
}
encodedKeyLen := keyAdapter.EncodedLen(idxKey, rowID)
length := encodedKeyLen + len(idxVal) + lengthBytes*2
encodedKeyLen := keyAdapter.EncodedLen(key, rowID)
length := encodedKeyLen + len(val) + lengthBytes*2
dataBuf, loc := w.kvBuffer.AllocBytesWithSliceLocation(length)
if dataBuf == nil {
if err := w.flushKVs(ctx, false); err != nil {
Expand All @@ -378,12 +378,12 @@ func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tid
}
}
binary.BigEndian.AppendUint64(dataBuf[:0], uint64(encodedKeyLen))
binary.BigEndian.AppendUint64(dataBuf[:lengthBytes], uint64(len(idxVal)))
keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], idxKey, rowID)
copy(dataBuf[2*lengthBytes+encodedKeyLen:], idxVal)
binary.BigEndian.AppendUint64(dataBuf[:lengthBytes], uint64(len(val)))
keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], key, rowID)
copy(dataBuf[2*lengthBytes+encodedKeyLen:], val)

w.kvLocations = append(w.kvLocations, loc)
w.kvSize += int64(encodedKeyLen + len(idxVal))
w.kvSize += int64(encodedKeyLen + len(val))
w.batchSize += uint64(length)
w.totalCnt += 1
return nil
Expand Down Expand Up @@ -510,7 +510,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
w.kvLocations = w.kvLocations[:0]
w.kvSize = 0
w.kvBuffer.Reset()
w.rc.reset()
w.batchSize = 0
w.currentSeq++
return nil
Expand All @@ -536,6 +535,7 @@ func (w *Writer) flushSortedKVs(ctx context.Context) (string, string, error) {
_ = statWriter.Close(ctx)
}
}()
w.rc.reset()
kvStore, err := NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return "", "", err
Expand Down
70 changes: 68 additions & 2 deletions pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,81 @@ func TestWriterSort(t *testing.T) {
}
return false
})
println("thread quick sort", time.Since(ts).String())
t.Log("thread quick sort", time.Since(ts).String())

ts = time.Now()
slices.SortFunc(kvs2, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})
println("quick sort", time.Since(ts).String())
t.Log("quick sort", time.Since(ts).String())

for i := 0; i < 1000000; i++ {
require.True(t, bytes.Compare(kvs[i].Key, kvs2[i].Key) == 0)
}
}

type writerFirstCloseFailStorage struct {
storage.ExternalStorage
shouldFail bool
}

func (s *writerFirstCloseFailStorage) Create(
ctx context.Context,
path string,
option *storage.WriterOption,
) (storage.ExternalFileWriter, error) {
w, err := s.ExternalStorage.Create(ctx, path, option)
if err != nil {
return nil, err
}
if strings.Contains(path, statSuffix) {
return &firstCloseFailWriter{ExternalFileWriter: w, shouldFail: &s.shouldFail}, nil
}
return w, nil
}

type firstCloseFailWriter struct {
storage.ExternalFileWriter
shouldFail *bool
}

func (w *firstCloseFailWriter) Close(ctx context.Context) error {
if *w.shouldFail {
*w.shouldFail = false
return fmt.Errorf("first close fail")
}
return w.ExternalFileWriter.Close(ctx)
}

func TestFlushKVsRetry(t *testing.T) {
ctx := context.Background()
store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true}

writer := NewWriterBuilder().
SetPropKeysDistance(4).
SetMemorySizeLimit(100).
SetBlockSize(100). // 2 KV pair will trigger flush
Build(store, "/test", "0")
err := writer.WriteRow(ctx, []byte("key1"), []byte("val1"), nil)
require.NoError(t, err)
err = writer.WriteRow(ctx, []byte("key3"), []byte("val3"), nil)
require.NoError(t, err)
err = writer.WriteRow(ctx, []byte("key2"), []byte("val2"), nil)
require.NoError(t, err)
// manually test flushKVs
err = writer.flushKVs(ctx, false)
require.NoError(t, err)

require.False(t, store.shouldFail)

r, err := newStatsReader(ctx, store, "/test/0_stat/0", 100)
require.NoError(t, err)
p, err := r.nextProp()
lastKey := []byte{}
for err != io.EOF {
require.NoError(t, err)
require.True(t, bytes.Compare(lastKey, p.firstKey) < 0)
lastKey = append(lastKey[:0], p.firstKey...)
p, err = r.nextProp()
}
}

0 comments on commit c60f97d

Please sign in to comment.