Skip to content

Commit

Permalink
lightning/external: port merge iterators (#45913)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
lance6716 authored Aug 15, 2023
1 parent 3846e80 commit f73b21b
Show file tree
Hide file tree
Showing 7 changed files with 763 additions and 3 deletions.
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"byte_reader.go",
"codec.go",
"file.go",
"iter.go",
"kv_reader.go",
"sharedisk.go",
"stat_reader.go",
Expand All @@ -17,6 +18,7 @@ go_library(
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -28,14 +30,16 @@ go_test(
"byte_reader_test.go",
"codec_test.go",
"file_test.go",
"iter_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 7,
shard_count = 13,
deps = [
"//br/pkg/storage",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_uber_go_atomic//:atomic",
],
)
11 changes: 10 additions & 1 deletion br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func openStoreReaderAndSeek(
return storageReader, nil
}

// newByteReader wraps readNBytes functionality to storageReader. It will not
// close storageReader when meet error.
func newByteReader(ctx context.Context, storageReader storage.ReadSeekCloser, bufSize int) (*byteReader, error) {
r := &byteReader{
ctx: ctx,
Expand Down Expand Up @@ -81,7 +83,14 @@ func (r *byteReader) readNBytes(n int) (*[]byte, error) {
for readLen < n {
r.cloneSlices()
err := r.reload()
if err != nil {
switch err {
case nil:
case io.EOF:
if readLen > 0 {
return nil, io.ErrUnexpectedEOF
}
return nil, err
default:
return nil, err
}
b = r.next(n - readLen)
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,17 @@ func TestReset(t *testing.T) {
_, err = br.readNBytes(1)
require.Equal(t, io.EOF, err)
}

func TestUnexpectedEOF(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br, err := newByteReader(context.Background(), ms, 3)
require.NoError(t, err)
_, err = br.readNBytes(100)
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}

func TestEmptyContent(t *testing.T) {
ms := &mockExtStore{src: []byte{}}
_, err := newByteReader(context.Background(), ms, 100)
require.Equal(t, io.EOF, err)
}
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewKeyValueStore(
// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
kvLen := len(key) + len(value) + 16
var b [8]byte
Expand Down
Loading

0 comments on commit f73b21b

Please sign in to comment.