Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/lightning: add range properties codec and kv/stats reader #45796

Merged
merged 19 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ go_library(
name = "external",
srcs = [
"byte_reader.go",
"codec.go",
"file.go",
"kv_reader.go",
"sharedisk.go",
"stat_reader.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/storage",
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -22,11 +26,12 @@ go_test(
timeout = "short",
srcs = [
"byte_reader_test.go",
"codec_test.go",
"file_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//br/pkg/storage",
"@com_github_pingcap_errors//:errors",
Expand Down
30 changes: 12 additions & 18 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
buf []byte
bufOffset int

isEOF bool

retPointers []*[]byte
}

Expand Down Expand Up @@ -110,10 +108,6 @@
r.retPointers = r.retPointers[:0]
}

func (r *byteReader) eof() bool {
return r.isEOF && len(r.buf) == r.bufOffset
}

func (r *byteReader) next(n int) []byte {
end := mathutil.Min(r.bufOffset+n, len(r.buf))
ret := r.buf[r.bufOffset:end]
Expand All @@ -123,20 +117,20 @@

func (r *byteReader) reload() error {
nBytes, err := io.ReadFull(r.storageReader, r.buf[0:])
if err == io.EOF {
r.isEOF = true
return err
} else if err == io.ErrUnexpectedEOF {
r.isEOF = true
} else if err != nil {
logutil.Logger(r.ctx).Warn("other error during reading from external storage", zap.Error(err))
return err
if err != nil {
switch err {
case io.EOF:
return err
case io.ErrUnexpectedEOF:
// The last batch.
r.buf = r.buf[:nBytes]
break
default:
logutil.Logger(r.ctx).Warn("other error during reload", zap.Error(err))
return err

Check warning on line 130 in br/pkg/lightning/backend/external/byte_reader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/external/byte_reader.go#L128-L130

Added lines #L128 - L130 were not covered by tests
}
}
r.bufOffset = 0
if nBytes < len(r.buf) {
// The last batch.
r.buf = r.buf[:nBytes]
}
return nil
}

Expand Down
6 changes: 0 additions & 6 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ func TestByteReader(t *testing.T) {
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[0])
require.Equal(t, byte('c'), x[1])
require.NoError(t, br.reload())
require.False(t, br.eof())
require.Error(t, br.reload())
require.False(t, br.eof()) // Data in buffer is not consumed.
br.next(2)
require.True(t, br.eof())
require.NoError(t, br.Close())

// Test basic readNBytes() usage.
Expand Down
77 changes: 77 additions & 0 deletions br/pkg/lightning/backend/external/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"encoding/binary"
)

type rangeProperty struct {
key []byte
offset uint64
size uint64
keys uint64
}

// decodeMultiProps is only used for test.
func decodeMultiProps(data []byte) []*rangeProperty {
var ret []*rangeProperty
for len(data) > 0 {
propLen := int(binary.BigEndian.Uint32(data))
propBytes := data[4 : 4+propLen]
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
rp := decodeProp(propBytes)
ret = append(ret, rp)
data = data[4+propLen:]
}
return ret
}

func decodeProp(data []byte) *rangeProperty {
rp := &rangeProperty{}
keyLen := binary.BigEndian.Uint32(data[0:4])
rp.key = data[4 : 4+keyLen]
rp.size = binary.BigEndian.Uint64(data[4+keyLen : 12+keyLen])
rp.keys = binary.BigEndian.Uint64(data[12+keyLen : 20+keyLen])
rp.offset = binary.BigEndian.Uint64(data[20+keyLen : 28+keyLen])
return rp
}

// keyLen + p.size + p.keys + p.offset
const propertyLengthExceptKey = 4 + 8 + 8 + 8

func encodeMultiProps(buf []byte, props []*rangeProperty) []byte {
var propLen [4]byte
for _, p := range props {
binary.BigEndian.PutUint32(propLen[:],
uint32(propertyLengthExceptKey+len(p.key)))
buf = append(buf, propLen[:4]...)
buf = encodeProp(buf, p)
}
return buf
}

func encodeProp(buf []byte, r *rangeProperty) []byte {
var b [8]byte
binary.BigEndian.PutUint32(b[:], uint32(len(r.key)))
buf = append(buf, b[:4]...)
buf = append(buf, r.key...)
binary.BigEndian.PutUint64(b[:], r.size)
buf = append(buf, b[:]...)
binary.BigEndian.PutUint64(b[:], r.keys)
buf = append(buf, b[:]...)
binary.BigEndian.PutUint64(b[:], r.offset)
buf = append(buf, b[:]...)
return buf
}
45 changes: 45 additions & 0 deletions br/pkg/lightning/backend/external/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestRangePropertyCodec(t *testing.T) {
prop := &rangeProperty{
key: []byte("key"),
offset: 1,
size: 2,
keys: 3,
}
buf := encodeProp(nil, prop)
prop2 := decodeProp(buf)
require.EqualValues(t, prop, prop2)

p1, p2, p3 := &rangeProperty{}, &rangeProperty{}, &rangeProperty{}
for i, p := range []*rangeProperty{p1, p2, p3} {
p.key = []byte(fmt.Sprintf("key%d", i))
p.offset = uint64(10 * i)
p.size = uint64(20 * i)
p.keys = uint64(30 * i)
}
buf = encodeMultiProps(nil, []*rangeProperty{p1, p2, p3})
props := decodeMultiProps(buf)
require.EqualValues(t, []*rangeProperty{p1, p2, p3}, props)
}
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error {

if len(s.rc.currProp.key) == 0 {
s.rc.currProp.key = key
s.rc.currProp.writerID = s.writerID
s.rc.currProp.dataSeq = s.seq
}

s.offset += uint64(kvLen)
Expand Down
30 changes: 12 additions & 18 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
key: k1,
offset: 0,
writerID: 1,
dataSeq: 1,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
}
require.Equal(t, expected, rc.props[0])
encoded = rc.encode()
Expand Down Expand Up @@ -85,25 +83,21 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
key: k1,
offset: 0,
writerID: 2,
dataSeq: 2,
size: uint64(len(k1) + len(v1)),
keys: 1,
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1)),
keys: 1,
}
require.Equal(t, expected, rc.props[0])

err = kvStore.AddKeyValue(k2, v2)
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
key: k2,
offset: uint64(len(k1) + len(v1) + 16),
writerID: 2,
dataSeq: 2,
size: uint64(len(k2) + len(v2)),
keys: 1,
key: k2,
offset: uint64(len(k1) + len(v1) + 16),
size: uint64(len(k2) + len(v2)),
keys: 1,
}
require.Equal(t, expected, rc.props[1])
err = writer.Close(ctx)
Expand Down
77 changes: 77 additions & 0 deletions br/pkg/lightning/backend/external/kv_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"encoding/binary"
"io"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type kvReader struct {
byteReader *byteReader
}

func newKVReader(ctx context.Context, name string, store storage.ExternalStorage, initFileOffset uint64, bufSize int) (*kvReader, error) {
sr, err := openStoreReaderAndSeek(ctx, store, name, initFileOffset)
if err != nil {
return nil, err
}
br, err := newByteReader(ctx, sr, bufSize)
return &kvReader{
byteReader: br,
}, nil

Check warning on line 40 in br/pkg/lightning/backend/external/kv_reader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/external/kv_reader.go#L32-L40

Added lines #L32 - L40 were not covered by tests
}

func (r *kvReader) nextKV() (key, val []byte, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kvReader and statReader's only have different nextxxx function, can we merge the code into one? just pass a function into the next function.

var k, v
next(func() {
 k, v = 
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging them into one is a bad idea because one may mistakenly call nextProps() after nextKV(), which is nonsense.

r.byteReader.reset()
lenBytes, err := r.byteReader.readNBytes(8)
if err != nil {
return nil, nil, err
}
keyLen := int(binary.BigEndian.Uint64(*lenBytes))
keyPtr, err := r.byteReader.readNBytes(keyLen)
if err != nil {
return nil, nil, noEOF(err)
}
lenBytes, err = r.byteReader.readNBytes(8)
if err != nil {
return nil, nil, noEOF(err)
}
valLen := int(binary.BigEndian.Uint64(*lenBytes))
valPtr, err := r.byteReader.readNBytes(valLen)
if err != nil {
return nil, nil, noEOF(err)
}
return *keyPtr, *valPtr, nil

Check warning on line 63 in br/pkg/lightning/backend/external/kv_reader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/external/kv_reader.go#L43-L63

Added lines #L43 - L63 were not covered by tests
}

// noEOF converts the EOF error to io.ErrUnexpectedEOF.
func noEOF(err error) error {
if err == io.EOF {
logutil.BgLogger().Warn("unexpected EOF", zap.Error(errors.Trace(err)))
return io.ErrUnexpectedEOF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is not traced

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can have more clues when this is reported(even if it is traced).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we can know the stack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update.

}
return err

Check warning on line 72 in br/pkg/lightning/backend/external/kv_reader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/external/kv_reader.go#L67-L72

Added lines #L67 - L72 were not covered by tests
}

func (r *kvReader) Close() error {
return r.byteReader.Close()

Check warning on line 76 in br/pkg/lightning/backend/external/kv_reader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/external/kv_reader.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}
Loading
Loading