forked from dgraph-io/dgraph
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature: full backups (dgraph-io#2710)
* moved orchestrate to its own pkg within worker. adding backup to worker and alpha. * trying to get the handler and file writers working. * added destination parameter. handler support to destination URI scheme. additional logging. * file handler rename on same volume. added more comments and logging. * changed worker to use stream pkg. updated protos for backup. fixed mislabelled request field. * logging changes for debugging * added some error checks, tweaked comments. * moved stream pkg out of worker. removed binary encoding package, using encode/binary with size delimiter. fixed race and added posting list values to backup. fixed issue with file handler that was breaking badger. refactored backup process to be simpler. added generic Status proto for any service response. added minio and used it for S3 backup uploads. * removed unused const. format fixes. * Initial pass at simplifying things. * cleaned up redundant code. renamed s3handler.send to s3handler.upload and removed all buffering. s3handler tests that bucket exists before working, we cant assume a region. s3handler.Close blocks until the upload is complete. * unused const * missing space * added progress monitoring. fixed issues found by CI * Small fixes here and there. * Rename handler files. * Both S3 uploads and file writes are tested to work. * renamed writer.cleapup to writer.close * regenerated protos * removed unneeded fallthrough
- Loading branch information
1 parent
3f072c9
commit edcecbd
Showing
20 changed files
with
1,447 additions
and
319 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Dgraph Enterprise Edition (EE) | ||
|
||
The files stored here correspond to the Dgraph Enterprise Edition features, which are _not_ under the Apache 2 License. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/dgraph-io/badger" | ||
"github.com/dgraph-io/dgraph/posting" | ||
"github.com/dgraph-io/dgraph/protos/pb" | ||
"github.com/dgraph-io/dgraph/stream" | ||
"github.com/dgraph-io/dgraph/x" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// Request has all the information needed to perform a backup. | ||
type Request struct { | ||
DB *badger.DB // Badger pstore managed by this node. | ||
Sizex uint64 // approximate upload size | ||
Backup *pb.BackupRequest | ||
} | ||
|
||
// Process uses the request values to create a stream writer then hand off the data | ||
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to | ||
// collect the data and later move to the target. | ||
// Returns errors on failure, nil on success. | ||
func (r *Request) Process(ctx context.Context) error { | ||
w, err := r.newWriter() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
sl := stream.Lists{Stream: w, DB: r.DB} | ||
sl.ChooseKeyFunc = nil | ||
sl.ItemToKVFunc = func(key []byte, itr *badger.Iterator) (*pb.KV, error) { | ||
item := itr.Item() | ||
pk := x.Parse(key) | ||
if pk.IsSchema() { | ||
val, err := item.ValueCopy(nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kv := &pb.KV{ | ||
Key: key, | ||
Val: val, | ||
UserMeta: []byte{item.UserMeta()}, | ||
Version: item.Version(), | ||
} | ||
return kv, nil | ||
} | ||
l, err := posting.ReadPostingList(key, itr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return l.MarshalToKv() | ||
} | ||
|
||
glog.V(2).Infof("Backup started ...") | ||
if err = sl.Orchestrate(ctx, "Backup:", r.Backup.ReadTs); err != nil { | ||
return err | ||
} | ||
if err = w.flush(); err != nil { | ||
return err | ||
} | ||
glog.Infof("Backup complete: group %d at %d", r.Backup.GroupId, r.Backup.ReadTs) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/dgraph-io/dgraph/x" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// fileHandler is used for 'file:' URI scheme. | ||
type fileHandler struct { | ||
fp *os.File | ||
} | ||
|
||
// Open authenticates or prepares a handler session. | ||
// Returns error on failure, nil on success. | ||
func (h *fileHandler) Open(uri *url.URL, req *Request) error { | ||
// check that this path exists and we can access it. | ||
if !h.exists(uri.Path) { | ||
return x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path) | ||
} | ||
|
||
dir := filepath.Join(uri.Path, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs)) | ||
if err := os.Mkdir(dir, 0700); err != nil { | ||
return err | ||
} | ||
|
||
path := filepath.Join(dir, | ||
fmt.Sprintf("r%d-g%d.backup", req.Backup.ReadTs, req.Backup.GroupId)) | ||
fp, err := os.Create(path) | ||
if err != nil { | ||
return err | ||
} | ||
glog.V(2).Infof("Using file path: %q", path) | ||
h.fp = fp | ||
return nil | ||
} | ||
|
||
func (h *fileHandler) Close() error { | ||
if err := h.fp.Sync(); err != nil { | ||
glog.Errorf("While closing file: %s. Error: %v", h.fp.Name(), err) | ||
x.Ignore(h.fp.Close()) | ||
return err | ||
} | ||
return h.fp.Close() | ||
} | ||
|
||
func (h *fileHandler) Write(b []byte) (int, error) { | ||
return h.fp.Write(b) | ||
} | ||
|
||
// Exists checks if a path (file or dir) is found at target. | ||
// Returns true if found, false otherwise. | ||
func (h *fileHandler) exists(path string) bool { | ||
_, err := os.Stat(path) | ||
if err == nil { | ||
return true | ||
} | ||
return !os.IsNotExist(err) && !os.IsPermission(err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"time" | ||
|
||
"github.com/dgraph-io/dgraph/x" | ||
|
||
humanize "github.com/dustin/go-humanize" | ||
|
||
"github.com/golang/glog" | ||
minio "github.com/minio/minio-go" | ||
) | ||
|
||
const ( | ||
s3DefaultEndpoint = "s3.amazonaws.com" | ||
s3AccelerateHost = "s3-accelerate" | ||
) | ||
|
||
// s3Handler is used for 's3:' URI scheme. | ||
type s3Handler struct { | ||
bucket string | ||
object string | ||
pwriter *io.PipeWriter | ||
preader *io.PipeReader | ||
cerr chan error | ||
} | ||
|
||
// Open creates an AWS session and sends our data stream to an S3 blob. | ||
// URI formats: | ||
// s3://<s3 region endpoint>/bucket/folder1.../folderN?secure=true|false | ||
// s3:///bucket/folder1.../folderN?secure=true|false (use default S3 endpoint) | ||
func (h *s3Handler) Open(uri *url.URL, req *Request) error { | ||
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") | ||
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") | ||
if accessKeyID == "" || secretAccessKey == "" { | ||
return x.Errorf("Env vars AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY not set.") | ||
} | ||
|
||
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path) | ||
// s3:///bucket/folder | ||
if !strings.Contains(uri.Host, ".") { | ||
uri.Host = s3DefaultEndpoint | ||
} | ||
glog.V(2).Infof("Backup using S3 host: %s, path: %s", uri.Host, uri.Path) | ||
|
||
if len(uri.Path) < 1 { | ||
return x.Errorf("The S3 bucket %q is invalid", uri.Path) | ||
} | ||
|
||
// split path into bucket and blob | ||
parts := strings.Split(uri.Path[1:], "/") | ||
h.bucket = parts[0] // bucket | ||
// The location is: /bucket/folder1...folderN/dgraph.20181106.0113/r110001-g1.backup | ||
parts = append(parts, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs)) | ||
parts = append(parts, fmt.Sprintf("r%d.g%d.backup", req.Backup.ReadTs, req.Backup.GroupId)) | ||
h.object = filepath.Join(parts[1:]...) | ||
glog.V(2).Infof("Sending data to S3 blob %q ...", h.object) | ||
|
||
// secure by default | ||
secure := uri.Query().Get("secure") != "false" | ||
|
||
mc, err := minio.New(uri.Host, accessKeyID, secretAccessKey, secure) | ||
if err != nil { | ||
return err | ||
} | ||
// S3 transfer acceleration support. | ||
if strings.Contains(uri.Host, s3AccelerateHost) { | ||
mc.SetS3TransferAccelerate(uri.Host) | ||
} | ||
// mc.TraceOn(os.Stderr) | ||
|
||
found, err := mc.BucketExists(h.bucket) | ||
if err != nil { | ||
return x.Errorf("Error while looking for bucket: %s at host: %s. Error: %v", | ||
h.bucket, uri.Host, err) | ||
} | ||
if !found { | ||
return x.Errorf("S3 bucket %s not found.", h.bucket) | ||
} | ||
|
||
h.cerr = make(chan error, 1) | ||
go func() { | ||
h.cerr <- h.upload(mc) | ||
}() | ||
|
||
glog.Infof("Uploading data, estimated size %s", humanize.Bytes(req.Sizex)) | ||
return nil | ||
} | ||
|
||
// upload will block until it's done or an error occurs. | ||
func (h *s3Handler) upload(mc *minio.Client) error { | ||
start := time.Now() | ||
h.preader, h.pwriter = io.Pipe() | ||
|
||
// We don't need to have a progress object, because we're using a Pipe. A write to Pipe would | ||
// block until it can be fully read. So, the rate of the writes here would be equal to the rate | ||
// of upload. We're already tracking progress of the writes in stream.Lists, so no need to track | ||
// the progress of read. By definition, it must be the same. | ||
n, err := mc.PutObject(h.bucket, h.object, h.preader, -1, minio.PutObjectOptions{}) | ||
glog.V(2).Infof("Backup sent %d bytes. Time elapsed: %s", | ||
n, time.Since(start).Round(time.Second)) | ||
|
||
if err != nil { | ||
// This should cause Write to fail as well. | ||
glog.Errorf("Backup: Closing RW pipe due to error: %v", err) | ||
h.pwriter.Close() | ||
h.preader.Close() | ||
} | ||
return err | ||
} | ||
|
||
func (h *s3Handler) Close() error { | ||
// we are done buffering, send EOF. | ||
if err := h.pwriter.CloseWithError(nil); err != nil && err != io.EOF { | ||
glog.Errorf("Unexpected error while uploading: %v", err) | ||
} | ||
glog.V(2).Infof("Backup waiting for upload to complete.") | ||
return <-h.cerr | ||
} | ||
|
||
func (h *s3Handler) Write(b []byte) (int, error) { | ||
return h.pwriter.Write(b) | ||
} |
Oops, something went wrong.