Skip to content

Commit

Permalink
br/lightning: add byte reader implementation (#45724)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
tangenta authored Aug 4, 2023
1 parent 57d778f commit a82c8ff
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 0 deletions.
27 changes: 27 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "external",
srcs = ["byte_reader.go"],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/storage",
"//util/logutil",
"//util/mathutil",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "external_test",
timeout = "short",
srcs = ["byte_reader_test.go"],
embed = [":external"],
flaky = True,
shard_count = 3,
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
145 changes: 145 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"io"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

// byteReader provides structured reading on a byte stream of external storage.
type byteReader struct {
ctx context.Context
storageReader storage.ReadSeekCloser

buf []byte
bufOffset int

isEOF bool

retPointers []*[]byte
}

func openStoreReaderAndSeek(
ctx context.Context,
store storage.ExternalStorage,
name string,
initFileOffset uint64,
) (storage.ReadSeekCloser, error) {
storageReader, err := store.Open(ctx, name)
if err != nil {
return nil, err
}
_, err = storageReader.Seek(int64(initFileOffset), io.SeekStart)
if err != nil {
return nil, err
}
return storageReader, nil
}

func newByteReader(ctx context.Context, storageReader storage.ReadSeekCloser, bufSize int) (*byteReader, error) {
r := &byteReader{
ctx: ctx,
storageReader: storageReader,
buf: make([]byte, bufSize),
bufOffset: 0,
}
return r, r.reload()
}

// readNBytes reads the next n bytes from the reader and returns a buffer slice containing those bytes.
// The returned slice (pointer) can not be used after r.reset. In the same interval of r.reset,
// byteReader guarantees that the returned slice (pointer) will point to the same content
// though the slice may be changed.
func (r *byteReader) readNBytes(n int) (*[]byte, error) {
b := r.next(n)
readLen := len(b)
if readLen == n {
ret := &b
r.retPointers = append(r.retPointers, ret)
return ret, nil
}
// If the reader has fewer than n bytes remaining in current buffer,
// `auxBuf` is used as a container instead.
auxBuf := make([]byte, n)
copy(auxBuf, b)
for readLen < n {
r.cloneSlices()
err := r.reload()
if err != nil {
return nil, err
}
b = r.next(n - readLen)
copy(auxBuf[readLen:], b)
readLen += len(b)
}
return &auxBuf, nil
}

func (r *byteReader) reset() {
for i := range r.retPointers {
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
}

func (r *byteReader) cloneSlices() {
for i := range r.retPointers {
copied := make([]byte, len(*r.retPointers[i]))
copy(copied, *r.retPointers[i])
*r.retPointers[i] = copied
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
}

func (r *byteReader) eof() bool {
return r.isEOF && len(r.buf) == r.bufOffset
}

func (r *byteReader) next(n int) []byte {
end := mathutil.Min(r.bufOffset+n, len(r.buf))
ret := r.buf[r.bufOffset:end]
r.bufOffset += len(ret)
return ret
}

func (r *byteReader) reload() error {
nBytes, err := io.ReadFull(r.storageReader, r.buf[0:])
if err == io.EOF {
r.isEOF = true
return err
} else if err == io.ErrUnexpectedEOF {
r.isEOF = true
} else if err != nil {
logutil.Logger(r.ctx).Warn("other error during reading from external storage", zap.Error(err))
return err
}
r.bufOffset = 0
if nBytes < len(r.buf) {
// The last batch.
r.buf = r.buf[:nBytes]
}
return nil
}

func (r *byteReader) Close() error {
return r.storageReader.Close()
}
186 changes: 186 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"io"
"testing"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

// mockExtStore is only used for test.
type mockExtStore struct {
src []byte
idx uint64
}

func (s *mockExtStore) Read(p []byte) (n int, err error) {
// Read from src to p.
if s.idx >= uint64(len(s.src)) {
return 0, io.EOF
}
n = copy(p, s.src[s.idx:])
s.idx += uint64(n)
return n, nil
}

func (s *mockExtStore) Seek(_ int64, _ int) (int64, error) {
return 0, errors.Errorf("unsupported operation")
}

func (s *mockExtStore) Close() error {
return nil
}

func TestByteReader(t *testing.T) {
// Test basic next() usage.
br, err := newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, err)
x := br.next(1)
require.Equal(t, 1, len(x))
require.Equal(t, byte('a'), x[0])
x = br.next(2)
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[0])
require.Equal(t, byte('c'), x[1])
require.NoError(t, br.reload())
require.False(t, br.eof())
require.Error(t, br.reload())
require.False(t, br.eof()) // Data in buffer is not consumed.
br.next(2)
require.True(t, br.eof())
require.NoError(t, br.Close())

// Test basic readNBytes() usage.
br, err = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, err)
y, err := br.readNBytes(2)
require.NoError(t, err)
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('a'), x[0])
require.Equal(t, byte('b'), x[1])
require.NoError(t, br.Close())

br, err = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, err)
y, err = br.readNBytes(5) // Read all the data.
require.NoError(t, err)
x = *y
require.Equal(t, 5, len(x))
require.Equal(t, byte('e'), x[4])
require.NoError(t, br.Close())

br, err = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, err)
_, err = br.readNBytes(7) // EOF
require.Error(t, err)

ms := &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
x = *y
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
br.reset()
require.NoError(t, br.Close())
}

func TestByteReaderClone(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br, err := newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err := br.readNBytes(2)
require.NoError(t, err)
y2, err := br.readNBytes(1)
require.NoError(t, err)
x1, x2 := *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.reload()) // Perform a reload to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('4'), x1[0]) // Verify if the buffer is overwritten.
require.Equal(t, byte('6'), x2[0])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("0123456789")}
br, err = newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err = br.readNBytes(2)
require.NoError(t, err)
y2, err = br.readNBytes(1)
require.NoError(t, err)
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
br.cloneSlices()
require.NoError(t, br.reload()) // Perform a reload to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0]) // Verify if the buffer is NOT overwritten.
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.Close())
}

func TestByteReaderAuxBuf(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br, err := newByteReader(context.Background(), ms, 1)
require.NoError(t, err)
y1, err := br.readNBytes(1)
require.NoError(t, err)
y2, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)

y3, err := br.readNBytes(1)
require.NoError(t, err)
y4, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("3"), *y3)
require.Equal(t, []byte("45"), *y4)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)
}

0 comments on commit a82c8ff

Please sign in to comment.