From c416ca1ead5941ebd380b11b66e94dfdc7163875 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 2 Nov 2023 19:00:39 +0800 Subject: [PATCH] external: make sure read after switchConcurrentMode will see EOF (#48232) close pingcap/tidb#47587, close pingcap/tidb#48223 --- br/pkg/lightning/backend/external/BUILD.bazel | 4 +- .../lightning/backend/external/byte_reader.go | 2 + .../lightning/backend/external/iter_test.go | 39 +++++++++++++++++++ pkg/ddl/backfilling_merge_sort.go | 2 +- 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index d8af2e5b4ab02..2b017198725a6 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -64,13 +64,15 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 42, + shard_count = 43, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", + "//br/pkg/membuf", "//br/pkg/storage", "//pkg/kv", "//pkg/util/codec", + "//pkg/util/logutil", "//pkg/util/size", "@com_github_aws_aws_sdk_go//aws", "@com_github_aws_aws_sdk_go//aws/credentials", diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index 86ffe9536f542..bed2661f50764 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -278,6 +278,8 @@ func (r *byteReader) reload() error { if err != nil { switch err { case io.EOF: + // move curBufOffset so following read will also find EOF + r.curBufOffset = len(r.curBuf) return err case io.ErrUnexpectedEOF: // The last batch. diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index 57b69f0f764ca..e9b27f3793b6e 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -24,7 +24,9 @@ import ( "time" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/exp/rand" @@ -326,6 +328,43 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) { require.NoError(t, err) } +type eofReader struct { + storage.ExternalFileReader +} + +func (r eofReader) Seek(_ int64, _ int) (int64, error) { + return 0, nil +} + +func (r eofReader) Read(_ []byte) (int, error) { + return 0, io.EOF +} + +func TestReadAfterCloseConnReader(t *testing.T) { + ctx := context.Background() + + reader := &byteReader{ + ctx: ctx, + storageReader: eofReader{}, + smallBuf: []byte{0, 255, 255, 255, 255, 255, 255, 255}, + curBufOffset: 8, + logger: logutil.Logger(ctx), + } + reader.curBuf = reader.smallBuf + pool := membuf.NewPool() + reader.concurrentReader.largeBufferPool = pool.NewBuffer() + reader.concurrentReader.store = storage.NewMemStorage() + + // set current reader to concurrent reader, and then close it + reader.concurrentReader.now = true + err := reader.switchConcurrentMode(false) + require.NoError(t, err) + + wrapKVReader := &kvReader{reader} + _, _, err = wrapKVReader.nextKV() + require.ErrorIs(t, err, io.EOF) +} + func TestHotspot(t *testing.T) { ctx := context.Background() store := storage.NewMemStorage() diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index b94efc4c283a7..32d27d7146a06 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -78,9 +78,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - m.mu.Lock() m.subtaskSortedKVMeta = &external.SortedKVMeta{} onClose := func(summary *external.WriterSummary) { + m.mu.Lock() m.subtaskSortedKVMeta.MergeSummary(summary) m.mu.Unlock() }