From db3ba30b84f90c0455fb26e089941092487595ee Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 29 Aug 2023 14:17:45 +0800 Subject: [PATCH 1/3] added test cases && better exit logic for warmup Signed-off-by: hillium --- cmd/ebs-warmup/filereader/config.go | 8 + cmd/ebs-warmup/filereader/exec.go | 102 ++++++---- .../internal/tests/integration_test.go | 181 ++++++++++++++++++ cmd/ebs-warmup/main.go | 16 +- cmd/ebs-warmup/worker/worker.go | 4 +- 5 files changed, 274 insertions(+), 37 deletions(-) create mode 100644 cmd/ebs-warmup/internal/tests/integration_test.go diff --git a/cmd/ebs-warmup/filereader/config.go b/cmd/ebs-warmup/filereader/config.go index a8ee8a1ed6..9e25ddef0e 100644 --- a/cmd/ebs-warmup/filereader/config.go +++ b/cmd/ebs-warmup/filereader/config.go @@ -13,6 +13,11 @@ package filereader +import ( + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker" + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker/tasks" +) + type Config struct { Files string Type string @@ -21,4 +26,7 @@ type Config struct { Direct bool CheckpointEvery uint64 CheckpointFile string + + OnStep worker.OnStepHook + OnFireRequest func(*tasks.ReadFile) } diff --git a/cmd/ebs-warmup/filereader/exec.go b/cmd/ebs-warmup/filereader/exec.go index fc0f7e603a..575d60d6f9 100644 --- a/cmd/ebs-warmup/filereader/exec.go +++ b/cmd/ebs-warmup/filereader/exec.go @@ -38,6 +38,8 @@ const ( defaultSegmentCount = 16 ) +type sendFileHook func(tasks.ReadFile) (brk bool) + type StatedFile struct { Info os.FileInfo Path string @@ -68,36 +70,31 @@ func StatFilesByGlob(glob string) ([]StatedFile, error) { return stats, nil } -func WarmUpFooters(glob string, sendToWorker func(tasks.ReadFile)) error { +func WarmUpFooters(glob string, sendToWorker sendFileHook) error { files, err := StatFilesByGlob(glob) if err != nil { return errors.Annotatef(err, "failed to stat files with glob %s", glob) } for _, file := range files { - sendToWorker(tasks.ReadFile{ + shouldBrk := sendToWorker(tasks.ReadFile{ Type: tasks.ReadLastNBytes(16 * 1024), File: file.Info, FilePath: file.Path, }) + if shouldBrk { + return nil + } } return nil } -func WarmUpWholeFile(glob string, sendToWorker func(tasks.ReadFile)) error { - return warmUpWholeFileBy(glob, func(sf StatedFile) { - sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker) - }) -} - -func WarmUpWholeFileAfter(glob string, after time.Time, sendToWorker func(tasks.ReadFile)) error { - return warmUpWholeFileBy(glob, func(sf StatedFile) { - if sf.Info.ModTime().After(after) { - sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker) - } +func WarmUpWholeFile(glob string, sendToWorker sendFileHook) error { + return warmUpWholeFileBy(glob, func(sf StatedFile) bool { + return sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker) }) } -func warmUpWholeFileBy(glob string, onFile func(StatedFile)) error { +func warmUpWholeFileBy(glob string, onFile func(StatedFile) (brk bool)) error { files, err := StatFilesByGlob(glob) if err != nil { return errors.Annotatef(err, "failed to stat files with glob %s", glob) @@ -107,12 +104,14 @@ func warmUpWholeFileBy(glob string, onFile func(StatedFile)) error { return files[i].Info.ModTime().After(files[j].Info.ModTime()) }) for _, file := range files { - onFile(file) + if onFile(file) { + return context.Canceled + } } return nil } -func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(tasks.ReadFile)) { +func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker sendFileHook) (brk bool) { partitionSize := file.Info.Size() / int64(partitions) if partitionSize < minimalSegmentSize { partitionSize = minimalSegmentSize @@ -123,7 +122,7 @@ func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(t if offset+partitionSize > file.Info.Size() { length = file.Info.Size() - offset } - sendToWorker( + shouldBrk := sendToWorker( tasks.ReadFile{ Type: tasks.ReadOffsetAndLength{ Offset: offset, @@ -133,13 +132,18 @@ func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(t FilePath: file.Path, }, ) + if shouldBrk { + return true + } offset += partitionSize } + return false } type WorkersOpt struct { ObserveTotalSize *uint64 RateLimitInMiB float64 + OnStep worker.OnStepHook } func CreateWorkers(ctx context.Context, n int, opt WorkersOpt) ([]chan<- tasks.ReadFile, *errgroup.Group) { @@ -160,6 +164,9 @@ func CreateWorkers(ctx context.Context, n int, opt WorkersOpt) ([]chan<- tasks.R wr := worker.New(ch) wr.RateLimiter = limiter wr.OnStep = func(file os.FileInfo, readBytes int, take time.Duration) { + if opt.OnStep != nil { + opt.OnStep(file, readBytes, take) + } if opt.ObserveTotalSize != nil { new := atomic.AddUint64(opt.ObserveTotalSize, uint64(readBytes)) if atomic.AddUint64(&loopCounter, 1)%2048 == 0 { @@ -200,7 +207,7 @@ func RoundRobin[T any](ts []T) func() T { return choose } -func TrySync(workers []chan<- tasks.ReadFile) { +func TrySync(ctx context.Context, workers []chan<- tasks.ReadFile) error { chs := make([]chan struct{}, 0) for _, w := range workers { ch := make(chan struct{}) @@ -208,8 +215,13 @@ func TrySync(workers []chan<- tasks.ReadFile) { chs = append(chs, ch) } for _, ch := range chs { - <-ch + select { + case <-ch: + case <-ctx.Done(): + return ctx.Err() + } } + return nil } type ExecContext struct { @@ -222,6 +234,8 @@ type ExecContext struct { chooser func() chan<- tasks.ReadFile lastSent uint64 start time.Time + + cancel context.CancelFunc } func (execCtx *ExecContext) perhapsCheckpoint() (uint64, error) { @@ -241,7 +255,8 @@ func (execCtx *ExecContext) checkpoint() uint64 { ckp, err := execCtx.perhapsCheckpoint() if err != nil { klog.InfoS("Failed to read checkpoint file. Will use time.Now() as checkpoint.", "err", err) - return uint64(time.Now().UnixMilli()) + // add a 1s offset for newly created files. (usually in test) + return uint64(time.Now().UnixMilli() + 1000) } return ckp } @@ -250,34 +265,38 @@ func (execCtx *ExecContext) saveCheckpoint(ckp uint64) error { return os.WriteFile(execCtx.config.CheckpointFile, []byte(fmt.Sprintf("%d", ckp)), 0o644) } -func New(masterCtx context.Context, config Config) *ExecContext { +func New(config Config) *ExecContext { execCtx := &ExecContext{ config: config, } - execCtx.wkrs, execCtx.eg = CreateWorkers(masterCtx, execCtx.config.NWorkers, WorkersOpt{ + wCtx, cancel := context.WithCancel(context.Background()) + execCtx.wkrs, execCtx.eg = CreateWorkers(wCtx, execCtx.config.NWorkers, WorkersOpt{ ObserveTotalSize: &execCtx.total, RateLimitInMiB: execCtx.config.RateLimit, + OnStep: config.OnStep, }) execCtx.start = time.Now() execCtx.chooser = RoundRobin(execCtx.wkrs) execCtx.cnt = uint64(0) execCtx.lastSent = execCtx.checkpoint() + execCtx.cancel = cancel return execCtx } -func (execCtx *ExecContext) Run() { - total := uint64(0) +func (execCtx *ExecContext) RunAndClose(ctx context.Context) { + defer execCtx.cancel() + total := uint64(0) klog.InfoS("Using checkpoint.", "checkpoint", execCtx.lastSent, "time", time.UnixMilli(int64(execCtx.lastSent)).String()) switch execCtx.config.Type { case "footer": - WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) { - execCtx.sendToWorker(rf) + WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool { + return execCtx.sendToWorker(ctx, rf) }) case "whole": - WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) { - execCtx.sendToWorker(rf) + WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool { + return execCtx.sendToWorker(ctx, rf) }) } @@ -291,21 +310,35 @@ func (execCtx *ExecContext) Run() { klog.InfoS("Done.", "take", take, "total", total, "rate", fmt.Sprintf("%s/s", units.HumanSize(rate))) } -func (execCtx *ExecContext) sendToWorker(rf tasks.ReadFile) { - createTs := rf.File.ModTime().UnixMilli() - if createTs > int64(execCtx.checkpoint()) { - return - } +func (execCtx *ExecContext) checkpointTick(ctx context.Context) { execCtx.cnt += 1 if execCtx.cnt%execCtx.config.CheckpointEvery == 0 { ckp := execCtx.lastSent now := time.Now() - TrySync(execCtx.wkrs) + if err := TrySync(ctx, execCtx.wkrs); err != nil { + klog.ErrorS(err, "skip saving checkpoint: the worker is closing", "err", err) + return + } err := execCtx.saveCheckpoint(ckp) if err != nil { klog.ErrorS(err, "Failed to save checkpoint.", "checkpoint", ckp, "take", time.Since(now)) } } +} + +func (execCtx *ExecContext) sendToWorker(ctx context.Context, rf tasks.ReadFile) bool { + createTs := rf.File.ModTime().UnixMilli() + if execCtx.config.OnFireRequest != nil { + execCtx.config.OnFireRequest(&rf) + } + if ctx.Err() != nil { + klog.InfoS("early exit due to context canceled.", "err", ctx.Err()) + return true + } + if createTs > int64(execCtx.lastSent) { + return false + } + execCtx.checkpointTick(ctx) rf.Direct = execCtx.config.Direct execCtx.chooser() <- rf if execCtx.lastSent < uint64(createTs) { @@ -313,4 +346,5 @@ func (execCtx *ExecContext) sendToWorker(rf tasks.ReadFile) { "createTs=", uint64(createTs), "lastSent=", execCtx.lastSent) } execCtx.lastSent = uint64(createTs) + return false } diff --git a/cmd/ebs-warmup/internal/tests/integration_test.go b/cmd/ebs-warmup/internal/tests/integration_test.go new file mode 100644 index 0000000000..a07eb1ae5c --- /dev/null +++ b/cmd/ebs-warmup/internal/tests/integration_test.go @@ -0,0 +1,181 @@ +package tests_test + +import ( + "context" + "fmt" + "math" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/filereader" + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker/tasks" + "github.com/stretchr/testify/require" +) + +type ToBeWarmedUp struct { + basicPath string + files []os.FileInfo + + allocID int + + tCtx *testing.T +} + +func (t *ToBeWarmedUp) newFileName() string { + t.allocID += 1 + return filepath.Join(t.basicPath, fmt.Sprintf("%06d.bin", t.allocID)) +} + +func (t *ToBeWarmedUp) createFile(size int, sync bool) { + fName := t.newFileName() + fd, err := os.Create(fName) + require.NoError(t.tCtx, err) + defer fd.Close() + total := 0 + for total < size { + n, err := fd.Write(make([]byte, 4096)) + require.NoError(t.tCtx, err) + total += n + } + stat, err := os.Stat(fName) + require.NoError(t.tCtx, err) + if sync { + require.NoError(t.tCtx, fd.Sync()) + } + t.files = append(t.files, stat) +} + +func createTestDataSet(t *testing.T) *ToBeWarmedUp { + dir := t.TempDir() + return &ToBeWarmedUp{ + basicPath: dir, + + tCtx: t, + } +} + +func (t *ToBeWarmedUp) saveCheckpoint(ckp int64) { + fd, err := os.Create(t.defaultConfig().CheckpointFile) + require.NoError(t.tCtx, err) + _, err = fd.WriteString(fmt.Sprintf("%d", ckp)) + require.NoError(t.tCtx, err) +} + +func (t *ToBeWarmedUp) defaultConfig() filereader.Config { + config := filereader.Config{ + Files: t.basicPath, + Type: "whole", + RateLimit: math.Inf(1), + NWorkers: 2, + Direct: false, + CheckpointEvery: 10, + CheckpointFile: filepath.Join(t.basicPath, "warmup-checkpoint.txt"), + } + return config +} + +type Collector struct { + mu sync.Mutex + records map[string]int +} + +func NewCollector() *Collector { + return &Collector{ + records: make(map[string]int), + } +} + +func (c *Collector) OnStep(file os.FileInfo, size int, dur time.Duration) { + c.mu.Lock() + c.records[file.Name()] += size + c.mu.Unlock() +} + +func (c *Collector) CheckWith(t *testing.T, fileInfos []os.FileInfo) { + c.mu.Lock() + defer c.mu.Unlock() + for _, f := range fileInfos { + require.GreaterOrEqual(t, int(f.Size()), c.records[f.Name()], "file %s size %d but should be %d", f.Name(), c.records[f.Name()], f.Size()) + } +} + +func TestBasic(t *testing.T) { + tw := createTestDataSet(t) + + for i := 0; i < 100; i++ { + tw.createFile(4096, false) + } + for i := 0; i < 5; i++ { + tw.createFile(4096*20, false) + } + + cfg := tw.defaultConfig() + coll := NewCollector() + cfg.OnStep = coll.OnStep + runner := filereader.New(cfg) + runner.RunAndClose(context.Background()) + + coll.CheckWith(t, tw.files) +} + +func TestCheckpoint(t *testing.T) { + tw := createTestDataSet(t) + + for i := 0; i < 100; i++ { + tw.createFile(4096, false) + if i == 42 { + time.Sleep(100 * time.Millisecond) + } + } + for i := 0; i < 5; i++ { + tw.createFile(4096*20, false) + } + + ckp := tw.files[42].ModTime().UnixMilli() + tw.saveCheckpoint(ckp) + + cfg := tw.defaultConfig() + coll := NewCollector() + cfg.OnStep = coll.OnStep + runner := filereader.New(cfg) + runner.RunAndClose(context.Background()) + + require.Len(t, coll.records, 43) + coll.CheckWith(t, tw.files[:43]) +} + +func TestSigAndCheckpoint(t *testing.T) { + tw := createTestDataSet(t) + + for i := 0; i < 300; i++ { + tw.createFile(4096+95, i < 200) + } + for i := 0; i < 10; i++ { + tw.createFile(4096*20-42, false) + } + + ctx, cancel := context.WithCancel(context.Background()) + cfg := tw.defaultConfig() + coll := NewCollector() + cfg.OnStep = coll.OnStep + cfg.OnFireRequest = func(rf *tasks.ReadFile) { + if strings.HasSuffix(rf.File.Name(), "099.bin") { + cancel() + } + } + runner := filereader.New(cfg) + runner.RunAndClose(ctx) + // Some of file might not be saved. + coll.CheckWith(t, tw.files[101:]) + + ctx = context.Background() + coll2 := NewCollector() + cfg.OnStep = coll2.OnStep + runner2 := filereader.New(cfg) + runner2.RunAndClose(ctx) + coll2.CheckWith(t, tw.files[:101]) +} diff --git a/cmd/ebs-warmup/main.go b/cmd/ebs-warmup/main.go index d96bfe54ec..8a63b04d43 100644 --- a/cmd/ebs-warmup/main.go +++ b/cmd/ebs-warmup/main.go @@ -16,9 +16,12 @@ package main import ( "context" "math" + "os" + "os/signal" "github.com/pingcap/tidb-operator/cmd/ebs-warmup/filereader" "github.com/spf13/pflag" + "k8s.io/klog/v2" ) var ( @@ -44,6 +47,15 @@ func main() { CheckpointFile: *checkpointFile, } - rd := filereader.New(context.Background(), config) - rd.Run() + rd := filereader.New(config) + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + go func() { + <-ch + klog.Warning("Received interrupt, stopping...") + signal.Stop(ch) + cancel() + }() + rd.RunAndClose(ctx) } diff --git a/cmd/ebs-warmup/worker/worker.go b/cmd/ebs-warmup/worker/worker.go index e16a2e0ce7..dd15efafda 100644 --- a/cmd/ebs-warmup/worker/worker.go +++ b/cmd/ebs-warmup/worker/worker.go @@ -28,13 +28,15 @@ import ( "k8s.io/klog/v2" ) +type OnStepHook func(file os.FileInfo, readBytes int, take time.Duration) + // Worker is the handler for reading tasks. // It shall be !Send. type Worker struct { mailbox <-chan tasks.ReadFile buf []byte - OnStep func(file os.FileInfo, readBytes int, take time.Duration) + OnStep OnStepHook RateLimiter *rate.Limiter } From 48b76f246d44ea777b8dba1a1f5fa8b586a3ecd7 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 29 Aug 2023 14:23:17 +0800 Subject: [PATCH 2/3] added to makefile Signed-off-by: hillium --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 1790f2342e..b777342e34 100644 --- a/Makefile +++ b/Makefile @@ -208,7 +208,7 @@ fault-trigger: # `-race` for race detector. # GO_COVER: Whether to run tests with code coverage. Set to 'y' to enable coverage collection. # -test: TEST_PACKAGES = ./cmd/backup-manager/app ./pkg +test: TEST_PACKAGES = ./cmd/backup-manager/app ./pkg ./cmd/ebs-warmup/internal/tests test: ## Run unit tests @echo "Run unit tests" ifeq ($(GO_COVER),y) From f3fffe069364fd7bab050113dca3cdcd3b6677e1 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 29 Aug 2023 16:04:54 +0800 Subject: [PATCH 3/3] added header Signed-off-by: hillium --- cmd/ebs-warmup/internal/tests/integration_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cmd/ebs-warmup/internal/tests/integration_test.go b/cmd/ebs-warmup/internal/tests/integration_test.go index a07eb1ae5c..38a7dffe2c 100644 --- a/cmd/ebs-warmup/internal/tests/integration_test.go +++ b/cmd/ebs-warmup/internal/tests/integration_test.go @@ -1,3 +1,16 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package tests_test import (