-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: full backups #2710
Merged
Merged
Feature: full backups #2710
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
4c52c27
saving state
cd769f5
Merge branch 'master' of github.com:dgraph-io/dgraph into feature/roa…
0de29ea
saving changes
396c38a
saving state
7eb2172
added backup dir
f6b8b10
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
cd1b88d
renamed dgraphee tree to ee.
c4701a4
trying to get the handler and file writers working.
b5cfb77
added destination parameter. handler support to destination URI schem…
905f1b4
file handler rename on same volume. added more comments and logging.
d62e439
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
8ac55fb
changed worker to use stream pkg. updated protos for backup. fixed mi…
c8d9054
logging changes for debugging
6ea4688
added some error checks, tweaked comments.
00a1cd0
moved stream pkg out of worker.
2a317fb
removed unused const. format fixes.
2101284
saving state
255f8f4
Initial pass at simplifying things.
manishrjain 90a394a
cleaned up redundant code.
1963b7c
unused const
b38ea24
missing space
03ae950
added progress monitoring. fixed issues found by CI
3606eaa
Small fixes here and there.
manishrjain 7688e76
Rename handler files.
manishrjain a2af931
Both S3 uploads and file writes are tested to work.
manishrjain f8dc111
renamed writer.cleapup to writer.close
24c9258
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
4116b46
regenerated protos
1beefbf
Merge branch 'feature/roadmap-backups' of github.com:/dgraph-io/dgrap…
7da0cae
removed unneeded fallthrough
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Dgraph Enterprise Edition (EE) | ||
|
||
The files stored here correspond to the Dgraph Enterprise Edition features, which are _not_ under the Apache 2 License. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/dgraph-io/badger" | ||
"github.com/dgraph-io/dgraph/posting" | ||
"github.com/dgraph-io/dgraph/protos/pb" | ||
"github.com/dgraph-io/dgraph/stream" | ||
"github.com/dgraph-io/dgraph/x" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// Request has all the information needed to perform a backup. | ||
type Request struct { | ||
DB *badger.DB // Badger pstore managed by this node. | ||
Sizex uint64 // approximate upload size | ||
Backup *pb.BackupRequest | ||
} | ||
|
||
// Process uses the request values to create a stream writer then hand off the data | ||
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to | ||
// collect the data and later move to the target. | ||
// Returns errors on failure, nil on success. | ||
func (r *Request) Process(ctx context.Context) error { | ||
w, err := r.newWriter() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
sl := stream.Lists{Stream: w, DB: r.DB} | ||
sl.ChooseKeyFunc = nil | ||
sl.ItemToKVFunc = func(key []byte, itr *badger.Iterator) (*pb.KV, error) { | ||
item := itr.Item() | ||
pk := x.Parse(key) | ||
if pk.IsSchema() { | ||
val, err := item.ValueCopy(nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kv := &pb.KV{ | ||
Key: key, | ||
Val: val, | ||
UserMeta: []byte{item.UserMeta()}, | ||
Version: item.Version(), | ||
} | ||
return kv, nil | ||
} | ||
l, err := posting.ReadPostingList(key, itr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return l.MarshalToKv() | ||
} | ||
|
||
glog.V(2).Infof("Backup started ...") | ||
if err = sl.Orchestrate(ctx, "Backup:", r.Backup.ReadTs); err != nil { | ||
return err | ||
} | ||
if err = w.flush(); err != nil { | ||
return err | ||
} | ||
glog.Infof("Backup complete: group %d at %d", r.Backup.GroupId, r.Backup.ReadTs) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/dgraph-io/dgraph/x" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// fileHandler is used for 'file:' URI scheme. | ||
type fileHandler struct { | ||
fp *os.File | ||
} | ||
|
||
// Open authenticates or prepares a handler session. | ||
// Returns error on failure, nil on success. | ||
func (h *fileHandler) Open(uri *url.URL, req *Request) error { | ||
// check that this path exists and we can access it. | ||
if !h.exists(uri.Path) { | ||
return x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path) | ||
} | ||
|
||
dir := filepath.Join(uri.Path, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs)) | ||
if err := os.Mkdir(dir, 0700); err != nil { | ||
return err | ||
} | ||
|
||
path := filepath.Join(dir, | ||
fmt.Sprintf("r%d-g%d.backup", req.Backup.ReadTs, req.Backup.GroupId)) | ||
fp, err := os.Create(path) | ||
if err != nil { | ||
return err | ||
} | ||
glog.V(2).Infof("Using file path: %q", path) | ||
h.fp = fp | ||
return nil | ||
} | ||
|
||
func (h *fileHandler) Close() error { | ||
if err := h.fp.Sync(); err != nil { | ||
glog.Errorf("While closing file: %s. Error: %v", h.fp.Name(), err) | ||
x.Ignore(h.fp.Close()) | ||
return err | ||
} | ||
return h.fp.Close() | ||
} | ||
|
||
func (h *fileHandler) Write(b []byte) (int, error) { | ||
return h.fp.Write(b) | ||
} | ||
|
||
// Exists checks if a path (file or dir) is found at target. | ||
// Returns true if found, false otherwise. | ||
func (h *fileHandler) exists(path string) bool { | ||
_, err := os.Stat(path) | ||
if err == nil { | ||
return true | ||
} | ||
return !os.IsNotExist(err) && !os.IsPermission(err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"time" | ||
|
||
"github.com/dgraph-io/dgraph/x" | ||
|
||
humanize "github.com/dustin/go-humanize" | ||
|
||
"github.com/golang/glog" | ||
minio "github.com/minio/minio-go" | ||
) | ||
|
||
const ( | ||
s3DefaultEndpoint = "s3.amazonaws.com" | ||
s3AccelerateHost = "s3-accelerate" | ||
) | ||
|
||
// s3Handler is used for 's3:' URI scheme. | ||
type s3Handler struct { | ||
bucket string | ||
object string | ||
pwriter *io.PipeWriter | ||
preader *io.PipeReader | ||
cerr chan error | ||
} | ||
|
||
// Open creates an AWS session and sends our data stream to an S3 blob. | ||
// URI formats: | ||
// s3://<s3 region endpoint>/bucket/folder1.../folderN?secure=true|false | ||
// s3:///bucket/folder1.../folderN?secure=true|false (use default S3 endpoint) | ||
func (h *s3Handler) Open(uri *url.URL, req *Request) error { | ||
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") | ||
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") | ||
if accessKeyID == "" || secretAccessKey == "" { | ||
return x.Errorf("Env vars AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY not set.") | ||
} | ||
|
||
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path) | ||
// s3:///bucket/folder | ||
if !strings.Contains(uri.Host, ".") { | ||
uri.Host = s3DefaultEndpoint | ||
} | ||
glog.V(2).Infof("Backup using S3 host: %s, path: %s", uri.Host, uri.Path) | ||
|
||
if len(uri.Path) < 1 { | ||
return x.Errorf("The S3 bucket %q is invalid", uri.Path) | ||
} | ||
|
||
// split path into bucket and blob | ||
parts := strings.Split(uri.Path[1:], "/") | ||
h.bucket = parts[0] // bucket | ||
// The location is: /bucket/folder1...folderN/dgraph.20181106.0113/r110001-g1.backup | ||
parts = append(parts, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs)) | ||
parts = append(parts, fmt.Sprintf("r%d.g%d.backup", req.Backup.ReadTs, req.Backup.GroupId)) | ||
h.object = filepath.Join(parts[1:]...) | ||
glog.V(2).Infof("Sending data to S3 blob %q ...", h.object) | ||
|
||
// secure by default | ||
secure := uri.Query().Get("secure") != "false" | ||
|
||
mc, err := minio.New(uri.Host, accessKeyID, secretAccessKey, secure) | ||
if err != nil { | ||
return err | ||
} | ||
// S3 transfer acceleration support. | ||
if strings.Contains(uri.Host, s3AccelerateHost) { | ||
mc.SetS3TransferAccelerate(uri.Host) | ||
} | ||
// mc.TraceOn(os.Stderr) | ||
|
||
found, err := mc.BucketExists(h.bucket) | ||
if err != nil { | ||
return x.Errorf("Error while looking for bucket: %s at host: %s. Error: %v", | ||
h.bucket, uri.Host, err) | ||
} | ||
if !found { | ||
return x.Errorf("S3 bucket %s not found.", h.bucket) | ||
} | ||
|
||
h.cerr = make(chan error, 1) | ||
go func() { | ||
h.cerr <- h.upload(mc) | ||
}() | ||
|
||
glog.Infof("Uploading data, estimated size %s", humanize.Bytes(req.Sizex)) | ||
return nil | ||
} | ||
|
||
// upload will block until it's done or an error occurs. | ||
func (h *s3Handler) upload(mc *minio.Client) error { | ||
start := time.Now() | ||
h.preader, h.pwriter = io.Pipe() | ||
|
||
// We don't need to have a progress object, because we're using a Pipe. A write to Pipe would | ||
// block until it can be fully read. So, the rate of the writes here would be equal to the rate | ||
// of upload. We're already tracking progress of the writes in stream.Lists, so no need to track | ||
// the progress of read. By definition, it must be the same. | ||
n, err := mc.PutObject(h.bucket, h.object, h.preader, -1, minio.PutObjectOptions{}) | ||
glog.V(2).Infof("Backup sent %d bytes. Time elapsed: %s", | ||
n, time.Since(start).Round(time.Second)) | ||
|
||
if err != nil { | ||
// This should cause Write to fail as well. | ||
glog.Errorf("Backup: Closing RW pipe due to error: %v", err) | ||
h.pwriter.Close() | ||
h.preader.Close() | ||
} | ||
return err | ||
} | ||
|
||
func (h *s3Handler) Close() error { | ||
// we are done buffering, send EOF. | ||
if err := h.pwriter.CloseWithError(nil); err != nil && err != io.EOF { | ||
glog.Errorf("Unexpected error while uploading: %v", err) | ||
} | ||
glog.V(2).Infof("Backup waiting for upload to complete.") | ||
return <-h.cerr | ||
} | ||
|
||
func (h *s3Handler) Write(b []byte) (int, error) { | ||
return h.pwriter.Write(b) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not
goimports
-ed