From 369a5c11d5ce0d8e8ac4e29f085baf5f016c4993 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 28 Sep 2020 16:09:50 -0700 Subject: [PATCH] feat(Dgraph): add utility to export backup data. (#6550) This utility allows to take a single backup (full or incremental) and export the data and the schema inside it to RDF. Encrypted backups are supported. Fixes DGRAPH-2465 --- dgraph/cmd/root_ee.go | 1 + ee/backup/run.go | 49 +++++++++++++- worker/export.go | 18 +++-- worker/file_handler.go | 147 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 208 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/root_ee.go b/dgraph/cmd/root_ee.go index ba479540716..b1bdc727ed2 100644 --- a/dgraph/cmd/root_ee.go +++ b/dgraph/cmd/root_ee.go @@ -22,6 +22,7 @@ func init() { subcommands = append(subcommands, &backup.Restore, &backup.LsBackup, + &backup.ExportBackup, &acl.CmdAcl, ) } diff --git a/ee/backup/run.go b/ee/backup/run.go index 7e045bc4f28..2a3400a11e0 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -33,15 +33,23 @@ var Restore x.SubCommand // LsBackup is the sub-command used to list the backups in a folder. var LsBackup x.SubCommand +var ExportBackup x.SubCommand + var opt struct { - backupId, location, pdir, zero string - key x.SensitiveByteSlice - forceZero bool + backupId string + location string + pdir string + zero string + key x.SensitiveByteSlice + forceZero bool + destination string + format string } func init() { initRestore() initBackupLs() + initExportBackup() } func initRestore() { @@ -247,3 +255,38 @@ func runLsbackupCmd() error { return nil } + +func initExportBackup() { + ExportBackup.Cmd = &cobra.Command{ + Use: "export_backup", + Short: "Export data inside single full or incremental backup.", + Long: ``, + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + defer x.StartProfile(ExportBackup.Conf).Stop() + if err := runExportBackup(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + }, + } + + flag := ExportBackup.Cmd.Flags() + flag.StringVarP(&opt.location, "location", "l", "", + "Sets the location of the backup. Only file URIs are supported for now.") + flag.StringVarP(&opt.destination, "destination", "d", "", + "The folder to which export the backups.") + flag.StringVarP(&opt.format, "format", "f", "rdf", + "The format of the export output. Accepts a value of either rdf or json") + enc.RegisterFlags(flag) +} + +func runExportBackup() error { + var err error + if opt.key, err = enc.ReadKey(ExportBackup.Conf); err != nil { + return err + } + + exporter := worker.BackupExporter{} + return exporter.ExportBackup(opt.location, opt.destination, opt.format, opt.key) +} diff --git a/worker/export.go b/worker/export.go index 3c762b33214..b1112cdce85 100644 --- a/worker/export.go +++ b/worker/export.go @@ -544,8 +544,17 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) { } glog.Infof("Running export for group %d at timestamp %d.", in.GroupId, in.ReadTs) + return exportInternal(ctx, in, pstore, false) +} + +// exportInternal contains the core logic to export a Dgraph database. If skipZero is set to +// false, the parts of this method that require to talk to zero will be skipped. This is useful +// when exporting a p directory directly from disk without a running cluster. +func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, + skipZero bool) (ExportedFiles, error) { uts := time.Unix(in.UnixTs, 0) - exportStorage, err := newExportStorage(in, fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504"))) + exportStorage, err := newExportStorage(in, + fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504"))) if err != nil { return nil, err } @@ -562,12 +571,13 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) { return nil, err } - gqlSchemaWriter, err := exportStorage.openFile(fmt.Sprintf("g%02d%s", in.GroupId, ".gql_schema.gz")) + gqlSchemaWriter, err := exportStorage.openFile( + fmt.Sprintf("g%02d%s", in.GroupId, ".gql_schema.gz")) if err != nil { return nil, err } - stream := pstore.NewStreamAt(in.ReadTs) + stream := db.NewStreamAt(in.ReadTs) stream.LogPrefix = "Export" stream.ChooseKey = func(item *badger.Item) bool { // Skip exporting delete data including Schema and Types. @@ -594,7 +604,7 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) { return false } - if !pk.IsType() { + if !pk.IsType() && !skipZero { if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet { return false } diff --git a/worker/file_handler.go b/worker/file_handler.go index d008265bb0a..29a3c500263 100644 --- a/worker/file_handler.go +++ b/worker/file_handler.go @@ -13,15 +13,23 @@ package worker import ( + "compress/gzip" + "context" "encoding/json" "fmt" + "io" "io/ioutil" + "math" "net/url" "os" "path/filepath" "sort" "strings" + "time" + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -34,6 +42,12 @@ type fileHandler struct { fp *os.File } +// BackupExporter is an alias of fileHandler so that this struct can be used +// by the export_backup command. +type BackupExporter struct { + fileHandler +} + // readManifest reads a manifest file at path using the handler. // Returns nil on success, otherwise an error. func (h *fileHandler) readManifest(path string, m *Manifest) error { @@ -240,3 +254,136 @@ func pathExist(path string) bool { } return !os.IsNotExist(err) && !os.IsPermission(err) } + +func (h *fileHandler) ExportBackup(backupDir, exportDir, format string, + key x.SensitiveByteSlice) error { + if format != "json" && format != "rdf" { + return errors.Errorf("invalid format %s", format) + } + + // Create exportDir and temporary folder to store the restored backup. + var err error + exportDir, err = filepath.Abs(exportDir) + if err != nil { + return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir) + } + if err := os.MkdirAll(exportDir, 0755); err != nil { + return errors.Wrapf(err, "cannot create dir %s", exportDir) + } + tmpDir, err := ioutil.TempDir("", "export_backup") + if err != nil { + return errors.Wrapf(err, "cannot create temp dir") + } + + // Function to load the a single backup file. + loadFn := func(r io.Reader, groupId uint32, preds predicateSet) (uint64, error) { + dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", groupId)) + + r, err := enc.GetReader(key, r) + if err != nil { + return 0, err + } + + gzReader, err := gzip.NewReader(r) + if err != nil { + if len(key) != 0 { + err = errors.Wrap(err, + "Unable to read the backup. Ensure the encryption key is correct.") + } + return 0, errors.Wrapf(err, "cannot create gzip reader") + } + // The badger DB should be opened only after creating the backup + // file reader and verifying the encryption in the backup file. + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithSyncWrites(false). + WithTableLoadingMode(options.MemoryMap). + WithValueThreshold(1 << 10). + WithNumVersionsToKeep(math.MaxInt32). + WithEncryptionKey(key)) + + if err != nil { + return 0, errors.Wrapf(err, "cannot open DB at %s", dir) + } + defer db.Close() + _, err = loadFromBackup(db, gzReader, 0, preds) + if err != nil { + return 0, errors.Wrapf(err, "cannot load backup") + } + return 0, x.WriteGroupIdFile(dir, uint32(groupId)) + } + + // Read manifest from folder. + manifest := &Manifest{} + manifestPath := filepath.Join(backupDir, backupManifest) + if err := h.ReadManifest(manifestPath, manifest); err != nil { + return errors.Wrapf(err, "cannot read manifest at %s", manifestPath) + } + manifest.Path = manifestPath + if manifest.Since == 0 || len(manifest.Groups) == 0 { + return errors.Errorf("no data found in backup") + } + + // Restore backup to disk. + for gid := range manifest.Groups { + file := filepath.Join(backupDir, backupName(manifest.Since, gid)) + fp, err := os.Open(file) + if err != nil { + return errors.Wrapf(err, "cannot open backup file at %s", file) + } + defer fp.Close() + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifest.getPredsInGroup(gid) + + _, err = loadFn(fp, gid, predSet) + if err != nil { + return err + } + } + + // Export the data from the p directories produced by the last step. + ch := make(chan error, len(manifest.Groups)) + for gid := range manifest.Groups { + go func(group uint32) { + dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", group)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithSyncWrites(false). + WithTableLoadingMode(options.MemoryMap). + WithValueThreshold(1 << 10). + WithNumVersionsToKeep(math.MaxInt32). + WithEncryptionKey(key)) + + if err != nil { + ch <- errors.Wrapf(err, "cannot open DB at %s", dir) + return + } + defer db.Close() + + req := &pb.ExportRequest{ + GroupId: group, + ReadTs: manifest.Since, + UnixTs: time.Now().Unix(), + Format: format, + Destination: exportDir, + } + + _, err = exportInternal(context.Background(), req, db, true) + ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir) + }(gid) + } + + for i := 0; i < len(manifest.Groups); i++ { + err := <-ch + if err != nil { + return err + } + } + + // Clean up temporary directory. + if err := os.RemoveAll(tmpDir); err != nil { + return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir) + } + + return nil +}