From a8c7b4a015cb7d4a0e6e3c0ab05f92f00aaa0fc3 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 29 Sep 2022 17:13:53 +0800 Subject: [PATCH 1/8] basic iterators Signed-off-by: Yu Juncen --- br/pkg/restore/client.go | 119 +++++++++++++++- br/pkg/task/stream.go | 31 +++-- br/pkg/utils/iter/combinator_test.go | 68 +++++++++ br/pkg/utils/iter/combinators.go | 201 +++++++++++++++++++++++++++ br/pkg/utils/iter/iter.go | 109 +++++++++++++++ br/pkg/utils/iter/source.go | 69 +++++++++ 6 files changed, 581 insertions(+), 16 deletions(-) create mode 100644 br/pkg/utils/iter/combinator_test.go create mode 100644 br/pkg/utils/iter/combinators.go create mode 100644 br/pkg/utils/iter/iter.go create mode 100644 br/pkg/utils/iter/source.go diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index b2bc9768eba0f..79ae3feaf93c8 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/config" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" @@ -1765,6 +1766,93 @@ func (rc *Client) ReadStreamMetaByTS(ctx context.Context, shiftedStartTS uint64, return streamBackupMetaFiles.metas, nil } +func (rc *Client) StreamingMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) (iter.TryNextor[*backuppb.Metadata], error) { + it, err := rc.ReadStreamMetadata(ctx, rc.storage) + if err != nil { + return nil, err + } + filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool { + return restoreTS < metadata.MinTs || metadata.MaxTs < shiftedStartTS + }) + return filtered, nil +} + +func (rc *Client) ReadStreamMetadata(ctx context.Context, s storage.ExternalStorage) (iter.TryNextor[*backuppb.Metadata], error) { + opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()} + names := []string{} + err := s.WalkDir(ctx, opt, func(path string, size int64) error { + if !strings.HasSuffix(path, ".meta") { + return nil + } + names = append(names, path) + return nil + }) + if err != nil { + return nil, err + } + namesIter := iter.FromSlice(names) + readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) { + f, err := s.ReadFile(ctx, name) + if err != nil { + return nil, err + } + meta, err := rc.helper.ParseToMetadata(f) + if err != nil { + return nil, err + } + return meta, nil + } + reader := iter.Transform(namesIter, readMeta, + iter.WithChunkSize[string, *backuppb.Metadata](512), + iter.WithConcurrency[string, *backuppb.Metadata](128)) + return reader, nil +} + +func (rc *Client) FilterDataFiles(ms iter.TryNextor[*backuppb.Metadata]) iter.TryNextor[*backuppb.DataFileInfo] { + return iter.FlatMap(ms, func(m *backuppb.Metadata) iter.TryNextor[*backuppb.DataFileInfo] { + return iter.FlatMap(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) iter.TryNextor[*backuppb.DataFileInfo] { + return iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { + // Modify the data internally, a little hacky. + if m.MetaVersion > backuppb.MetaVersion_V1 { + d.Path = g.Path + } + return d.IsMeta || rc.ShouldFilterOut(d) + }) + }) + }) +} + +// ShouldFilterOut checks whether a file should be filtered out via the current client. +func (rc *Client) ShouldFilterOut(d *backuppb.DataFileInfo) bool { + return d.MinTs > rc.restoreTS || + (d.Cf == stream.WriteCF && d.MaxTs < rc.startTS) || + (d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS) +} + +type DDLMetaGroup struct { + Path string + FileMetas []*backuppb.DataFileInfo +} + +func (rc *Client) FilterMetaFiles(ms iter.TryNextor[*backuppb.Metadata]) iter.TryNextor[DDLMetaGroup] { + return iter.FlatMap(ms, func(m *backuppb.Metadata) iter.TryNextor[DDLMetaGroup] { + return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { + metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { + // Modify the data internally, a little hacky. + if m.MetaVersion > backuppb.MetaVersion_V1 { + d.Path = g.Path + } + return !d.IsMeta || rc.ShouldFilterOut(d) + }) + return DDLMetaGroup{ + Path: g.Path, + // NOTE: the metas iterator is pure. No context or cancel needs. + FileMetas: iter.CollectAll(context.Background(), metas).Item, + } + }) + }) +} + // ReadStreamDataFiles is used for streaming task. collect all meta file by TS. func (rc *Client) ReadStreamDataFiles( ctx context.Context, @@ -1876,22 +1964,21 @@ func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *m func (rc *Client) RestoreKVFiles( ctx context.Context, rules map[int64]*RewriteRules, - files []*backuppb.DataFileInfo, + files iter.TryNextor[*backuppb.DataFileInfo], updateStats func(kvCount uint64, size uint64), onProgress func(), ) error { var err error + fileCount := 0 start := time.Now() defer func() { elapsed := time.Since(start) if err == nil { log.Info("Restore KV files", zap.Duration("take", elapsed)) - summary.CollectSuccessUnit("files", len(files), elapsed) + summary.CollectSuccessUnit("files", fileCount, elapsed) } }() - log.Debug("start to restore files", zap.Int("files", len(files))) - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1931,7 +2018,11 @@ func (rc *Client) RestoreKVFiles( }) } } - for _, file := range files { + for r := files.TryNext(ctx); !r.Finished; r = files.TryNext(ctx) { + if r.Err != nil { + return err + } + file := r.Item if file.Type == backuppb.FileType_Delete { // collect delete type file and apply it later. deleteFiles = append(deleteFiles, file) @@ -2058,6 +2149,24 @@ func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo { return files } +func (rc *Client) PrepareDDLFileCache( + ctx context.Context, + files iter.TryNextor[DDLMetaGroup], +) ([]*backuppb.DataFileInfo, error) { + fs := iter.CollectAll(ctx, files) + if fs.Err != nil { + return nil, fs.Err + } + + dataFileInfos := make([]*backuppb.DataFileInfo, 0) + for _, g := range fs.Item { + rc.helper.InitCacheEntry(g.Path, len(g.FileMetas)) + dataFileInfos = append(dataFileInfos, g.FileMetas...) + } + + return dataFileInfos, nil +} + // RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup. func (rc *Client) RestoreMetaKVFiles( ctx context.Context, diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9a1a06eff9693..1a232935fdd40 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" @@ -1123,26 +1124,28 @@ func restoreStream( } // read meta by given ts. - metas, err := client.ReadStreamMetaByTS(ctx, shiftStartTS, cfg.RestoreTS) + metas, err := client.StreamingMetaByTS(ctx, shiftStartTS, cfg.RestoreTS) if err != nil { return errors.Trace(err) } - if len(metas) == 0 { - log.Info("nothing to restore.") - return nil - } client.SetRestoreRangeTS(cfg.StartTS, cfg.RestoreTS, shiftStartTS) + dataFileCount := 0 + metas = iter.Tap(metas, func(m *backuppb.Metadata) { + for _, fg := range m.FileGroups { + for _, f := range fg.DataFilesInfo { + if !f.IsMeta && !client.ShouldFilterOut(f) { + dataFileCount += 1 + } + } + } + }) // read data file by given ts. - dmlFiles, ddlFiles, err := client.ReadStreamDataFiles(ctx, metas) + metaFiles := client.FilterMetaFiles(metas) if err != nil { return errors.Trace(err) } - if len(dmlFiles) == 0 && len(ddlFiles) == 0 { - log.Info("nothing to restore.") - return nil - } // get full backup meta to generate rewrite rules. fullBackupTables, err := initFullBackupTables(ctx, cfg) @@ -1173,6 +1176,10 @@ func restoreStream( totalKVCount += kvCount totalSize += size } + ddlFiles, err := client.PrepareDDLFileCache(ctx, metaFiles) + if err != nil { + return err + } pm := g.StartProgress(ctx, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress) if err = withProgress(pm, func(p glue.Progress) error { client.RunGCRowsLoader(ctx) @@ -1188,7 +1195,9 @@ func restoreStream( } updateRewriteRules(rewriteRules, schemasReplace) - pd := g.StartProgress(ctx, "Restore KV Files", int64(len(dmlFiles)), !cfg.LogProgress) + metas, err = client.StreamingMetaByTS(ctx, shiftStartTS, cfg.RestoreTS) + pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) + dmlFiles := client.FilterDataFiles(metas) err = withProgress(pd, func(p glue.Progress) error { return client.RestoreKVFiles(ctx, rewriteRules, dmlFiles, updateStats, p.Inc) }) diff --git a/br/pkg/utils/iter/combinator_test.go b/br/pkg/utils/iter/combinator_test.go new file mode 100644 index 0000000000000..cbb42d8fb4183 --- /dev/null +++ b/br/pkg/utils/iter/combinator_test.go @@ -0,0 +1,68 @@ +package iter_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/utils/iter" + "github.com/stretchr/testify/require" +) + +func TestParTrans(t *testing.T) { + items := iter.OfRange(0, 200) + mapped := iter.Transform(items, func(c context.Context, i int) (int, error) { + select { + case <-c.Done(): + return 0, c.Err() + case <-time.After(100 * time.Millisecond): + } + return i + 100, nil + }, iter.WithChunkSize[int, int](128), iter.WithConcurrency[int, int](64)) + cx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + r := iter.CollectAll(cx, mapped) + require.NoError(t, r.Err) + require.Len(t, r.Item, 200) + require.Equal(t, r.Item, iter.CollectAll(cx, iter.OfRange(100, 300)).Item) +} + +func TestFilter(t *testing.T) { + items := iter.OfRange(0, 10) + items = iter.FlatMap(items, func(n int) iter.TryNextor[int] { + return iter.Map(iter.OfRange(n, 10), func(i int) int { return n * i }) + }) + items = iter.FilterOut(items, func(n int) bool { return n == 0 || (n+1)%13 != 0 }) + coll := iter.CollectAll(context.Background(), items) + require.Equal(t, []int{12, 12, 25, 64}, coll.Item, "%s", coll) +} + +func TestFailure(t *testing.T) { + items := iter.ConcatAll(iter.OfRange(0, 5), iter.Fail[int](errors.New("meow?")), iter.OfRange(5, 10)) + items = iter.FlatMap(items, func(n int) iter.TryNextor[int] { + return iter.Map(iter.OfRange(n, 10), func(i int) int { return n * i }) + }) + items = iter.FilterOut(items, func(n int) bool { return n == 0 || (n+1)%13 != 0 }) + coll := iter.CollectAll(context.Background(), items) + require.Error(t, coll.Err, "%s", coll) + require.Nil(t, coll.Item) +} + +func TestCollect(t *testing.T) { + items := iter.OfRange(0, 100) + ctx := context.Background() + coll := iter.CollectMany(ctx, items, 10) + require.Len(t, coll.Item, 10, "%s", coll) + require.Equal(t, coll.Item, iter.CollectAll(ctx, iter.OfRange(0, 10)).Item) +} + +func TestSome(t *testing.T) { + req := require.New(t) + it := iter.OfRange(0, 2) + c := context.Background() + req.Equal(it.TryNext(c), iter.Emit(0)) + req.Equal(it.TryNext(c), iter.Emit(1)) + req.Equal(it.TryNext(c), iter.Done[int]()) + req.Equal(it.TryNext(c), iter.Done[int]()) +} diff --git a/br/pkg/utils/iter/combinators.go b/br/pkg/utils/iter/combinators.go new file mode 100644 index 0000000000000..83a553fbf4a9b --- /dev/null +++ b/br/pkg/utils/iter/combinators.go @@ -0,0 +1,201 @@ +package iter + +import ( + "context" + + "github.com/pingcap/tidb/br/pkg/utils" + "golang.org/x/sync/errgroup" +) + +type chunkMapping[T, R any] struct { + inner TryNextor[T] + mapper func(context.Context, T) (R, error) + chunkSize uint + quota *utils.WorkerPool + + buffer fromSlice[R] +} + +func (m *chunkMapping[T, R]) fillChunk(ctx context.Context) IterResult[fromSlice[R]] { + eg, cx := errgroup.WithContext(ctx) + s := CollectMany(ctx, m.inner, m.chunkSize) + if s.FinishedOrError() { + return DoneBy[fromSlice[R]](s) + } + r := make([]R, len(s.Item)) + for i := 0; i < len(s.Item); i++ { + i := i + m.quota.ApplyOnErrorGroup(eg, func() error { + var err error + r[i], err = m.mapper(cx, s.Item[i]) + return err + }) + } + if err := eg.Wait(); err != nil { + return Throw[fromSlice[R]](err) + } + if len(r) > 0 { + return Emit(fromSlice[R](r)) + } + return Done[fromSlice[R]]() +} + +func (m *chunkMapping[T, R]) TryNext(ctx context.Context) IterResult[R] { + r := m.buffer.TryNext(ctx) + if !r.FinishedOrError() { + return Emit(r.Item) + } + + r2 := m.fillChunk(ctx) + if !r2.FinishedOrError() { + m.buffer = r2.Item + return m.TryNext(ctx) + } + + return DoneBy[R](r2) +} + +type TransformConfig[T, R any] func(*chunkMapping[T, R]) + +func WithConcurrency[T, R any](n uint) TransformConfig[T, R] { + return func(c *chunkMapping[T, R]) { + c.quota = utils.NewWorkerPool(n, "transforming") + } +} + +func WithChunkSize[T, R any](n uint) TransformConfig[T, R] { + return func(c *chunkMapping[T, R]) { + c.chunkSize = n + } +} + +// Transform returns an iterator that performs an impure procedure for each element, +// then emitting the result of that procedure. +// The execution of that procedure can be paralleled with the config `WithConcurrency`. +// You may also need to config the `WithChunkSize`, because the concurrent execution is only available intra-batch. +func Transform[T, R any](it TryNextor[T], with func(context.Context, T) (R, error), cs ...TransformConfig[T, R]) TryNextor[R] { + r := &chunkMapping[T, R]{ + inner: it, + mapper: with, + chunkSize: 1, + } + for _, c := range cs { + c(r) + } + if r.quota == nil { + r.quota = utils.NewWorkerPool(r.chunkSize, "max-concurrency") + } + if r.quota.Limit() > int(r.chunkSize) { + r.chunkSize = uint(r.quota.Limit()) + } + return r +} + +type filter[T any] struct { + inner TryNextor[T] + filterOutIf func(T) bool +} + +func (f filter[T]) TryNext(ctx context.Context) IterResult[T] { + r := f.inner.TryNext(ctx) + if r.Err != nil || r.Finished { + return r + } + + if f.filterOutIf(r.Item) { + return f.TryNext(ctx) + } + + return r +} + +func FilterOut[T any](it TryNextor[T], f func(T) bool) TryNextor[T] { + return filter[T]{ + inner: it, + filterOutIf: f, + } +} + +type take[T any] struct { + n uint + inner TryNextor[T] +} + +func (t *take[T]) TryNext(ctx context.Context) IterResult[T] { + if t.n == 0 { + return Done[T]() + } + + t.n-- + return t.inner.TryNext(ctx) +} + +func TakeFirst[T any](inner TryNextor[T], n uint) TryNextor[T] { + return &take[T]{ + n: n, + inner: inner, + } +} + +type join[T any] struct { + inner TryNextor[TryNextor[T]] + + current TryNextor[T] +} + +type pureMap[T, R any] struct { + inner TryNextor[T] + + mapper func(T) R +} + +func (p pureMap[T, R]) TryNext(ctx context.Context) IterResult[R] { + r := p.inner.TryNext(ctx) + + if r.FinishedOrError() { + return DoneBy[R](r) + } + return Emit(p.mapper(r.Item)) +} + +func (j *join[T]) TryNext(ctx context.Context) IterResult[T] { + r := j.current.TryNext(ctx) + if r.Err != nil { + j.inner = empty[TryNextor[T]]{} + return r + } + if !r.Finished { + return r + } + + nr := j.inner.TryNext(ctx) + if nr.FinishedOrError() { + return DoneBy[T](nr) + } + j.current = nr.Item + return j.TryNext(ctx) +} + +func FlatMap[T, R any](it TryNextor[T], mapper func(T) TryNextor[R]) TryNextor[R] { + return &join[R]{ + inner: pureMap[T, TryNextor[R]]{ + inner: it, + mapper: mapper, + }, + current: empty[R]{}, + } +} + +func Map[T, R any](it TryNextor[T], mapper func(T) R) TryNextor[R] { + return pureMap[T, R]{ + inner: it, + mapper: mapper, + } +} + +func ConcatAll[T any](items ...TryNextor[T]) TryNextor[T] { + return &join[T]{ + inner: FromSlice(items), + current: empty[T]{}, + } +} diff --git a/br/pkg/utils/iter/iter.go b/br/pkg/utils/iter/iter.go new file mode 100644 index 0000000000000..60b667b57f2b1 --- /dev/null +++ b/br/pkg/utils/iter/iter.go @@ -0,0 +1,109 @@ +package iter + +import ( + "context" + "fmt" +) + +// IterResult is the result of try to advancing an impure iterator. +// Generally it is a "sum type", which only one field would be filled. +// You can create it via `Done`, `Emit` and `Throw`. +type IterResult[T any] struct { + Item T + Err error + Finished bool +} + +func (r IterResult[T]) String() string { + if r.Err != nil { + return fmt.Sprintf("IterResult.Throw(%s)", r.Err) + } + if r.Finished { + return "IterResult.Done()" + } + return fmt.Sprintf("IterResult.Emit(%v)", r.Item) +} + +// TryNextor is the general interface for "impure" iterators: +// which may trigger some error or block the caller when advancing. +type TryNextor[T any] interface { + TryNext(ctx context.Context) IterResult[T] +} + +func (r IterResult[T]) FinishedOrError() bool { + return r.Err != nil || r.Finished +} + +// DoneBy creates a finished or error IterResult by its argument. +func DoneBy[T, O any](r IterResult[O]) IterResult[T] { + return IterResult[T]{ + Err: r.Err, + Finished: r.Finished, + } +} + +// Done creates an IterResult which indices the iteration has finished. +func Done[T any]() IterResult[T] { + return IterResult[T]{ + Finished: true, + } +} + +// Emit creates an IterResult which contains normal data. +func Emit[T any](t T) IterResult[T] { + return IterResult[T]{ + Item: t, + } +} + +// Throw creates an IterResult which contains the err. +func Throw[T any](err error) IterResult[T] { + return IterResult[T]{ + Err: err, + } +} + +// CollectMany collects the first n items of the iterator. +// When the iterator contains less data than N, it emits as many items as it can and won't set `Finished`. +func CollectMany[T any](ctx context.Context, it TryNextor[T], n uint) IterResult[[]T] { + return CollectAll(ctx, TakeFirst(it, n)) +} + +// CollectAll fully consumes the iterator, collecting all items the iterator emitted. +// When the iterator has been finished, it emits empty slice and won't set `Finished`. +func CollectAll[T any](ctx context.Context, it TryNextor[T]) IterResult[[]T] { + r := IterResult[[]T]{} + + for ir := it.TryNext(ctx); !ir.Finished; ir = it.TryNext(ctx) { + if ir.Err != nil { + return DoneBy[[]T](ir) + } + r.Item = append(r.Item, ir.Item) + } + return r +} + +type tap[T any] struct { + inner TryNextor[T] + + tapper func(T) +} + +func (t tap[T]) TryNext(ctx context.Context) IterResult[T] { + n := t.inner.TryNext(ctx) + if n.FinishedOrError() { + return n + } + + t.tapper(n.Item) + return Emit(n.Item) +} + +// Tap adds a hook into the iterator, it would execute the function +// anytime the iterator emits an item. +func Tap[T any](i TryNextor[T], with func(T)) TryNextor[T] { + return tap[T]{ + inner: i, + tapper: with, + } +} diff --git a/br/pkg/utils/iter/source.go b/br/pkg/utils/iter/source.go new file mode 100644 index 0000000000000..e9db21ea491a3 --- /dev/null +++ b/br/pkg/utils/iter/source.go @@ -0,0 +1,69 @@ +package iter + +import ( + "context" + + "golang.org/x/exp/constraints" +) + +type fromSlice[T any] []T + +func (s *fromSlice[T]) TryNext(ctx context.Context) IterResult[T] { + if s == nil || len(*s) == 0 { + return Done[T]() + } + + var item T + item, *s = (*s)[0], (*s)[1:] + return Emit(item) +} + +// FromSlice creates an iterator from a slice, the iterator would +func FromSlice[T any](s []T) TryNextor[T] { + sa := fromSlice[T](s) + return &sa +} + +type ofRange[T constraints.Integer] struct { + end T + endExclusive bool + + current T +} + +func (r *ofRange[T]) TryNext(ctx context.Context) IterResult[T] { + if r.current > r.end || (r.current == r.end && r.endExclusive) { + return Done[T]() + } + + result := Emit(r.current) + r.current++ + return result +} + +func OfRange[T constraints.Integer](begin, end T) TryNextor[T] { + return &ofRange[T]{ + end: end, + endExclusive: true, + + current: begin, + } +} + +type empty[T any] struct{} + +func (empty[T]) TryNext(ctx context.Context) IterResult[T] { + return Done[T]() +} + +type failure[T any] struct { + error +} + +func (f failure[T]) TryNext(ctx context.Context) IterResult[T] { + return Throw[T](f) +} + +func Fail[T any](err error) TryNextor[T] { + return failure[T]{error: err} +} From 6f47ee1c900ccc6d9aad8f6a70740312bd4e8828 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Mon, 10 Oct 2022 17:14:35 +0800 Subject: [PATCH 2/8] refactored the log client Signed-off-by: Yu Juncen --- br/pkg/restore/client.go | 215 ++-------------------- br/pkg/restore/log_client.go | 266 +++++++++++++++++++++++++++ br/pkg/restore/log_client_test.go | 237 +++++++++++++++++++----- br/pkg/task/stream.go | 11 +- br/pkg/utils/iter/combinator_test.go | 12 +- br/pkg/utils/iter/combinators.go | 32 ++-- 6 files changed, 499 insertions(+), 274 deletions(-) create mode 100644 br/pkg/restore/log_client.go diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 79ae3feaf93c8..aadc64b7ed162 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -137,24 +137,13 @@ type Client struct { supportPolicy bool - // startTS and restoreTS are used for kv file restore. - // TiKV will filter the key space that don't belong to [startTS, restoreTS]. - startTS uint64 - restoreTS uint64 - - // If the commitTS of txn-entry belong to [startTS, restoreTS], - // the startTS of txn-entry may be smaller than startTS. - // We need maintain and restore more entries in default cf - // (the startTS in these entries belong to [shiftStartTS, startTS]). - shiftStartTS uint64 - // currentTS is used for rewrite meta kv when restore stream. // Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`. currentTS uint64 - storage storage.ExternalStorage + *logFileManager - helper *stream.MetadataHelper + storage storage.ExternalStorage // if fullClusterRestore = true: // - if there's system tables in the backup(backup data since br 5.1.0), the cluster should be a fresh cluster @@ -1710,196 +1699,18 @@ func (rc *Client) PreCheckTableClusterIndex( return nil } -func (rc *Client) GetShiftTS(ctx context.Context, startTS uint64, restoreTS uint64) (uint64, error) { - shiftTS := struct { - sync.Mutex - value uint64 - exists bool - }{} - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { - m, err := rc.helper.ParseToMetadata(raw) - if err != nil { - return err - } - shiftTS.Lock() - defer shiftTS.Unlock() - - ts, ok := UpdateShiftTS(m, startTS, restoreTS) - if ok && (!shiftTS.exists || shiftTS.value > ts) { - shiftTS.value = ts - shiftTS.exists = true - } - return nil - }) - if err != nil { - return 0, err - } - if !shiftTS.exists { - return startTS, nil +func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error { + init := LogFileManagerInit{ + StartTS: startTS, + RestoreTS: restoreTS, + Storage: rc.storage, } - return shiftTS.value, nil -} - -// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS. -func (rc *Client) ReadStreamMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) ([]*backuppb.Metadata, error) { - streamBackupMetaFiles := struct { - sync.Mutex - metas []*backuppb.Metadata - }{} - streamBackupMetaFiles.metas = make([]*backuppb.Metadata, 0, 128) - - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { - metadata, err := rc.helper.ParseToMetadata(raw) - if err != nil { - return err - } - streamBackupMetaFiles.Lock() - if restoreTS >= metadata.MinTs && metadata.MaxTs >= shiftedStartTS { - streamBackupMetaFiles.metas = append(streamBackupMetaFiles.metas, metadata) - } - streamBackupMetaFiles.Unlock() - return nil - }) - if err != nil { - return nil, errors.Trace(err) - } - return streamBackupMetaFiles.metas, nil -} - -func (rc *Client) StreamingMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) (iter.TryNextor[*backuppb.Metadata], error) { - it, err := rc.ReadStreamMetadata(ctx, rc.storage) - if err != nil { - return nil, err - } - filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool { - return restoreTS < metadata.MinTs || metadata.MaxTs < shiftedStartTS - }) - return filtered, nil -} - -func (rc *Client) ReadStreamMetadata(ctx context.Context, s storage.ExternalStorage) (iter.TryNextor[*backuppb.Metadata], error) { - opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()} - names := []string{} - err := s.WalkDir(ctx, opt, func(path string, size int64) error { - if !strings.HasSuffix(path, ".meta") { - return nil - } - names = append(names, path) - return nil - }) + var err error + rc.logFileManager, err = CreateLogFileManager(ctx, init) if err != nil { - return nil, err - } - namesIter := iter.FromSlice(names) - readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) { - f, err := s.ReadFile(ctx, name) - if err != nil { - return nil, err - } - meta, err := rc.helper.ParseToMetadata(f) - if err != nil { - return nil, err - } - return meta, nil - } - reader := iter.Transform(namesIter, readMeta, - iter.WithChunkSize[string, *backuppb.Metadata](512), - iter.WithConcurrency[string, *backuppb.Metadata](128)) - return reader, nil -} - -func (rc *Client) FilterDataFiles(ms iter.TryNextor[*backuppb.Metadata]) iter.TryNextor[*backuppb.DataFileInfo] { - return iter.FlatMap(ms, func(m *backuppb.Metadata) iter.TryNextor[*backuppb.DataFileInfo] { - return iter.FlatMap(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) iter.TryNextor[*backuppb.DataFileInfo] { - return iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { - // Modify the data internally, a little hacky. - if m.MetaVersion > backuppb.MetaVersion_V1 { - d.Path = g.Path - } - return d.IsMeta || rc.ShouldFilterOut(d) - }) - }) - }) -} - -// ShouldFilterOut checks whether a file should be filtered out via the current client. -func (rc *Client) ShouldFilterOut(d *backuppb.DataFileInfo) bool { - return d.MinTs > rc.restoreTS || - (d.Cf == stream.WriteCF && d.MaxTs < rc.startTS) || - (d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS) -} - -type DDLMetaGroup struct { - Path string - FileMetas []*backuppb.DataFileInfo -} - -func (rc *Client) FilterMetaFiles(ms iter.TryNextor[*backuppb.Metadata]) iter.TryNextor[DDLMetaGroup] { - return iter.FlatMap(ms, func(m *backuppb.Metadata) iter.TryNextor[DDLMetaGroup] { - return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { - metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { - // Modify the data internally, a little hacky. - if m.MetaVersion > backuppb.MetaVersion_V1 { - d.Path = g.Path - } - return !d.IsMeta || rc.ShouldFilterOut(d) - }) - return DDLMetaGroup{ - Path: g.Path, - // NOTE: the metas iterator is pure. No context or cancel needs. - FileMetas: iter.CollectAll(context.Background(), metas).Item, - } - }) - }) -} - -// ReadStreamDataFiles is used for streaming task. collect all meta file by TS. -func (rc *Client) ReadStreamDataFiles( - ctx context.Context, - metas []*backuppb.Metadata, -) (dataFiles, metaFiles []*backuppb.DataFileInfo, err error) { - dFiles := make([]*backuppb.DataFileInfo, 0) - mFiles := make([]*backuppb.DataFileInfo, 0) - - for _, m := range metas { - _, exists := backuppb.MetaVersion_name[int32(m.MetaVersion)] - if !exists { - log.Warn("metaversion too new", zap.Reflect("version id", m.MetaVersion)) - } - for _, ds := range m.FileGroups { - metaRef := 0 - for _, d := range ds.DataFilesInfo { - if d.MinTs > rc.restoreTS { - continue - } else if d.Cf == stream.WriteCF && d.MaxTs < rc.startTS { - continue - } else if d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS { - continue - } - - // If ds.Path is empty, it is MetadataV1. - // Try to be compatible with newer metadata version - if m.MetaVersion > backuppb.MetaVersion_V1 { - d.Path = ds.Path - } - - if d.IsMeta { - mFiles = append(mFiles, d) - metaRef += 1 - } else { - dFiles = append(dFiles, d) - } - log.Debug("backup stream collect data partition", zap.Uint64("offset", d.RangeOffset), zap.Uint64("length", d.Length)) - } - // metadatav1 doesn't use cache - // Try to be compatible with newer metadata version - if m.MetaVersion > backuppb.MetaVersion_V1 { - rc.helper.InitCacheEntry(ds.Path, metaRef) - } - } + return err } - - return dFiles, mFiles, nil + return nil } // FixIndex tries to fix a single index. @@ -1964,7 +1775,7 @@ func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *m func (rc *Client) RestoreKVFiles( ctx context.Context, rules map[int64]*RewriteRules, - files iter.TryNextor[*backuppb.DataFileInfo], + files LogIter, updateStats func(kvCount uint64, size uint64), onProgress func(), ) error { @@ -2151,7 +1962,7 @@ func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo { func (rc *Client) PrepareDDLFileCache( ctx context.Context, - files iter.TryNextor[DDLMetaGroup], + files MetaGroupIter, ) ([]*backuppb.DataFileInfo, error) { fs := iter.CollectAll(ctx, files) if fs.Err != nil { diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go new file mode 100644 index 0000000000000..c322364338ae7 --- /dev/null +++ b/br/pkg/restore/log_client.go @@ -0,0 +1,266 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + "strings" + "sync" + + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/br/pkg/utils/iter" + "go.uber.org/zap" +) + +const ( + readMetaConcurrency = 16 + readMetaBatchSize = 16 +) + +// MetaIter is the type of iterator of metadata files' content. +type MetaIter = iter.TryNextor[*backuppb.Metadata] + +// LogIter is the type of iterator of each log files' meta information. +type LogIter = iter.TryNextor[*backuppb.DataFileInfo] + +// MetaGroupIter is the iterator of flushes of metadata. +type MetaGroupIter = iter.TryNextor[DDLMetaGroup] + +// logFileManager is the manager for log files, which supports read / filter from the +// log backup archive. +type logFileManager struct { + // startTS and restoreTS are used for kv file restore. + // TiKV will filter the key space that don't belong to [startTS, restoreTS]. + startTS uint64 + restoreTS uint64 + + // If the commitTS of txn-entry belong to [startTS, restoreTS], + // the startTS of txn-entry may be smaller than startTS. + // We need maintain and restore more entries in default cf + // (the startTS in these entries belong to [shiftStartTS, startTS]). + shiftStartTS uint64 + + storage storage.ExternalStorage + helper *stream.MetadataHelper +} + +type LogFileManagerInit struct { + StartTS uint64 + RestoreTS uint64 + Storage storage.ExternalStorage +} + +func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFileManager, error) { + fm := &logFileManager{ + startTS: init.StartTS, + restoreTS: init.RestoreTS, + storage: init.Storage, + helper: stream.NewMetadataHelper(), + } + err := fm.loadShiftTS(ctx) + if err != nil { + return nil, err + } + return fm, nil +} + +func (rc *logFileManager) ShiftTS() uint64 { + return rc.shiftStartTS +} + +func (rc *logFileManager) loadShiftTS(ctx context.Context) error { + shiftTS := struct { + sync.Mutex + value uint64 + exists bool + }{} + err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { + m, err := rc.helper.ParseToMetadata(raw) + if err != nil { + return err + } + shiftTS.Lock() + defer shiftTS.Unlock() + + ts, ok := UpdateShiftTS(m, rc.startTS, rc.restoreTS) + if ok && (!shiftTS.exists || shiftTS.value > ts) { + shiftTS.value = ts + shiftTS.exists = true + } + return nil + }) + if err != nil { + return err + } + if !shiftTS.exists { + rc.shiftStartTS = rc.startTS + return nil + } + rc.shiftStartTS = shiftTS.value + return nil +} + +func (rc *logFileManager) StreamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) { + it, err := rc.ReadStreamMetadata(ctx, rc.storage) + if err != nil { + return nil, err + } + filtered := iter.FilterOut(it, func(metadata *backuppb.Metadata) bool { + return restoreTS < metadata.MinTs || metadata.MaxTs < rc.shiftStartTS + }) + return filtered, nil +} + +func (rc *logFileManager) ReadStreamMetadata(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) { + opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()} + names := []string{} + err := s.WalkDir(ctx, opt, func(path string, size int64) error { + if !strings.HasSuffix(path, ".meta") { + return nil + } + names = append(names, path) + return nil + }) + if err != nil { + return nil, err + } + namesIter := iter.FromSlice(names) + readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) { + f, err := s.ReadFile(ctx, name) + if err != nil { + return nil, err + } + meta, err := rc.helper.ParseToMetadata(f) + if err != nil { + return nil, err + } + return meta, nil + } + reader := iter.Transform(namesIter, readMeta, + iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency)) + return reader, nil +} + +func (rc *logFileManager) FilterDataFiles(ms MetaIter) LogIter { + return iter.FlatMap(ms, func(m *backuppb.Metadata) LogIter { + return iter.FlatMap(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) LogIter { + return iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { + // Modify the data internally, a little hacky. + if m.MetaVersion > backuppb.MetaVersion_V1 { + d.Path = g.Path + } + return d.IsMeta || rc.ShouldFilterOut(d) + }) + }) + }) +} + +// ShouldFilterOut checks whether a file should be filtered out via the current client. +func (rc *logFileManager) ShouldFilterOut(d *backuppb.DataFileInfo) bool { + return d.MinTs > rc.restoreTS || + (d.Cf == stream.WriteCF && d.MaxTs < rc.startTS) || + (d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS) +} + +type DDLMetaGroup struct { + Path string + FileMetas []*backuppb.DataFileInfo +} + +func (rc *logFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter { + return iter.FlatMap(ms, func(m *backuppb.Metadata) MetaGroupIter { + return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { + metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { + // Modify the data internally, a little hacky. + if m.MetaVersion > backuppb.MetaVersion_V1 { + d.Path = g.Path + } + return !d.IsMeta || rc.ShouldFilterOut(d) + }) + return DDLMetaGroup{ + Path: g.Path, + // NOTE: the metas iterator is pure. No context or cancel needs. + FileMetas: iter.CollectAll(context.Background(), metas).Item, + } + }) + }) +} + +// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS. +func (rc *logFileManager) ReadStreamMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) ([]*backuppb.Metadata, error) { + streamBackupMetaFiles := struct { + sync.Mutex + metas []*backuppb.Metadata + }{} + streamBackupMetaFiles.metas = make([]*backuppb.Metadata, 0, 128) + + err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { + metadata, err := rc.helper.ParseToMetadata(raw) + if err != nil { + return err + } + streamBackupMetaFiles.Lock() + if restoreTS >= metadata.MinTs && metadata.MaxTs >= shiftedStartTS { + streamBackupMetaFiles.metas = append(streamBackupMetaFiles.metas, metadata) + } + streamBackupMetaFiles.Unlock() + return nil + }) + if err != nil { + return nil, errors.Trace(err) + } + return streamBackupMetaFiles.metas, nil +} + +// ReadStreamDataFiles is used for streaming task. collect all meta file by TS. +func (rc *logFileManager) ReadStreamDataFiles( + ctx context.Context, + metas []*backuppb.Metadata, +) (dataFiles, metaFiles []*backuppb.DataFileInfo, err error) { + dFiles := make([]*backuppb.DataFileInfo, 0) + mFiles := make([]*backuppb.DataFileInfo, 0) + + for _, m := range metas { + _, exists := backuppb.MetaVersion_name[int32(m.MetaVersion)] + if !exists { + log.Warn("metaversion too new", zap.Reflect("version id", m.MetaVersion)) + } + for _, ds := range m.FileGroups { + metaRef := 0 + for _, d := range ds.DataFilesInfo { + if d.MinTs > rc.restoreTS { + continue + } else if d.Cf == stream.WriteCF && d.MaxTs < rc.startTS { + continue + } else if d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS { + continue + } + + // If ds.Path is empty, it is MetadataV1. + // Try to be compatible with newer metadata version + if m.MetaVersion > backuppb.MetaVersion_V1 { + d.Path = ds.Path + } + + if d.IsMeta { + mFiles = append(mFiles, d) + metaRef += 1 + } else { + dFiles = append(dFiles, d) + } + log.Debug("backup stream collect data partition", zap.Uint64("offset", d.RangeOffset), zap.Uint64("length", d.Length)) + } + // metadatav1 doesn't use cache + // Try to be compatible with newer metadata version + if m.MetaVersion > backuppb.MetaVersion_V1 { + rc.helper.InitCacheEntry(ds.Path, metaRef) + } + } + } + + return dFiles, mFiles, nil +} diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index b6240819dad71..6b389e51b7114 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -11,6 +11,8 @@ import ( "math" "os" "path" + "sort" + "strings" "sync/atomic" "testing" @@ -19,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -26,8 +29,20 @@ import ( var id uint64 -// wd is the shortcut for making a fake data file from write CF. -func wd(start, end uint64, minBegin uint64) *backuppb.DataFileInfo { +func wm(start, end, minBegin uint64) *backuppb.DataFileInfo { + i := wr(start, end, minBegin) + i.IsMeta = true + return i +} + +func dm(start, end uint64) *backuppb.DataFileInfo { + i := dr(start, end) + i.IsMeta = true + return i +} + +// wr is the shortcut for making a fake data file from write CF. +func wr(start, end uint64, minBegin uint64) *backuppb.DataFileInfo { id := atomic.AddUint64(&id, 1) return &backuppb.DataFileInfo{ Path: fmt.Sprintf("default-%06d", id), @@ -38,8 +53,8 @@ func wd(start, end uint64, minBegin uint64) *backuppb.DataFileInfo { } } -// dd is the shortcut for making a fake data file from default CF. -func dd(start, end uint64) *backuppb.DataFileInfo { +// dr is the shortcut for making a fake data file from default CF. +func dr(start, end uint64) *backuppb.DataFileInfo { id := atomic.AddUint64(&id, 1) return &backuppb.DataFileInfo{ Path: fmt.Sprintf("write-%06d", id), @@ -76,12 +91,14 @@ func m2(files ...*backuppb.DataFileInfo) *backuppb.Metadata { MinTs: uint64(math.MaxUint64), MetaVersion: backuppb.MetaVersion_V2, } - fileGroups := &backuppb.DataFileGroup{} + fileGroups := &backuppb.DataFileGroup{ + MinTs: uint64(math.MaxUint64), + } for _, file := range files { - if meta.MaxTs < file.MaxTs { + if fileGroups.MaxTs < file.MaxTs { fileGroups.MaxTs = file.MaxTs } - if meta.MinTs > file.MinTs { + if fileGroups.MinTs > file.MinTs { fileGroups.MinTs = file.MinTs } fileGroups.DataFilesInfo = append(fileGroups.DataFilesInfo, file) @@ -151,9 +168,9 @@ func TestReadMetaBetweenTS(t *testing.T) { cases := []Case{ { items: []*backuppb.Metadata{ - m(wd(4, 10, 3), wd(5, 13, 5)), - m(dd(1, 3)), - m(wd(10, 42, 9), dd(6, 9)), + m(wr(4, 10, 3), wr(5, 13, 5)), + m(dr(1, 3)), + m(wr(10, 42, 9), dr(6, 9)), }, startTS: 4, endTS: 5, @@ -162,8 +179,8 @@ func TestReadMetaBetweenTS(t *testing.T) { }, { items: []*backuppb.Metadata{ - m(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m(wd(100, 200, 98), dd(100, 200)), + m(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m(wr(100, 200, 98), dr(100, 200)), }, startTS: 50, endTS: 99, @@ -172,9 +189,9 @@ func TestReadMetaBetweenTS(t *testing.T) { }, { items: []*backuppb.Metadata{ - m(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m(wd(100, 200, 98), dd(100, 200)), - m(wd(200, 300, 200), dd(200, 300)), + m(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m(wr(100, 200, 98), dr(100, 200)), + m(wr(200, 300, 200), dr(200, 300)), }, startTS: 150, endTS: 199, @@ -183,9 +200,9 @@ func TestReadMetaBetweenTS(t *testing.T) { }, { items: []*backuppb.Metadata{ - m(wd(1, 100, 1), wd(5, 13, 5)), - m(wd(101, 200, 101), dd(100, 200)), - m(wd(200, 300, 200), dd(200, 300)), + m(wr(1, 100, 1), wr(5, 13, 5)), + m(wr(101, 200, 101), dr(100, 200)), + m(wr(200, 300, 200), dr(200, 300)), }, startTS: 150, endTS: 199, @@ -206,14 +223,15 @@ func TestReadMetaBetweenTS(t *testing.T) { os.RemoveAll(temp) } }() - cli := Client{ - storage: loc, - helper: stream.NewMetadataHelper(), + init := LogFileManagerInit{ + StartTS: c.startTS, + RestoreTS: c.endTS, + Storage: loc, } - shift, err := cli.GetShiftTS(ctx, c.startTS, c.endTS) - req.Equal(shift, c.expectedShiftTS) + cli, err := CreateLogFileManager(ctx, init) + req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, shift, c.endTS) + metas, err := cli.ReadStreamMetaByTS(ctx, cli.ShiftTS(), c.endTS) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -246,9 +264,9 @@ func TestReadMetaBetweenTSV2(t *testing.T) { cases := []Case{ { items: []*backuppb.Metadata{ - m2(wd(4, 10, 3), wd(5, 13, 5)), - m2(dd(1, 3)), - m2(wd(10, 42, 9), dd(6, 9)), + m2(wr(4, 10, 3), wr(5, 13, 5)), + m2(dr(1, 3)), + m2(wr(10, 42, 9), dr(6, 9)), }, startTS: 4, endTS: 5, @@ -257,8 +275,8 @@ func TestReadMetaBetweenTSV2(t *testing.T) { }, { items: []*backuppb.Metadata{ - m2(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m2(wd(100, 200, 98), dd(100, 200)), + m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m2(wr(100, 200, 98), dr(100, 200)), }, startTS: 50, endTS: 99, @@ -267,9 +285,9 @@ func TestReadMetaBetweenTSV2(t *testing.T) { }, { items: []*backuppb.Metadata{ - m2(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m2(wd(100, 200, 98), dd(100, 200)), - m2(wd(200, 300, 200), dd(200, 300)), + m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m2(wr(100, 200, 98), dr(100, 200)), + m2(wr(200, 300, 200), dr(200, 300)), }, startTS: 150, endTS: 199, @@ -278,9 +296,9 @@ func TestReadMetaBetweenTSV2(t *testing.T) { }, { items: []*backuppb.Metadata{ - m2(wd(1, 100, 1), wd(5, 13, 5)), - m2(wd(101, 200, 101), dd(100, 200)), - m2(wd(200, 300, 200), dd(200, 300)), + m2(wr(1, 100, 1), wr(5, 13, 5)), + m2(wr(101, 200, 101), dr(100, 200)), + m2(wr(200, 300, 200), dr(200, 300)), }, startTS: 150, endTS: 199, @@ -301,14 +319,15 @@ func TestReadMetaBetweenTSV2(t *testing.T) { os.RemoveAll(temp) } }() - cli := Client{ - storage: loc, - helper: stream.NewMetadataHelper(), + init := LogFileManagerInit{ + StartTS: c.startTS, + RestoreTS: c.endTS, + Storage: loc, } - shift, err := cli.GetShiftTS(ctx, c.startTS, c.endTS) - req.Equal(shift, c.expectedShiftTS) + cli, err := CreateLogFileManager(ctx, init) + req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, shift, c.endTS) + metas, err := cli.ReadStreamMetaByTS(ctx, cli.ShiftTS(), c.endTS) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -338,17 +357,17 @@ func TestReadFromMetadata(t *testing.T) { cases := []Case{ { items: []*backuppb.Metadata{ - m(wd(4, 10, 3), wd(5, 13, 5)), - m(dd(1, 3)), - m(wd(10, 42, 9), dd(6, 9)), + m(wr(4, 10, 3), wr(5, 13, 5)), + m(dr(1, 3)), + m(wr(10, 42, 9), dr(6, 9)), }, untilTS: 10, expected: []int{0, 1, 2}, }, { items: []*backuppb.Metadata{ - m(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m(wd(100, 200, 98), dd(100, 200)), + m(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m(wr(100, 200, 98), dr(100, 200)), }, untilTS: 99, expected: []int{0}, @@ -404,17 +423,17 @@ func TestReadFromMetadataV2(t *testing.T) { cases := []Case{ { items: []*backuppb.Metadata{ - m2(wd(4, 10, 3), wd(5, 13, 5)), - m2(dd(1, 3)), - m2(wd(10, 42, 9), dd(6, 9)), + m2(wr(4, 10, 3), wr(5, 13, 5)), + m2(dr(1, 3)), + m2(wr(10, 42, 9), dr(6, 9)), }, untilTS: 10, expected: []int{0, 1, 2}, }, { items: []*backuppb.Metadata{ - m2(wd(1, 100, 1), wd(5, 13, 5), dd(1, 101)), - m2(wd(100, 200, 98), dd(100, 200)), + m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)), + m2(wr(100, 200, 98), dr(100, 200)), }, untilTS: 99, expected: []int{0}, @@ -459,3 +478,119 @@ func TestReadFromMetadataV2(t *testing.T) { }) } } + +func dataFileInfoMatches(t *testing.T, listA []*backuppb.DataFileInfo, listB ...*backuppb.DataFileInfo) { + sortL := func(l []*backuppb.DataFileInfo) { + sort.Slice(l, func(i, j int) bool { + return l[i].MinTs < l[j].MinTs + }) + } + + sortL(listA) + sortL(listB) + + if len(listA) != len(listB) { + t.Fatalf("failed: list length not match: %s vs %s", formatL(listA), formatL(listB)) + } + + for i := range listA { + require.True(t, equals(listA[i], listB[i]), "remaining: %s vs %s", formatL(listA[i:]), formatL(listB[i:])) + } +} + +func equals(a, b *backuppb.DataFileInfo) bool { + return a.IsMeta == b.IsMeta && + a.MinTs == b.MinTs && + a.MaxTs == b.MaxTs && + a.Cf == b.Cf && + a.MinBeginTsInDefaultCf == b.MinBeginTsInDefaultCf +} + +func formatI(i *backuppb.DataFileInfo) string { + ty := "d" + if i.Cf == "write" { + ty = "w" + } + isMeta := "r" + if i.IsMeta { + isMeta = "m" + } + shift := "" + if i.MinBeginTsInDefaultCf > 0 { + shift = fmt.Sprintf(", %d", i.MinBeginTsInDefaultCf) + } + + return fmt.Sprintf("%s%s(%d, %d%s)", ty, isMeta, i.MinTs, i.MaxTs, shift) +} + +func formatL(l []*backuppb.DataFileInfo) string { + r := iter.CollectAll(context.TODO(), iter.Map(iter.FromSlice(l), formatI)) + return "[" + strings.Join(r.Item, ", ") + "]" +} + +func TestFileManager(t *testing.T) { + type Case struct { + Metadata []*backuppb.Metadata + StartTS int + RestoreTS int + + SearchMeta bool + Requires []*backuppb.DataFileInfo + } + + cases := []Case{ + { + Metadata: []*backuppb.Metadata{ + m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)), + m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)), + m2(dr(100, 101), wr(102, 104, 100)), + }, + StartTS: 2, + RestoreTS: 60, + Requires: []*backuppb.DataFileInfo{ + dr(2, 6), wr(4, 5, 2), wr(50, 54, 42), dr(42, 50), + }, + }, + } + + run := func(t *testing.T, c Case) { + req := require.New(t) + items := c.Metadata + start := uint64(c.StartTS) + end := uint64(c.RestoreTS) + loc, temp := (&mockMetaBuilder{ + metas: items, + }).b(true) + defer func() { + t.Log("temp dir", temp) + if !t.Failed() { + os.RemoveAll(temp) + } + }() + ctx := context.Background() + fm, err := CreateLogFileManager(ctx, LogFileManagerInit{ + StartTS: start, + RestoreTS: end, + Storage: loc, + }) + req.NoError(err) + + metas, err := fm.StreamingMetaByTS(ctx, end) + req.NoError(err) + metas = iter.Tap(metas, func(m *backuppb.Metadata) { + fmt.Printf("%d-%d:%s", m.MinTs, m.MaxTs, formatL(m.FileGroups[0].DataFilesInfo)) + }) + var datas LogIter + if !c.SearchMeta { + datas = fm.FilterDataFiles(metas) + } else { + datas = iter.FlatMap(fm.FilterMetaFiles(metas), func(g DDLMetaGroup) iter.TryNextor[*backuppb.DataFileInfo] { return iter.FromSlice(g.FileMetas) }) + } + r := iter.CollectAll(ctx, datas) + dataFileInfoMatches(t, r.Item, c.Requires...) + } + + for i, c := range cases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { run(t, c) }) + } +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 1a232935fdd40..a092b6fd656a4 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1118,18 +1118,17 @@ func restoreStream( // mode or emptied schedulers defer restorePostWork(ctx, client, restoreSchedulers) - shiftStartTS, err := client.GetShiftTS(ctx, cfg.StartTS, cfg.RestoreTS) + err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) if err != nil { - return errors.Annotate(err, "failed to get shift TS") + return err } // read meta by given ts. - metas, err := client.StreamingMetaByTS(ctx, shiftStartTS, cfg.RestoreTS) + metas, err := client.StreamingMetaByTS(ctx, cfg.RestoreTS) if err != nil { return errors.Trace(err) } - client.SetRestoreRangeTS(cfg.StartTS, cfg.RestoreTS, shiftStartTS) dataFileCount := 0 metas = iter.Tap(metas, func(m *backuppb.Metadata) { for _, fg := range m.FileGroups { @@ -1195,7 +1194,7 @@ func restoreStream( } updateRewriteRules(rewriteRules, schemasReplace) - metas, err = client.StreamingMetaByTS(ctx, shiftStartTS, cfg.RestoreTS) + metas, err = client.StreamingMetaByTS(ctx, cfg.RestoreTS) pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) dmlFiles := client.FilterDataFiles(metas) err = withProgress(pd, func(p glue.Progress) error { @@ -1284,8 +1283,6 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m return nil, errors.Trace(err) } - client.InitMetadataHelper() - return client, nil } diff --git a/br/pkg/utils/iter/combinator_test.go b/br/pkg/utils/iter/combinator_test.go index cbb42d8fb4183..16ff904cd56e9 100644 --- a/br/pkg/utils/iter/combinator_test.go +++ b/br/pkg/utils/iter/combinator_test.go @@ -19,7 +19,7 @@ func TestParTrans(t *testing.T) { case <-time.After(100 * time.Millisecond): } return i + 100, nil - }, iter.WithChunkSize[int, int](128), iter.WithConcurrency[int, int](64)) + }, iter.WithChunkSize(128), iter.WithConcurrency(64)) cx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() r := iter.CollectAll(cx, mapped) @@ -57,6 +57,16 @@ func TestCollect(t *testing.T) { require.Equal(t, coll.Item, iter.CollectAll(ctx, iter.OfRange(0, 10)).Item) } +func TestTapping(t *testing.T) { + items := iter.OfRange(0, 101) + ctx := context.Background() + n := 0 + + items = iter.Tap(items, func(i int) { n += i }) + iter.CollectAll(ctx, items) + require.Equal(t, 5050, n) +} + func TestSome(t *testing.T) { req := require.New(t) it := iter.OfRange(0, 2) diff --git a/br/pkg/utils/iter/combinators.go b/br/pkg/utils/iter/combinators.go index 83a553fbf4a9b..48e8c49e623a4 100644 --- a/br/pkg/utils/iter/combinators.go +++ b/br/pkg/utils/iter/combinators.go @@ -7,11 +7,15 @@ import ( "golang.org/x/sync/errgroup" ) -type chunkMapping[T, R any] struct { - inner TryNextor[T] - mapper func(context.Context, T) (R, error) +type chunkMappingCfg struct { chunkSize uint quota *utils.WorkerPool +} + +type chunkMapping[T, R any] struct { + chunkMappingCfg + inner TryNextor[T] + mapper func(context.Context, T) (R, error) buffer fromSlice[R] } @@ -55,16 +59,16 @@ func (m *chunkMapping[T, R]) TryNext(ctx context.Context) IterResult[R] { return DoneBy[R](r2) } -type TransformConfig[T, R any] func(*chunkMapping[T, R]) +type TransformConfig func(*chunkMappingCfg) -func WithConcurrency[T, R any](n uint) TransformConfig[T, R] { - return func(c *chunkMapping[T, R]) { +func WithConcurrency(n uint) TransformConfig { + return func(c *chunkMappingCfg) { c.quota = utils.NewWorkerPool(n, "transforming") } } -func WithChunkSize[T, R any](n uint) TransformConfig[T, R] { - return func(c *chunkMapping[T, R]) { +func WithChunkSize(n uint) TransformConfig { + return func(c *chunkMappingCfg) { c.chunkSize = n } } @@ -73,14 +77,16 @@ func WithChunkSize[T, R any](n uint) TransformConfig[T, R] { // then emitting the result of that procedure. // The execution of that procedure can be paralleled with the config `WithConcurrency`. // You may also need to config the `WithChunkSize`, because the concurrent execution is only available intra-batch. -func Transform[T, R any](it TryNextor[T], with func(context.Context, T) (R, error), cs ...TransformConfig[T, R]) TryNextor[R] { +func Transform[T, R any](it TryNextor[T], with func(context.Context, T) (R, error), cs ...TransformConfig) TryNextor[R] { r := &chunkMapping[T, R]{ - inner: it, - mapper: with, - chunkSize: 1, + inner: it, + mapper: with, + chunkMappingCfg: chunkMappingCfg{ + chunkSize: 1, + }, } for _, c := range cs { - c(r) + c(&r.chunkMappingCfg) } if r.quota == nil { r.quota = utils.NewWorkerPool(r.chunkSize, "max-concurrency") From 4a9c9475ee98effe34670b24a1666ef537667d35 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 11 Oct 2022 10:28:43 +0800 Subject: [PATCH 3/8] fix config Signed-off-by: Yu Juncen --- br/pkg/restore/log_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index c322364338ae7..ce80c8ee42da7 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -17,8 +17,8 @@ import ( ) const ( - readMetaConcurrency = 16 - readMetaBatchSize = 16 + readMetaConcurrency = 128 + readMetaBatchSize = 512 ) // MetaIter is the type of iterator of metadata files' content. From a15832435b5259cec194d9682bf7eba93d5e781e Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 11 Oct 2022 11:46:47 +0800 Subject: [PATCH 4/8] reorg file structure Signed-off-by: Yu Juncen --- br/pkg/utils/iter/combinator_test.go | 2 + br/pkg/utils/iter/combinator_types.go | 133 +++++++++++++++++++++++++ br/pkg/utils/iter/combinators.go | 135 +++----------------------- br/pkg/utils/iter/iter.go | 2 + br/pkg/utils/iter/source.go | 49 +--------- br/pkg/utils/iter/source_types.go | 52 ++++++++++ 6 files changed, 204 insertions(+), 169 deletions(-) create mode 100644 br/pkg/utils/iter/combinator_types.go create mode 100644 br/pkg/utils/iter/source_types.go diff --git a/br/pkg/utils/iter/combinator_test.go b/br/pkg/utils/iter/combinator_test.go index 16ff904cd56e9..e61223df5a001 100644 --- a/br/pkg/utils/iter/combinator_test.go +++ b/br/pkg/utils/iter/combinator_test.go @@ -1,3 +1,5 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + package iter_test import ( diff --git a/br/pkg/utils/iter/combinator_types.go b/br/pkg/utils/iter/combinator_types.go new file mode 100644 index 0000000000000..a3a43467b6e26 --- /dev/null +++ b/br/pkg/utils/iter/combinator_types.go @@ -0,0 +1,133 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package iter + +import ( + "context" + + "github.com/pingcap/tidb/br/pkg/utils" + "golang.org/x/sync/errgroup" +) + +type chunkMappingCfg struct { + chunkSize uint + quota *utils.WorkerPool +} + +type chunkMapping[T, R any] struct { + chunkMappingCfg + inner TryNextor[T] + mapper func(context.Context, T) (R, error) + + buffer fromSlice[R] +} + +func (m *chunkMapping[T, R]) fillChunk(ctx context.Context) IterResult[fromSlice[R]] { + eg, cx := errgroup.WithContext(ctx) + s := CollectMany(ctx, m.inner, m.chunkSize) + if s.FinishedOrError() { + return DoneBy[fromSlice[R]](s) + } + r := make([]R, len(s.Item)) + for i := 0; i < len(s.Item); i++ { + i := i + m.quota.ApplyOnErrorGroup(eg, func() error { + var err error + r[i], err = m.mapper(cx, s.Item[i]) + return err + }) + } + if err := eg.Wait(); err != nil { + return Throw[fromSlice[R]](err) + } + if len(r) > 0 { + return Emit(fromSlice[R](r)) + } + return Done[fromSlice[R]]() +} + +func (m *chunkMapping[T, R]) TryNext(ctx context.Context) IterResult[R] { + r := m.buffer.TryNext(ctx) + if !r.FinishedOrError() { + return Emit(r.Item) + } + + r2 := m.fillChunk(ctx) + if !r2.FinishedOrError() { + m.buffer = r2.Item + return m.TryNext(ctx) + } + + return DoneBy[R](r2) +} + +type filter[T any] struct { + inner TryNextor[T] + filterOutIf func(T) bool +} + +func (f filter[T]) TryNext(ctx context.Context) IterResult[T] { + r := f.inner.TryNext(ctx) + if r.Err != nil || r.Finished { + return r + } + + if f.filterOutIf(r.Item) { + return f.TryNext(ctx) + } + + return r +} + +type take[T any] struct { + n uint + inner TryNextor[T] +} + +func (t *take[T]) TryNext(ctx context.Context) IterResult[T] { + if t.n == 0 { + return Done[T]() + } + + t.n-- + return t.inner.TryNext(ctx) +} + +type join[T any] struct { + inner TryNextor[TryNextor[T]] + + current TryNextor[T] +} + +type pureMap[T, R any] struct { + inner TryNextor[T] + + mapper func(T) R +} + +func (p pureMap[T, R]) TryNext(ctx context.Context) IterResult[R] { + r := p.inner.TryNext(ctx) + + if r.FinishedOrError() { + return DoneBy[R](r) + } + return Emit(p.mapper(r.Item)) +} + +func (j *join[T]) TryNext(ctx context.Context) IterResult[T] { + r := j.current.TryNext(ctx) + if r.Err != nil { + j.inner = empty[TryNextor[T]]{} + return r + } + if !r.Finished { + return r + } + + nr := j.inner.TryNext(ctx) + if nr.FinishedOrError() { + return DoneBy[T](nr) + } + j.current = nr.Item + return j.TryNext(ctx) +} diff --git a/br/pkg/utils/iter/combinators.go b/br/pkg/utils/iter/combinators.go index 48e8c49e623a4..e852add07d7a4 100644 --- a/br/pkg/utils/iter/combinators.go +++ b/br/pkg/utils/iter/combinators.go @@ -1,64 +1,14 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + package iter import ( "context" "github.com/pingcap/tidb/br/pkg/utils" - "golang.org/x/sync/errgroup" ) -type chunkMappingCfg struct { - chunkSize uint - quota *utils.WorkerPool -} - -type chunkMapping[T, R any] struct { - chunkMappingCfg - inner TryNextor[T] - mapper func(context.Context, T) (R, error) - - buffer fromSlice[R] -} - -func (m *chunkMapping[T, R]) fillChunk(ctx context.Context) IterResult[fromSlice[R]] { - eg, cx := errgroup.WithContext(ctx) - s := CollectMany(ctx, m.inner, m.chunkSize) - if s.FinishedOrError() { - return DoneBy[fromSlice[R]](s) - } - r := make([]R, len(s.Item)) - for i := 0; i < len(s.Item); i++ { - i := i - m.quota.ApplyOnErrorGroup(eg, func() error { - var err error - r[i], err = m.mapper(cx, s.Item[i]) - return err - }) - } - if err := eg.Wait(); err != nil { - return Throw[fromSlice[R]](err) - } - if len(r) > 0 { - return Emit(fromSlice[R](r)) - } - return Done[fromSlice[R]]() -} - -func (m *chunkMapping[T, R]) TryNext(ctx context.Context) IterResult[R] { - r := m.buffer.TryNext(ctx) - if !r.FinishedOrError() { - return Emit(r.Item) - } - - r2 := m.fillChunk(ctx) - if !r2.FinishedOrError() { - m.buffer = r2.Item - return m.TryNext(ctx) - } - - return DoneBy[R](r2) -} - +// TransformConfig is the config for the combinator "transform". type TransformConfig func(*chunkMappingCfg) func WithConcurrency(n uint) TransformConfig { @@ -97,24 +47,8 @@ func Transform[T, R any](it TryNextor[T], with func(context.Context, T) (R, erro return r } -type filter[T any] struct { - inner TryNextor[T] - filterOutIf func(T) bool -} - -func (f filter[T]) TryNext(ctx context.Context) IterResult[T] { - r := f.inner.TryNext(ctx) - if r.Err != nil || r.Finished { - return r - } - - if f.filterOutIf(r.Item) { - return f.TryNext(ctx) - } - - return r -} - +// FilterOut returns an iterator that yields all elements the original iterator +// generated and DOESN'T satisfies the predicate. func FilterOut[T any](it TryNextor[T], f func(T) bool) TryNextor[T] { return filter[T]{ inner: it, @@ -122,20 +56,7 @@ func FilterOut[T any](it TryNextor[T], f func(T) bool) TryNextor[T] { } } -type take[T any] struct { - n uint - inner TryNextor[T] -} - -func (t *take[T]) TryNext(ctx context.Context) IterResult[T] { - if t.n == 0 { - return Done[T]() - } - - t.n-- - return t.inner.TryNext(ctx) -} - +// TakeFirst takes the first n elements of the iterator. func TakeFirst[T any](inner TryNextor[T], n uint) TryNextor[T] { return &take[T]{ n: n, @@ -143,45 +64,8 @@ func TakeFirst[T any](inner TryNextor[T], n uint) TryNextor[T] { } } -type join[T any] struct { - inner TryNextor[TryNextor[T]] - - current TryNextor[T] -} - -type pureMap[T, R any] struct { - inner TryNextor[T] - - mapper func(T) R -} - -func (p pureMap[T, R]) TryNext(ctx context.Context) IterResult[R] { - r := p.inner.TryNext(ctx) - - if r.FinishedOrError() { - return DoneBy[R](r) - } - return Emit(p.mapper(r.Item)) -} - -func (j *join[T]) TryNext(ctx context.Context) IterResult[T] { - r := j.current.TryNext(ctx) - if r.Err != nil { - j.inner = empty[TryNextor[T]]{} - return r - } - if !r.Finished { - return r - } - - nr := j.inner.TryNext(ctx) - if nr.FinishedOrError() { - return DoneBy[T](nr) - } - j.current = nr.Item - return j.TryNext(ctx) -} - +// FlapMap applies the mapper over every elements the origin iterator generates, +// then flatten them. func FlatMap[T, R any](it TryNextor[T], mapper func(T) TryNextor[R]) TryNextor[R] { return &join[R]{ inner: pureMap[T, TryNextor[R]]{ @@ -192,6 +76,7 @@ func FlatMap[T, R any](it TryNextor[T], mapper func(T) TryNextor[R]) TryNextor[R } } +// Map applies the mapper over every elements the origin iterator yields. func Map[T, R any](it TryNextor[T], mapper func(T) R) TryNextor[R] { return pureMap[T, R]{ inner: it, @@ -199,6 +84,8 @@ func Map[T, R any](it TryNextor[T], mapper func(T) R) TryNextor[R] { } } +// ConcatAll concatenates all elements yields by the iterators. +// In another word, it 'chains' all the input iterators. func ConcatAll[T any](items ...TryNextor[T]) TryNextor[T] { return &join[T]{ inner: FromSlice(items), diff --git a/br/pkg/utils/iter/iter.go b/br/pkg/utils/iter/iter.go index 60b667b57f2b1..069b37c4f369e 100644 --- a/br/pkg/utils/iter/iter.go +++ b/br/pkg/utils/iter/iter.go @@ -1,3 +1,5 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + package iter import ( diff --git a/br/pkg/utils/iter/source.go b/br/pkg/utils/iter/source.go index e9db21ea491a3..f2ea2fd8fb173 100644 --- a/br/pkg/utils/iter/source.go +++ b/br/pkg/utils/iter/source.go @@ -1,46 +1,18 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + package iter import ( - "context" - "golang.org/x/exp/constraints" ) -type fromSlice[T any] []T - -func (s *fromSlice[T]) TryNext(ctx context.Context) IterResult[T] { - if s == nil || len(*s) == 0 { - return Done[T]() - } - - var item T - item, *s = (*s)[0], (*s)[1:] - return Emit(item) -} - // FromSlice creates an iterator from a slice, the iterator would func FromSlice[T any](s []T) TryNextor[T] { sa := fromSlice[T](s) return &sa } -type ofRange[T constraints.Integer] struct { - end T - endExclusive bool - - current T -} - -func (r *ofRange[T]) TryNext(ctx context.Context) IterResult[T] { - if r.current > r.end || (r.current == r.end && r.endExclusive) { - return Done[T]() - } - - result := Emit(r.current) - r.current++ - return result -} - +// OfRange creates an iterator that yields elements in the integer range. func OfRange[T constraints.Integer](begin, end T) TryNextor[T] { return &ofRange[T]{ end: end, @@ -50,20 +22,7 @@ func OfRange[T constraints.Integer](begin, end T) TryNextor[T] { } } -type empty[T any] struct{} - -func (empty[T]) TryNext(ctx context.Context) IterResult[T] { - return Done[T]() -} - -type failure[T any] struct { - error -} - -func (f failure[T]) TryNext(ctx context.Context) IterResult[T] { - return Throw[T](f) -} - +// Fail creates an iterator always fail. func Fail[T any](err error) TryNextor[T] { return failure[T]{error: err} } diff --git a/br/pkg/utils/iter/source_types.go b/br/pkg/utils/iter/source_types.go new file mode 100644 index 0000000000000..41e9810de5286 --- /dev/null +++ b/br/pkg/utils/iter/source_types.go @@ -0,0 +1,52 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package iter + +import ( + "context" + + "golang.org/x/exp/constraints" +) + +type fromSlice[T any] []T + +func (s *fromSlice[T]) TryNext(ctx context.Context) IterResult[T] { + if s == nil || len(*s) == 0 { + return Done[T]() + } + + var item T + item, *s = (*s)[0], (*s)[1:] + return Emit(item) +} + +type ofRange[T constraints.Integer] struct { + end T + endExclusive bool + + current T +} + +func (r *ofRange[T]) TryNext(ctx context.Context) IterResult[T] { + if r.current > r.end || (r.current == r.end && r.endExclusive) { + return Done[T]() + } + + result := Emit(r.current) + r.current++ + return result +} + +type empty[T any] struct{} + +func (empty[T]) TryNext(ctx context.Context) IterResult[T] { + return Done[T]() +} + +type failure[T any] struct { + error +} + +func (f failure[T]) TryNext(ctx context.Context) IterResult[T] { + return Throw[T](f) +} From 0d49a588f00c5de4722da6afbf9259c1efc70809 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 11 Oct 2022 16:10:40 +0800 Subject: [PATCH 5/8] removed dead code Signed-off-by: Yu Juncen --- br/pkg/restore/log_client.go | 78 +++---------------------------- br/pkg/restore/log_client_test.go | 17 ++++++- 2 files changed, 22 insertions(+), 73 deletions(-) diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index ce80c8ee42da7..03ee52a19f127 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -9,11 +9,9 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" - "go.uber.org/zap" ) const ( @@ -191,76 +189,14 @@ func (rc *logFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter { } // ReadStreamMetaByTS is used for streaming task. collect all meta file by TS. -func (rc *logFileManager) ReadStreamMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) ([]*backuppb.Metadata, error) { - streamBackupMetaFiles := struct { - sync.Mutex - metas []*backuppb.Metadata - }{} - streamBackupMetaFiles.metas = make([]*backuppb.Metadata, 0, 128) - - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { - metadata, err := rc.helper.ParseToMetadata(raw) - if err != nil { - return err - } - streamBackupMetaFiles.Lock() - if restoreTS >= metadata.MinTs && metadata.MaxTs >= shiftedStartTS { - streamBackupMetaFiles.metas = append(streamBackupMetaFiles.metas, metadata) - } - streamBackupMetaFiles.Unlock() - return nil - }) +func (rc *logFileManager) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) { + metas, err := rc.StreamingMetaByTS(ctx, restoreTS) if err != nil { - return nil, errors.Trace(err) + return nil, err } - return streamBackupMetaFiles.metas, nil -} - -// ReadStreamDataFiles is used for streaming task. collect all meta file by TS. -func (rc *logFileManager) ReadStreamDataFiles( - ctx context.Context, - metas []*backuppb.Metadata, -) (dataFiles, metaFiles []*backuppb.DataFileInfo, err error) { - dFiles := make([]*backuppb.DataFileInfo, 0) - mFiles := make([]*backuppb.DataFileInfo, 0) - - for _, m := range metas { - _, exists := backuppb.MetaVersion_name[int32(m.MetaVersion)] - if !exists { - log.Warn("metaversion too new", zap.Reflect("version id", m.MetaVersion)) - } - for _, ds := range m.FileGroups { - metaRef := 0 - for _, d := range ds.DataFilesInfo { - if d.MinTs > rc.restoreTS { - continue - } else if d.Cf == stream.WriteCF && d.MaxTs < rc.startTS { - continue - } else if d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS { - continue - } - - // If ds.Path is empty, it is MetadataV1. - // Try to be compatible with newer metadata version - if m.MetaVersion > backuppb.MetaVersion_V1 { - d.Path = ds.Path - } - - if d.IsMeta { - mFiles = append(mFiles, d) - metaRef += 1 - } else { - dFiles = append(dFiles, d) - } - log.Debug("backup stream collect data partition", zap.Uint64("offset", d.RangeOffset), zap.Uint64("length", d.Length)) - } - // metadatav1 doesn't use cache - // Try to be compatible with newer metadata version - if m.MetaVersion > backuppb.MetaVersion_V1 { - rc.helper.InitCacheEntry(ds.Path, metaRef) - } - } + r := iter.CollectAll(ctx, metas) + if r.Err != nil { + return nil, errors.Trace(r.Err) } - - return dFiles, mFiles, nil + return r.Item, nil } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 6b389e51b7114..8ae5af6d15a49 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -231,7 +231,7 @@ func TestReadMetaBetweenTS(t *testing.T) { cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, cli.ShiftTS(), c.endTS) + metas, err := cli.ReadStreamMetaByTS(ctx, c.endTS) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -327,7 +327,7 @@ func TestReadMetaBetweenTSV2(t *testing.T) { cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, cli.ShiftTS(), c.endTS) + metas, err := cli.ReadStreamMetaByTS(ctx, c.endTS) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -551,6 +551,19 @@ func TestFileManager(t *testing.T) { dr(2, 6), wr(4, 5, 2), wr(50, 54, 42), dr(42, 50), }, }, + { + Metadata: []*backuppb.Metadata{ + m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)), + m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)), + m2(dr(100, 101), wr(102, 104, 100)), + }, + StartTS: 6, + RestoreTS: 80, + Requires: []*backuppb.DataFileInfo{ + wm(80, 81, 0), wm(5, 10, 1), dm(1, 8), + }, + SearchMeta: true, + }, } run := func(t *testing.T, c Case) { From 3260dd98b82d03abdf2faf4648f2363996afe1e2 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 12 Oct 2022 15:35:19 +0800 Subject: [PATCH 6/8] export Load* APIs Signed-off-by: hillium --- br/pkg/restore/client.go | 102 +---------------- br/pkg/restore/log_client.go | 180 ++++++++++++++++++++++++++---- br/pkg/restore/log_client_test.go | 44 ++++++-- br/pkg/task/stream.go | 30 +---- 4 files changed, 199 insertions(+), 157 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index be0f88fcf4822..849910335f867 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -5,7 +5,6 @@ package restore import ( "bytes" "context" - "crypto/sha256" "crypto/tls" "encoding/hex" "encoding/json" @@ -39,7 +38,6 @@ import ( "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/config" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" @@ -304,14 +302,6 @@ func (rc *Client) Close() { log.Info("Restore client closed") } -func (rc *Client) SetRestoreRangeTS(startTs, restoreTS, shiftStartTS uint64) { - rc.startTS = startTs - rc.restoreTS = restoreTS - rc.shiftStartTS = shiftStartTS - log.Info("set restore range ts", zap.Uint64("shift-start-ts", shiftStartTS), - zap.Uint64("start-ts", startTs), zap.Uint64("restored-ts", restoreTS)) -} - func (rc *Client) SetCurrentTS(ts uint64) { rc.currentTS = ts } @@ -374,10 +364,6 @@ func (rc *Client) IsRawKvMode() bool { return rc.backupMeta.IsRawKv } -func (rc *Client) InitMetadataHelper() { - rc.helper = stream.NewMetadataHelper() -} - // GetFilesInRawRange gets all files that are in the given range or intersects with the given range. func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backuppb.File, error) { if !rc.IsRawKvMode() { @@ -1831,6 +1817,8 @@ func (rc *Client) RestoreKVFiles( continue } fileReplica := file + // applyFunc blocks once there aren't enough workers. + // this would help us don't load too many DML file info. applyFunc(fileReplica) } if len(deleteFiles) > 0 { @@ -1951,24 +1939,6 @@ func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo { return files } -func (rc *Client) PrepareDDLFileCache( - ctx context.Context, - files MetaGroupIter, -) ([]*backuppb.DataFileInfo, error) { - fs := iter.CollectAll(ctx, files) - if fs.Err != nil { - return nil, fs.Err - } - - dataFileInfos := make([]*backuppb.DataFileInfo, 0) - for _, g := range fs.Item { - rc.helper.InitCacheEntry(g.Path, len(g.FileMetas)) - dataFileInfos = append(dataFileInfos, g.FileMetas...) - } - - return dataFileInfos, nil -} - // RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup. func (rc *Client) RestoreMetaKVFiles( ctx context.Context, @@ -2136,7 +2106,7 @@ func (rc *Client) RestoreBatchMetaKVFiles( // read all of entries from files. for _, f := range files { - es, nextEs, err := rc.readAllEntries(ctx, f, filterTS) + es, nextEs, err := rc.ReadAllEntries(ctx, f, filterTS) if err != nil { return nextKvEntries, errors.Trace(err) } @@ -2163,72 +2133,6 @@ func (rc *Client) RestoreBatchMetaKVFiles( return nextKvEntries, nil } -func (rc *Client) readAllEntries( - ctx context.Context, - file *backuppb.DataFileInfo, - filterTS uint64, -) ([]*KvEntryWithTS, []*KvEntryWithTS, error) { - kvEntries := make([]*KvEntryWithTS, 0) - nextKvEntries := make([]*KvEntryWithTS, 0) - - buff, err := rc.helper.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType, rc.storage) - if err != nil { - return nil, nil, errors.Trace(err) - } - - if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) { - return nil, nil, errors.Annotatef(berrors.ErrInvalidMetaFile, - "checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:]) - } - - iter := stream.NewEventIterator(buff) - for iter.Valid() { - iter.Next() - if iter.GetError() != nil { - return nil, nil, errors.Trace(iter.GetError()) - } - - txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()} - - if !stream.MaybeDBOrDDLJobHistoryKey(txnEntry.Key) { - // only restore mDB and mDDLHistory - continue - } - - ts, err := GetKeyTS(txnEntry.Key) - if err != nil { - return nil, nil, errors.Trace(err) - } - - // The commitTs in write CF need be limited on [startTs, restoreTs]. - // We can restore more key-value in default CF. - if ts > rc.restoreTS { - continue - } else if file.Cf == stream.WriteCF && ts < rc.startTS { - continue - } else if file.Cf == stream.DefaultCF && ts < rc.shiftStartTS { - continue - } - - if len(txnEntry.Value) == 0 { - // we might record duplicated prewrite keys in some conor cases. - // the first prewrite key has the value but the second don't. - // so we can ignore the empty value key. - // see details at https://github.com/pingcap/tiflow/issues/5468. - log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key)) - continue - } - - if ts < filterTS { - kvEntries = append(kvEntries, &KvEntryWithTS{e: txnEntry, ts: ts}) - } else { - nextKvEntries = append(nextKvEntries, &KvEntryWithTS{e: txnEntry, ts: ts}) - } - } - - return kvEntries, nextKvEntries, nil -} - func (rc *Client) restoreMetaKvEntries( ctx context.Context, sr *stream.SchemasReplace, diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index 03ee52a19f127..7c01897741ba2 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -3,15 +3,21 @@ package restore import ( + "bytes" "context" + "crypto/sha256" "strings" "sync" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" + "github.com/pingcap/tidb/kv" + "go.uber.org/zap" ) const ( @@ -28,8 +34,14 @@ type LogIter = iter.TryNextor[*backuppb.DataFileInfo] // MetaGroupIter is the iterator of flushes of metadata. type MetaGroupIter = iter.TryNextor[DDLMetaGroup] -// logFileManager is the manager for log files, which supports read / filter from the -// log backup archive. +// Meta is the metadata of files. +type Meta = *backuppb.Metadata + +// Log is the metadata of one file recording KV sequences. +type Log = *backuppb.DataFileInfo + +// logFileManager is the manager for log files of a certain restoration, +// which supports read / filter from the log backup archive with static start TS / restore TS. type logFileManager struct { // startTS and restoreTS are used for kv file restore. // TiKV will filter the key space that don't belong to [startTS, restoreTS]. @@ -46,12 +58,20 @@ type logFileManager struct { helper *stream.MetadataHelper } +// LogFileManagerInit is the config needed for initializing the log file manager. type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage } +type DDLMetaGroup struct { + Path string + FileMetas []*backuppb.DataFileInfo +} + +// CreateLogFileManager creates a log file manager using the specified config. +// Generally the config cannot be changed during its lifetime. func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFileManager, error) { fm := &logFileManager{ startTS: init.StartTS, @@ -102,8 +122,12 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error { return nil } -func (rc *logFileManager) StreamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) { - it, err := rc.ReadStreamMetadata(ctx, rc.storage) +func (rc *logFileManager) streamingMeta(ctx context.Context) (MetaIter, error) { + return rc.streamingMetaByTS(ctx, rc.restoreTS) +} + +func (rc *logFileManager) streamingMetaByTS(ctx context.Context, restoreTS uint64) (MetaIter, error) { + it, err := rc.createMetaIterOver(ctx, rc.storage) if err != nil { return nil, err } @@ -113,7 +137,7 @@ func (rc *logFileManager) StreamingMetaByTS(ctx context.Context, restoreTS uint6 return filtered, nil } -func (rc *logFileManager) ReadStreamMetadata(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) { +func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.ExternalStorage) (MetaIter, error) { opt := &storage.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()} names := []string{} err := s.WalkDir(ctx, opt, func(path string, size int64) error { @@ -130,11 +154,11 @@ func (rc *logFileManager) ReadStreamMetadata(ctx context.Context, s storage.Exte readMeta := func(ctx context.Context, name string) (*backuppb.Metadata, error) { f, err := s.ReadFile(ctx, name) if err != nil { - return nil, err + return nil, errors.Annotatef(err, "failed during reading file %s", name) } meta, err := rc.helper.ParseToMetadata(f) if err != nil { - return nil, err + return nil, errors.Annotatef(err, "failed to parse metadata of file %s", name) } return meta, nil } @@ -164,15 +188,77 @@ func (rc *logFileManager) ShouldFilterOut(d *backuppb.DataFileInfo) bool { (d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS) } -type DDLMetaGroup struct { - Path string - FileMetas []*backuppb.DataFileInfo +func (rc *logFileManager) collectDDLFilesAndPrepareCache( + ctx context.Context, + files MetaGroupIter, +) ([]Log, error) { + fs := iter.CollectAll(ctx, files) + if fs.Err != nil { + return nil, errors.Annotatef(fs.Err, "failed to collect from files") + } + + dataFileInfos := make([]*backuppb.DataFileInfo, 0) + for _, g := range fs.Item { + rc.helper.InitCacheEntry(g.Path, len(g.FileMetas)) + dataFileInfos = append(dataFileInfos, g.FileMetas...) + } + + return dataFileInfos, nil +} + +// LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. +// At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. +// This function returns all DDL files needing directly because we need sort all of them. +func (rc *logFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error) { + m, err := rc.streamingMeta(ctx) + if err != nil { + return nil, err + } + if counter != nil { + m = iter.Tap(m, func(m Meta) { + for _, fg := range m.FileGroups { + for _, f := range fg.DataFilesInfo { + if !f.IsMeta && !rc.ShouldFilterOut(f) { + *counter += 1 + } + } + } + }) + } + mg := rc.FilterMetaFiles(m) + + return rc.collectDDLFilesAndPrepareCache(ctx, mg) +} + +// LoadDMLFiles loads all DML files needs to be restored in the restoration. +// This function returns a stream, because there are usually many DML files need to be restored. +func (rc *logFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error) { + m, err := rc.streamingMeta(ctx) + if err != nil { + return nil, err + } + + mg := rc.FilterDataFiles(m) + return mg, nil +} + +// readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage. +func (rc *logFileManager) readStreamMeta(ctx context.Context) ([]Meta, error) { + metas, err := rc.streamingMeta(ctx) + if err != nil { + return nil, err + } + r := iter.CollectAll(ctx, metas) + if r.Err != nil { + return nil, errors.Trace(r.Err) + } + return r.Item, nil } func (rc *logFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter { - return iter.FlatMap(ms, func(m *backuppb.Metadata) MetaGroupIter { + return iter.FlatMap(ms, func(m Meta) MetaGroupIter { return iter.Map(iter.FromSlice(m.FileGroups), func(g *backuppb.DataFileGroup) DDLMetaGroup { - metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d *backuppb.DataFileInfo) bool { + metas := iter.FilterOut(iter.FromSlice(g.DataFilesInfo), func(d Log) bool { // Modify the data internally, a little hacky. if m.MetaVersion > backuppb.MetaVersion_V1 { d.Path = g.Path @@ -188,15 +274,69 @@ func (rc *logFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter { }) } -// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS. -func (rc *logFileManager) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) { - metas, err := rc.StreamingMetaByTS(ctx, restoreTS) +// ReadAllEntries loads content of a log file, with filtering out no needed entries. +func (rc *logFileManager) ReadAllEntries( + ctx context.Context, + file Log, + filterTS uint64, +) ([]*KvEntryWithTS, []*KvEntryWithTS, error) { + kvEntries := make([]*KvEntryWithTS, 0) + nextKvEntries := make([]*KvEntryWithTS, 0) + + buff, err := rc.helper.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType, rc.storage) if err != nil { - return nil, err + return nil, nil, errors.Trace(err) } - r := iter.CollectAll(ctx, metas) - if r.Err != nil { - return nil, errors.Trace(r.Err) + + if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) { + return nil, nil, errors.Annotatef(berrors.ErrInvalidMetaFile, + "checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:]) } - return r.Item, nil + + iter := stream.NewEventIterator(buff) + for iter.Valid() { + iter.Next() + if iter.GetError() != nil { + return nil, nil, errors.Trace(iter.GetError()) + } + + txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()} + + if !stream.MaybeDBOrDDLJobHistoryKey(txnEntry.Key) { + // only restore mDB and mDDLHistory + continue + } + + ts, err := GetKeyTS(txnEntry.Key) + if err != nil { + return nil, nil, errors.Trace(err) + } + + // The commitTs in write CF need be limited on [startTs, restoreTs]. + // We can restore more key-value in default CF. + if ts > rc.restoreTS { + continue + } else if file.Cf == stream.WriteCF && ts < rc.startTS { + continue + } else if file.Cf == stream.DefaultCF && ts < rc.shiftStartTS { + continue + } + + if len(txnEntry.Value) == 0 { + // we might record duplicated prewrite keys in some conor cases. + // the first prewrite key has the value but the second don't. + // so we can ignore the empty value key. + // see details at https://github.com/pingcap/tiflow/issues/5468. + log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key)) + continue + } + + if ts < filterTS { + kvEntries = append(kvEntries, &KvEntryWithTS{e: txnEntry, ts: ts}) + } else { + nextKvEntries = append(nextKvEntries, &KvEntryWithTS{e: txnEntry, ts: ts}) + } + } + + return kvEntries, nextKvEntries, nil } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 8ae5af6d15a49..d4c8567b63d82 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -231,7 +231,7 @@ func TestReadMetaBetweenTS(t *testing.T) { cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, c.endTS) + metas, err := cli.readStreamMeta(ctx) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -327,7 +327,7 @@ func TestReadMetaBetweenTSV2(t *testing.T) { cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) req.NoError(err) - metas, err := cli.ReadStreamMetaByTS(ctx, c.endTS) + metas, err := cli.readStreamMeta(ctx) req.NoError(err) actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { @@ -534,10 +534,13 @@ func TestFileManager(t *testing.T) { StartTS int RestoreTS int - SearchMeta bool - Requires []*backuppb.DataFileInfo + SearchMeta bool + DMLFileCount *int + + Requires []*backuppb.DataFileInfo } + indirect := func(i int) *int { return &i } cases := []Case{ { Metadata: []*backuppb.Metadata{ @@ -551,6 +554,20 @@ func TestFileManager(t *testing.T) { dr(2, 6), wr(4, 5, 2), wr(50, 54, 42), dr(42, 50), }, }, + { + Metadata: []*backuppb.Metadata{ + m2(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)), + m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)), + m2(dr(100, 101), wr(102, 104, 100)), + }, + StartTS: 5, + RestoreTS: 80, + Requires: []*backuppb.DataFileInfo{ + wm(80, 81, 0), wm(4, 10, 1), dm(1, 8), + }, + SearchMeta: true, + DMLFileCount: indirect(5), + }, { Metadata: []*backuppb.Metadata{ m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)), @@ -588,16 +605,21 @@ func TestFileManager(t *testing.T) { }) req.NoError(err) - metas, err := fm.StreamingMetaByTS(ctx, end) - req.NoError(err) - metas = iter.Tap(metas, func(m *backuppb.Metadata) { - fmt.Printf("%d-%d:%s", m.MinTs, m.MaxTs, formatL(m.FileGroups[0].DataFilesInfo)) - }) var datas LogIter if !c.SearchMeta { - datas = fm.FilterDataFiles(metas) + datas, err = fm.LoadDMLFiles(ctx) + req.NoError(err) } else { - datas = iter.FlatMap(fm.FilterMetaFiles(metas), func(g DDLMetaGroup) iter.TryNextor[*backuppb.DataFileInfo] { return iter.FromSlice(g.FileMetas) }) + var counter *int + if c.DMLFileCount != nil { + counter = new(int) + } + data, err := fm.LoadDDLFilesAndCountDMLFiles(ctx, counter) + req.NoError(err) + if counter != nil { + req.Equal(*c.DMLFileCount, *counter) + } + datas = iter.FromSlice(data) } r := iter.CollectAll(ctx, datas) dataFileInfoMatches(t, r.Item, c.Requires...) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index a092b6fd656a4..8485b2c741222 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -46,7 +46,6 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" @@ -1123,29 +1122,6 @@ func restoreStream( return err } - // read meta by given ts. - metas, err := client.StreamingMetaByTS(ctx, cfg.RestoreTS) - if err != nil { - return errors.Trace(err) - } - - dataFileCount := 0 - metas = iter.Tap(metas, func(m *backuppb.Metadata) { - for _, fg := range m.FileGroups { - for _, f := range fg.DataFilesInfo { - if !f.IsMeta && !client.ShouldFilterOut(f) { - dataFileCount += 1 - } - } - } - }) - - // read data file by given ts. - metaFiles := client.FilterMetaFiles(metas) - if err != nil { - return errors.Trace(err) - } - // get full backup meta to generate rewrite rules. fullBackupTables, err := initFullBackupTables(ctx, cfg) if err != nil { @@ -1175,7 +1151,8 @@ func restoreStream( totalKVCount += kvCount totalSize += size } - ddlFiles, err := client.PrepareDDLFileCache(ctx, metaFiles) + dataFileCount := 0 + ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx, &dataFileCount) if err != nil { return err } @@ -1194,9 +1171,8 @@ func restoreStream( } updateRewriteRules(rewriteRules, schemasReplace) - metas, err = client.StreamingMetaByTS(ctx, cfg.RestoreTS) + dmlFiles, err := client.LoadDMLFiles(ctx) pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) - dmlFiles := client.FilterDataFiles(metas) err = withProgress(pd, func(p glue.Progress) error { return client.RestoreKVFiles(ctx, rewriteRules, dmlFiles, updateStats, p.Inc) }) From 4421b5bfa071b1a623b6c92f0e8c39aef67eff9d Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 13 Oct 2022 15:22:38 +0800 Subject: [PATCH 7/8] added a new test case --- br/pkg/utils/iter/combinator_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/br/pkg/utils/iter/combinator_test.go b/br/pkg/utils/iter/combinator_test.go index e61223df5a001..97d847f769783 100644 --- a/br/pkg/utils/iter/combinator_test.go +++ b/br/pkg/utils/iter/combinator_test.go @@ -5,6 +5,7 @@ package iter_test import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -78,3 +79,22 @@ func TestSome(t *testing.T) { req.Equal(it.TryNext(c), iter.Done[int]()) req.Equal(it.TryNext(c), iter.Done[int]()) } + +func TestErrorDuringTransforming(t *testing.T) { + req := require.New(t) + items := iter.OfRange(1, 20) + running := new(atomic.Int32) + items = iter.Transform(items, func(ctx context.Context, i int) (int, error) { + if i == 10 { + return 0, errors.New("meow") + } + running.Add(1) + return i, nil + }, iter.WithChunkSize(16), iter.WithConcurrency(8)) + + coll := iter.CollectAll(context.TODO(), items) + req.Greater(running.Load(), int32(8)) + // Should be melted down. + req.Less(running.Load(), int32(16)) + req.ErrorContains(coll.Err, "meow") +} From 27f8e0ef83d520133e32865a4dbbe438fe8f8476 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 28 Oct 2022 12:37:01 +0800 Subject: [PATCH 8/8] address comments Signed-off-by: hillium --- br/pkg/utils/iter/combinator_types.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/br/pkg/utils/iter/combinator_types.go b/br/pkg/utils/iter/combinator_types.go index a3a43467b6e26..c08f37e81a655 100644 --- a/br/pkg/utils/iter/combinator_types.go +++ b/br/pkg/utils/iter/combinator_types.go @@ -67,16 +67,12 @@ type filter[T any] struct { } func (f filter[T]) TryNext(ctx context.Context) IterResult[T] { - r := f.inner.TryNext(ctx) - if r.Err != nil || r.Finished { - return r - } - - if f.filterOutIf(r.Item) { - return f.TryNext(ctx) + for { + r := f.inner.TryNext(ctx) + if r.Err != nil || r.Finished || !f.filterOutIf(r.Item) { + return r + } } - - return r } type take[T any] struct {