From 20dcc597f019c011fe21443940a509600f48834b Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 9 Jul 2019 15:50:43 -0700 Subject: [PATCH] Use backwards-compatible formats during backup. This change converts the keys and posting lists to a backwards-compatible format so that backups work accross versions of Dgraph. The restore logic is also changed to convert the data back to the internal Dgraph formats. --- ee/backup/backup.go | 148 ++++++++++++++++++++++++++++++++++++++++-- ee/backup/restore.go | 150 +++++++++++++++++++++++++++++++++++++++++++ ee/backup/run.go | 33 ---------- 3 files changed, 292 insertions(+), 39 deletions(-) create mode 100644 ee/backup/restore.go diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 4b92b548dc5..b5a7a8bd467 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -13,17 +13,25 @@ package backup import ( + "bytes" "compress/gzip" "context" + "encoding/binary" + "encoding/hex" "encoding/json" "fmt" + "io" "net/url" "sync" "github.com/dgraph-io/badger" + bpb "github.com/dgraph-io/badger/pb" + "github.com/golang/glog" + "github.com/pkg/errors" + + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" ) // Processor handles the different stages of the backup process. @@ -93,24 +101,33 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) { predMap[pred] = struct{}{} } + var maxVersion uint64 + gzWriter := gzip.NewWriter(handler) stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" + stream.KeyToList = toBackupList(pr.Request.SinceTs) stream.ChooseKey = func(item *badger.Item) bool { parsedKey := x.Parse(item.Key()) _, ok := predMap[parsedKey.Attr] return ok } - gzWriter := gzip.NewWriter(handler) - newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs) + stream.Send = func(list *bpb.KVList) error { + for _, kv := range list.Kv { + if maxVersion < kv.Version { + maxVersion = kv.Version + } + } + return writeKVList(list, gzWriter) + } - if err != nil { + if err := stream.Orchestrate(context.Background()); err != nil { glog.Errorf("While taking backup: %v", err) return &emptyRes, err } - if newSince > pr.Request.ReadTs { + if maxVersion > pr.Request.ReadTs { glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)", - newSince, pr.Request.ReadTs) + maxVersion, pr.Request.ReadTs) } glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs) @@ -161,3 +178,122 @@ func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) err func (m *Manifest) GoString() string { return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups) } + +func toBackupList(since uint64) func([]byte, *badger.Iterator) (*bpb.KVList, error) { + return func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + list := &bpb.KVList{} + + loop: + for itr.Valid() { + item := itr.Item() + if !bytes.Equal(item.Key(), key) { + break + } + if item.Version() < since { + // Ignore versions less than given timestamp, or skip older versions of + // the given key. + break + } + + switch item.UserMeta() { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + l, err := posting.ReadPostingList(key, itr) + kvs, err := l.Rollup() + if err != nil { + return nil, errors.Wrapf(err, "while rolling up list") + } + + for _, kv := range kvs { + backupKey, err := toBackupKey(kv.Key) + if err != nil { + return nil, err + } + kv.Key = backupKey + + backupPl, err := toBackupPostingList(kv.Value) + if err != nil { + return nil, err + } + kv.Value = backupPl + } + list.Kv = append(list.Kv, kvs...) + + case posting.BitSchemaPosting: + var valCopy []byte + if !item.IsDeletedOrExpired() { + // No need to copy value if item is deleted or expired. + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + return nil, errors.Wrapf(err, "while copying value") + } + } + + backupKey, err := toBackupKey(key) + if err != nil { + return nil, err + } + + kv := &bpb.KV{ + Key: backupKey, + Value: valCopy, + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), + } + list.Kv = append(list.Kv, kv) + + if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() { + break loop + } + + // Manually advance the iterator. This cannot be done in the for + // statement because ReadPostingList advances the iterator so this + // only needs to be done for BitSchemaPosting entries. + itr.Next() + + default: + return nil, errors.Errorf( + "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) + } + } + + return list, nil + } +} + +func toBackupKey(key []byte) ([]byte, error) { + parsedKey := x.Parse(key) + if parsedKey == nil { + return nil, errors.Errorf("could not parse key %s", hex.Dump(key)) + } + backupKey, err := parsedKey.ToBackupKey().Marshal() + if err != nil { + return nil, errors.Wrapf(err, "while converting key for backup") + } + return backupKey, nil +} + +func toBackupPostingList(val []byte) ([]byte, error) { + pl := &pb.PostingList{} + if err := pl.Unmarshal(val); err != nil { + return nil, errors.Wrapf(err, "while reading posting list") + } + backupVal, err := posting.ToBackupPostingList(pl).Marshal() + if err != nil { + return nil, errors.Wrapf(err, "while converting posting list for backup") + } + return backupVal, nil +} + +func writeKVList(list *bpb.KVList, w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil { + return err + } + buf, err := list.Marshal() + if err != nil { + return err + } + _, err = w.Write(buf) + return err +} diff --git a/ee/backup/restore.go b/ee/backup/restore.go new file mode 100644 index 00000000000..676d85b5927 --- /dev/null +++ b/ee/backup/restore.go @@ -0,0 +1,150 @@ +// +build !oss + +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Dgraph Community License (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + */ + +package backup + +import ( + "bufio" + "compress/gzip" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "math" + "path/filepath" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" + bpb "github.com/dgraph-io/badger/pb" + "github.com/pkg/errors" + + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" +) + +// RunRestore calls badger.Load and tries to load data into a new DB. +func RunRestore(pdir, location, backupId string) (uint64, error) { + // Scan location for backup files and load them. Each file represents a node group, + // and we create a new p dir for each. + return Load(location, backupId, func(r io.Reader, groupId int) error { + dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithSyncWrites(true). + WithTableLoadingMode(options.MemoryMap). + WithValueThreshold(1 << 10). + WithNumVersionsToKeep(math.MaxInt32)) + if err != nil { + return err + } + defer db.Close() + fmt.Printf("Restoring groupId: %d\n", groupId) + if !pathExist(dir) { + fmt.Println("Creating new db:", dir) + } + gzReader, err := gzip.NewReader(r) + if err != nil { + return nil + } + return loadFromBackup(db, gzReader, 16) + }) +} + +// loadFromBackup reads the backup, converts the keys and values to the required format, +// and loads them to the given badger DB. +func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { + br := bufio.NewReaderSize(r, 16<<10) + unmarshalBuf := make([]byte, 1<<10) + + loader := db.NewKVLoader(maxPendingWrites) + for { + var sz uint64 + err := binary.Read(br, binary.LittleEndian, &sz) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if cap(unmarshalBuf) < int(sz) { + unmarshalBuf = make([]byte, sz) + } + + if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { + return err + } + + list := &bpb.KVList{} + if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil { + return err + } + + for _, kv := range list.Kv { + if len(kv.GetUserMeta()) != 1 { + return errors.Errorf( + "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) + } + + var restoreKey []byte + var restoreVal []byte + switch kv.GetUserMeta()[0] { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + var err error + restoreKey, err = fromBackupKey(kv.Key) + if err != nil { + return err + } + + backupPl := &pb.BackupPostingList{} + if err := backupPl.Unmarshal(kv.Value); err != nil { + return errors.Wrapf(err, "while reading backup posting list") + } + restoreVal, err = posting.FromBackupPostingList(backupPl).Marshal() + if err != nil { + return errors.Wrapf(err, "while converting backup posting list") + } + + case posting.BitSchemaPosting: + var err error + restoreKey, err = fromBackupKey(kv.Key) + if err != nil { + return err + } + restoreVal = kv.Value + + default: + return errors.Errorf( + "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) + } + + kv.Key = restoreKey + kv.Value = restoreVal + if err := loader.Set(kv); err != nil { + return err + } + } + } + + if err := loader.Finish(); err != nil { + return err + } + + return nil +} + +func fromBackupKey(key []byte) ([]byte, error) { + backupKey := &pb.BackupKey{} + if err := backupKey.Unmarshal(key); err != nil { + return nil, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key)) + } + return x.FromBackupKey(backupKey), nil +} diff --git a/ee/backup/run.go b/ee/backup/run.go index 3e2fc3daf66..d348f77ec9a 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -13,17 +13,11 @@ package backup import ( - "compress/gzip" "context" "fmt" - "io" - "math" "os" - "path/filepath" "time" - "github.com/dgraph-io/badger" - "github.com/dgraph-io/badger/options" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/pkg/errors" @@ -213,33 +207,6 @@ func runRestoreCmd() error { return nil } -// RunRestore calls badger.Load and tries to load data into a new DB. -func RunRestore(pdir, location, backupId string) (uint64, error) { - // Scan location for backup files and load them. Each file represents a node group, - // and we create a new p dir for each. - return Load(location, backupId, func(r io.Reader, groupId int) error { - dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithSyncWrites(true). - WithTableLoadingMode(options.MemoryMap). - WithValueThreshold(1 << 10). - WithNumVersionsToKeep(math.MaxInt32)) - if err != nil { - return err - } - defer db.Close() - fmt.Printf("Restoring groupId: %d\n", groupId) - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } - gzReader, err := gzip.NewReader(r) - if err != nil { - return nil - } - return db.Load(gzReader, 16) - }) -} - func runLsbackupCmd() error { fmt.Println("Listing backups from:", opt.location) manifests, err := ListManifests(opt.location)