Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: restore meta kv with batch method (#37100) #37140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 182 additions & 45 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,14 +1771,6 @@ func (rc *Client) ReadStreamDataFiles(
}
}

// sort files firstly.
slices.SortFunc(mFiles, func(i, j *backuppb.DataFileInfo) bool {
if i.ResolvedTs > 0 && j.ResolvedTs > 0 {
return i.ResolvedTs < j.ResolvedTs
} else {
return i.MaxTs < j.MaxTs
}
})
return dFiles, mFiles, nil
}

Expand Down Expand Up @@ -2000,6 +1992,31 @@ func (rc *Client) InitSchemasReplaceForDDL(
return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
slices.SortFunc(files, func(i, j *backuppb.DataFileInfo) bool {
if i.GetMinTs() < j.GetMinTs() {
return true
} else if i.GetMinTs() > j.GetMinTs() {
return false
}

if i.GetMaxTs() < j.GetMaxTs() {
return true
} else if i.GetMaxTs() > j.GetMaxTs() {
return false
}

if i.GetResolvedTs() < j.GetResolvedTs() {
return true
} else if i.GetResolvedTs() > j.GetResolvedTs() {
return false
}

return true
})
return files
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFiles(
ctx context.Context,
Expand All @@ -2008,7 +2025,10 @@ func (rc *Client) RestoreMetaKVFiles(
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
// sort files firstly.
files = SortMetaKVFiles(files)
filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files))
filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files))

// The k-v events in default CF should be restored firstly. The reason is that:
// The error of transactions of meta could happen if restore write CF events successfully,
Expand All @@ -2018,30 +2038,39 @@ func (rc *Client) RestoreMetaKVFiles(
filesInWriteCF = append(filesInWriteCF, f)
continue
}

if f.Type == backuppb.FileType_Delete {
// this should happen abnormally.
// only do some preventive checks here.
log.Warn("detected delete file of meta key, skip it", zap.Any("file", f))
continue
}

kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
if f.Cf == stream.DefaultCF {
filesInDefaultCF = append(filesInDefaultCF, f)
}
updateStats(kvCount, size)
progressInc()
}

// Restore files in default CF.
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInDefaultCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Restore files in write CF.
for _, f := range filesInWriteCF {
kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
}
updateStats(kvCount, size)
progressInc()
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInWriteCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Update global schema version and report all of TiDBs.
Expand All @@ -2051,41 +2080,128 @@ func (rc *Client) RestoreMetaKVFiles(
return nil
}

// RestoreMetaKVFile tries to restore a file about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFile(
func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
ctx context.Context,
file *backuppb.DataFileInfo,
sr *stream.SchemasReplace,
) (uint64, uint64, error) {
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
restoreBatch func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error,
) error {
var (
kvCount uint64
size uint64
rangeMin uint64
rangeMax uint64
idx int
)
log.Info("restore meta kv events", zap.String("file", file.Path),
zap.String("cf", file.Cf), zap.Int64("kv-count", file.NumberOfEntries),
zap.Uint64("min-ts", file.MinTs), zap.Uint64("max-ts", file.MaxTs))
for i, f := range files {
if i == 0 {
idx = i
rangeMax = f.MaxTs
rangeMin = f.MinTs
} else {
if f.MinTs <= rangeMax {
rangeMin = mathutil.Min(rangeMin, f.MinTs)
rangeMax = mathutil.Max(rangeMax, f.MaxTs)
} else {
err := restoreBatch(ctx, files[idx:i], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
idx = i
rangeMin = f.MinTs
rangeMax = f.MaxTs
}
}

if i == len(files)-1 {
err := restoreBatch(ctx, files[idx:], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

// the kv entry with ts, the ts is decoded from entry.
type kvEntryWithTS struct {
e kv.Entry
ts uint64
}

func (rc *Client) RestoreBatchMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
if len(files) == 0 {
return nil
}

// read all of entries from files.
kvEntries := make([]*kvEntryWithTS, 0)
for _, f := range files {
es, err := rc.readAllEntries(ctx, f)
if err != nil {
return errors.Trace(err)
}

kvEntries = append(kvEntries, es...)
}

// sort these entries.
slices.SortFunc(kvEntries, func(i, j *kvEntryWithTS) bool {
return i.ts < j.ts
})

// restore these entries with rawPut() method.
kvCount, size, err := rc.restoreMetaKvEntries(ctx, schemasReplace, kvEntries, files[0].GetCf())
if err != nil {
return errors.Trace(err)
}

updateStats(kvCount, size)
for i := 0; i < len(files); i++ {
progressInc()
}
return nil
}

func (rc *Client) readAllEntries(
ctx context.Context,
file *backuppb.DataFileInfo,
) ([]*kvEntryWithTS, error) {
kvEntries := make([]*kvEntryWithTS, 0)

rc.rawKVClient.SetColumnFamily(file.GetCf())
buff, err := rc.storage.ReadFile(ctx, file.Path)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) {
return 0, 0, errors.Annotatef(berrors.ErrInvalidMetaFile,
return nil, errors.Annotatef(berrors.ErrInvalidMetaFile,
"checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:])
}

iter := stream.NewEventIterator(buff)
for iter.Valid() {
iter.Next()
if iter.GetError() != nil {
return 0, 0, errors.Trace(iter.GetError())
return nil, errors.Trace(iter.GetError())
}

txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()}
ts, err := GetKeyTS(txnEntry.Key)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

// The commitTs in write CF need be limited on [startTs, restoreTs].
Expand All @@ -2105,20 +2221,41 @@ func (rc *Client) RestoreMetaKVFile(
log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key))
continue
}
log.Debug("txn entry", zap.Uint64("key-ts", ts), zap.Int("txnKey-len", len(txnEntry.Key)),
zap.Int("txnValue-len", len(txnEntry.Value)), zap.ByteString("txnKey", txnEntry.Key))
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf)
kvEntries = append(kvEntries, &kvEntryWithTS{e: txnEntry, ts: ts})
}

return kvEntries, nil
}

func (rc *Client) restoreMetaKvEntries(
ctx context.Context,
sr *stream.SchemasReplace,
entries []*kvEntryWithTS,
columnFamily string,
) (uint64, uint64, error) {
var (
kvCount uint64
size uint64
)

rc.rawKVClient.SetColumnFamily(columnFamily)

for _, entry := range entries {
log.Debug("before rewrte entry", zap.Uint64("key-ts", entry.ts), zap.Int("key-len", len(entry.e.Key)),
zap.Int("value-len", len(entry.e.Value)), zap.ByteString("key", entry.e.Key))

newEntry, err := sr.RewriteKvEntry(&entry.e, columnFamily)
if err != nil {
log.Error("rewrite txn entry failed", zap.Int("klen", len(txnEntry.Key)),
logutil.Key("txn-key", txnEntry.Key))
log.Error("rewrite txn entry failed", zap.Int("klen", len(entry.e.Key)),
logutil.Key("txn-key", entry.e.Key))
return 0, 0, errors.Trace(err)
} else if newEntry == nil {
continue
}
log.Debug("rewrite txn entry", zap.Int("newKey-len", len(newEntry.Key)),
zap.Int("newValue-len", len(txnEntry.Value)), zap.ByteString("newkey", newEntry.Key))
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, ts); err != nil {
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down
Loading