Skip to content

Commit

Permalink
ebs-warmup: added test cases && better exit logic (#5272)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Sep 15, 2023
1 parent 6702782 commit 2c3cd13
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ fault-trigger:
# `-race` for race detector.
# GO_COVER: Whether to run tests with code coverage. Set to 'y' to enable coverage collection.
#
test: TEST_PACKAGES = ./cmd/backup-manager/app ./pkg
test: TEST_PACKAGES = ./cmd/backup-manager/app ./pkg ./cmd/ebs-warmup/internal/tests
test: ## Run unit tests
@echo "Run unit tests"
ifeq ($(GO_COVER),y)
Expand Down
8 changes: 8 additions & 0 deletions cmd/ebs-warmup/filereader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

package filereader

import (
"github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker"
"github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker/tasks"
)

type Config struct {
Files string
Type string
Expand All @@ -21,4 +26,7 @@ type Config struct {
Direct bool
CheckpointEvery uint64
CheckpointFile string

OnStep worker.OnStepHook
OnFireRequest func(*tasks.ReadFile)
}
102 changes: 68 additions & 34 deletions cmd/ebs-warmup/filereader/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
defaultSegmentCount = 16
)

type sendFileHook func(tasks.ReadFile) (brk bool)

type StatedFile struct {
Info os.FileInfo
Path string
Expand Down Expand Up @@ -68,36 +70,31 @@ func StatFilesByGlob(glob string) ([]StatedFile, error) {
return stats, nil
}

func WarmUpFooters(glob string, sendToWorker func(tasks.ReadFile)) error {
func WarmUpFooters(glob string, sendToWorker sendFileHook) error {
files, err := StatFilesByGlob(glob)
if err != nil {
return errors.Annotatef(err, "failed to stat files with glob %s", glob)
}
for _, file := range files {
sendToWorker(tasks.ReadFile{
shouldBrk := sendToWorker(tasks.ReadFile{
Type: tasks.ReadLastNBytes(16 * 1024),
File: file.Info,
FilePath: file.Path,
})
if shouldBrk {
return nil
}
}
return nil
}

func WarmUpWholeFile(glob string, sendToWorker func(tasks.ReadFile)) error {
return warmUpWholeFileBy(glob, func(sf StatedFile) {
sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker)
})
}

func WarmUpWholeFileAfter(glob string, after time.Time, sendToWorker func(tasks.ReadFile)) error {
return warmUpWholeFileBy(glob, func(sf StatedFile) {
if sf.Info.ModTime().After(after) {
sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker)
}
func WarmUpWholeFile(glob string, sendToWorker sendFileHook) error {
return warmUpWholeFileBy(glob, func(sf StatedFile) bool {
return sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker)
})
}

func warmUpWholeFileBy(glob string, onFile func(StatedFile)) error {
func warmUpWholeFileBy(glob string, onFile func(StatedFile) (brk bool)) error {
files, err := StatFilesByGlob(glob)
if err != nil {
return errors.Annotatef(err, "failed to stat files with glob %s", glob)
Expand All @@ -107,12 +104,14 @@ func warmUpWholeFileBy(glob string, onFile func(StatedFile)) error {
return files[i].Info.ModTime().After(files[j].Info.ModTime())
})
for _, file := range files {
onFile(file)
if onFile(file) {
return context.Canceled
}
}
return nil
}

func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(tasks.ReadFile)) {
func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker sendFileHook) (brk bool) {
partitionSize := file.Info.Size() / int64(partitions)
if partitionSize < minimalSegmentSize {
partitionSize = minimalSegmentSize
Expand All @@ -123,7 +122,7 @@ func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(t
if offset+partitionSize > file.Info.Size() {
length = file.Info.Size() - offset
}
sendToWorker(
shouldBrk := sendToWorker(
tasks.ReadFile{
Type: tasks.ReadOffsetAndLength{
Offset: offset,
Expand All @@ -133,13 +132,18 @@ func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(t
FilePath: file.Path,
},
)
if shouldBrk {
return true
}
offset += partitionSize
}
return false
}

type WorkersOpt struct {
ObserveTotalSize *uint64
RateLimitInMiB float64
OnStep worker.OnStepHook
}

func CreateWorkers(ctx context.Context, n int, opt WorkersOpt) ([]chan<- tasks.ReadFile, *errgroup.Group) {
Expand All @@ -160,6 +164,9 @@ func CreateWorkers(ctx context.Context, n int, opt WorkersOpt) ([]chan<- tasks.R
wr := worker.New(ch)
wr.RateLimiter = limiter
wr.OnStep = func(file os.FileInfo, readBytes int, take time.Duration) {
if opt.OnStep != nil {
opt.OnStep(file, readBytes, take)
}
if opt.ObserveTotalSize != nil {
new := atomic.AddUint64(opt.ObserveTotalSize, uint64(readBytes))
if atomic.AddUint64(&loopCounter, 1)%2048 == 0 {
Expand Down Expand Up @@ -200,16 +207,21 @@ func RoundRobin[T any](ts []T) func() T {
return choose
}

func TrySync(workers []chan<- tasks.ReadFile) {
func TrySync(ctx context.Context, workers []chan<- tasks.ReadFile) error {
chs := make([]chan struct{}, 0)
for _, w := range workers {
ch := make(chan struct{})
w <- tasks.ReadFile{Type: tasks.Sync{C: ch}}
chs = append(chs, ch)
}
for _, ch := range chs {
<-ch
select {
case <-ch:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

type ExecContext struct {
Expand All @@ -222,6 +234,8 @@ type ExecContext struct {
chooser func() chan<- tasks.ReadFile
lastSent uint64
start time.Time

cancel context.CancelFunc
}

func (execCtx *ExecContext) perhapsCheckpoint() (uint64, error) {
Expand All @@ -241,7 +255,8 @@ func (execCtx *ExecContext) checkpoint() uint64 {
ckp, err := execCtx.perhapsCheckpoint()
if err != nil {
klog.InfoS("Failed to read checkpoint file. Will use time.Now() as checkpoint.", "err", err)
return uint64(time.Now().UnixMilli())
// add a 1s offset for newly created files. (usually in test)
return uint64(time.Now().UnixMilli() + 1000)
}
return ckp
}
Expand All @@ -250,34 +265,38 @@ func (execCtx *ExecContext) saveCheckpoint(ckp uint64) error {
return os.WriteFile(execCtx.config.CheckpointFile, []byte(fmt.Sprintf("%d", ckp)), 0o644)
}

func New(masterCtx context.Context, config Config) *ExecContext {
func New(config Config) *ExecContext {
execCtx := &ExecContext{
config: config,
}
execCtx.wkrs, execCtx.eg = CreateWorkers(masterCtx, execCtx.config.NWorkers, WorkersOpt{
wCtx, cancel := context.WithCancel(context.Background())
execCtx.wkrs, execCtx.eg = CreateWorkers(wCtx, execCtx.config.NWorkers, WorkersOpt{
ObserveTotalSize: &execCtx.total,
RateLimitInMiB: execCtx.config.RateLimit,
OnStep: config.OnStep,
})
execCtx.start = time.Now()
execCtx.chooser = RoundRobin(execCtx.wkrs)
execCtx.cnt = uint64(0)
execCtx.lastSent = execCtx.checkpoint()
execCtx.cancel = cancel
return execCtx
}

func (execCtx *ExecContext) Run() {
total := uint64(0)
func (execCtx *ExecContext) RunAndClose(ctx context.Context) {
defer execCtx.cancel()

total := uint64(0)
klog.InfoS("Using checkpoint.", "checkpoint", execCtx.lastSent, "time", time.UnixMilli(int64(execCtx.lastSent)).String())

switch execCtx.config.Type {
case "footer":
WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) {
execCtx.sendToWorker(rf)
WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) bool {
return execCtx.sendToWorker(ctx, rf)
})
case "whole":
WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) {
execCtx.sendToWorker(rf)
WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) bool {
return execCtx.sendToWorker(ctx, rf)
})
}

Expand All @@ -291,26 +310,41 @@ func (execCtx *ExecContext) Run() {
klog.InfoS("Done.", "take", take, "total", total, "rate", fmt.Sprintf("%s/s", units.HumanSize(rate)))
}

func (execCtx *ExecContext) sendToWorker(rf tasks.ReadFile) {
createTs := rf.File.ModTime().UnixMilli()
if createTs > int64(execCtx.checkpoint()) {
return
}
func (execCtx *ExecContext) checkpointTick(ctx context.Context) {
execCtx.cnt += 1
if execCtx.cnt%execCtx.config.CheckpointEvery == 0 {
ckp := execCtx.lastSent
now := time.Now()
TrySync(execCtx.wkrs)
if err := TrySync(ctx, execCtx.wkrs); err != nil {
klog.ErrorS(err, "skip saving checkpoint: the worker is closing", "err", err)
return
}
err := execCtx.saveCheckpoint(ckp)
if err != nil {
klog.ErrorS(err, "Failed to save checkpoint.", "checkpoint", ckp, "take", time.Since(now))
}
}
}

func (execCtx *ExecContext) sendToWorker(ctx context.Context, rf tasks.ReadFile) bool {
createTs := rf.File.ModTime().UnixMilli()
if execCtx.config.OnFireRequest != nil {
execCtx.config.OnFireRequest(&rf)
}
if ctx.Err() != nil {
klog.InfoS("early exit due to context canceled.", "err", ctx.Err())
return true
}
if createTs > int64(execCtx.lastSent) {
return false
}
execCtx.checkpointTick(ctx)
rf.Direct = execCtx.config.Direct
execCtx.chooser() <- rf
if execCtx.lastSent < uint64(createTs) {
klog.Warningln("unordered files: checkpoint is unavailable.", "checkpoint=", execCtx.checkpoint(),
"createTs=", uint64(createTs), "lastSent=", execCtx.lastSent)
}
execCtx.lastSent = uint64(createTs)
return false
}
Loading

0 comments on commit 2c3cd13

Please sign in to comment.