Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use backwards-compatible formats during backup #3629

Merged
merged 4 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 144 additions & 6 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
package backup

import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/url"
"sync"

"github.com/dgraph-io/badger"
bpb "github.com/dgraph-io/badger/pb"
"github.com/golang/glog"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

// Processor handles the different stages of the backup process.
Expand Down Expand Up @@ -93,24 +101,33 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
predMap[pred] = struct{}{}
}

var maxVersion uint64
gzWriter := gzip.NewWriter(handler)
stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.KeyToList = pr.toBackupList
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey := x.Parse(item.Key())
_, ok := predMap[parsedKey.Attr]
return ok
}
gzWriter := gzip.NewWriter(handler)
newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs)
stream.Send = func(list *bpb.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
}
return writeKVList(list, gzWriter)
}

if err != nil {
if err := stream.Orchestrate(context.Background()); err != nil {
glog.Errorf("While taking backup: %v", err)
return &emptyRes, err
}

if newSince > pr.Request.ReadTs {
if maxVersion > pr.Request.ReadTs {
glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)",
newSince, pr.Request.ReadTs)
maxVersion, pr.Request.ReadTs)
}

glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs)
Expand Down Expand Up @@ -161,3 +178,124 @@ func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) err
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups)
}

func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
list := &bpb.KVList{}

for itr.Valid() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
return list, nil
}
if item.Version() < pr.Request.SinceTs {
// Ignore versions less than given timestamp, or skip older versions of
// the given key.
return list, nil
}

switch item.UserMeta() {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
kvs, err := l.Rollup()
if err != nil {
return nil, errors.Wrapf(err, "while rolling up list")
}

for _, kv := range kvs {
backupKey, err := toBackupKey(kv.Key)
if err != nil {
return nil, err
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
if err != nil {
return nil, err
}
kv.Value = backupPl
}
list.Kv = append(list.Kv, kvs...)

case posting.BitSchemaPosting:
var valCopy []byte
if !item.IsDeletedOrExpired() {
// No need to copy value if item is deleted or expired.
var err error
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil, errors.Wrapf(err, "while copying value")
}
}

backupKey, err := toBackupKey(key)
if err != nil {
return nil, err
}

kv := &bpb.KV{
Key: backupKey,
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
list.Kv = append(list.Kv, kv)

if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() {
return list, nil
}

// Manually advance the iterator. This cannot be done in the for
// statement because ReadPostingList advances the iterator so this
// only needs to be done for BitSchemaPosting entries.
itr.Next()

default:
return nil, errors.Errorf(
"Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key))
}
}

// This shouldn't be reached but it's being added here because the golang
// compiler complains about the missing return statement.
return list, nil
}

func toBackupKey(key []byte) ([]byte, error) {
parsedKey := x.Parse(key)
if parsedKey == nil {
return nil, errors.Errorf("could not parse key %s", hex.Dump(key))
}
backupKey, err := parsedKey.ToBackupKey().Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting key for backup")
}
return backupKey, nil
}

func toBackupPostingList(val []byte) ([]byte, error) {
pl := &pb.PostingList{}
if err := pl.Unmarshal(val); err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
backupVal, err := posting.ToBackupPostingList(pl).Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting posting list for backup")
}
return backupVal, nil
}

func writeKVList(list *bpb.KVList, w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil {
return err
}
buf, err := list.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
144 changes: 144 additions & 0 deletions ee/backup/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// +build !oss

/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Dgraph Community License (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt
*/

package backup

import (
"bufio"
"compress/gzip"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"path/filepath"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
bpb "github.com/dgraph-io/badger/pb"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location, backupId string) (uint64, error) {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, backupId, func(r io.Reader, groupId int) error {
dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32))
if err != nil {
return err
}
defer db.Close()
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(dir) {
fmt.Println("Creating new db:", dir)
}
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil
}
return loadFromBackup(db, gzReader, 16)
})
}

// loadFromBackup reads the backup, converts the keys and values to the required format,
// and loads them to the given badger DB.
func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)

loader := db.NewKVLoader(maxPendingWrites)
for {
var sz uint64
err := binary.Read(br, binary.LittleEndian, &sz)
if err == io.EOF {
break
} else if err != nil {
return err
}

if cap(unmarshalBuf) < int(sz) {
unmarshalBuf = make([]byte, sz)
}

if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
return err
}

list := &bpb.KVList{}
if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil {
return err
}

for _, kv := range list.Kv {
if len(kv.GetUserMeta()) != 1 {
return errors.Errorf(
"Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key))
}

restoreKey, err := fromBackupKey(kv.Key)
if err != nil {
return err
}

var restoreVal []byte
switch kv.GetUserMeta()[0] {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
var err error
backupPl := &pb.BackupPostingList{}
if err := backupPl.Unmarshal(kv.Value); err != nil {
return errors.Wrapf(err, "while reading backup posting list")
}
restoreVal, err = posting.FromBackupPostingList(backupPl).Marshal()
if err != nil {
return errors.Wrapf(err, "while converting backup posting list")
}

case posting.BitSchemaPosting:
restoreVal = kv.Value

default:
return errors.Errorf(
"Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key))
}

kv.Key = restoreKey
kv.Value = restoreVal
if err := loader.Set(kv); err != nil {
return err
}
}
}

if err := loader.Finish(); err != nil {
return err
}

return nil
}

func fromBackupKey(key []byte) ([]byte, error) {
backupKey := &pb.BackupKey{}
if err := backupKey.Unmarshal(key); err != nil {
return nil, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key))
}
return x.FromBackupKey(backupKey), nil
}
33 changes: 0 additions & 33 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@
package backup

import (
"compress/gzip"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
Expand Down Expand Up @@ -213,33 +207,6 @@ func runRestoreCmd() error {
return nil
}

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location, backupId string) (uint64, error) {
// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, backupId, func(r io.Reader, groupId int) error {
dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(true).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32))
if err != nil {
return err
}
defer db.Close()
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(dir) {
fmt.Println("Creating new db:", dir)
}
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil
}
return db.Load(gzReader, 16)
})
}

func runLsbackupCmd() error {
fmt.Println("Listing backups from:", opt.location)
manifests, err := ListManifests(opt.location)
Expand Down