Skip to content

Commit

Permalink
[BREAKING] Opt(Restore): Optimize Restore's new map-reduce based desi…
Browse files Browse the repository at this point in the history
…gn (#7666)

This PR along with the previous restore PR is a BREAKING change. Marking this PR as breaking, because we forgot to mark the previous one.

- Make restore map run concurrently for faster execution.
- Add progress updates every second for both map and reduce phase.
- Refactor code to break out the map and reduce code into separate files.
- Make reduce cheap by avoiding marshal-unmarshal step.

With these changes, I see map phase is faster than reduce and both finish in about 2 mins each. Map runs at 200 MBps, while Reduce runs at 130 MBps, processing 20GB of uncompressed data in under 5 mins.

Changes:
* Work on optimizing restore
* Some file moves
* Moved map output to a temp directory.
* Add restoreTs in export-backup

Co-authored-by: Ahsan Barkati <ahsanbarkati@gmail.com>
  • Loading branch information
manishrjain and ahsanbarkati authored Mar 30, 2021
1 parent 98f6828 commit 1c7d449
Show file tree
Hide file tree
Showing 12 changed files with 1,529 additions and 1,331 deletions.
144 changes: 73 additions & 71 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package backup
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -161,6 +162,70 @@ func initExportBackup() {
enc.RegisterFlags(flag)
}

type bufWriter struct {
writers *worker.Writers
req *pb.ExportRequest
}

func exportSchema(writers *worker.Writers, val []byte, pk x.ParsedKey) error {
kv := &bpb.KV{}
var err error
if pk.IsSchema() {
kv, err = worker.SchemaExportKv(pk.Attr, val, true)
if err != nil {
return err
}
} else {
kv, err = worker.TypeExportKv(pk.Attr, val)
if err != nil {
return err
}
}
return worker.WriteExport(writers, kv, "rdf")
}

func (bw *bufWriter) Write(buf *z.Buffer) error {
kv := &bpb.KV{}
err := buf.SliceIterate(func(s []byte) error {
kv.Reset()
if err := kv.Unmarshal(s); err != nil {
return errors.Wrap(err, "processKvBuf failed to unmarshal kv")
}
pk, err := x.Parse(kv.Key)
if err != nil {
return errors.Wrap(err, "processKvBuf failed to parse key")
}
if pk.Attr == "_predicate_" {
return nil
}
if pk.IsSchema() || pk.IsType() {
return exportSchema(bw.writers, kv.Value, pk)
}
if pk.IsData() {
pl := &pb.PostingList{}
if err := pl.Unmarshal(kv.Value); err != nil {
return errors.Wrap(err, "ProcessKvBuf failed to Unmarshal pl")
}
l := posting.NewList(kv.Key, pl, kv.Version)
kvList, err := worker.ToExportKvList(pk, l, bw.req)
if err != nil {
return errors.Wrap(err, "processKvBuf failed to Export")
}
if len(kvList.Kv) == 0 {
return nil
}
exportKv := kvList.Kv[0]
return worker.WriteExport(bw.writers, exportKv, bw.req.Format)
}
return nil
})
if err != nil {
return err
}
buf.Release()
return nil
}

func runExportBackup() error {
_, opt.key = ee.GetKeys(ExportBackup.Conf)
if opt.format != "json" && opt.format != "rdf" {
Expand Down Expand Up @@ -188,66 +253,10 @@ func runExportBackup() error {
return errors.Wrapf(err, "runExportBackup")
}

exportSchema := func(writers *worker.Writers, val []byte, pk x.ParsedKey) error {
kv := &bpb.KV{}
var err error
if pk.IsSchema() {
kv, err = worker.SchemaExportKv(pk.Attr, val, true)
if err != nil {
return err
}
} else {
kv, err = worker.TypeExportKv(pk.Attr, val)
if err != nil {
return err
}
}
return worker.WriteExport(writers, kv, "rdf")
}

processKvBuf := func(ch chan *z.Buffer, req *pb.ExportRequest, writers *worker.Writers) error {
for buf := range ch {
kv := &bpb.KV{}
err := buf.SliceIterate(func(s []byte) error {
kv.Reset()
if err := kv.Unmarshal(s); err != nil {
return errors.Wrap(err, "processKvBuf failed to unmarshal kv")
}
pk, err := x.Parse(kv.Key)
if err != nil {
return errors.Wrap(err, "processKvBuf failed to parse key")
}
if pk.Attr == "_predicate_" {
return nil
}
if pk.IsSchema() || pk.IsType() {
return exportSchema(writers, kv.Value, pk)
}
if pk.IsData() {
pl := &pb.PostingList{}
if err := pl.Unmarshal(kv.Value); err != nil {
return errors.Wrap(err, "ProcessKvBuf failed to Unmarshal pl")
}
l := posting.NewList(kv.Key, pl, kv.Version)
kvList, err := worker.ToExportKvList(pk, l, req)
if err != nil {
return errors.Wrap(err, "processKvBuf failed to Export")
}
if len(kvList.Kv) == 0 {
return nil
}
exportKv := kvList.Kv[0]
return worker.WriteExport(writers, exportKv, req.Format)
}
return nil
})
if err != nil {
return err
}
buf.Release()
}
return nil
}
mapDir, err := ioutil.TempDir(x.WorkerConfig.TmpDir, "restore-export")
x.Check(err)
defer os.RemoveAll(mapDir)
glog.Infof("Created temporary map directory: %s\n", mapDir)

// TODO: Can probably make this procesing concurrent.
for gid, _ := range latestManifest.Groups {
Expand All @@ -256,8 +265,9 @@ func runExportBackup() error {
GroupId: gid,
Location: opt.location,
EncryptionKeyFile: ExportBackup.Conf.GetString("encryption_key_file"),
RestoreTs: 1,
}
if err := worker.MapBackup(req); err != nil {
if err := worker.RunMapper(req, mapDir); err != nil {
return errors.Wrap(err, "Failed to map the backups")
}
in := &pb.ExportRequest{
Expand All @@ -279,18 +289,10 @@ func runExportBackup() error {
return err
}

r := worker.NewBackupReducer(nil, 0)
errCh := make(chan error, 1)
go func() {
errCh <- processKvBuf(r.WriteCh(), in, writers)
}()

if err := r.Reduce(); err != nil {
w := &bufWriter{req: in, writers: writers}
if err := worker.RunReducer(w, mapDir); err != nil {
return errors.Wrap(err, "Failed to reduce the map")
}
if err := <-errCh; err != nil {
return errors.Wrap(err, "Failed to process reduced buffers")
}
if _, err := exportStorage.FinishWriting(writers); err != nil {
return errors.Wrap(err, "Failed to finish write")
}
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func RunFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui
// calling restore.
require.NoError(t, os.RemoveAll(restoreDir))

result := worker.RunRestore("./data/restore", backupLocation, lastDir,
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir,
"", options.Snappy, 0)
require.Error(t, result.Err)
require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1")
Expand Down
4 changes: 2 additions & 2 deletions systest/backup/encryption/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string

t.Logf("--- Restoring from: %q", localBackupDst)
testutil.KeyFile = "../../../ee/enc/test-fixtures/enc-key"
result := worker.RunRestore("./data/restore", localBackupDst, lastDir,
result := worker.RunOfflineRestore("./data/restore", localBackupDst, lastDir,
testutil.KeyFile, options.Snappy, 0)
require.NoError(t, result.Err)

Expand Down Expand Up @@ -354,7 +354,7 @@ func runFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui
require.NoError(t, os.RemoveAll(restoreDir))
keyFile := "../../../ee/enc/test-fixtures/enc-key"

result := worker.RunRestore("./data/restore", backupLocation, lastDir, keyFile,
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir, keyFile,
options.Snappy, 0)
require.Error(t, result.Err)
require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1")
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
require.NoError(t, os.RemoveAll(restoreDir))

t.Logf("--- Restoring from: %q", backupLocation)
result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
require.NoError(t, result.Err)

for i, pdir := range []string{"p1", "p2", "p3"} {
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/minio-large/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
require.NoError(t, os.MkdirAll(restoreDir, os.ModePerm))

t.Logf("--- Restoring from: %q", backupLocation)
result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
require.NoError(t, result.Err)

restored1, err := testutil.GetPredicateValues("./data/restore/p1", x.GalaxyAttr("name1"), commitTs)
Expand Down
4 changes: 2 additions & 2 deletions systest/backup/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string
require.NoError(t, os.RemoveAll(restoreDir))

t.Logf("--- Restoring from: %q", localBackupDst)
result := worker.RunRestore("./data/restore", localBackupDst, lastDir, "", options.Snappy, 0)
result := worker.RunOfflineRestore("./data/restore", localBackupDst, lastDir, "", options.Snappy, 0)
require.NoError(t, result.Err)

for i, pdir := range []string{"p1", "p2", "p3"} {
Expand All @@ -359,7 +359,7 @@ func runFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui
// calling restore.
require.NoError(t, os.RemoveAll(restoreDir))

result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
require.Error(t, result.Err)
require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1")
}
Expand Down
Loading

0 comments on commit 1c7d449

Please sign in to comment.