Skip to content

Commit

Permalink
ebs br: always save checkpoint to the warmup folder (#5507) (#5522)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 16, 2024
1 parent d67387a commit 1d8aa7e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
24 changes: 18 additions & 6 deletions cmd/ebs-warmup/filereader/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ type StatedFile struct {
}

func StatFilesByGlob(glob string) ([]StatedFile, error) {
klog.V(1).InfoS("Entering dir.", "recPath", glob)
files, err := filepath.Glob(glob)
if err != nil {
return nil, errors.Annotatef(err, "failed to glob files with glob %s", glob)
}
stats := make([]StatedFile, 0, len(files))
klog.V(3).InfoS("Working over files.", "files", files)
for _, file := range files {
s, err := os.Stat(file)
if err != nil {
return nil, errors.Annotatef(err, "failed to stat file %s", file)
}
if s.IsDir() {
recPath := filepath.Join(file, "*")
klog.V(2).InfoS("Recursively entering dir.", "recPath", recPath)
recContent, err := StatFilesByGlob(recPath)
if err != nil {
return nil, errors.Annotatef(err, "failed to stat files in dir %s (globing %s)", file, recPath)
Expand Down Expand Up @@ -283,31 +286,40 @@ func New(config Config) *ExecContext {
return execCtx
}

func (execCtx *ExecContext) RunAndClose(ctx context.Context) {
func (execCtx *ExecContext) RunAndClose(ctx context.Context) error {
defer execCtx.cancel()

total := uint64(0)
klog.InfoS("Using checkpoint.", "checkpoint", execCtx.lastSent, "time", time.UnixMilli(int64(execCtx.lastSent)).String())

var err error
switch execCtx.config.Type {
case "footer":
WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool {
err = WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool {
return execCtx.sendToWorker(ctx, rf)
})
case "whole":
WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool {
err = WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool {
return execCtx.sendToWorker(ctx, rf)
})
}
if err != nil {
klog.ErrorS(err, "Failed to initialize the task.")
return err
}

for _, wkr := range execCtx.wkrs {
close(wkr)
}
execCtx.eg.Wait()
if err := execCtx.eg.Wait(); err != nil {
return err
}

take := time.Since(execCtx.start)
total := atomic.LoadUint64(&execCtx.total)
rate := float64(total) / take.Seconds()
klog.InfoS("Done.", "take", take, "total", total, "rate", fmt.Sprintf("%s/s", units.HumanSize(rate)))
err = os.Remove(execCtx.config.CheckpointFile)
klog.InfoS("Try remove the checkpoint file.", "err", err, "file", execCtx.config.CheckpointFile)
return nil
}

func (execCtx *ExecContext) checkpointTick(ctx context.Context) {
Expand Down
3 changes: 3 additions & 0 deletions cmd/ebs-warmup/internal/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestCheckpoint(t *testing.T) {

require.Len(t, coll.records, 43)
coll.CheckWith(t, tw.files[:43])
require.NoFileExists(t, cfg.CheckpointFile)
}

func TestSigAndCheckpoint(t *testing.T) {
Expand All @@ -184,11 +185,13 @@ func TestSigAndCheckpoint(t *testing.T) {
runner.RunAndClose(ctx)
// Some of file might not be saved.
coll.CheckWith(t, tw.files[101:])
require.FileExists(t, cfg.CheckpointFile)

ctx = context.Background()
coll2 := NewCollector()
cfg.OnStep = coll2.OnStep
runner2 := filereader.New(cfg)
runner2.RunAndClose(ctx)
coll2.CheckWith(t, tw.files[:101])
require.NoFileExists(t, cfg.CheckpointFile)
}
7 changes: 6 additions & 1 deletion cmd/ebs-warmup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"context"
"flag"
"math"
"os"
"os/signal"
Expand All @@ -38,6 +39,8 @@ var (
)

func main() {
klog.InitFlags(nil)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()

config := filereader.Config{
Expand All @@ -60,5 +63,7 @@ func main() {
signal.Stop(ch)
cancel()
}()
rd.RunAndClose(ctx)
if err := rd.RunAndClose(ctx); err != nil {
klog.ErrorS(err, "Failed to warmup. The checkpoint maybe stored.")
}
}
8 changes: 7 additions & 1 deletion images/ebs-warmup/warmup-steps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ Supported flags:
EOF
}

warmup_by_file() {
checkpoint=.com.pingcap.tidb.operator.ebs.warmup.checkpoint
/warmup --type=whole --files="$1" -P256 --direct --checkpoint.at="$1/$checkpoint"
}


# The trap command is to make sure the sidecars are terminated when the jobs are finished
cleanup() {
if [ ! -d "/tmp/pod" ]; then
Expand Down Expand Up @@ -74,7 +80,7 @@ while [ $# -gt 0 ]; do
--thread=1 --filename=/dev/"$device" &
fi
;;
fs) /warmup --type=whole --files="$1" -P256 --direct &
fs) warmup_by_file "$1" &
;;
*) die "internal error: unsupported operation $1; forgot to call --block or --fs?"
;;
Expand Down

0 comments on commit 1d8aa7e

Please sign in to comment.