Skip to content

Commit

Permalink
external: make sure read after switchConcurrentMode will see EOF (#48232
Browse files Browse the repository at this point in the history
)

close #47587, close #48223
  • Loading branch information
lance6716 authored Nov 2, 2023
1 parent 9defab4 commit c416ca1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 42,
shard_count = 43,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
"//pkg/util/codec",
"//pkg/util/logutil",
"//pkg/util/size",
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/credentials",
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func (r *byteReader) reload() error {
if err != nil {
switch err {
case io.EOF:
// move curBufOffset so following read will also find EOF
r.curBufOffset = len(r.curBuf)
return err
case io.ErrUnexpectedEOF:
// The last batch.
Expand Down
39 changes: 39 additions & 0 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"time"

"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -326,6 +328,43 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) {
require.NoError(t, err)
}

type eofReader struct {
storage.ExternalFileReader
}

func (r eofReader) Seek(_ int64, _ int) (int64, error) {
return 0, nil
}

func (r eofReader) Read(_ []byte) (int, error) {
return 0, io.EOF
}

func TestReadAfterCloseConnReader(t *testing.T) {
ctx := context.Background()

reader := &byteReader{
ctx: ctx,
storageReader: eofReader{},
smallBuf: []byte{0, 255, 255, 255, 255, 255, 255, 255},
curBufOffset: 8,
logger: logutil.Logger(ctx),
}
reader.curBuf = reader.smallBuf
pool := membuf.NewPool()
reader.concurrentReader.largeBufferPool = pool.NewBuffer()
reader.concurrentReader.store = storage.NewMemStorage()

// set current reader to concurrent reader, and then close it
reader.concurrentReader.now = true
err := reader.switchConcurrentMode(false)
require.NoError(t, err)

wrapKVReader := &kvReader{reader}
_, _, err = wrapKVReader.nextKV()
require.ErrorIs(t, err, io.EOF)
}

func TestHotspot(t *testing.T) {
ctx := context.Background()
store := storage.NewMemStorage()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

m.mu.Lock()
m.subtaskSortedKVMeta = &external.SortedKVMeta{}
onClose := func(summary *external.WriterSummary) {
m.mu.Lock()
m.subtaskSortedKVMeta.MergeSummary(summary)
m.mu.Unlock()
}
Expand Down

0 comments on commit c416ca1

Please sign in to comment.