-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: full backups #2710
Feature: full backups #2710
Changes from 14 commits
4c52c27
cd769f5
0de29ea
396c38a
7eb2172
f6b8b10
cd1b88d
c4701a4
b5cfb77
905f1b4
d62e439
8ac55fb
c8d9054
6ea4688
00a1cd0
2a317fb
2101284
255f8f4
90a394a
1963b7c
b38ea24
03ae950
3606eaa
7688e76
a2af931
f8dc111
24c9258
4116b46
1beefbf
7da0cae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Dgraph Enterprise Edition (EE) | ||
|
||
The files stored here correspond to the Dgraph Enterprise Edition features, which are _not_ under the Apache 2 License. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/dgraph-io/badger" | ||
"github.com/dgraph-io/dgraph/protos/pb" | ||
"github.com/dgraph-io/dgraph/worker/stream" | ||
"github.com/golang/glog" | ||
) | ||
|
||
// Worker has all the information needed to perform a backup. | ||
type Worker struct { | ||
ReadTs uint64 // Timestamp to read at. | ||
GroupId uint32 // The group ID of this node. | ||
SeqTs string // Sequence data to label backup at the target. | ||
TargetURI string // The intended location as URI. | ||
DB *badger.DB // Badger pstore managed by this node. | ||
} | ||
|
||
// Process uses the worker values to create a stream writer then hand off the data | ||
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to | ||
// collect the data and later move to the target. | ||
// Returns errors on failure, nil on success. | ||
func (w *Worker) Process(ctx context.Context) error { | ||
c, err := newWriter(w) | ||
if err != nil { | ||
return err | ||
} | ||
sl := stream.Lists{Stream: c, DB: w.DB} | ||
sl.ChooseKeyFunc = func(_ *badger.Item) bool { return true } | ||
sl.ItemToKVFunc = func(key []byte, itr *badger.Iterator) (*pb.KV, error) { | ||
item := itr.Item() | ||
val, err := item.ValueCopy(nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kv := &pb.KV{Key: item.KeyCopy(nil), Val: val, Version: item.Version()} | ||
return kv, nil | ||
} | ||
|
||
glog.V(3).Infof("Backup started ...") | ||
if err = sl.Orchestrate(ctx, "Backup", w.ReadTs); err != nil { | ||
return err | ||
} | ||
glog.V(3).Infof("Backup saving ...") | ||
if err = c.save(); err != nil { | ||
return err | ||
} | ||
glog.Infof("Backup complete: group %d at %d", w.GroupId, w.ReadTs) | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"net/url" | ||
"sync" | ||
|
||
"github.com/dgraph-io/dgraph/x" | ||
) | ||
|
||
// handler interface is implemented by uri scheme handlers. | ||
// | ||
// Session() will read any supported environment variables and authenticate if needed. | ||
// Copy() copies a local file to a new destination, possibly remote. | ||
// Exists() tests if a file exists at destination. | ||
type handler interface { | ||
Copy(string, string) error | ||
Session(string, string) error | ||
} | ||
|
||
// handlers map URI scheme to a handler. | ||
// List of possible handlers: | ||
// file - local file or NFS mounted (default if no scheme detected) | ||
// http - multipart HTTP upload | ||
// gs - Google Cloud Storage | ||
// s3 - Amazon S3 | ||
// as - Azure Storage | ||
var handlers struct { | ||
sync.Mutex | ||
m map[string]handler | ||
} | ||
|
||
// getSchemeHandler takes a URI and picks the parts we need for creating a scheme handler. | ||
// The scheme handler knows how to authenticate itself (using URI params), and how to copy | ||
// itself to the destination target. | ||
// Returns a new file handler on success, error otherwise. | ||
func getSchemeHandler(uri string) (handler, error) { | ||
u, err := url.Parse(uri) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// target might be just a dir like '/tmp/backup', then default to local file handler. | ||
if u.Scheme == "" { | ||
u.Scheme = "file" | ||
} | ||
handlers.Lock() | ||
defer handlers.Unlock() | ||
h, ok := handlers.m[u.Scheme] | ||
if !ok { | ||
return nil, x.Errorf("invalid scheme %q", u.Scheme) | ||
} | ||
if err := h.Session(u.Host, u.Path); err != nil { | ||
return nil, err | ||
} | ||
return h, nil | ||
} | ||
|
||
// addSchemeHandler registers a new scheme handler. If the handler is already registered | ||
// we just ignore the request. | ||
func addSchemeHandler(scheme string, h handler) { | ||
handlers.Lock() | ||
defer handlers.Unlock() | ||
if handlers.m == nil { | ||
handlers.m = make(map[string]handler) | ||
} | ||
if _, ok := handlers.m[scheme]; !ok { | ||
handlers.m[scheme] = h | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"io" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File is not |
||
"github.com/golang/glog" | ||
) | ||
|
||
// fileHandler is used for 'file:' URI scheme. | ||
type fileHandler struct { | ||
path string | ||
} | ||
|
||
// Session authenticates or prepares a handler session. | ||
// Returns error on failure, nil on success. | ||
func (h *fileHandler) Session(_, path string) error { | ||
h.path = path | ||
return os.Chdir(h.path) | ||
} | ||
|
||
// List returns a list of Dgraph backup files at target. | ||
// Returns a list (might be empty) on success, error otherwise. | ||
func (h *fileHandler) List() ([]string, error) { | ||
return filepath.Glob(filepath.Join(h.path, "*"+dgraphBackupSuffix)) | ||
} | ||
|
||
// Copy is called when we are ready to transmit a file to the target. | ||
// Returns error on failure, nil on success. | ||
func (h *fileHandler) Copy(in, out string) error { | ||
if filepath.Base(out) == out { | ||
out = filepath.Join(h.path, out) | ||
} | ||
|
||
if h.Exists(out) { | ||
glog.Errorf("File already exists on target: %q", out) | ||
return os.ErrExist | ||
} | ||
|
||
// if we are in the same volume, just rename. it should work on NFS too. | ||
if strings.HasPrefix(in, h.path) { | ||
glog.V(3).Infof("renaming %q to %q", in, out) | ||
return os.Rename(in, out) | ||
} | ||
|
||
src, err := os.Open(in) | ||
if err != nil { | ||
return err | ||
} | ||
defer src.Close() | ||
|
||
dst, err := os.Create(out) | ||
if err != nil { | ||
return err | ||
} | ||
defer dst.Close() | ||
|
||
if _, err = io.Copy(dst, src); err != nil { | ||
return err | ||
} | ||
|
||
return dst.Sync() | ||
} | ||
|
||
// Exists checks if a path (file or dir) is found at target. | ||
// Returns true if found, false otherwise. | ||
func (h *fileHandler) Exists(path string) bool { | ||
_, err := os.Stat(path) | ||
return os.IsExist(err) | ||
} | ||
|
||
// Register this handler | ||
func init() { | ||
addSchemeHandler("file", &fileHandler{}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. All rights reserved. | ||
* | ||
*/ | ||
|
||
package backup | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
|
||
"github.com/dgraph-io/dgraph/protos/pb" | ||
"github.com/golang/glog" | ||
"github.com/matttproud/golang_protobuf_extensions/pbutil" | ||
) | ||
|
||
const dgraphBackupTempPrefix = "dgraph-backup-*" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
const dgraphBackupSuffix = ".dgraph-backup" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
type writer struct { | ||
file string | ||
dst handler | ||
tmp *os.File | ||
} | ||
|
||
func (w *writer) save() error { | ||
glog.Infof("Saving: %q", w.file) | ||
if err := w.dst.Copy(w.tmp.Name(), w.file); err != nil { | ||
return err | ||
} | ||
glog.V(3).Infof("copied %q to %q on target ...", w.tmp.Name(), w.file) | ||
// we are done done, cleanup. | ||
return w.cleanup() | ||
} | ||
|
||
func (w *writer) cleanup() error { | ||
defer func() { | ||
if err := os.Remove(w.tmp.Name()); err != nil { | ||
// let the user know there's baggage left behind. they might have to delete by hand. | ||
glog.Errorf("Failed to remove temp file %q: %s", w.tmp.Name(), err) | ||
} | ||
}() | ||
glog.V(3).Info("Backup cleanup ...") | ||
if err := w.tmp.Close(); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func newWriter(worker *Worker) (*writer, error) { | ||
var w writer | ||
var err error | ||
|
||
// dst is the final destination for data. | ||
w.dst, err = getSchemeHandler(worker.TargetURI) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// tmp file is our main working file. | ||
// we will prepare this file and then copy to dst when done. | ||
w.tmp, err = ioutil.TempFile("", dgraphBackupTempPrefix) | ||
if err != nil { | ||
glog.Errorf("Failed to create temp file: %s\n", err) | ||
return nil, err | ||
} | ||
glog.V(3).Infof("temp file: %q", w.tmp.Name()) | ||
|
||
w.file = fmt.Sprintf("%s-g%d-r%d%s", | ||
worker.SeqTs, worker.GroupId, worker.ReadTs, dgraphBackupSuffix) | ||
glog.V(3).Infof("target file name: %q", w.file) | ||
|
||
return &w, err | ||
} | ||
|
||
// Send implements the stream.kvStream interface. | ||
// It writes the received KV into the temp file as a delimited binary chain. | ||
// Returns error if the writing fails, nil on success. | ||
func (w *writer) Send(kvs *pb.KVS) error { | ||
var err error | ||
for _, kv := range kvs.Kv { | ||
_, err = pbutil.WriteDelimited(w.tmp, kv) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of
w.Write
is not checked