diff --git a/Makefile b/Makefile index efa7283b77..1790f2342e 100644 --- a/Makefile +++ b/Makefile @@ -95,6 +95,13 @@ else $(GO_BUILD) -ldflags '$(LDFLAGS)' -o images/br-federation-manager/bin/$(GOARCH)/br-federation-manager ./cmd/br-federation-manager endif +ebs-warmup: +ifeq ($(E2E),y) + $(GO_TEST) -ldflags '$(LDFLAGS)' -c -o images/ebs-warmup/bin/warmup ./cmd/ebs-warmup +else + $(GO_BUILD) -ldflags '$(LDFLAGS)' -o images/ebs-warmup/bin/$(GOARCH)/warmup ./cmd/ebs-warmup +endif + ##@ Build Docker images docker: operator-docker backup-docker br-federation-docker @@ -127,6 +134,18 @@ else docker build --tag "${DOCKER_REPO}/br-federation-manager:${IMAGE_TAG}" --build-arg=TARGETARCH=$(GOARCH) images/br-federation-manager endif +ifeq ($(NO_BUILD),y) +ebs-warmup-docker: + @echo "NO_BUILD=y, skip build for $@" +else +ebs-warmup-docker: ebs-warmup +endif +ifeq ($(E2E),y) + docker build --tag "${DOCKER_REPO}/ebs-warmup:${IMAGE_TAG}" -f images/ebs-wamrup/Dockerfile.e2e images/ebs-warmup +else + docker build --tag "${DOCKER_REPO}/ebs-warmup:${IMAGE_TAG}" --build-arg=TARGETARCH=$(GOARCH) images/ebs-warmup +endif + e2e-docker-push: e2e-docker ## Push tidb-operator-e2e image to registry docker push "${DOCKER_REPO}/tidb-operator-e2e:${IMAGE_TAG}" diff --git a/cmd/ebs-warmup/filereader/config.go b/cmd/ebs-warmup/filereader/config.go new file mode 100644 index 0000000000..a8ee8a1ed6 --- /dev/null +++ b/cmd/ebs-warmup/filereader/config.go @@ -0,0 +1,24 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filereader + +type Config struct { + Files string + Type string + RateLimit float64 + NWorkers int + Direct bool + CheckpointEvery uint64 + CheckpointFile string +} diff --git a/cmd/ebs-warmup/filereader/exec.go b/cmd/ebs-warmup/filereader/exec.go new file mode 100644 index 0000000000..fc0f7e603a --- /dev/null +++ b/cmd/ebs-warmup/filereader/exec.go @@ -0,0 +1,316 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package filereader + +import ( + "context" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "sync/atomic" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker" + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker/tasks" + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + "k8s.io/klog/v2" +) + +const ( + channelBufSize = 128 + minimalSegmentSize = 64 * 1024 + defaultSegmentCount = 16 +) + +type StatedFile struct { + Info os.FileInfo + Path string +} + +func StatFilesByGlob(glob string) ([]StatedFile, error) { + files, err := filepath.Glob(glob) + if err != nil { + return nil, errors.Annotatef(err, "failed to glob files with glob %s", glob) + } + stats := make([]StatedFile, 0, len(files)) + for _, file := range files { + s, err := os.Stat(file) + if err != nil { + return nil, errors.Annotatef(err, "failed to stat file %s", file) + } + if s.IsDir() { + recPath := filepath.Join(file, "*") + recContent, err := StatFilesByGlob(recPath) + if err != nil { + return nil, errors.Annotatef(err, "failed to stat files in dir %s (globing %s)", file, recPath) + } + stats = append(stats, recContent...) + } else { + stats = append(stats, StatedFile{Info: s, Path: file}) + } + } + return stats, nil +} + +func WarmUpFooters(glob string, sendToWorker func(tasks.ReadFile)) error { + files, err := StatFilesByGlob(glob) + if err != nil { + return errors.Annotatef(err, "failed to stat files with glob %s", glob) + } + for _, file := range files { + sendToWorker(tasks.ReadFile{ + Type: tasks.ReadLastNBytes(16 * 1024), + File: file.Info, + FilePath: file.Path, + }) + } + return nil +} + +func WarmUpWholeFile(glob string, sendToWorker func(tasks.ReadFile)) error { + return warmUpWholeFileBy(glob, func(sf StatedFile) { + sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker) + }) +} + +func WarmUpWholeFileAfter(glob string, after time.Time, sendToWorker func(tasks.ReadFile)) error { + return warmUpWholeFileBy(glob, func(sf StatedFile) { + if sf.Info.ModTime().After(after) { + sendFileWithSegmenting(sf, defaultSegmentCount, sendToWorker) + } + }) +} + +func warmUpWholeFileBy(glob string, onFile func(StatedFile)) error { + files, err := StatFilesByGlob(glob) + if err != nil { + return errors.Annotatef(err, "failed to stat files with glob %s", glob) + } + sort.Slice(files, func(i, j int) bool { + // Desc order of modify time. + return files[i].Info.ModTime().After(files[j].Info.ModTime()) + }) + for _, file := range files { + onFile(file) + } + return nil +} + +func sendFileWithSegmenting(file StatedFile, partitions int, sendToWorker func(tasks.ReadFile)) { + partitionSize := file.Info.Size() / int64(partitions) + if partitionSize < minimalSegmentSize { + partitionSize = minimalSegmentSize + } + offset := int64(0) + for offset <= file.Info.Size() { + length := partitionSize + if offset+partitionSize > file.Info.Size() { + length = file.Info.Size() - offset + } + sendToWorker( + tasks.ReadFile{ + Type: tasks.ReadOffsetAndLength{ + Offset: offset, + Length: length, + }, + File: file.Info, + FilePath: file.Path, + }, + ) + offset += partitionSize + } +} + +type WorkersOpt struct { + ObserveTotalSize *uint64 + RateLimitInMiB float64 +} + +func CreateWorkers(ctx context.Context, n int, opt WorkersOpt) ([]chan<- tasks.ReadFile, *errgroup.Group) { + result := make([]chan<- tasks.ReadFile, 0, n) + eg, ectx := errgroup.WithContext(ctx) + + loopCounter := uint64(0) + lastTotalSize := uint64(0) + lastTime := time.Now() + var limiter *rate.Limiter + if !math.IsInf(opt.RateLimitInMiB, 0) && !math.Signbit(opt.RateLimitInMiB) { + limiter = rate.NewLimiter(rate.Limit(opt.RateLimitInMiB*units.MiB), 8*units.MiB) + } + for i := 0; i < n; i++ { + ch := make(chan tasks.ReadFile, channelBufSize) + i := i + eg.Go(func() error { + wr := worker.New(ch) + wr.RateLimiter = limiter + wr.OnStep = func(file os.FileInfo, readBytes int, take time.Duration) { + if opt.ObserveTotalSize != nil { + new := atomic.AddUint64(opt.ObserveTotalSize, uint64(readBytes)) + if atomic.AddUint64(&loopCounter, 1)%2048 == 0 { + now := time.Now() + diff := new - atomic.LoadUint64(&lastTotalSize) + atomic.StoreUint64(&lastTotalSize, new) + rate := units.HumanSizeWithPrecision(float64(diff)/now.Sub(lastTime).Seconds(), 4) + klog.InfoS("Printing rate info.", "rate/s", rate) + lastTime = time.Now() + } + } + klog.V(2).InfoS( + "Read bytes from file.", "file", file.Name(), + "size", file.Size(), + "read", readBytes, + "take", take, + "worker", i, + ) + } + err := wr.MainLoop(ectx) + klog.InfoS("Background worker exits.", "id", i, "err", err) + return err + }) + result = append(result, ch) + } + return result, eg +} + +func RoundRobin[T any](ts []T) func() T { + n := len(ts) + choose := func() T { + n++ + if n >= len(ts) { + n = 0 + } + return ts[n] + } + return choose +} + +func TrySync(workers []chan<- tasks.ReadFile) { + chs := make([]chan struct{}, 0) + for _, w := range workers { + ch := make(chan struct{}) + w <- tasks.ReadFile{Type: tasks.Sync{C: ch}} + chs = append(chs, ch) + } + for _, ch := range chs { + <-ch + } +} + +type ExecContext struct { + config Config + + wkrs []chan<- tasks.ReadFile + eg *errgroup.Group + cnt uint64 + total uint64 + chooser func() chan<- tasks.ReadFile + lastSent uint64 + start time.Time +} + +func (execCtx *ExecContext) perhapsCheckpoint() (uint64, error) { + file, err := os.ReadFile(execCtx.config.CheckpointFile) + if err != nil { + return 0, errors.Annotatef(err, "failed to open checkpoint file %s", execCtx.config.CheckpointFile) + } + var cnt uint64 + _, err = fmt.Sscanf(string(file), "%d", &cnt) + if err != nil { + return 0, errors.Annotatef(err, "failed to parse checkpoint file %s", execCtx.config.CheckpointFile) + } + return cnt, nil +} + +func (execCtx *ExecContext) checkpoint() uint64 { + ckp, err := execCtx.perhapsCheckpoint() + if err != nil { + klog.InfoS("Failed to read checkpoint file. Will use time.Now() as checkpoint.", "err", err) + return uint64(time.Now().UnixMilli()) + } + return ckp +} + +func (execCtx *ExecContext) saveCheckpoint(ckp uint64) error { + return os.WriteFile(execCtx.config.CheckpointFile, []byte(fmt.Sprintf("%d", ckp)), 0o644) +} + +func New(masterCtx context.Context, config Config) *ExecContext { + execCtx := &ExecContext{ + config: config, + } + execCtx.wkrs, execCtx.eg = CreateWorkers(masterCtx, execCtx.config.NWorkers, WorkersOpt{ + ObserveTotalSize: &execCtx.total, + RateLimitInMiB: execCtx.config.RateLimit, + }) + execCtx.start = time.Now() + execCtx.chooser = RoundRobin(execCtx.wkrs) + execCtx.cnt = uint64(0) + execCtx.lastSent = execCtx.checkpoint() + return execCtx +} + +func (execCtx *ExecContext) Run() { + total := uint64(0) + + klog.InfoS("Using checkpoint.", "checkpoint", execCtx.lastSent, "time", time.UnixMilli(int64(execCtx.lastSent)).String()) + + switch execCtx.config.Type { + case "footer": + WarmUpFooters(execCtx.config.Files, func(rf tasks.ReadFile) { + execCtx.sendToWorker(rf) + }) + case "whole": + WarmUpWholeFile(execCtx.config.Files, func(rf tasks.ReadFile) { + execCtx.sendToWorker(rf) + }) + } + + for _, wkr := range execCtx.wkrs { + close(wkr) + } + execCtx.eg.Wait() + + take := time.Since(execCtx.start) + rate := float64(total) / take.Seconds() + klog.InfoS("Done.", "take", take, "total", total, "rate", fmt.Sprintf("%s/s", units.HumanSize(rate))) +} + +func (execCtx *ExecContext) sendToWorker(rf tasks.ReadFile) { + createTs := rf.File.ModTime().UnixMilli() + if createTs > int64(execCtx.checkpoint()) { + return + } + execCtx.cnt += 1 + if execCtx.cnt%execCtx.config.CheckpointEvery == 0 { + ckp := execCtx.lastSent + now := time.Now() + TrySync(execCtx.wkrs) + err := execCtx.saveCheckpoint(ckp) + if err != nil { + klog.ErrorS(err, "Failed to save checkpoint.", "checkpoint", ckp, "take", time.Since(now)) + } + } + rf.Direct = execCtx.config.Direct + execCtx.chooser() <- rf + if execCtx.lastSent < uint64(createTs) { + klog.Warningln("unordered files: checkpoint is unavailable.", "checkpoint=", execCtx.checkpoint(), + "createTs=", uint64(createTs), "lastSent=", execCtx.lastSent) + } + execCtx.lastSent = uint64(createTs) +} diff --git a/cmd/ebs-warmup/main.go b/cmd/ebs-warmup/main.go new file mode 100644 index 0000000000..d96bfe54ec --- /dev/null +++ b/cmd/ebs-warmup/main.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "math" + + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/filereader" + "github.com/spf13/pflag" +) + +var ( + files = pflag.String("files", "*", "What files should be warmed up? This can be a bash glob.") + ty = pflag.String("type", "footer", "Where to warm up? `footer` or `whole`.") + rateLimit = pflag.Float64P("ratelimit", "r", math.Inf(1), "What is the max speed of reading? (in MiB/s)") + nWorkers = pflag.IntP("workers", "P", 32, "How many workers should we start?") + direct = pflag.Bool("direct", false, "Should we use direct I/O?") + checkpointFileCount = pflag.Uint64("checkpoint.every", 100, "After processing how many files, should we save the checkpoint?") + checkpointFile = pflag.String("checkpoint.at", "warmup-checkpoint.txt", "Where should we save & read the checkpoint?") +) + +func main() { + pflag.Parse() + + config := filereader.Config{ + Files: *files, + Type: *ty, + RateLimit: *rateLimit, + NWorkers: *nWorkers, + Direct: *direct, + CheckpointEvery: *checkpointFileCount, + CheckpointFile: *checkpointFile, + } + + rd := filereader.New(context.Background(), config) + rd.Run() +} diff --git a/cmd/ebs-warmup/worker/tasks/task.go b/cmd/ebs-warmup/worker/tasks/task.go new file mode 100644 index 0000000000..c716fc3527 --- /dev/null +++ b/cmd/ebs-warmup/worker/tasks/task.go @@ -0,0 +1,63 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "fmt" + "os" +) + +type ReadType interface { + fmt.Stringer +} + +type ReadOffsetAndLength struct { + Offset int64 + Length int64 +} + +func (r ReadOffsetAndLength) String() string { + return fmt.Sprintf("ReadOffsetAndLength(%d, %d)", r.Offset, r.Length) +} + +type ReadLastNBytes int + +func (r ReadLastNBytes) String() string { + return fmt.Sprintf("ReadLastNBytes(%d)", r) +} + +type ReadFull struct{} + +func (r ReadFull) String() string { + return "ReadFull" +} + +type Sync struct { + C chan<- struct{} +} + +func (r Sync) String() string { + return "Sync" +} + +type ReadFile struct { + Type ReadType + File os.FileInfo + FilePath string + Direct bool +} + +func (r ReadFile) String() string { + return fmt.Sprintf("ReadFile(%q, %s)", r.FilePath, r.Type) +} diff --git a/cmd/ebs-warmup/worker/worker.go b/cmd/ebs-warmup/worker/worker.go new file mode 100644 index 0000000000..e16a2e0ce7 --- /dev/null +++ b/cmd/ebs-warmup/worker/worker.go @@ -0,0 +1,179 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "github.com/docker/go-units" + "github.com/ncw/directio" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-operator/cmd/ebs-warmup/worker/tasks" + "golang.org/x/time/rate" + "k8s.io/klog/v2" +) + +// Worker is the handler for reading tasks. +// It shall be !Send. +type Worker struct { + mailbox <-chan tasks.ReadFile + buf []byte + + OnStep func(file os.FileInfo, readBytes int, take time.Duration) + RateLimiter *rate.Limiter +} + +func New(input <-chan tasks.ReadFile) Worker { + return Worker{ + mailbox: input, + // Anyway... Aligning with block doesn't cost many. + buf: directio.AlignedBlock(256 * units.KiB), + } +} + +func (w *Worker) MainLoop(cx context.Context) error { + for { + select { + case <-cx.Done(): + return cx.Err() + case task, ok := <-w.mailbox: + if !ok { + return nil + } + if err := w.handleReadFile(task); err != nil { + klog.InfoS("Failed to read file.", "err", err) + } + } + } +} + +func (w *Worker) handleReadFile(task tasks.ReadFile) error { + if sync, ok := task.Type.(tasks.Sync); ok { + close(sync.C) + return nil + } + fd, err := w.openFileByTask(task) + if err != nil { + return errors.Annotatef(err, "failed to open file for task %s", task) + } + defer fd.Close() + err = w.execReadFile(fd, task) + if err != nil { + return errors.Annotatef(err, "failed to read file for opened file %s", task.FilePath) + } + return nil +} + +func (w *Worker) openFileByTask(task tasks.ReadFile) (*os.File, error) { + var ( + fd *os.File + err error + ) + if task.Direct { + fd, err = directio.OpenFile(task.FilePath, os.O_RDONLY, 0o644) + } else { + fd, err = os.OpenFile(task.FilePath, os.O_RDONLY, 0o644) + } + if err != nil { + return nil, err + } + return fd, nil +} + +func (w *Worker) execReadFile(fd *os.File, task tasks.ReadFile) error { + begin := time.Now() + reader, err := w.makeReaderByTask(fd, task) + if err != nil { + return errors.Annotatef(err, "failed to make reader for task %s", task) + } + n, err := w.fetchAll(reader) + if w.OnStep != nil { + w.OnStep(task.File, n, time.Since(begin)) + } + return errors.Annotatef(err, "failed to read from file %s", fd.Name()) +} + +func (w *Worker) makeReaderByTask(fd *os.File, task tasks.ReadFile) (io.Reader, error) { + switch t := task.Type.(type) { + case tasks.ReadLastNBytes: + s, err := fd.Stat() + if err != nil { + return nil, errors.Annotate(err, "failed to open the file stat") + } + seekPoint := int64(0) + if s.Size() > int64(t) { + seekPoint = s.Size() - int64(t) + } + if task.Direct { + seekPoint = alignBlockBackward(seekPoint) + } + if _, err := fd.Seek(seekPoint, io.SeekStart); err != nil { + return nil, errors.Annotatef(err, "failed to seek to %d", seekPoint) + } + return fd, nil + case tasks.ReadOffsetAndLength: + if task.Direct { + // Move the base cursor back to align the block edge. + // Move the length cursor forward to align + newOffset := alignBlockBackward(t.Offset) + t.Length = alignBlockForward(t.Length + newOffset - t.Offset) + t.Offset = newOffset + } + if _, err := fd.Seek(int64(t.Offset), io.SeekStart); err != nil { + return nil, errors.Annotatef(err, "failed to seek to %d", t.Offset) + } + return io.LimitReader(fd, int64(t.Length)), nil + case tasks.ReadFull: + return fd, nil + default: + return nil, errors.Errorf("unknown read type %T", t) + } +} + +func (w *Worker) fetchAll(r io.Reader) (int, error) { + total := 0 + for { + n, err := r.Read(w.buf) + if err == io.EOF { + return total + n, nil + } + if err != nil { + return 0, err + } + total += n + if w.RateLimiter != nil { + res := w.RateLimiter.ReserveN(time.Now(), n) + if !res.OK() { + return 0, fmt.Errorf("the read block size %d is larger than rate limit %d", n, w.RateLimiter.Burst()) + } + time.Sleep(res.Delay()) + } + } +} + +// alignBlockBackward aligns the pointer with the block size by moving it backward. +func alignBlockBackward(n int64) int64 { + // or n & ~directio.BlockSize + return n - n%directio.BlockSize +} + +// alignBlockForward aligns the pointer with the block size by moving it forward. +func alignBlockForward(n int64) int64 { + // or n & ~directio.BlockSize + directio.BlockSize + return directio.BlockSize * (n/directio.BlockSize + 1) +} diff --git a/go.mod b/go.mod index 324d70eb0f..f7b4699890 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/google/gofuzz v1.1.0 github.com/mholt/archiver v3.1.1+incompatible github.com/minio/minio-go/v6 v6.0.55 + github.com/ncw/directio v1.0.5 github.com/onsi/ginkgo v1.14.1 github.com/onsi/gomega v1.10.2 github.com/openshift/generic-admission-server v1.14.1-0.20210422140326-da96454c926d @@ -141,7 +142,7 @@ require ( github.com/dimchansky/utfbom v1.1.0 // indirect github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect - github.com/docker/go-units v0.4.0 // indirect + github.com/docker/go-units v0.4.0 github.com/dsnet/compress v0.0.1 // indirect github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect; indirectload github.com/evanphx/json-patch v4.11.0+incompatible // indirect diff --git a/go.sum b/go.sum index 9f2e6537d5..4359298642 100644 --- a/go.sum +++ b/go.sum @@ -575,6 +575,8 @@ github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq4 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4= +github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nwaples/rardecode v1.0.0 h1:r7vGuS5akxOnR4JQSkko62RJ1ReCMXxQRPtxsiFMBOs= github.com/nwaples/rardecode v1.0.0/go.mod h1:5DzqNKiOdpKKBH87u8VlvAnPZMXcGRhxWkRpHbbfGS0= diff --git a/images/ebs-warmup/Dockerfile b/images/ebs-warmup/Dockerfile new file mode 100644 index 0000000000..e784999a71 --- /dev/null +++ b/images/ebs-warmup/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:latest +ARG TARGETARCH +RUN apk add lsblk fio +ADD bin/${TARGETARCH}/warmup /warmup + +ADD warmup-steps.sh /warmup_steps +CMD /warmup_steps diff --git a/images/ebs-warmup/Dockerfile.e2e b/images/ebs-warmup/Dockerfile.e2e new file mode 100644 index 0000000000..f1196ef231 --- /dev/null +++ b/images/ebs-warmup/Dockerfile.e2e @@ -0,0 +1,6 @@ +FROM alpine:latest +RUN apk add lsblk fio +ADD bin/warmup /warmup + +ADD warmup-steps.sh /warmup_steps +CMD /warmup_steps diff --git a/images/ebs-warmup/warmup-steps.sh b/images/ebs-warmup/warmup-steps.sh new file mode 100644 index 0000000000..6720ca9a1a --- /dev/null +++ b/images/ebs-warmup/warmup-steps.sh @@ -0,0 +1,77 @@ +#! /usr/bin/env sh + +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +dev_name_by_mount_point() { + lsblk -no NAME,MOUNTPOINT -r | awk -v path="$1" '$2 == path {print $1;}' +} + +die() { + echo "$@" >/dev/stderr + help + exit 1 +} + +help() { + cat < + --fs + --debug enable \`set -x\` +EOF +} + +operation=none +while [ $# -gt 0 ]; do + case $1 in + --help | -h) + help + exit 0 + ;; + --block) operation=fio + ;; + --fs) operation=fs + ;; + --debug) set -x + ;; + -*) + die "unsupported flag $1" + ;; + *) + echo "spawning wram up task: operation = $operation; file path = $1" + case "$operation" in + fio) + device=$(dev_name_by_mount_point "$1") + if [ -z "$device" ]; then + echo "$1 isn't a mount point, skipping." + else + fio --rw=read --bs=256K --iodepth=128 --ioengine=libaio \ + --numjobs=10 --offset=0% --offset_increment=10% --size=10% \ + "--name=initialize-$device" \ + --thread=1 --filename=/dev/"$device" & + fi + ;; + fs) /warmup --type=whole --files="$1" -P256 --direct & + ;; + *) die "internal error: unsupported operation $1; forgot to call --fio or --fs?" + ;; + esac + ;; + esac + shift +done + +wait