Skip to content

Commit

Permalink
merge-sort: add context to errgroup, alloc mem by concurrency and ref…
Browse files Browse the repository at this point in the history
…ine logs (#48341) (#48392)

close #48367
  • Loading branch information
ti-chi-bot authored Nov 8, 2023
1 parent 157a7cc commit 3ebecae
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 16 deletions.
6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ go_library(
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand All @@ -57,13 +57,14 @@ go_test(
"file_test.go",
"iter_test.go",
"kv_buf_test.go",
"merge_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 43,
shard_count = 44,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand All @@ -82,6 +83,7 @@ go_test(
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
Expand Down
41 changes: 30 additions & 11 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -29,24 +31,23 @@ func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.Ex
dataFilesSlice = append(dataFilesSlice, paths[i:end])
}

memTotal, err := memory.MemTotal()
if err != nil {
return err
}
memSize := (memTotal / 2) / uint64(len(dataFilesSlice))

var eg errgroup.Group
logutil.Logger(ctx).Info("start to merge overlapping files",
zap.Int("file-count", len(paths)),
zap.Int("file-groups", len(dataFilesSlice)),
zap.Int("concurrency", concurrency))
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
return mergeOverlappingFilesImpl(
ctx,
egCtx,
files,
store,
readBufferSize,
newFilePrefix,
uuid.New().String(),
memSize,
DefaultMemSizeLimit,
blockSize,
writeBatchCount,
propSizeDist,
Expand All @@ -72,7 +73,25 @@ func mergeOverlappingFilesImpl(ctx context.Context,
propKeysDist uint64,
onClose OnCloseFunc,
checkHotspot bool,
) error {
) (err error) {
task := log.BeginTask(logutil.Logger(ctx).With(
zap.String("writer-id", writerID),
zap.Int("file-count", len(paths)),
), "merge overlapping files")
defer func() {
task.End(zap.ErrorLevel, err)
}()
failpoint.Inject("mergeOverlappingFilesImpl", func(val failpoint.Value) {
if val.(string) == paths[0] {
failpoint.Return(errors.New("injected error"))
} else {
select {
case <-ctx.Done():
failpoint.Return(ctx.Err())
}
}
})

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot)
if err != nil {
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/external/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
)

func TestMergeOverlappingFiles(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/mergeOverlappingFilesImpl", `return("a")`))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/mergeOverlappingFilesImpl"))
})
require.ErrorContains(t, MergeOverlappingFiles(
context.Background(),
[]string{"a", "b", "c", "d", "e"},
nil,
1,
"",
1,
1,
1,
1,
nil,
5,
false,
), "injected error")
}
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func seekPropsOffsets(
) (_ []uint64, err error) {
logger := logutil.Logger(ctx)
task := log.BeginTask(logger, "seek props offsets")
defer task.End(zapcore.ErrorLevel, err)
defer func() {
task.End(zapcore.ErrorLevel, err)
}()
iter, err := NewMergePropIter(ctx, paths, exStorage, checkHotSpot)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
// subtask needs to change its state to canceled.
cancel(ErrCancelSubtask)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) (resErr error
zap.String("server-mem-limit", memory.ServerMemoryLimitOriginText.Load()),
), "schedule step")
// log as info level, subtask might be cancelled, let caller check it.
defer stepLogger.End(zap.InfoLevel, resErr)
defer func() {
stepLogger.End(zap.InfoLevel, resErr)
}()

summary, cleanup, err := runSummaryCollectLoop(ctx, task, s.taskTable)
if err != nil {
Expand Down

0 comments on commit 3ebecae

Please sign in to comment.