Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lsbackup command to list backups. #3219

Merged
merged 8 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func init() {
// subcommands already has the default subcommands, we append to EE ones to that.
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&acl.CmdAcl,
)
}
8 changes: 8 additions & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ type Manifest struct {
Groups []uint32 `json:"groups"`
}

// ManifestStatus combines a manifest along with other information about it
// that should not be inside the Manifest struct since it should not be
// recorded in manifest files.
type ManifestStatus struct {
Manifest *Manifest
FileName string
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Version: %d, ReadTs: %d, Groups: %v}`,
Expand Down
6 changes: 3 additions & 3 deletions ee/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func RestoreFull(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[0])
_, err := runRestore("./data/restore", dirs[0])
_, err := runRestoreInternal("./data/restore", dirs[0])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down Expand Up @@ -277,7 +277,7 @@ func RestoreIncr1(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[1])
_, err := runRestore("./data/restore", dirs[1])
_, err := runRestoreInternal("./data/restore", dirs[1])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down Expand Up @@ -335,7 +335,7 @@ func RestoreIncr2(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[2])
_, err := runRestore("./data/restore", dirs[2])
_, err := runRestoreInternal("./data/restore", dirs[2])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down
38 changes: 37 additions & 1 deletion ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
continue
}

// Load the backup for each group in manifest.
// Check the files for each group in the manifest exist.
path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.ReadTs, groupId))
Expand All @@ -155,6 +155,42 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return version, nil
}

// ListManifests loads the manifests in the locations and returns them.
func (h *fileHandler) ListManifests(uri *url.URL) ([]*ManifestStatus, error) {
if !pathExist(uri.Path) {
return nil, x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

// Get a list of all the manifest files at the location.
suffix := filepath.Join(string(filepath.Separator), backupManifest)
manifests := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, suffix)
})
if len(manifests) == 0 {
return nil, x.Errorf("No manifests found at path: %s", uri.Path)
}
sort.Strings(manifests)
if glog.V(3) {
fmt.Printf("Found backup manifest(s): %v\n", manifests)
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file.
var listedManifests []*ManifestStatus
for _, manifest := range manifests {
var m Manifest
var ms ManifestStatus

if err := h.readManifest(manifest, &m); err != nil {
return nil, x.Wrapf(err, "While reading %q", manifest)
}
ms.Manifest = &m
ms.FileName = manifest
listedManifests = append(listedManifests, &ms)
}
return listedManifests, nil
}

func (h *fileHandler) Close() error {
if h.fp == nil {
return nil
Expand Down
21 changes: 21 additions & 0 deletions ee/backup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type handler interface {
// The loadFn receives the files as they are processed by a handler, to do the actual
// load to DB.
Load(*url.URL, loadFn) (uint64, error)

// ListManifests will scan the provided URI for backup manifests. This operation
// should be read-only.
//
// The URL object is parsed as described in `newHandler`.
ListManifests(*url.URL) ([]*ManifestStatus, error)
}

// getHandler returns a handler for the URI scheme.
Expand Down Expand Up @@ -148,3 +154,18 @@ func Load(l string, fn loadFn) (version uint64, err error) {

return h.Load(uri, fn)
}

// ListManifests scans location l for backup files and returns the list of manifests.
func ListManifests(l string) ([]*ManifestStatus, error) {
uri, err := url.Parse(l)
if err != nil {
return nil, err
}

h := getHandler(uri.Scheme)
if h == nil {
return nil, x.Errorf("Unsupported URI: %v", uri)
}

return h.ListManifests(uri)
}
78 changes: 76 additions & 2 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ import (
)

var Restore x.SubCommand
var LsBackup x.SubCommand

var opt struct {
location, pdir, zero string
}

func init() {
initRestore()
initBackupLs()
}

func initRestore() {
Restore.Cmd = &cobra.Command{
Use: "restore",
Short: "Run Dgraph (EE) Restore backup",
Expand Down Expand Up @@ -85,7 +91,7 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(Restore.Conf).Stop()
if err := run(); err != nil {
if err := runRestoreCmd(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
Expand All @@ -102,7 +108,56 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
_ = Restore.Cmd.MarkFlagRequired("location")
}

func run() error {
func initBackupLs() {
LsBackup.Cmd = &cobra.Command{
Use: "lsbackup",
Short: "List info on backups in given location",
Long: `
lsbackup looks at a location where backups are stored and prints information about them.

Backups are originated from HTTP at /admin/backup, then can be restored using CLI restore
command. Restore is intended to be used with new Dgraph clusters in offline state.

The --location flag indicates a source URI with Dgraph backup objects. This URI supports all
the schemes used for backup.

Source URI formats:
[scheme]://[host]/[path]?[args]
[scheme]:///[path]?[args]
/[path]?[args] (only for local or NFS)

Source URI parts:
scheme - service handler, one of: "s3", "minio", "file"
host - remote address. ex: "dgraph.s3.amazonaws.com"
path - directory, bucket or container at target. ex: "/dgraph/backups/"
args - specific arguments that are ok to appear in logs.

Dgraph backup creates a unique backup object for each node group, and restore will create
a posting directory 'p' matching the backup group ID. Such that a backup file
named '.../r32-g2.backup' will be loaded to posting dir 'p2'.

Usage examples:

# Run using location in S3:
$ dgraph lsbackup -l s3://s3.us-west-2.amazonaws.com/srfrog/dgraph
`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(Restore.Conf).Stop()
if err := runLsbackupCmd(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
},
}

flag := LsBackup.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
"Sets the source location URI (required).")
_ = LsBackup.Cmd.MarkFlagRequired("location")
}

func runRestoreCmd() error {
var (
start time.Time
zc pb.ZeroClient
Expand Down Expand Up @@ -183,3 +238,22 @@ func runRestore(pdir, location string) (uint64, error) {
return db.Load(r)
})
}

func runLsbackupCmd() error {
fmt.Println("Listing backups from:", opt.location)
manifests, err := ListManifests(opt.location)
if err != nil {
return x.Errorf("Error while listing manifests: %v", err.Error())
}

fmt.Printf("Name\tVersion\tReadTs\tGroups\tValid\n")
for _, manifest := range manifests {
fmt.Printf("%v\t%v\t%v\t%v\n",
manifest.FileName,
manifest.Manifest.Version,
manifest.Manifest.ReadTs,
manifest.Manifest.Groups)
}

return nil
}
43 changes: 43 additions & 0 deletions ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,49 @@ func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return version, nil
}

// ListManifests loads the manifests in the locations and returns them.
func (h *s3Handler) ListManifests(uri *url.URL) ([]*ManifestStatus, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}

var manifests []string
var listedManifests []*ManifestStatus

doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifests = append(manifests, object.Key)
}
}
if len(manifests) == 0 {
return nil, x.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(manifests)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifests)
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file.
for _, manifest := range manifests {
var m Manifest
var ms ManifestStatus

if err := h.readManifest(mc, manifest, &m); err != nil {
return nil, x.Wrapf(err, "While reading %q", manifest)
}
ms.Manifest = &m
ms.FileName = manifest
listedManifests = append(listedManifests, &ms)
}
return listedManifests, nil
}

// upload will block until it's done or an error occurs.
func (h *s3Handler) upload(mc *minio.Client, object string) error {
start := time.Now()
Expand Down