Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use manifest to only restore preds assigned to each group. #3648

Merged
merged 8 commits into from
Jul 11, 2019
164 changes: 158 additions & 6 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
package backup

import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/url"
"sync"

"github.com/dgraph-io/badger"
bpb "github.com/dgraph-io/badger/pb"
"github.com/golang/glog"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

// Processor handles the different stages of the backup process.
Expand Down Expand Up @@ -61,6 +69,19 @@ type Manifest struct {
Path string `json:"-"`
}

func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {
preds, ok := m.Groups[gid]
if !ok {
return nil
}

predSet := make(predicateSet)
for _, pred := range preds {
predSet[pred] = struct{}{}
}
return predSet
}

// WriteBackup uses the request values to create a stream writer then hand off the data
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to
// collect the data and later move to the target.
Expand Down Expand Up @@ -93,24 +114,33 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
predMap[pred] = struct{}{}
}

var maxVersion uint64
gzWriter := gzip.NewWriter(handler)
stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.KeyToList = toBackupList(pr.Request.SinceTs)
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey := x.Parse(item.Key())
_, ok := predMap[parsedKey.Attr]
return ok
}
gzWriter := gzip.NewWriter(handler)
newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs)
stream.Send = func(list *bpb.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
}
return writeKVList(list, gzWriter)
}

if err != nil {
if err := stream.Orchestrate(context.Background()); err != nil {
glog.Errorf("While taking backup: %v", err)
return &emptyRes, err
}

if newSince > pr.Request.ReadTs {
if maxVersion > pr.Request.ReadTs {
glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)",
newSince, pr.Request.ReadTs)
maxVersion, pr.Request.ReadTs)
}

glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs)
Expand Down Expand Up @@ -161,3 +191,125 @@ func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) err
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups)
}

func toBackupList(since uint64) func([]byte, *badger.Iterator) (*bpb.KVList, error) {
return func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
list := &bpb.KVList{}

loop:
for itr.Valid() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
break
}
if item.Version() < since {
// Ignore versions less than given timestamp, or skip older versions of
// the given key.
break
}

switch item.UserMeta() {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
kvs, err := l.Rollup()
if err != nil {
return nil, errors.Wrapf(err, "while rolling up list")
}

for _, kv := range kvs {
backupKey, err := toBackupKey(kv.Key)
if err != nil {
return nil, err
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
if err != nil {
return nil, err
}
kv.Value = backupPl
}
list.Kv = append(list.Kv, kvs...)

case posting.BitSchemaPosting:
var valCopy []byte
if !item.IsDeletedOrExpired() {
// No need to copy value if item is deleted or expired.
var err error
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil, errors.Wrapf(err, "while copying value")
}
}

backupKey, err := toBackupKey(key)
if err != nil {
return nil, err
}

kv := &bpb.KV{
Key: backupKey,
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
list.Kv = append(list.Kv, kv)

if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() {
break loop
}

// Manually advance the iterator. This cannot be done in the for
// statement because ReadPostingList advances the iterator so this
// only needs to be done for BitSchemaPosting entries.
itr.Next()

default:
return nil, errors.Errorf(
"Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key))
}
}

return list, nil
}
}

func toBackupKey(key []byte) ([]byte, error) {
parsedKey := x.Parse(key)
if parsedKey == nil {
return nil, errors.Errorf("could not parse key %s", hex.Dump(key))
}
backupKey, err := parsedKey.ToBackupKey().Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting key for backup")
}
return backupKey, nil
}

func toBackupPostingList(val []byte) ([]byte, error) {
pl := &pb.PostingList{}
if err := pl.Unmarshal(val); err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
backupVal, err := posting.ToBackupPostingList(pl).Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting posting list for backup")
}
return backupVal, nil
}

func writeKVList(list *bpb.KVList, w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil {
return err
}
buf, err := list.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
10 changes: 7 additions & 3 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,18 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, er
}

path := filepath.Dir(manifests[i].Path)
for groupId := range manifest.Groups {
file := filepath.Join(path, backupName(manifest.Since, groupId))
for gid := range manifest.Groups {
file := filepath.Join(path, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return 0, errors.Wrapf(err, "Failed to open %q", file)
}
defer fp.Close()
if err = fn(fp, int(groupId)); err != nil {

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)
if err = fn(fp, int(gid), predSet); err != nil {
return 0, err
}
}
Expand Down
8 changes: 6 additions & 2 deletions ee/backup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ func NewUriHandler(uri *url.URL) (UriHandler, error) {
return h, nil
}

// predicateSet is a map whose keys are predicates. It is meant to be used as a set.
type predicateSet map[string]struct{}

// loadFn is a function that will receive the current file being read.
// A reader and the backup groupId are passed as arguments.
type loadFn func(reader io.Reader, groupId int) error
// A reader, the backup groupId, and a map whose keys are the predicates to restore
// are passed as arguments.
type loadFn func(reader io.Reader, groupId int, preds predicateSet) error

// Load will scan location l for backup files in the given backup series and load them
// sequentially. Returns the maximum Since value on success, otherwise an error.
Expand Down
Loading