Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: using streaming to load metadata for preventing OOM #38386

Merged
merged 14 commits into from
Oct 28, 2022
220 changes: 22 additions & 198 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package restore
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -137,24 +136,13 @@ type Client struct {

supportPolicy bool

// startTS and restoreTS are used for kv file restore.
// TiKV will filter the key space that don't belong to [startTS, restoreTS].
startTS uint64
restoreTS uint64

// If the commitTS of txn-entry belong to [startTS, restoreTS],
// the startTS of txn-entry may be smaller than startTS.
// We need maintain and restore more entries in default cf
// (the startTS in these entries belong to [shiftStartTS, startTS]).
shiftStartTS uint64

// currentTS is used for rewrite meta kv when restore stream.
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64

storage storage.ExternalStorage
*logFileManager

helper *stream.MetadataHelper
storage storage.ExternalStorage

// if fullClusterRestore = true:
// - if there's system tables in the backup(backup data since br 5.1.0), the cluster should be a fresh cluster
Expand Down Expand Up @@ -338,14 +326,6 @@ func (rc *Client) Close() {
log.Info("Restore client closed")
}

func (rc *Client) SetRestoreRangeTS(startTs, restoreTS, shiftStartTS uint64) {
rc.startTS = startTs
rc.restoreTS = restoreTS
rc.shiftStartTS = shiftStartTS
log.Info("set restore range ts", zap.Uint64("shift-start-ts", shiftStartTS),
zap.Uint64("start-ts", startTs), zap.Uint64("restored-ts", restoreTS))
}

func (rc *Client) SetCurrentTS(ts uint64) {
rc.currentTS = ts
}
Expand Down Expand Up @@ -408,10 +388,6 @@ func (rc *Client) IsRawKvMode() bool {
return rc.backupMeta.IsRawKv
}

func (rc *Client) InitMetadataHelper() {
rc.helper = stream.NewMetadataHelper()
}

// GetFilesInRawRange gets all files that are in the given range or intersects with the given range.
func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backuppb.File, error) {
if !rc.IsRawKvMode() {
Expand Down Expand Up @@ -1729,109 +1705,18 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) GetShiftTS(ctx context.Context, startTS uint64, restoreTS uint64) (uint64, error) {
shiftTS := struct {
sync.Mutex
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
}
shiftTS.Lock()
defer shiftTS.Unlock()

ts, ok := UpdateShiftTS(m, startTS, restoreTS)
if ok && (!shiftTS.exists || shiftTS.value > ts) {
shiftTS.value = ts
shiftTS.exists = true
}
return nil
})
if err != nil {
return 0, err
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,
}
if !shiftTS.exists {
return startTS, nil
}
return shiftTS.value, nil
}

// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamMetaByTS(ctx context.Context, shiftedStartTS uint64, restoreTS uint64) ([]*backuppb.Metadata, error) {
streamBackupMetaFiles := struct {
sync.Mutex
metas []*backuppb.Metadata
}{}
streamBackupMetaFiles.metas = make([]*backuppb.Metadata, 0, 128)

err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
metadata, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
}
streamBackupMetaFiles.Lock()
if restoreTS >= metadata.MinTs && metadata.MaxTs >= shiftedStartTS {
streamBackupMetaFiles.metas = append(streamBackupMetaFiles.metas, metadata)
}
streamBackupMetaFiles.Unlock()
return nil
})
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
if err != nil {
return nil, errors.Trace(err)
}
return streamBackupMetaFiles.metas, nil
}

// ReadStreamDataFiles is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamDataFiles(
ctx context.Context,
metas []*backuppb.Metadata,
) (dataFiles, metaFiles []*backuppb.DataFileInfo, err error) {
dFiles := make([]*backuppb.DataFileInfo, 0)
mFiles := make([]*backuppb.DataFileInfo, 0)

for _, m := range metas {
_, exists := backuppb.MetaVersion_name[int32(m.MetaVersion)]
if !exists {
log.Warn("metaversion too new", zap.Reflect("version id", m.MetaVersion))
}
for _, ds := range m.FileGroups {
metaRef := 0
for _, d := range ds.DataFilesInfo {
if d.MinTs > rc.restoreTS {
continue
} else if d.Cf == stream.WriteCF && d.MaxTs < rc.startTS {
continue
} else if d.Cf == stream.DefaultCF && d.MaxTs < rc.shiftStartTS {
continue
}

// If ds.Path is empty, it is MetadataV1.
// Try to be compatible with newer metadata version
if m.MetaVersion > backuppb.MetaVersion_V1 {
d.Path = ds.Path
}

if d.IsMeta {
mFiles = append(mFiles, d)
metaRef += 1
} else {
dFiles = append(dFiles, d)
}
log.Debug("backup stream collect data partition", zap.Uint64("offset", d.RangeOffset), zap.Uint64("length", d.Length))
}
// metadatav1 doesn't use cache
// Try to be compatible with newer metadata version
if m.MetaVersion > backuppb.MetaVersion_V1 {
rc.helper.InitCacheEntry(ds.Path, metaRef)
}
}
return err
}

return dFiles, mFiles, nil
return nil
}

// FixIndex tries to fix a single index.
Expand Down Expand Up @@ -1896,22 +1781,21 @@ func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *m
func (rc *Client) RestoreKVFiles(
ctx context.Context,
rules map[int64]*RewriteRules,
files []*backuppb.DataFileInfo,
files LogIter,
updateStats func(kvCount uint64, size uint64),
onProgress func(),
) error {
var err error
fileCount := 0
start := time.Now()
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore KV files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", len(files), elapsed)
summary.CollectSuccessUnit("files", fileCount, elapsed)
}
}()

log.Debug("start to restore files", zap.Int("files", len(files)))

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down Expand Up @@ -1951,13 +1835,19 @@ func (rc *Client) RestoreKVFiles(
})
}
}
for _, file := range files {
for r := files.TryNext(ctx); !r.Finished; r = files.TryNext(ctx) {
if r.Err != nil {
return err
}
file := r.Item
if file.Type == backuppb.FileType_Delete {
// collect delete type file and apply it later.
deleteFiles = append(deleteFiles, file)
continue
}
fileReplica := file
// applyFunc blocks once there aren't enough workers.
// this would help us don't load too many DML file info.
applyFunc(fileReplica)
}
if len(deleteFiles) > 0 {
Expand Down Expand Up @@ -2245,7 +2135,7 @@ func (rc *Client) RestoreBatchMetaKVFiles(

// read all of entries from files.
for _, f := range files {
es, nextEs, err := rc.readAllEntries(ctx, f, filterTS)
es, nextEs, err := rc.ReadAllEntries(ctx, f, filterTS)
if err != nil {
return nextKvEntries, errors.Trace(err)
}
Expand All @@ -2272,72 +2162,6 @@ func (rc *Client) RestoreBatchMetaKVFiles(
return nextKvEntries, nil
}

func (rc *Client) readAllEntries(
ctx context.Context,
file *backuppb.DataFileInfo,
filterTS uint64,
) ([]*KvEntryWithTS, []*KvEntryWithTS, error) {
kvEntries := make([]*KvEntryWithTS, 0)
nextKvEntries := make([]*KvEntryWithTS, 0)

buff, err := rc.helper.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType, rc.storage)
if err != nil {
return nil, nil, errors.Trace(err)
}

if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) {
return nil, nil, errors.Annotatef(berrors.ErrInvalidMetaFile,
"checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:])
}

iter := stream.NewEventIterator(buff)
for iter.Valid() {
iter.Next()
if iter.GetError() != nil {
return nil, nil, errors.Trace(iter.GetError())
}

txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()}

if !stream.MaybeDBOrDDLJobHistoryKey(txnEntry.Key) {
// only restore mDB and mDDLHistory
continue
}

ts, err := GetKeyTS(txnEntry.Key)
if err != nil {
return nil, nil, errors.Trace(err)
}

// The commitTs in write CF need be limited on [startTs, restoreTs].
// We can restore more key-value in default CF.
if ts > rc.restoreTS {
continue
} else if file.Cf == stream.WriteCF && ts < rc.startTS {
continue
} else if file.Cf == stream.DefaultCF && ts < rc.shiftStartTS {
continue
}

if len(txnEntry.Value) == 0 {
// we might record duplicated prewrite keys in some conor cases.
// the first prewrite key has the value but the second don't.
// so we can ignore the empty value key.
// see details at https://github.com/pingcap/tiflow/issues/5468.
log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key))
continue
}

if ts < filterTS {
kvEntries = append(kvEntries, &KvEntryWithTS{e: txnEntry, ts: ts})
} else {
nextKvEntries = append(nextKvEntries, &KvEntryWithTS{e: txnEntry, ts: ts})
}
}

return kvEntries, nextKvEntries, nil
}

func (rc *Client) restoreMetaKvEntries(
ctx context.Context,
sr *stream.SchemasReplace,
Expand Down
Loading