Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: full backups #2710

Merged
merged 30 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4c52c27
saving state
Oct 15, 2018
cd769f5
Merge branch 'master' of github.com:dgraph-io/dgraph into feature/roa…
Oct 16, 2018
0de29ea
saving changes
Oct 16, 2018
396c38a
saving state
Oct 17, 2018
7eb2172
added backup dir
Oct 17, 2018
f6b8b10
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
Oct 24, 2018
cd1b88d
renamed dgraphee tree to ee.
Oct 25, 2018
c4701a4
trying to get the handler and file writers working.
Oct 30, 2018
b5cfb77
added destination parameter. handler support to destination URI schem…
Oct 30, 2018
905f1b4
file handler rename on same volume. added more comments and logging.
Oct 30, 2018
d62e439
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
Oct 30, 2018
8ac55fb
changed worker to use stream pkg. updated protos for backup. fixed mi…
Oct 31, 2018
c8d9054
logging changes for debugging
Oct 31, 2018
6ea4688
added some error checks, tweaked comments.
Oct 31, 2018
00a1cd0
moved stream pkg out of worker.
Nov 1, 2018
2a317fb
removed unused const. format fixes.
Nov 1, 2018
2101284
saving state
Nov 3, 2018
255f8f4
Initial pass at simplifying things.
manishrjain Nov 3, 2018
90a394a
cleaned up redundant code.
Nov 6, 2018
1963b7c
unused const
Nov 6, 2018
b38ea24
missing space
Nov 6, 2018
03ae950
added progress monitoring. fixed issues found by CI
Nov 6, 2018
3606eaa
Small fixes here and there.
manishrjain Nov 7, 2018
7688e76
Rename handler files.
manishrjain Nov 7, 2018
a2af931
Both S3 uploads and file writes are tested to work.
manishrjain Nov 8, 2018
f8dc111
renamed writer.cleapup to writer.close
Nov 8, 2018
24c9258
Merge branch 'master' of github.com:/dgraph-io/dgraph into feature/ro…
Nov 8, 2018
4116b46
regenerated protos
Nov 8, 2018
1beefbf
Merge branch 'feature/roadmap-backups' of github.com:/dgraph-io/dgrap…
Nov 8, 2018
7da0cae
removed unneeded fallthrough
Nov 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
)

// handlerInit does some standard checks. Returns false if something is wrong.
func handlerInit(w http.ResponseWriter, r *http.Request) bool {
if r.Method != http.MethodGet {
func handlerInit(w http.ResponseWriter, r *http.Request, method string) bool {
if r.Method != method {
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method")
return false
}
Expand All @@ -47,7 +47,7 @@ func handlerInit(w http.ResponseWriter, r *http.Request) bool {
}

func shutDownHandler(w http.ResponseWriter, r *http.Request) {
if !handlerInit(w, r) {
if !handlerInit(w, r, http.MethodGet) {
return
}

Expand All @@ -57,19 +57,37 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
}

func exportHandler(w http.ResponseWriter, r *http.Request) {
if !handlerInit(w, r) {
if !handlerInit(w, r, http.MethodGet) {
return
}
ctx := context.Background()
// Export logic can be moved to dgraphzero.
if err := worker.ExportOverNetwork(ctx); err != nil {
if err := worker.ExportOverNetwork(context.Background()); err != nil {
x.SetStatus(w, err.Error(), "Export failed.")
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"code": "Success", "message": "Export completed."}`))
}

func backupHandler(w http.ResponseWriter, r *http.Request) {
if !handlerInit(w, r, http.MethodPost) {
return
}
target := r.FormValue("destination")
if target == "" {
err := x.Errorf("You must specify a 'destination' value")
x.SetStatus(w, err.Error(), "Backup failed.")
return
}
if err := worker.BackupOverNetwork(context.Background(), target); err != nil {
x.SetStatus(w, err.Error(), "Backup failed.")
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"code": "Success", "message": "Backup completed."}`))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of w.Write is not checked


}

func memoryLimitHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func setupServer() {
http.HandleFunc("/share", shareHandler)
http.HandleFunc("/debug/store", storeStatsHandler)
http.HandleFunc("/admin/shutdown", shutDownHandler)
http.HandleFunc("/admin/backup", backupHandler)
http.HandleFunc("/admin/export", exportHandler)
http.HandleFunc("/admin/config/lru_mb", memoryLimitHandler)

Expand Down
4 changes: 4 additions & 0 deletions ee/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Dgraph Enterprise Edition (EE)

The files stored here correspond to the Dgraph Enterprise Edition features, which are _not_ under the Apache 2 License.

72 changes: 72 additions & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018 Dgraph Labs, Inc. All rights reserved.
*
*/

package backup

import (
"context"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/stream"
"github.com/dgraph-io/dgraph/x"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed

"github.com/golang/glog"
)

// Request has all the information needed to perform a backup.
type Request struct {
DB *badger.DB // Badger pstore managed by this node.
Sizex uint64 // approximate upload size
Backup *pb.BackupRequest
}

// Process uses the request values to create a stream writer then hand off the data
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to
// collect the data and later move to the target.
// Returns errors on failure, nil on success.
func (r *Request) Process(ctx context.Context) error {
w, err := r.newWriter()
if err != nil {
return err
}

sl := stream.Lists{Stream: w, DB: r.DB}
sl.ChooseKeyFunc = nil
sl.ItemToKVFunc = func(key []byte, itr *badger.Iterator) (*pb.KV, error) {
item := itr.Item()
pk := x.Parse(key)
if pk.IsSchema() {
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
kv := &pb.KV{
Key: key,
Val: val,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
}
return kv, nil
}
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}
return l.MarshalToKv()
}

glog.V(2).Infof("Backup started ...")
if err = sl.Orchestrate(ctx, "Backup", r.Backup.ReadTs); err != nil {
return err
}
if err = w.cleanup(); err != nil {
return err
}
glog.Infof("Backup complete: group %d at %d", r.Backup.GroupId, r.Backup.ReadTs)

return nil
}
67 changes: 67 additions & 0 deletions ee/backup/handler_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2018 Dgraph Labs, Inc. All rights reserved.
*
*/

package backup

import (
"fmt"
"net/url"
"os"
"path/filepath"

"github.com/dgraph-io/dgraph/x"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed

"github.com/golang/glog"
)

// fileHandler is used for 'file:' URI scheme.
type fileHandler struct {
fp *os.File
}

// Open authenticates or prepares a handler session.
// Returns error on failure, nil on success.
func (h *fileHandler) Open(uri *url.URL, req *Request) error {
// check that this path exists and we can access it.
if !h.exists(uri.Path) {
return x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

dir := filepath.Join(uri.Path, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs))
if err := os.Mkdir(dir, 0700); err != nil {
return err
}

path := filepath.Join(dir,
fmt.Sprintf("r%d-g%d.backup", req.Backup.ReadTs, req.Backup.GroupId))
fp, err := os.Create(path)
if err != nil {
return err
}
glog.V(2).Infof("Using file path: %q", path)
h.fp = fp
return nil
}

func (h *fileHandler) Close() error {
if err := h.fp.Sync(); err != nil {
return err
}
return h.fp.Close()
}

func (h *fileHandler) Write(b []byte) (int, error) {
return h.fp.Write(b)
}

// Exists checks if a path (file or dir) is found at target.
// Returns true if found, false otherwise.
func (h *fileHandler) exists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
return !os.IsNotExist(err) && !os.IsPermission(err)
}
148 changes: 148 additions & 0 deletions ee/backup/handler_s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2018 Dgraph Labs, Inc. All rights reserved.
*
*/

package backup

import (
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed

"github.com/golang/glog"
minio "github.com/minio/minio-go"
)

const (
s3DefaultEndpoint = "s3.amazonaws.com"
s3AccelerateHost = "s3-accelerate"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3AccelerateHost is unused

s3MinioChunkSize = 64 << 20 // 64MiB, minimum upload size for single file.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const s3MinioChunkSize is unused

)

// s3Handler is used for 's3:' URI scheme.
type s3Handler struct {
bucket string
object string
pw *io.PipeWriter
pr *io.PipeReader
cerr chan error
}

// Open creates an AWS session and sends our data stream to an S3 blob.
// URI formats:
// s3://<s3 region endpoint>/bucket/folder1.../folderN?secure=true|false
// s3:///bucket/folder1.../folderN?secure=true|false (use default S3 endpoint)
func (h *s3Handler) Open(uri *url.URL, req *Request) error {
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
if accessKeyID == "" || secretAccessKey == "" {
return x.Errorf("Env vars AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY not set.")
}

glog.V(2).Infof("S3Handler got uri: %+v\n", uri)
// s3:///bucket/folder
if !strings.Contains(uri.Host, ".") {
uri.Path, uri.Host = uri.Host, s3DefaultEndpoint
}
glog.V(2).Infof("Backup using S3 host: %s, path: %s", uri.Host, uri.Path)

if len(uri.Path) < 1 {
return x.Errorf("The S3 bucket %q is invalid", uri.Path)
}

// split path into bucket and blob
parts := strings.Split(uri.Path[1:], "/")
h.bucket = parts[0] // bucket
// The location is: /bucket/folder1...folderN/dgraph.20181106.0113/r110001-g1.backup
parts = append(parts, fmt.Sprintf("dgraph.%s", req.Backup.UnixTs))
parts = append(parts, fmt.Sprintf("r%d-g%d.backup", req.Backup.ReadTs, req.Backup.GroupId))
h.object = filepath.Join(parts[1:]...)
glog.V(3).Infof("Sending data to S3 blob %q ...", h.object)

// secure by default
secure := uri.Query().Get("secure") != "false"

mc, err := minio.New(uri.Host, accessKeyID, secretAccessKey, secure)
if err != nil {
return err
}
// S3 transfer acceleration support.
if strings.Contains(uri.Host, "s3-accelerate") {
mc.SetS3TransferAccelerate(uri.Host)
}
// mc.TraceOn(os.Stderr)

found, err := mc.BucketExists(h.bucket)
if err != nil {
return err
}
if !found {
return x.Errorf("S3 bucket %q not found. Use host with specific region.",
h.bucket)
}

h.cerr = make(chan error, 1)
go h.upload(mc)

glog.Infof("Uploading data, estimated size %s", humanize.Bytes(req.Sizex))

return nil
}

// progress allows us to monitor the progress of an upload.
// TODO: I used this during testing, maybe keep it turned on for -v 5 ?
type progress struct{ n uint64 }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type progress is unused


func (p *progress) Read(b []byte) (int, error) {
n := atomic.AddUint64(&p.n, uint64(len(b)))
if n%s3MinioChunkSize == 0 { // every 64MiB
glog.V(5).Infof("--- progress: %d", n)
}
return int(n), nil
}

// upload will block until it's done or an error occurs.
func (h *s3Handler) upload(mc *minio.Client) {
start := time.Now()
h.pr, h.pw = io.Pipe()
n, err := mc.PutObject(h.bucket, h.object, h.pr, -1, minio.PutObjectOptions{})
if err != nil {
glog.Errorf("Failure while uploading backup: %s", err)
h.pw.Close()
h.pr.Close()
} else {
glog.V(3).Infof("--- sent %d bytes, time elapsed %s", n, time.Since(start))
}
h.cerr <- err
}

func (h *s3Handler) Close() error {
defer close(h.cerr)

// we are done buffering, send EOF.
if err := h.pw.CloseWithError(nil); err != nil && err != io.EOF {
glog.Errorf("Unexpected error while uploading: %s", err)
}

glog.V(3).Infof("--- waiting for upload to complete ...")
select {
case err := <-h.cerr:
if err != nil {
return err
}
}
return nil
}

func (h *s3Handler) Write(b []byte) (int, error) {
return h.pw.Write(b)
}
Loading