From 1d8aa7ebc7d402179be1d6c9a37668f7c9fd3522 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 17 Jan 2024 07:34:35 +0800 Subject: [PATCH] ebs br: always save checkpoint to the warmup folder (#5507) (#5522) --- cmd/ebs-warmup/filereader/exec.go | 24 ++++++++++++++----- .../internal/tests/integration_test.go | 3 +++ cmd/ebs-warmup/main.go | 7 +++++- images/ebs-warmup/warmup-steps.sh | 8 ++++++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/cmd/ebs-warmup/filereader/exec.go b/cmd/ebs-warmup/filereader/exec.go index 575d60d6f9..52726a6b65 100644 --- a/cmd/ebs-warmup/filereader/exec.go +++ b/cmd/ebs-warmup/filereader/exec.go @@ -46,11 +46,13 @@ type StatedFile struct { } func StatFilesByGlob(glob string) ([]StatedFile, error) { + klog.V(1).InfoS("Entering dir.", "recPath", glob) files, err := filepath.Glob(glob) if err != nil { return nil, errors.Annotatef(err, "failed to glob files with glob %s", glob) } stats := make([]StatedFile, 0, len(files)) + klog.V(3).InfoS("Working over files.", "files", files) for _, file := range files { s, err := os.Stat(file) if err != nil { @@ -58,6 +60,7 @@ func StatFilesByGlob(glob string) ([]StatedFile, error) { } if s.IsDir() { recPath := filepath.Join(file, "*") + klog.V(2).InfoS("Recursively entering dir.", "recPath", recPath) recContent, err := StatFilesByGlob(recPath) if err != nil { return nil, errors.Annotatef(err, "failed to stat files in dir %s (globing %s)", file, recPath) @@ -283,31 +286,40 @@ func New(config Config) *ExecContext { return execCtx } -func (execCtx *ExecContext) RunAndClose(ctx context.Context) { +func (execCtx *ExecContext) RunAndClose(ctx context.Context) error { defer execCtx.cancel() - total := uint64(0) klog.InfoS("Using checkpoint.", "checkpoint", execCtx.lastSent, "time", time.UnixMilli(int64(execCtx.lastSent)).String()) - + var err error switch execCtx.config.Type { case "footer": - WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool { + err = WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool { return execCtx.sendToWorker(ctx, rf) }) case "whole": - WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool { + err = WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool { return execCtx.sendToWorker(ctx, rf) }) } + if err != nil { + klog.ErrorS(err, "Failed to initialize the task.") + return err + } for _, wkr := range execCtx.wkrs { close(wkr) } - execCtx.eg.Wait() + if err := execCtx.eg.Wait(); err != nil { + return err + } take := time.Since(execCtx.start) + total := atomic.LoadUint64(&execCtx.total) rate := float64(total) / take.Seconds() klog.InfoS("Done.", "take", take, "total", total, "rate", fmt.Sprintf("%s/s", units.HumanSize(rate))) + err = os.Remove(execCtx.config.CheckpointFile) + klog.InfoS("Try remove the checkpoint file.", "err", err, "file", execCtx.config.CheckpointFile) + return nil } func (execCtx *ExecContext) checkpointTick(ctx context.Context) { diff --git a/cmd/ebs-warmup/internal/tests/integration_test.go b/cmd/ebs-warmup/internal/tests/integration_test.go index 38a7dffe2c..7bc7bcda45 100644 --- a/cmd/ebs-warmup/internal/tests/integration_test.go +++ b/cmd/ebs-warmup/internal/tests/integration_test.go @@ -159,6 +159,7 @@ func TestCheckpoint(t *testing.T) { require.Len(t, coll.records, 43) coll.CheckWith(t, tw.files[:43]) + require.NoFileExists(t, cfg.CheckpointFile) } func TestSigAndCheckpoint(t *testing.T) { @@ -184,6 +185,7 @@ func TestSigAndCheckpoint(t *testing.T) { runner.RunAndClose(ctx) // Some of file might not be saved. coll.CheckWith(t, tw.files[101:]) + require.FileExists(t, cfg.CheckpointFile) ctx = context.Background() coll2 := NewCollector() @@ -191,4 +193,5 @@ func TestSigAndCheckpoint(t *testing.T) { runner2 := filereader.New(cfg) runner2.RunAndClose(ctx) coll2.CheckWith(t, tw.files[:101]) + require.NoFileExists(t, cfg.CheckpointFile) } diff --git a/cmd/ebs-warmup/main.go b/cmd/ebs-warmup/main.go index b8291cefe4..e60441bebf 100644 --- a/cmd/ebs-warmup/main.go +++ b/cmd/ebs-warmup/main.go @@ -15,6 +15,7 @@ package main import ( "context" + "flag" "math" "os" "os/signal" @@ -38,6 +39,8 @@ var ( ) func main() { + klog.InitFlags(nil) + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) pflag.Parse() config := filereader.Config{ @@ -60,5 +63,7 @@ func main() { signal.Stop(ch) cancel() }() - rd.RunAndClose(ctx) + if err := rd.RunAndClose(ctx); err != nil { + klog.ErrorS(err, "Failed to warmup. The checkpoint maybe stored.") + } } diff --git a/images/ebs-warmup/warmup-steps.sh b/images/ebs-warmup/warmup-steps.sh index 560638603d..da9747b3c2 100755 --- a/images/ebs-warmup/warmup-steps.sh +++ b/images/ebs-warmup/warmup-steps.sh @@ -34,6 +34,12 @@ Supported flags: EOF } +warmup_by_file() { + checkpoint=.com.pingcap.tidb.operator.ebs.warmup.checkpoint + /warmup --type=whole --files="$1" -P256 --direct --checkpoint.at="$1/$checkpoint" +} + + # The trap command is to make sure the sidecars are terminated when the jobs are finished cleanup() { if [ ! -d "/tmp/pod" ]; then @@ -74,7 +80,7 @@ while [ $# -gt 0 ]; do --thread=1 --filename=/dev/"$device" & fi ;; - fs) /warmup --type=whole --files="$1" -P256 --direct & + fs) warmup_by_file "$1" & ;; *) die "internal error: unsupported operation $1; forgot to call --block or --fs?" ;;