Skip to content

Commit

Permalink
Add field to backup requests to force a full backup. (dgraph-io#3387)
Browse files Browse the repository at this point in the history
This change adds a new field to the backup requests to force a full
backup. Only the code that forces the full backup is included in this
PR to keep it small.

Manually tested using a minio instance locally. No existing tests will
be broken by this change. Automated tests will be included later since
the existing backup test needs some work before it can verify this
feature.
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent a058c01 commit 7ec73c8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 67 deletions.
16 changes: 6 additions & 10 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
)

// ErrBackupNoChanges is returned when the manifest version is equal to the snapshot version.
// This means that no data updates happened since the last backup.
var ErrBackupNoChanges = x.Errorf("No changes since last backup, OK.")

// Request has all the information needed to perform a backup.
type Request struct {
DB *badger.DB // Badger pstore managed by this node.
Backup *pb.BackupRequest
Manifest *Manifest
Version uint64

// Version indicates the beginning timestamp from which the backup should start.
// For a partial backup, the Version is the largest Version from the previous manifest
// files. For a full backup, Version is set to zero so that all data is included.
// TODO(martinmr): rename this field to Since both here and in the manifest.
Version uint64
}

// Process uses the request values to create a stream writer then hand off the data
Expand All @@ -48,9 +47,6 @@ func (r *Request) Process(ctx context.Context) error {

handler, err := r.newHandler()
if err != nil {
if err != ErrBackupNoChanges {
glog.Errorf("Unable to get handler for request: %+v. Error: %v", r.Backup, err)
}
return err
}
glog.V(3).Infof("Backup manifest version: %d", r.Version)
Expand Down
13 changes: 8 additions & 5 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func (h *fileHandler) Create(uri *url.URL, req *Request) error {
if err := h.readManifest(lastManifest, &m); err != nil {
return err
}
// No new changes since last check
if m.Version == req.Backup.SnapshotTs {
return ErrBackupNoChanges
}

// Return the version of last backup
req.Version = m.Version
}
Expand All @@ -83,6 +80,12 @@ func (h *fileHandler) Create(uri *url.URL, req *Request) error {
fileName = backupManifest
}

// If a full backup is being forced, force the version to zero to stream all
// the contents from the database.
if req.Backup.ForceFull {
req.Version = 0
}

dir = filepath.Join(uri.Path, fmt.Sprintf(backupPathFmt, req.Backup.UnixTs))
err := os.Mkdir(dir, 0700)
if err != nil && !os.IsExist(err) {
Expand All @@ -106,7 +109,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return 0, x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

// Get a lisst of all the manifest files at the location.
// Get a list of all the manifest files at the location.
suffix := filepath.Join(string(filepath.Separator), backupManifest)
manifests := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, suffix)
Expand Down
11 changes: 7 additions & 4 deletions ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,7 @@ func (h *s3Handler) Create(uri *url.URL, req *Request) error {
if err := h.readManifest(mc, lastManifest, &m); err != nil {
return err
}
// No new changes since last check
if m.Version >= req.Backup.SnapshotTs {
return ErrBackupNoChanges
}

// Return the version of last backup
req.Version = m.Version
}
Expand All @@ -192,6 +189,12 @@ func (h *s3Handler) Create(uri *url.URL, req *Request) error {
objectName = backupManifest
}

// If a full backup is being forced, force the version to zero to stream all
// the contents from the database.
if req.Backup.ForceFull {
req.Version = 0
}

// The backup object is: folder1...folderN/dgraph.20181106.0113/r110001-g1.backup
object := filepath.Join(h.objectPrefix,
fmt.Sprintf(backupPathFmt, req.Backup.UnixTs),
Expand Down
5 changes: 5 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ message BackupRequest {
// True if no credentials should be used to access the S3 or minio bucket.
// For example, when using a bucket with a public policy.
bool anonymous = 10;

// If true, previous backups will be ignored and a new full backup will be
// created. If false, the backup will be full or incremental depending on
// the existing backups.
bool force_full = 11;
}

message ExportRequest {
Expand Down
111 changes: 78 additions & 33 deletions protos/pb/pb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 6 additions & 15 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,17 @@ func backupProcess(ctx context.Context, req *pb.BackupRequest) error {
glog.Errorf("Context error during backup: %v\n", err)
return err
}

g := groups()
// sanity, make sure this is our group.
if g.groupId() != req.GroupId {
return x.Errorf("Backup request group mismatch. Mine: %d. Requested: %d\n",
g.groupId(), req.GroupId)
}
// wait for this node to catch-up.

if err := posting.Oracle().WaitForTs(ctx, req.ReadTs); err != nil {
return err
}
// Get snapshot to fill any gaps.
snap, err := g.Node.Snapshot()
if err != nil {
return err
}
glog.V(3).Infof("Backup group %d snapshot: %+v", req.GroupId, snap)
// Attach snapshot readTs to request to compare with any previous version.
req.SnapshotTs = snap.ReadTs
// create backup request and process it.

br := &backup.Request{DB: pstore, Backup: req}
return br.Process(ctx)
}
Expand Down Expand Up @@ -103,6 +95,7 @@ func BackupOverNetwork(ctx context.Context, r *http.Request) error {
secretKey := r.FormValue("secret_key")
sessionToken := r.FormValue("session_token")
anonymous := r.FormValue("anonymous") == "true"
forceFull := r.FormValue("force_full") == "true"

// Check that this node can accept requests.
if err := x.HealthCheck(); err != nil {
Expand All @@ -125,6 +118,8 @@ func BackupOverNetwork(ctx context.Context, r *http.Request) error {
SecretKey: secretKey,
SessionToken: sessionToken,
Anonymous: anonymous,
// TODO(martinmr): Check if this field can be removed.
ForceFull: forceFull,
}
m := backup.Manifest{Groups: groups().KnownGroups()}
glog.Infof("Created backup request: %s. Groups=%v\n", &req, m.Groups)
Expand All @@ -151,10 +146,6 @@ func BackupOverNetwork(ctx context.Context, r *http.Request) error {

for range m.Groups {
if err := <-errCh; err != nil {
// No changes, nothing was done.
if err == backup.ErrBackupNoChanges {
return nil
}
glog.Errorf("Error received during backup: %v", err)
return err
}
Expand Down

0 comments on commit 7ec73c8

Please sign in to comment.