diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 34db2d3f37a08..e1ffd7bfb2f5d 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -141,6 +141,7 @@ go_test( "//types", "//util/codec", "//util/mathutil", + "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index b6240819dad71..9cef2769bf670 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -370,12 +370,19 @@ func TestReadFromMetadata(t *testing.T) { meta := new(StreamMetadataSet) meta.Helper = stream.NewMetadataHelper() - meta.LoadUntil(ctx, loc, c.untilTS) + meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS) var metas []*backuppb.Metadata - for _, m := range meta.metadata { + for path := range meta.metadataInfos { + data, err := loc.ReadFile(ctx, path) + require.NoError(t, err) + + m, err := meta.Helper.ParseToMetadataHard(data) + require.NoError(t, err) + metas = append(metas, m) } + actualStoreIDs := make([]int64, 0, len(metas)) for _, meta := range metas { actualStoreIDs = append(actualStoreIDs, meta.StoreId) @@ -436,10 +443,16 @@ func TestReadFromMetadataV2(t *testing.T) { meta := new(StreamMetadataSet) meta.Helper = stream.NewMetadataHelper() - meta.LoadUntil(ctx, loc, c.untilTS) + meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS) var metas []*backuppb.Metadata - for _, m := range meta.metadata { + for path := range meta.metadataInfos { + data, err := loc.ReadFile(ctx, path) + require.NoError(t, err) + + m, err := meta.Helper.ParseToMetadataHard(data) + require.NoError(t, err) + metas = append(metas, m) } actualStoreIDs := make([]int64, 0, len(metas)) diff --git a/br/pkg/restore/stream_metas.go b/br/pkg/restore/stream_metas.go index 7468573ce6ba8..2aa9c8f11a9db 100644 --- a/br/pkg/restore/stream_metas.go +++ b/br/pkg/restore/stream_metas.go @@ -12,62 +12,116 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/util/mathutil" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) +const notDeletedBecameFatalThreshold = 128 + type StreamMetadataSet struct { - metadata map[string]*backuppb.Metadata - // The metadata after changed that needs to be write back. - writeback map[string]*backuppb.Metadata + // if set true, the metadata and datafile won't be removed + DryRun bool + + // keeps the meta-information of metadata as little as possible + // to save the memory + metadataInfos map[string]*MetadataInfo + // a parser of metadata Helper *stream.MetadataHelper - BeforeDoWriteBack func(path string, last, current *backuppb.Metadata) (skip bool) + // for test + BeforeDoWriteBack func(path string, replaced *backuppb.Metadata) (skip bool) } -// LoadUntil loads the metadata until the specified timestamp. -// This would load all metadata files that *may* contain data from transaction committed before that TS. -// Note: maybe record the timestamp and reject reading data files after this TS? -func (ms *StreamMetadataSet) LoadUntil(ctx context.Context, s storage.ExternalStorage, until uint64) error { +// keep these meta-information for statistics and filtering +type FileGroupInfo struct { + MaxTS uint64 + Length uint64 + KVCount int64 +} + +// keep these meta-information for statistics and filtering +type MetadataInfo struct { + MinTS uint64 + FileGroupInfos []*FileGroupInfo +} + +// LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. +// This would record all metadata files that *may* contain data from transaction committed before that TS. +func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error) { metadataMap := struct { sync.Mutex - metas map[string]*backuppb.Metadata + metas map[string]*MetadataInfo + shiftUntilTS uint64 }{} - ms.writeback = make(map[string]*backuppb.Metadata) - metadataMap.metas = make(map[string]*backuppb.Metadata) + metadataMap.metas = make(map[string]*MetadataInfo) + // `shiftUntilTS` must be less than `until` + metadataMap.shiftUntilTS = until err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { m, err := ms.Helper.ParseToMetadataHard(raw) if err != nil { return err } - metadataMap.Lock() // If the meta file contains only files with ts grater than `until`, when the file is from // `Default`: it should be kept, because its corresponding `write` must has commit ts grater than it, which should not be considered. // `Write`: it should trivially not be considered. if m.MinTs <= until { - metadataMap.metas[path] = m + // record these meta-information for statistics and filtering + fileGroupInfos := make([]*FileGroupInfo, 0, len(m.FileGroups)) + for _, group := range m.FileGroups { + var kvCount int64 = 0 + for _, file := range group.DataFilesInfo { + kvCount += file.NumberOfEntries + } + fileGroupInfos = append(fileGroupInfos, &FileGroupInfo{ + MaxTS: group.MaxTs, + Length: group.Length, + KVCount: kvCount, + }) + } + metadataMap.Lock() + metadataMap.metas[path] = &MetadataInfo{ + MinTS: m.MinTs, + FileGroupInfos: fileGroupInfos, + } + metadataMap.Unlock() + } + // filter out the metadatas whose ts-range is overlap with [until, +inf) + // and calculate their minimum begin-default-ts + ts, ok := UpdateShiftTS(m, until, mathutil.MaxUint) + if ok { + metadataMap.Lock() + if ts < metadataMap.shiftUntilTS { + metadataMap.shiftUntilTS = ts + } + metadataMap.Unlock() } - metadataMap.Unlock() return nil }) if err != nil { - return errors.Trace(err) + return 0, errors.Trace(err) } - ms.metadata = metadataMap.metas - return nil + ms.metadataInfos = metadataMap.metas + if metadataMap.shiftUntilTS != until { + log.Warn("calculate shift-ts", zap.Uint64("start-ts", until), zap.Uint64("shift-ts", metadataMap.shiftUntilTS)) + } + return metadataMap.shiftUntilTS, nil } -// LoadFrom loads data from an external storage into the stream metadata set. +// LoadFrom loads data from an external storage into the stream metadata set. (Now only for test) func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error { - return ms.LoadUntil(ctx, s, math.MaxUint64) + _, err := ms.LoadUntilAndCalculateShiftTS(ctx, s, math.MaxUint64) + return err } -func (ms *StreamMetadataSet) iterateDataFiles(f func(d *backuppb.DataFileGroup) (shouldBreak bool)) { - for _, m := range ms.metadata { - for _, d := range m.FileGroups { +func (ms *StreamMetadataSet) iterateDataFiles(f func(d *FileGroupInfo) (shouldBreak bool)) { + for _, m := range ms.metadataInfos { + for _, d := range m.FileGroupInfos { if f(d) { return } @@ -75,21 +129,6 @@ func (ms *StreamMetadataSet) iterateDataFiles(f func(d *backuppb.DataFileGroup) } } -// CalculateShiftTS calculates the shift-ts. -func (ms *StreamMetadataSet) CalculateShiftTS(startTS uint64) uint64 { - metadatas := make([]*backuppb.Metadata, 0, len(ms.metadata)) - for _, m := range ms.metadata { - metadatas = append(metadatas, m) - } - - minBeginTS, exist := CalculateShiftTS(metadatas, startTS, mathutil.MaxUint) - if !exist { - minBeginTS = startTS - } - log.Warn("calculate shift-ts", zap.Uint64("start-ts", startTS), zap.Uint64("shift-ts", minBeginTS)) - return minBeginTS -} - // IterateFilesFullyBefore runs the function over all files contain data before the timestamp only. // // 0 before @@ -98,78 +137,145 @@ func (ms *StreamMetadataSet) CalculateShiftTS(startTS uint64) uint64 { // |-file2--------------| <- File contains any record out of this won't be found. // // This function would call the `f` over file1 only. -func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *backuppb.DataFileGroup) (shouldBreak bool)) { - ms.iterateDataFiles(func(d *backuppb.DataFileGroup) (shouldBreak bool) { - if d.MaxTs >= before { +func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool)) { + ms.iterateDataFiles(func(d *FileGroupInfo) (shouldBreak bool) { + if d.MaxTS >= before { return false } return f(d) }) } -// RemoveDataBefore would find files contains only records before the timestamp, mark them as removed from meta, -// and returning their information. -func (ms *StreamMetadataSet) RemoveDataBefore(from uint64) []*backuppb.DataFileGroup { - removed := []*backuppb.DataFileGroup{} - for metaPath, m := range ms.metadata { - remainedDataFiles := make([]*backuppb.DataFileGroup, 0) - // can we assume those files are sorted to avoid traversing here? (by what?) - for _, ds := range m.FileGroups { - if ds.MaxTs < from { - removed = append(removed, ds) - } else { - remainedDataFiles = append(remainedDataFiles, ds) +// RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. +// Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. +// Returns the not deleted datafilegroups. +func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, storage storage.ExternalStorage, updateFn func(num int64)) ([]string, error) { + var notDeleted struct { + item []string + sync.Mutex + } + worker := utils.NewWorkerPool(128, "delete files") + eg, cx := errgroup.WithContext(ctx) + for path, metaInfo := range ms.metadataInfos { + path := path + minTS := metaInfo.MinTS + // It's safety to remove the item within a range loop + delete(ms.metadataInfos, path) + if minTS >= from { + // That means all the datafiles wouldn't be removed, + // so that the metadata is skipped. + continue + } + worker.ApplyOnErrorGroup(eg, func() error { + if cx.Err() != nil { + return cx.Err() + } + + data, err := storage.ReadFile(ctx, path) + if err != nil { + return err + } + + meta, err := ms.Helper.ParseToMetadataHard(data) + if err != nil { + return err + } + + num, notDeletedItems, err := ms.removeDataFilesAndUpdateMetadata(ctx, storage, from, meta, path) + if err != nil { + return err } + + updateFn(num) + + notDeleted.Lock() + notDeleted.item = append(notDeleted.item, notDeletedItems...) + notDeleted.Unlock() + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, errors.Trace(err) + } + + return notDeleted.item, nil +} + +// removeDataFilesAndUpdateMetadata removes some datafilegroups of the metadata, if their max-ts is less than `from` +func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata(ctx context.Context, storage storage.ExternalStorage, from uint64, meta *backuppb.Metadata, metaPath string) (num int64, notDeleted []string, err error) { + removed := make([]*backuppb.DataFileGroup, 0) + remainedDataFiles := make([]*backuppb.DataFileGroup, 0) + notDeleted = make([]string, 0) + // can we assume those files are sorted to avoid traversing here? (by what?) + for _, ds := range meta.FileGroups { + if ds.MaxTs < from { + removed = append(removed, ds) + } else { + // That means some kvs in the datafilegroup shouldn't be removed, + // so it will be kept out being removed. + remainedDataFiles = append(remainedDataFiles, ds) } - if len(remainedDataFiles) != len(m.FileGroups) { - mCopy := *m - mCopy.FileGroups = remainedDataFiles - ms.WriteBack(metaPath, &mCopy) + } + + num = int64(len(removed)) + + if ms.DryRun { + log.Debug("dry run, skip deletion ...") + return num, notDeleted, nil + } + + // remove data file groups + for _, f := range removed { + log.Debug("Deleting file", zap.String("path", f.Path)) + if err := storage.DeleteFile(ctx, f.Path); err != nil { + log.Warn("File not deleted.", zap.String("path", f.Path), logutil.ShortError(err)) + notDeleted = append(notDeleted, f.Path) + if len(notDeleted) > notDeletedBecameFatalThreshold { + return num, notDeleted, errors.Annotatef(berrors.ErrPiTRMalformedMetadata, "too many failure when truncating") + } } } - return removed -} -func (ms *StreamMetadataSet) WriteBack(path string, file *backuppb.Metadata) { - ms.writeback[path] = file -} + // update metadata + if len(remainedDataFiles) != len(meta.FileGroups) { + // rewrite metadata + log.Info("Updating metadata.", zap.String("file", metaPath), + zap.Int("data-file-before", len(meta.FileGroups)), + zap.Int("data-file-after", len(remainedDataFiles))) + + // replace the filegroups and update the ts of the replaced metadata + ReplaceMetadata(meta, remainedDataFiles) -func (ms *StreamMetadataSet) doWriteBackForFile(ctx context.Context, s storage.ExternalStorage, path string) error { - data, ok := ms.writeback[path] - if !ok { - return errors.Annotatef(berrors.ErrInvalidArgument, "There is no write back for path %s", path) + if ms.BeforeDoWriteBack != nil && ms.BeforeDoWriteBack(metaPath, meta) { + return num, notDeleted, nil + } + + if err := ms.doWriteBackForFile(ctx, storage, metaPath, meta); err != nil { + // NOTE: Maybe we'd better roll back all writebacks? (What will happen if roll back fails too?) + return num, notDeleted, errors.Annotatef(err, "failed to write back file %s", metaPath) + } } + + return num, notDeleted, nil +} + +func (ms *StreamMetadataSet) doWriteBackForFile(ctx context.Context, s storage.ExternalStorage, path string, meta *backuppb.Metadata) error { // If the metadata file contains no data file, remove it due to it is meanless. - if len(data.FileGroups) == 0 { + if len(meta.FileGroups) == 0 { if err := s.DeleteFile(ctx, path); err != nil { return errors.Annotatef(err, "failed to remove the empty meta %s", path) } return nil } - bs, err := ms.Helper.Marshal(data) + bs, err := ms.Helper.Marshal(meta) if err != nil { return errors.Annotatef(err, "failed to marshal the file %s", path) } return truncateAndWrite(ctx, s, path, bs) } -func (ms *StreamMetadataSet) DoWriteBack(ctx context.Context, s storage.ExternalStorage) error { - for path := range ms.writeback { - if ms.BeforeDoWriteBack != nil && ms.BeforeDoWriteBack(path, ms.metadata[path], ms.writeback[path]) { - return nil - } - err := ms.doWriteBackForFile(ctx, s, path) - // NOTE: Maybe we'd better roll back all writebacks? (What will happen if roll back fails too?) - if err != nil { - return errors.Annotatef(err, "failed to write back file %s", path) - } - - delete(ms.writeback, path) - } - return nil -} - func truncateAndWrite(ctx context.Context, s storage.ExternalStorage, path string, data []byte) error { // Performance hack: the `Write` implemention would truncate the file if it exists. if err := s.WriteFile(ctx, path, data); err != nil { @@ -248,26 +354,30 @@ func UpdateShiftTS(m *backuppb.Metadata, startTS uint64, restoreTS uint64) (uint return minBeginTS, isExist } -// CalculateShiftTS gets the minimal begin-ts about transaction according to the kv-event in write-cf. -func CalculateShiftTS( - metas []*backuppb.Metadata, - startTS uint64, - restoreTS uint64, -) (uint64, bool) { - var ( - minBeginTS uint64 - isExist bool - ) - for _, m := range metas { - if len(m.FileGroups) == 0 || m.MinTs > restoreTS || m.MaxTs < startTS { - continue +// replace the filegroups and update the ts of the replaced metadata +func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGroup) { + // replace the origin metadata + meta.FileGroups = filegroups + + if len(meta.FileGroups) == 0 { + meta.MinTs = 0 + meta.MaxTs = 0 + meta.ResolvedTs = 0 + return + } + + meta.MinTs = meta.FileGroups[0].MinTs + meta.MaxTs = meta.FileGroups[0].MaxTs + meta.ResolvedTs = meta.FileGroups[0].MinResolvedTs + for _, group := range meta.FileGroups { + if group.MinTs < meta.MinTs { + meta.MinTs = group.MinTs + } + if group.MaxTs > meta.MaxTs { + meta.MaxTs = group.MaxTs } - ts, ok := UpdateShiftTS(m, startTS, restoreTS) - if ok && (!isExist || ts < minBeginTS) { - minBeginTS = ts - isExist = true + if group.MinResolvedTs < meta.ResolvedTs { + meta.ResolvedTs = group.MinResolvedTs } } - - return minBeginTS, isExist } diff --git a/br/pkg/restore/stream_metas_test.go b/br/pkg/restore/stream_metas_test.go index 8e75f7544885e..5b75e9de6b3d8 100644 --- a/br/pkg/restore/stream_metas_test.go +++ b/br/pkg/restore/stream_metas_test.go @@ -6,7 +6,10 @@ import ( "context" "fmt" "math/rand" + "os" + "path" "path/filepath" + "sync" "testing" "github.com/fsouza/fake-gcs-server/fakestorage" @@ -16,7 +19,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" - "github.com/pingcap/tidb/util/mathutil" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -36,13 +38,59 @@ func fakeDataFiles(s storage.ExternalStorage, base, item int) (result []*backupp return } +func fakeDataFilesV2(s storage.ExternalStorage, base, item int) (result []*backuppb.DataFileGroup) { + ctx := context.Background() + for i := base; i < base+item; i++ { + path := fmt.Sprintf("%04d_to_%04d.log", i, i+2) + s.WriteFile(ctx, path, []byte("test")) + data := &backuppb.DataFileGroup{ + Path: path, + MinTs: uint64(i), + MaxTs: uint64(i + 2), + } + result = append(result, data) + } + return +} + +func tsOfFile(dfs []*backuppb.DataFileInfo) (uint64, uint64) { + var minTS uint64 = 9876543210 + var maxTS uint64 = 0 + for _, df := range dfs { + if df.MaxTs > maxTS { + maxTS = df.MaxTs + } + if df.MinTs < minTS { + minTS = df.MinTs + } + } + return minTS, maxTS +} + +func tsOfFileGroup(dfs []*backuppb.DataFileGroup) (uint64, uint64) { + var minTS uint64 = 9876543210 + var maxTS uint64 = 0 + for _, df := range dfs { + if df.MaxTs > maxTS { + maxTS = df.MaxTs + } + if df.MinTs < minTS { + minTS = df.MinTs + } + } + return minTS, maxTS +} + func fakeStreamBackup(s storage.ExternalStorage) error { ctx := context.Background() base := 0 for i := 0; i < 6; i++ { dfs := fakeDataFiles(s, base, 4) base += 4 + minTS, maxTS := tsOfFile(dfs) meta := &backuppb.Metadata{ + MinTs: minTS, + MaxTs: maxTS, Files: dfs, StoreId: int64(i%3 + 1), } @@ -64,43 +112,13 @@ func fakeStreamBackupV2(s storage.ExternalStorage) error { ctx := context.Background() base := 0 for i := 0; i < 6; i++ { - dfs := fakeDataFiles(s, base, 4) - minTs1 := uint64(18446744073709551615) - maxTs1 := uint64(0) - for _, f := range dfs[0:2] { - f.Path = fmt.Sprintf("%d", i) - if minTs1 > f.MinTs { - minTs1 = f.MinTs - } - if maxTs1 < f.MaxTs { - maxTs1 = f.MaxTs - } - } - minTs2 := uint64(18446744073709551615) - maxTs2 := uint64(0) - for _, f := range dfs[2:] { - f.Path = fmt.Sprintf("%d", i) - if minTs2 > f.MinTs { - minTs2 = f.MinTs - } - if maxTs2 < f.MaxTs { - maxTs2 = f.MaxTs - } - } + dfs := fakeDataFilesV2(s, base, 4) base += 4 + minTS, maxTS := tsOfFileGroup(dfs) meta := &backuppb.Metadata{ - FileGroups: []*backuppb.DataFileGroup{ - { - DataFilesInfo: dfs[0:2], - MinTs: minTs1, - MaxTs: maxTs1, - }, - { - DataFilesInfo: dfs[2:], - MinTs: minTs2, - MaxTs: maxTs2, - }, - }, + MinTs: minTS, + MaxTs: maxTS, + FileGroups: dfs, StoreId: int64(i%3 + 1), MetaVersion: backuppb.MetaVersion_V2, } @@ -135,42 +153,59 @@ func TestTruncateLog(t *testing.T) { } require.NoError(t, s.LoadFrom(ctx, l)) - fs := []*backuppb.DataFileGroup{} - s.IterateFilesFullyBefore(17, func(d *backuppb.DataFileGroup) (shouldBreak bool) { + fs := []*restore.FileGroupInfo{} + s.IterateFilesFullyBefore(17, func(d *restore.FileGroupInfo) (shouldBreak bool) { fs = append(fs, d) - require.Less(t, d.MaxTs, uint64(17)) + require.Less(t, d.MaxTS, uint64(17)) return false }) require.Len(t, fs, 15) - s.RemoveDataBefore(17) - deletedFiles := []string{} - modifiedFiles := []string{} - s.BeforeDoWriteBack = func(path string, last, current *backuppb.Metadata) bool { - require.NotNil(t, last) - if len(current.GetFileGroups()) == 0 { - deletedFiles = append(deletedFiles, path) - } else if len(current.GetFileGroups()) != len(last.GetFileGroups()) { - modifiedFiles = append(modifiedFiles, path) + var lock sync.Mutex + remainedFiles := []string{} + remainedDataFiles := []string{} + removedMetaFiles := []string{} + s.BeforeDoWriteBack = func(path string, replaced *backuppb.Metadata) bool { + lock.Lock() + require.NotNil(t, replaced) + if len(replaced.GetFileGroups()) > 0 { + remainedFiles = append(remainedFiles, path) + for _, ds := range replaced.FileGroups { + remainedDataFiles = append(remainedDataFiles, ds.Path) + } + } else { + removedMetaFiles = append(removedMetaFiles, path) } + lock.Unlock() return false } - require.NoError(t, s.DoWriteBack(ctx, l)) - require.ElementsMatch(t, deletedFiles, []string{"v1/backupmeta/0000.meta", "v1/backupmeta/0001.meta", "v1/backupmeta/0002.meta"}) - require.ElementsMatch(t, modifiedFiles, []string{"v1/backupmeta/0003.meta"}) + + var total int64 = 0 + notDeleted, err := s.RemoveDataFilesAndUpdateMetadataInBatch(ctx, 17, l, func(num int64) { + lock.Lock() + total += num + lock.Unlock() + }) + require.NoError(t, err) + require.Equal(t, len(notDeleted), 0) + require.ElementsMatch(t, remainedFiles, []string{"v1/backupmeta/0003.meta"}) + require.ElementsMatch(t, removedMetaFiles, []string{"v1/backupmeta/0000.meta", "v1/backupmeta/0001.meta", "v1/backupmeta/0002.meta"}) + require.ElementsMatch(t, remainedDataFiles, []string{"0015_to_0017.log"}) + require.Equal(t, total, int64(15)) require.NoError(t, s.LoadFrom(ctx, l)) - s.IterateFilesFullyBefore(17, func(d *backuppb.DataFileGroup) (shouldBreak bool) { + s.IterateFilesFullyBefore(17, func(d *restore.FileGroupInfo) (shouldBreak bool) { t.Errorf("some of log files still not truncated, it is %#v", d) return true }) - l.WalkDir(ctx, &storage.WalkOption{ + err = l.WalkDir(ctx, &storage.WalkOption{ SubDir: stream.GetStreamBackupMetaPrefix(), }, func(s string, i int64) error { - require.NotContains(t, deletedFiles, s) + require.NotContains(t, removedMetaFiles, s) return nil }) + require.NoError(t, err) } func TestTruncateLogV2(t *testing.T) { @@ -190,42 +225,59 @@ func TestTruncateLogV2(t *testing.T) { } require.NoError(t, s.LoadFrom(ctx, l)) - fs := []*backuppb.DataFileGroup{} - s.IterateFilesFullyBefore(17, func(d *backuppb.DataFileGroup) (shouldBreak bool) { + fs := []*restore.FileGroupInfo{} + s.IterateFilesFullyBefore(17, func(d *restore.FileGroupInfo) (shouldBreak bool) { fs = append(fs, d) - require.Less(t, d.MaxTs, uint64(17)) + require.Less(t, d.MaxTS, uint64(17)) return false }) - require.Len(t, fs, 7) - - s.RemoveDataBefore(17) - deletedFiles := []string{} - modifiedFiles := []string{} - s.BeforeDoWriteBack = func(path string, last, current *backuppb.Metadata) bool { - require.NotNil(t, last) - if len(current.GetFileGroups()) == 0 { - deletedFiles = append(deletedFiles, path) - } else if len(current.GetFileGroups()) != len(last.GetFileGroups()) { - modifiedFiles = append(modifiedFiles, path) + require.Len(t, fs, 15) + + var lock sync.Mutex + remainedFiles := []string{} + remainedDataFiles := []string{} + removedMetaFiles := []string{} + s.BeforeDoWriteBack = func(path string, replaced *backuppb.Metadata) bool { + lock.Lock() + require.NotNil(t, replaced) + if len(replaced.GetFileGroups()) > 0 { + remainedFiles = append(remainedFiles, path) + for _, ds := range replaced.FileGroups { + remainedDataFiles = append(remainedDataFiles, ds.Path) + } + } else { + removedMetaFiles = append(removedMetaFiles, path) } + lock.Unlock() return false } - require.NoError(t, s.DoWriteBack(ctx, l)) - require.ElementsMatch(t, deletedFiles, []string{"v1/backupmeta/0000.meta", "v1/backupmeta/0001.meta", "v1/backupmeta/0002.meta"}) - require.ElementsMatch(t, modifiedFiles, []string{"v1/backupmeta/0003.meta"}) + + var total int64 = 0 + notDeleted, err := s.RemoveDataFilesAndUpdateMetadataInBatch(ctx, 17, l, func(num int64) { + lock.Lock() + total += num + lock.Unlock() + }) + require.NoError(t, err) + require.Equal(t, len(notDeleted), 0) + require.ElementsMatch(t, remainedFiles, []string{"v1/backupmeta/0003.meta"}) + require.ElementsMatch(t, removedMetaFiles, []string{"v1/backupmeta/0000.meta", "v1/backupmeta/0001.meta", "v1/backupmeta/0002.meta"}) + require.ElementsMatch(t, remainedDataFiles, []string{"0015_to_0017.log"}) + require.Equal(t, total, int64(15)) require.NoError(t, s.LoadFrom(ctx, l)) - s.IterateFilesFullyBefore(17, func(d *backuppb.DataFileGroup) (shouldBreak bool) { + s.IterateFilesFullyBefore(17, func(d *restore.FileGroupInfo) (shouldBreak bool) { t.Errorf("some of log files still not truncated, it is %#v", d) return true }) - l.WalkDir(ctx, &storage.WalkOption{ + err = l.WalkDir(ctx, &storage.WalkOption{ SubDir: stream.GetStreamBackupMetaPrefix(), }, func(s string, i int64) error { - require.NotContains(t, deletedFiles, s) + require.NotContains(t, removedMetaFiles, s) return nil }) + require.NoError(t, err) } func TestTruncateSafepoint(t *testing.T) { @@ -425,52 +477,1835 @@ func fakeMetaDataV2s(t *testing.T, helper *stream.MetadataHelper, cf string) []* return m2s } +func ff(minTS, maxTS uint64) *backuppb.DataFileGroup { + return f(0, minTS, maxTS, stream.DefaultCF, 0) +} + +func TestReplaceMetadataTs(t *testing.T) { + m := &backuppb.Metadata{} + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{ + ff(1, 3), + ff(4, 5), + }) + require.Equal(t, m.MinTs, uint64(1)) + require.Equal(t, m.MaxTs, uint64(5)) + + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{ + ff(1, 4), + ff(3, 5), + }) + require.Equal(t, m.MinTs, uint64(1)) + require.Equal(t, m.MaxTs, uint64(5)) + + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{ + ff(1, 6), + ff(0, 5), + }) + require.Equal(t, m.MinTs, uint64(0)) + require.Equal(t, m.MaxTs, uint64(6)) + + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{ + ff(1, 3), + }) + require.Equal(t, m.MinTs, uint64(1)) + require.Equal(t, m.MaxTs, uint64(3)) + + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{}) + require.Equal(t, m.MinTs, uint64(0)) + require.Equal(t, m.MaxTs, uint64(0)) + + restore.ReplaceMetadata(m, []*backuppb.DataFileGroup{ + ff(1, 3), + ff(2, 4), + ff(0, 2), + }) + require.Equal(t, m.MinTs, uint64(0)) + require.Equal(t, m.MaxTs, uint64(4)) +} + +func m(storeId int64, minTS, maxTS uint64) *backuppb.Metadata { + return &backuppb.Metadata{ + StoreId: storeId, + MinTs: minTS, + MaxTs: maxTS, + MetaVersion: backuppb.MetaVersion_V2, + } +} + +func f(storeId int64, minTS, maxTS uint64, cf string, defaultTS uint64) *backuppb.DataFileGroup { + return &backuppb.DataFileGroup{ + Path: logName(storeId, minTS, maxTS), + DataFilesInfo: []*backuppb.DataFileInfo{ + { + NumberOfEntries: 1, + MinTs: minTS, + MaxTs: maxTS, + Cf: cf, + MinBeginTsInDefaultCf: defaultTS, + }, + }, + MinTs: minTS, + MaxTs: maxTS, + } +} + +// get the metadata with only one datafilegroup +func m_1(storeId int64, minTS, maxTS uint64, cf string, defaultTS uint64) *backuppb.Metadata { + meta := m(storeId, minTS, maxTS) + meta.FileGroups = []*backuppb.DataFileGroup{ + f(storeId, minTS, maxTS, cf, defaultTS), + } + return meta +} + +// get the metadata with 2 datafilegroup +func m_2( + storeId int64, + minTSL, maxTSL uint64, cfL string, defaultTSL uint64, + minTSR, maxTSR uint64, cfR string, defaultTSR uint64, +) *backuppb.Metadata { + meta := m(storeId, minTSL, maxTSR) + meta.FileGroups = []*backuppb.DataFileGroup{ + f(storeId, minTSL, maxTSL, cfL, defaultTSL), + f(storeId, minTSR, maxTSR, cfR, defaultTSR), + } + return meta +} + +// clean the files in the external storage +func cleanFiles(ctx context.Context, s storage.ExternalStorage) error { + names := make([]string, 0) + err := s.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { + names = append(names, path) + return nil + }) + if err != nil { + return err + } + for _, path := range names { + err := s.DeleteFile(ctx, path) + if err != nil { + return err + } + } + return nil +} + +func metaName(storeId int64) string { + return fmt.Sprintf("%s/%04d.meta", stream.GetStreamBackupMetaPrefix(), storeId) +} + +func logName(storeId int64, minTS, maxTS uint64) string { + return fmt.Sprintf("%04d_%04d_%04d.log", storeId, minTS, maxTS) +} + +// generate the files to the external storage +func generateFiles(ctx context.Context, s storage.ExternalStorage, metas []*backuppb.Metadata, tmpDir string) error { + if err := cleanFiles(ctx, s); err != nil { + return err + } + fname := path.Join(tmpDir, stream.GetStreamBackupMetaPrefix()) + os.MkdirAll(fname, 0777) + for _, meta := range metas { + data, err := meta.Marshal() + if err != nil { + return err + } + + fname := metaName(meta.StoreId) + err = s.WriteFile(ctx, fname, data) + if err != nil { + return err + } + + for _, group := range meta.FileGroups { + fname := logName(meta.StoreId, group.MinTs, group.MaxTs) + err = s.WriteFile(ctx, fname, []byte("test")) + if err != nil { + return err + } + } + } + + return nil +} + +// check the files in the external storage +func checkFiles(ctx context.Context, s storage.ExternalStorage, metas []*backuppb.Metadata, t *testing.T) { + pathSet := make(map[string]struct{}) + for _, meta := range metas { + metaPath := metaName(meta.StoreId) + pathSet[metaPath] = struct{}{} + exists, err := s.FileExists(ctx, metaPath) + require.NoError(t, err) + require.True(t, exists) + + data, err := s.ReadFile(ctx, metaPath) + require.NoError(t, err) + metaRead := &backuppb.Metadata{} + err = metaRead.Unmarshal(data) + require.NoError(t, err) + require.Equal(t, meta.MinTs, metaRead.MinTs) + require.Equal(t, meta.MaxTs, metaRead.MaxTs) + for i, group := range meta.FileGroups { + require.Equal(t, metaRead.FileGroups[i].Path, group.Path) + logPath := logName(meta.StoreId, group.MinTs, group.MaxTs) + pathSet[logPath] = struct{}{} + exists, err := s.FileExists(ctx, logPath) + require.NoError(t, err) + require.True(t, exists) + } + } + + err := s.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { + _, exists := pathSet[path] + require.True(t, exists, path) + return nil + }) + require.NoError(t, err) +} + +type testParam struct { + until []uint64 + shiftUntilTS uint64 + restMetadata []*backuppb.Metadata +} + +func TestTruncate1(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + s, err := storage.NewLocalStorage(tmpDir) + require.NoError(t, err) + + cases := []struct { + metas []*backuppb.Metadata + testParams []*testParam + }{ + { + // metadata 10-----------20 + // ↑ ↑ + // +-----------+ + // ↓ ↓ + // filegroup 10-----d-----20 + metas: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + }, + testParams: []*testParam{ + { + until: []uint64{5}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + }, + }, { + until: []uint64{10}, + shiftUntilTS: 10, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + }, + }, { + until: []uint64{15}, + shiftUntilTS: 15, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + }, + }, { + until: []uint64{20}, + shiftUntilTS: 20, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // ↑ ↑ + // +-----------+ + // ↓ ↓ + // filegroup 5-d--10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 7, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 5----8 10-----------20 + // ↑ ↑ ↑ ↑ + // +----+ +-----------+ + // ↓ ↓ ↓ ↓ + // filegroup 5--d-8 ↓ ↓ + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 5, 8, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 8, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 9, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 8, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 5------10 ↑ + // ↑ ↑ ↑ + // +-------+-----------+ + // ↓ ↓ ↓ + // filegroup 5--d---10 ↓ + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 5, 10, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 10, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 9, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 10, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 5-------↑-12 ↑ + // ↑ ↑ ↑ ↑ + // +-------+-+---------+ + // ↓ ↓ ↓ ↓ + // filegroup 5--d----↓-12 ↓ + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 5, 12, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 12, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 9, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 12, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 5-------↑-----------20 + // ↑ ↑ ↑ + // +-------+-----------+ + // ↓ ↓ ↓ + // filegroup 5--d----↓-----------20 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 5, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 5-------↑-----------↑--22 + // ↑ ↑ ↑ ↑ + // +-------+-----------+--+ + // ↓ ↓ ↓ ↓ + // filegroup 5--d----↓-----------↓--22 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 5, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 15, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{21}, + shiftUntilTS: 21, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{22}, + shiftUntilTS: 22, restMetadata: []*backuppb.Metadata{ + m_1(1, 5, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 10---14 ↑ + // ↑ ↑ ↑ + // +----+-------+ + // ↓ ↓ ↓ + // filegroup 10-d-14 ↓ + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 10, 14, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 14, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 12, 14, 18, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 14, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 10-----------20 + // ↑ ↑ + // +------------+ + // ↓ ↓ + // filegroup 10----d------20 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata 10------------↑--22 + // ↑ ↑ ↑ + // +------------+---+ + // ↓ ↓ ↓ + // filegroup 10----d-------↓--22 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 10, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{21}, + shiftUntilTS: 21, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{22}, + shiftUntilTS: 22, restMetadata: []*backuppb.Metadata{ + m_1(1, 10, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata ↑ 12-----18 ↑ + // ↑ ↑ ↑ ↑ + // +--+------+--+ + // ↓ ↓ ↓ ↓ + // filegroup ↓ 12--d--18 ↓ + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 12, 18, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 12, 18, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 11, 12, 15, 18, 19, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 12, 18, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata ↑ 14----20 + // ↑ ↑ ↑ + // +------+-----+ + // ↓ ↓ ↓ + // filegroup ↓ 14--d-20 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 14, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 20, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata ↑ 14-----↑--22 + // ↑ ↑ ↑ ↑ + // +------+-----+---+ + // ↓ ↓ ↓ ↓ + // filegroup ↓ 14-d--↓--22 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 14, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{21}, + shiftUntilTS: 21, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{22}, + shiftUntilTS: 22, restMetadata: []*backuppb.Metadata{ + m_1(1, 14, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata ↑ 20--22 + // ↑ ↑ ↑ + // +------------+---+ + // ↓ ↓ ↓ + // filegroup ↓ 20--22 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 20, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 20, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 20, 22, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{21}, + shiftUntilTS: 21, restMetadata: []*backuppb.Metadata{ + m_1(1, 20, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{22}, + shiftUntilTS: 22, restMetadata: []*backuppb.Metadata{ + m_1(1, 20, 22, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 10-----------20 + // metadata ↑ ↑ 21---24 + // ↑ ↑ ↑ ↑ + // +------------+--+----+ + // ↓ ↓ ↓ ↓ + // filegroup ↓ ↓ 21-d-24 + // filegroup 5--d---10-----w-----20 + metas: []*backuppb.Metadata{ + m_1(1, 21, 24, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + testParams: []*testParam{ + { + until: []uint64{3}, + shiftUntilTS: 3, restMetadata: []*backuppb.Metadata{ + m_1(1, 21, 24, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{5, 8, 10, 14, 20}, + shiftUntilTS: 5, restMetadata: []*backuppb.Metadata{ + m_1(1, 21, 24, stream.DefaultCF, 0), + m_1(2, 10, 20, stream.WriteCF, 5), + }, + }, { + until: []uint64{21}, + shiftUntilTS: 21, restMetadata: []*backuppb.Metadata{ + m_1(1, 21, 24, stream.DefaultCF, 0), + }, + }, { + until: []uint64{22}, + shiftUntilTS: 22, restMetadata: []*backuppb.Metadata{ + m_1(1, 21, 24, stream.DefaultCF, 0), + }, + }, { + until: []uint64{25}, + shiftUntilTS: 25, restMetadata: []*backuppb.Metadata{}, + }, + }, + }, + } + + for i, cs := range cases { + for j, ts := range cs.testParams { + for _, until := range ts.until { + t.Logf("case %d, param %d, until %d", i, j, until) + metas := restore.StreamMetadataSet{ + Helper: stream.NewMetadataHelper(), + } + err := generateFiles(ctx, s, cs.metas, tmpDir) + require.NoError(t, err) + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) + require.NoError(t, err) + require.Equal(t, shiftUntilTS, ts.shiftUntilTS) + n, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, s, func(num int64) {}) + require.Equal(t, len(n), 0) + require.NoError(t, err) + + // check the result + checkFiles(ctx, s, ts.restMetadata, t) + } + } + } +} + +type testParam2 struct { + until []uint64 + shiftUntilTS func(uint64) uint64 + restMetadata []*backuppb.Metadata +} + +func returnV(v uint64) func(uint64) uint64 { + return func(uint64) uint64 { + return v + } +} + +func returnSelf() func(uint64) uint64 { + return func(u uint64) uint64 { + return u + } +} + +func TestTruncate2(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + s, err := storage.NewLocalStorage(tmpDir) + require.NoError(t, err) + + cases := []struct { + metas []*backuppb.Metadata + testParams []*testParam2 + }{ + { + // metadata 10-----------20 + // ↑ ↑ + // +-----------+ + // ↓ ↓ ↓ ↓ + // filegroup 10-d-13 ↓ ↓ + // filegroup 8----d--15-w-20 + metas: []*backuppb.Metadata{ + m_2(1, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{5}, + shiftUntilTS: returnV(5), restMetadata: []*backuppb.Metadata{ + m_2(1, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{8, 9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(8), restMetadata: []*backuppb.Metadata{ + m_2(1, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +-----------+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3 6 10-d-13 ↓ ↓ + // filegroup 1-----------d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 1, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{0}, + shiftUntilTS: returnV(0), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{1, 2, 3, 4, 6, 9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(1), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +-----------+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3 6 10-d-13 ↓ ↓ + // filegroup 3----------d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 3, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2}, + shiftUntilTS: returnV(2), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 3, + ), + }, + }, { + until: []uint64{3, 4, 6, 9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(3), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 3, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---7 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3 7 10-d-13 ↓ ↓ + // filegroup 5--------d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 5, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 5, + ), + }, + }, { + until: []uint64{5, 6, 7, 9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(5), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 5, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---7 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3 7 10-d-13 ↓ ↓ + // filegroup 7------d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 7, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6, 7}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 7, + ), + }, + }, { + until: []uint64{9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(7), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 7, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 7, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 10-d-13 ↓ ↓ + // filegroup 8----d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{7}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{8, 9, 10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(8), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 10-d-13 ↓ ↓ + // filegroup 10--d--15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 10, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 10, + ), + }, + }, { + until: []uint64{7, 8, 9}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 10, + ), + }, + }, { + until: []uint64{10, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(10), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 10, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 9-d-13 ↓ ↓ + // filegroup 11-d-15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 9, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 11, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 9, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 11, + ), + }, + }, { + until: []uint64{7, 8, 9, 10}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 9, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 11, + ), + }, + }, { + until: []uint64{11, 12, 13, 14, 15, 18, 20}, + shiftUntilTS: returnV(11), restMetadata: []*backuppb.Metadata{ + m_2(2, + 9, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 11, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+-+----+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 10-d-13 ↓ ↓ + // filegroup 13d15-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 13, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 13, + ), + }, + }, { + until: []uint64{7, 8, 9, 10, 12}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 13, + ), + }, + }, { + until: []uint64{13, 14, 15, 18, 20}, + shiftUntilTS: returnV(13), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 13, stream.DefaultCF, 0, + 15, 20, stream.WriteCF, 13, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 10-d-12 ↓ ↓ + // filegroup 14d16-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{7, 8, 9, 10, 11, 12}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{13}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(2, 16, 20, stream.WriteCF, 14), + }, + }, { + until: []uint64{14, 15, 18, 20}, + shiftUntilTS: returnV(14), restMetadata: []*backuppb.Metadata{ + m_1(2, 16, 20, stream.WriteCF, 14), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 3---6 10----------20 + // ↑ ↑ ↑ ↑ + // +---+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3-d-6 10-d-12 ↓ ↓ + // filegroup 14d16-w-20 + metas: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2, 3, 4, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, 3, 6, stream.DefaultCF, 0), + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{7, 8, 9, 10, 11, 12}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(2, + 10, 12, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{13}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(2, 16, 20, stream.WriteCF, 14), + }, + }, { + until: []uint64{14, 15, 18, 20}, + shiftUntilTS: returnV(14), restMetadata: []*backuppb.Metadata{ + m_1(2, 16, 20, stream.WriteCF, 14), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, + } + + for i, cs := range cases { + for j, ts := range cs.testParams { + for _, until := range ts.until { + t.Logf("case %d, param %d, until %d", i, j, until) + metas := restore.StreamMetadataSet{ + Helper: stream.NewMetadataHelper(), + } + err := generateFiles(ctx, s, cs.metas, tmpDir) + require.NoError(t, err) + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) + require.NoError(t, err) + require.Equal(t, shiftUntilTS, ts.shiftUntilTS(until)) + n, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, s, func(num int64) {}) + require.Equal(t, len(n), 0) + require.NoError(t, err) + + // check the result + checkFiles(ctx, s, ts.restMetadata, t) + } + } + } +} + +func TestTruncate3(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + s, err := storage.NewLocalStorage(tmpDir) + require.NoError(t, err) + + cases := []struct { + metas []*backuppb.Metadata + testParams []*testParam2 + }{ + { + // metadata 3------10 12----------20 + // ↑ ↑ ↑ ↑ + // +-+--+--+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 3--d-7 ↓ ↓ ↓ ↓ ↓ + // filegroup 5--d-10 ↓ ↓ ↓ ↓ + // filegroup 3----d-----12---w--18 ↓ + // filegroup 5----d--------15--w--20 + metas: []*backuppb.Metadata{ + m_2(1, + 3, 7, stream.DefaultCF, 0, + 5, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 3, + 15, 20, stream.WriteCF, 5, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{2}, + shiftUntilTS: returnV(2), restMetadata: []*backuppb.Metadata{ + m_2(1, + 3, 7, stream.DefaultCF, 0, + 5, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 3, + 15, 20, stream.WriteCF, 5, + ), + }, + }, { + until: []uint64{3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 18}, + shiftUntilTS: returnV(3), restMetadata: []*backuppb.Metadata{ + m_2(1, + 3, 7, stream.DefaultCF, 0, + 5, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 3, + 15, 20, stream.WriteCF, 5, + ), + }, + }, { + until: []uint64{19, 20}, + shiftUntilTS: returnV(5), restMetadata: []*backuppb.Metadata{ + m_2(1, + 3, 7, stream.DefaultCF, 0, + 5, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 3, + 15, 20, stream.WriteCF, 5, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 2------10 12----------20 + // ↑ ↑ ↑ ↑ + // +-+--+--+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 2--d-6 ↓ ↓ ↓ ↓ ↓ + // filegroup 4--d-10 ↓ ↓ ↓ ↓ + // filegroup 2----d-----12---w--18 ↓ + // filegroup 8---d----15--w--20 + metas: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 2, + 15, 20, stream.WriteCF, 8, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{1}, + shiftUntilTS: returnV(1), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 2, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{2, 3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 18}, + shiftUntilTS: returnV(2), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 2, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{19, 20}, + shiftUntilTS: returnV(8), restMetadata: []*backuppb.Metadata{ + m_1(1, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 12, 18, stream.WriteCF, 2, + 15, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 2------10 14----------20 + // ↑ ↑ ↑ ↑ + // +-+--+--+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 2--d-6 ↓ ↓ ↓ ↓ ↓ + // filegroup 4--d-10 ↓ ↓ ↓ ↓ + // filegroup 2----d-------14---w--18 ↓ + // filegroup 12---d--16--w--20 + metas: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 2, + 16, 20, stream.WriteCF, 12, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{1}, + shiftUntilTS: returnV(1), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 2, + 16, 20, stream.WriteCF, 12, + ), + }, + }, { + until: []uint64{2, 3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 18}, + shiftUntilTS: returnV(2), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 4, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 2, + 16, 20, stream.WriteCF, 12, + ), + }, + }, { + until: []uint64{19, 20}, + shiftUntilTS: returnV(12), restMetadata: []*backuppb.Metadata{ + m_2(2, + 14, 18, stream.WriteCF, 2, + 16, 20, stream.WriteCF, 8, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 2-------10 14----------20 + // ↑ ↑ ↑ ↑ + // +-+--+---+ +----+--+---+ + // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 2--d-6 ↓ ↓ ↓ ↓ ↓ + // filegroup 4-d-8w10 ↓ ↓ ↓ ↓ + // filegroup 14--d---18 ↓ + // filegroup 14-d--16-w--20 + metas: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.WriteCF, 4, + ), + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{1}, + shiftUntilTS: returnV(1), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.WriteCF, 4, + ), + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{2, 3}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.WriteCF, 4, + ), + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{4, 5, 6, 7, 8, 9, 10}, + shiftUntilTS: returnV(4), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.WriteCF, 4, + ), + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{12}, + shiftUntilTS: returnV(12), restMetadata: []*backuppb.Metadata{ + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{14, 15, 16, 17, 18, 19, 20}, + shiftUntilTS: returnV(14), restMetadata: []*backuppb.Metadata{ + m_2(2, + 14, 18, stream.DefaultCF, 0, + 16, 20, stream.WriteCF, 14, + ), + }, + }, { + until: []uint64{25}, + shiftUntilTS: returnV(25), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, { + // metadata 2-------10 14----------22 24-w-26 + // ↑ ↑ ↑ ↑ ↑ ↑ + // +-+--+---+ +----+--+---+ +----+ + // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 2--d-6 ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 8d10 ↓ ↓ ↓ ↓ ↓ ↓ + // filegroup 9--d--14--w---18 ↓ ↓ ↓ + // filegroup 16-d--22 ↓ ↓ + // filegroup 20---d-24-w-26 + metas: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 9, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + testParams: []*testParam2{ + { + until: []uint64{1, 2, 3, 6}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_2(1, + 2, 6, stream.DefaultCF, 0, + 8, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 9, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + }, { + until: []uint64{7, 8}, + shiftUntilTS: returnSelf(), restMetadata: []*backuppb.Metadata{ + m_1(1, + 8, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 9, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + }, { + until: []uint64{9, 10, 11, 14, 15, 16, 17, 18}, + shiftUntilTS: returnV(9), restMetadata: []*backuppb.Metadata{ + m_1(1, + 8, 10, stream.DefaultCF, 0, + ), + m_2(2, + 14, 18, stream.WriteCF, 9, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + }, { + until: []uint64{19}, + shiftUntilTS: returnV(19), restMetadata: []*backuppb.Metadata{ + m_1(2, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + }, { + until: []uint64{20, 21, 22, 23, 24, 25, 26}, + shiftUntilTS: returnV(20), restMetadata: []*backuppb.Metadata{ + m_1(2, + 16, 22, stream.DefaultCF, 0, + ), + m_1(3, + 24, 26, stream.WriteCF, 20, + ), + }, + }, { + until: []uint64{28}, + shiftUntilTS: returnV(28), restMetadata: []*backuppb.Metadata{}, + }, + }, + }, + } + + for i, cs := range cases { + for j, ts := range cs.testParams { + for _, until := range ts.until { + t.Logf("case %d, param %d, until %d", i, j, until) + metas := restore.StreamMetadataSet{ + Helper: stream.NewMetadataHelper(), + } + err := generateFiles(ctx, s, cs.metas, tmpDir) + require.NoError(t, err) + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) + require.NoError(t, err) + require.Equal(t, shiftUntilTS, ts.shiftUntilTS(until)) + n, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, s, func(num int64) {}) + require.Equal(t, len(n), 0) + require.NoError(t, err) + + // check the result + checkFiles(ctx, s, ts.restMetadata, t) + } + } + } +} + +type testParam3 struct { + until []uint64 + shiftUntilTS func(uint64) uint64 +} + +func fi(minTS, maxTS uint64, cf string, defaultTS uint64) *backuppb.DataFileInfo { + return &backuppb.DataFileInfo{ + NumberOfEntries: 1, + MinTs: minTS, + MaxTs: maxTS, + Cf: cf, + MinBeginTsInDefaultCf: defaultTS, + } +} + +func getTsFromFiles(files []*backuppb.DataFileInfo) (uint64, uint64, uint64) { + if len(files) == 0 { + return 0, 0, 0 + } + f := files[0] + minTs, maxTs, resolvedTs := f.MinTs, f.MaxTs, f.ResolvedTs + for _, file := range files { + if file.MinTs < minTs { + minTs = file.MinTs + } + if file.MaxTs > maxTs { + maxTs = file.MaxTs + } + if file.ResolvedTs < resolvedTs { + resolvedTs = file.ResolvedTs + } + } + return minTs, maxTs, resolvedTs +} + +func mf(id int64, filess [][]*backuppb.DataFileInfo) *backuppb.Metadata { + filegroups := make([]*backuppb.DataFileGroup, 0) + for _, files := range filess { + minTs, maxTs, resolvedTs := getTsFromFiles(files) + filegroups = append(filegroups, &backuppb.DataFileGroup{ + DataFilesInfo: files, + MinTs: minTs, + MaxTs: maxTs, + MinResolvedTs: resolvedTs, + }) + } + + m := &backuppb.Metadata{ + StoreId: id, + MetaVersion: backuppb.MetaVersion_V2, + } + restore.ReplaceMetadata(m, filegroups) + return m +} + func TestCalculateShiftTS(t *testing.T) { - var ( - startTs uint64 = 2900 - restoreTS uint64 = 4500 - ) - - helper := stream.NewMetadataHelper() - ms := fakeMetaDatas(t, helper, stream.WriteCF) - shiftTS, exist := restore.CalculateShiftTS(ms, startTs, restoreTS) - require.Equal(t, shiftTS, uint64(2000)) - require.Equal(t, exist, true) - - shiftTS, exist = restore.CalculateShiftTS(ms, startTs, mathutil.MaxUint) - require.Equal(t, shiftTS, uint64(1800)) - require.Equal(t, exist, true) - - shiftTS, exist = restore.CalculateShiftTS(ms, 1999, 3001) - require.Equal(t, shiftTS, uint64(800)) - require.Equal(t, exist, true) - - ms = fakeMetaDatas(t, helper, stream.DefaultCF) - _, exist = restore.CalculateShiftTS(ms, startTs, restoreTS) - require.Equal(t, exist, false) -} - -func TestCalculateShiftTSV2(t *testing.T) { - var ( - startTs uint64 = 2900 - restoreTS uint64 = 5100 - ) - - helper := stream.NewMetadataHelper() - ms := fakeMetaDataV2s(t, helper, stream.WriteCF) - shiftTS, exist := restore.CalculateShiftTS(ms, startTs, restoreTS) - require.Equal(t, shiftTS, uint64(1800)) - require.Equal(t, exist, true) - - shiftTS, exist = restore.CalculateShiftTS(ms, startTs, mathutil.MaxUint) - require.Equal(t, shiftTS, uint64(1700)) - require.Equal(t, exist, true) - - shiftTS, exist = restore.CalculateShiftTS(ms, 1999, 3001) - require.Equal(t, shiftTS, uint64(800)) - require.Equal(t, exist, true) - - ms = fakeMetaDataV2s(t, helper, stream.DefaultCF) - _, exist = restore.CalculateShiftTS(ms, startTs, restoreTS) - require.Equal(t, exist, false) + ctx := context.Background() + tmpDir := t.TempDir() + s, err := storage.NewLocalStorage(tmpDir) + require.NoError(t, err) + + cases := []struct { + metas []*backuppb.Metadata + testParams []*testParam3 + }{ + { + // filegroup 10 35 + // ↑ ↑ + // +----+-++---+ + // ↓ ↓ ↓↓ ↓ + // fileinfo 10-d-20 + // fileinfo 8--d-15--w-30 + // fileinfo 11-d-25-w-35 + metas: []*backuppb.Metadata{ + mf(1, [][]*backuppb.DataFileInfo{ + { + fi(10, 20, stream.DefaultCF, 0), + fi(15, 30, stream.WriteCF, 8), + fi(25, 35, stream.WriteCF, 11), + }, + }), + }, + testParams: []*testParam3{ + { + until: []uint64{3}, + shiftUntilTS: returnV(3), + }, { + until: []uint64{8, 9, 10, 11, 12, 15, 16, 20, 21, 25, 26, 30}, + shiftUntilTS: returnV(8), + }, { + until: []uint64{31, 35}, + shiftUntilTS: returnV(11), + }, { + until: []uint64{36}, + shiftUntilTS: returnV(36), + }, + }, + }, { + // filegroup 50 85 + // ↑ ↑ + // +-+-+--+--+------+ + // ↓ ↓ ↓ ↓ ↓ ↓ + // fileinfo 55-d-65-70 + // fileinfo 50-d60 + // fileinfo 72d80w85 + metas: []*backuppb.Metadata{ + mf(1, [][]*backuppb.DataFileInfo{ + { + fi(65, 70, stream.WriteCF, 55), + fi(50, 60, stream.DefaultCF, 0), + fi(80, 85, stream.WriteCF, 72), + }, + }), + }, + testParams: []*testParam3{ + { + until: []uint64{45, 50, 52}, + shiftUntilTS: returnSelf(), + }, { + until: []uint64{55, 56, 60, 61, 65, 66, 70}, + shiftUntilTS: returnV(55), + }, { + until: []uint64{71}, + shiftUntilTS: returnV(71), + }, { + until: []uint64{72, 73, 80, 81, 85}, + shiftUntilTS: returnV(72), + }, { + until: []uint64{86}, + shiftUntilTS: returnV(86), + }, + }, + }, { + // filegroup 10 35 50 85 + // ↑ ↑ ↑ ↑ + // +----+-++---+ +-+-+--+--+------+ + // ↓ ↓ ↓↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + // fileinfo 10-d-20 55-d-65-70 + // fileinfo 8--d-15--w-30 50-d60 + // fileinfo 11-d-25-w-35 72d80w85 + metas: []*backuppb.Metadata{ + mf(1, [][]*backuppb.DataFileInfo{ + { + fi(10, 20, stream.DefaultCF, 0), + fi(15, 30, stream.WriteCF, 8), + fi(25, 35, stream.WriteCF, 11), + }, + }), + mf(2, [][]*backuppb.DataFileInfo{ + { + fi(65, 70, stream.WriteCF, 55), + fi(50, 60, stream.DefaultCF, 0), + fi(80, 85, stream.WriteCF, 72), + }, + }), + }, + testParams: []*testParam3{ + { + until: []uint64{3}, + shiftUntilTS: returnV(3), + }, { + until: []uint64{8, 9, 10, 11, 12, 15, 16, 20, 21, 25, 26, 30}, + shiftUntilTS: returnV(8), + }, { + until: []uint64{31, 35}, + shiftUntilTS: returnV(11), + }, { + until: []uint64{36}, + shiftUntilTS: returnV(36), + }, { + until: []uint64{45, 50, 52}, + shiftUntilTS: returnSelf(), + }, { + until: []uint64{55, 56, 60, 61, 65, 66, 70}, + shiftUntilTS: returnV(55), + }, { + until: []uint64{71}, + shiftUntilTS: returnV(71), + }, { + until: []uint64{72, 73, 80, 81, 85}, + shiftUntilTS: returnV(72), + }, { + until: []uint64{86}, + shiftUntilTS: returnV(86), + }, + }, + }, + } + + for i, cs := range cases { + for j, ts := range cs.testParams { + for _, until := range ts.until { + t.Logf("case %d, param %d, until %d", i, j, until) + metas := restore.StreamMetadataSet{ + Helper: stream.NewMetadataHelper(), + } + err := generateFiles(ctx, s, cs.metas, tmpDir) + require.NoError(t, err) + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) + require.NoError(t, err) + require.Equal(t, shiftUntilTS, ts.shiftUntilTS(until), cs.metas) + } + } + } } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 727cd8143e6b2..fe477a0cf8871 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -895,31 +895,24 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) metas := restore.StreamMetadataSet{ Helper: stream.NewMetadataHelper(), - BeforeDoWriteBack: func(path string, last, current *backuppb.Metadata) (skip bool) { - log.Info("Updating metadata.", zap.String("file", path), - zap.Int("data-file-before", len(last.GetFileGroups())), - zap.Int("data-file-after", len(current.GetFileGroups()))) - return cfg.DryRun - }, + DryRun: cfg.DryRun, } - if err := metas.LoadUntil(ctx, storage, cfg.Until); err != nil { + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until) + if err != nil { return err } readMetaDone() var ( - fileCount uint64 = 0 - kvCount int64 = 0 - totalSize uint64 = 0 - shiftUntilTS = metas.CalculateShiftTS(cfg.Until) + fileCount int = 0 + kvCount int64 = 0 + totalSize uint64 = 0 ) - metas.IterateFilesFullyBefore(shiftUntilTS, func(d *backuppb.DataFileGroup) (shouldBreak bool) { + metas.IterateFilesFullyBefore(shiftUntilTS, func(d *restore.FileGroupInfo) (shouldBreak bool) { fileCount++ totalSize += d.Length - for _, f := range d.DataFilesInfo { - kvCount += f.NumberOfEntries - } + kvCount += d.KVCount return }) console.Printf("We are going to remove %s files, until %s.\n", @@ -937,40 +930,33 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre } } - removed := metas.RemoveDataBefore(shiftUntilTS) - - // remove log + // begin to remove clearDataFileDone := console.ShowTask( "Clearing data files... ", glue.WithTimeCost(), glue.WithConstExtraField("kv-count", kvCount), glue.WithConstExtraField("kv-size", fmt.Sprintf("%d(%s)", totalSize, units.HumanSize(float64(totalSize)))), ) - worker := utils.NewWorkerPool(128, "delete files") - wg := new(sync.WaitGroup) - for _, f := range removed { - if !cfg.DryRun { - wg.Add(1) - finalFile := f - worker.Apply(func() { - defer wg.Done() - if err := storage.DeleteFile(ctx, finalFile.Path); err != nil { - log.Warn("File not deleted.", zap.String("path", finalFile.Path), logutil.ShortError(err)) - console.Print("\n"+em(finalFile.Path), "not deleted, you may clear it manually:", warn(err)) - } - }) - } + + notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, storage, func(int64) {}) + if err != nil { + return err } - wg.Wait() - clearDataFileDone() - // remove metadata - removeMetaDone := console.ShowTask("Removing metadata... ", glue.WithTimeCost()) - if !cfg.DryRun { - if err := metas.DoWriteBack(ctx, storage); err != nil { - return err + if len(notDeleted) > 0 { + const keepFirstNFailure = 16 + console.Println("Files below are not deleted due to error, you may clear it manually, check log for detail error:") + console.Println("- Total", em(len(notDeleted)), "items.") + if len(notDeleted) > keepFirstNFailure { + console.Println("-", em(len(notDeleted)-keepFirstNFailure), "items omitted.") + // TODO: maybe don't add them at the very first. + notDeleted = notDeleted[:keepFirstNFailure] + } + for _, f := range notDeleted { + console.Println(f) } } - removeMetaDone() + clearDataFileDone() + return nil }