From 942c4a732849ac983c05a769b2190420ca2ece91 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Thu, 22 Apr 2021 17:46:52 +0530 Subject: [PATCH] Revert "perf(Backup): Improve backup performance (#7601)" This reverts commit 2715d88571950835ac3743a0656777fb5e7f1014. We are reverting this change temporarily to bring in other(larger) backup changes. We re-do changes of this commit later. --- codec/codec.go | 9 +++-- codec/codec_test.go | 11 ++---- ee/backup/run.go | 4 +-- graphql/admin/list_backups.go | 4 +-- posting/list.go | 17 ++++++---- posting/list_test.go | 5 +-- worker/backup_common.go | 24 ++++--------- worker/backup_ee.go | 58 ++++++++++++-------------------- worker/backup_handler.go | 3 -- worker/backup_processor.go | 63 ++++++++++++++--------------------- worker/draft.go | 4 +-- worker/file_handler.go | 26 ++++----------- worker/online_restore_ee.go | 11 ++++-- worker/restore.go | 48 ++++++++++---------------- worker/s3_handler.go | 31 +++++------------ 15 files changed, 117 insertions(+), 201 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index edaa3580b23..fb509885407 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -405,17 +405,22 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 { // DecodeToBuffer is the same as Decode but it returns a z.Buffer which is // calloc'ed and can be SHOULD be freed up by calling buffer.Release(). -func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) { +func DecodeToBuffer(pack *pb.UidPack, seek uint64) *z.Buffer { + buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc, "Codec.DecodeToBuffer") + x.Check(err) + buf.AutoMmapAfter(1 << 30) + var last uint64 tmp := make([]byte, 16) dec := Decoder{Pack: pack} - for uids := dec.Seek(0, SeekStart); len(uids) > 0; uids = dec.Next() { + for uids := dec.Seek(seek, SeekStart); len(uids) > 0; uids = dec.Next() { for _, u := range uids { n := binary.PutUvarint(tmp, u-last) x.Check2(buf.Write(tmp[:n])) last = u } } + return buf } func match32MSB(num1, num2 uint64) bool { diff --git a/codec/codec_test.go b/codec/codec_test.go index 2a272ad96c5..3f776df1170 100644 --- a/codec/codec_test.go +++ b/codec/codec_test.go @@ -28,7 +28,6 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -75,10 +74,7 @@ func TestBufferUidPack(t *testing.T) { // Some edge case tests. pack := Encode([]uint64{}, 128) FreePack(pack) - - buf := z.NewBuffer(10<<10, "TestBufferUidPack") - defer buf.Release() - DecodeToBuffer(buf, &pb.UidPack{}) + buf := DecodeToBuffer(&pb.UidPack{}, 0) require.Equal(t, 0, buf.LenNoPadding()) require.NoError(t, buf.Release()) @@ -94,10 +90,7 @@ func TestBufferUidPack(t *testing.T) { actual := Decode(pack, 0) require.Equal(t, expected, actual) - actualbuffer := z.NewBuffer(10<<10, "TestBufferUidPack") - defer actualbuffer.Release() - - DecodeToBuffer(actualbuffer, pack) + actualbuffer := DecodeToBuffer(pack, 0) enc := EncodeFromBuffer(actualbuffer.Bytes(), 256) require.Equal(t, ExactLen(pack), ExactLen(enc)) diff --git a/ee/backup/run.go b/ee/backup/run.go index eb254f644e0..8b3d041a01a 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -274,7 +274,6 @@ func runLsbackupCmd() error { type backupEntry struct { Path string `json:"path"` Since uint64 `json:"since"` - ReadTs uint64 `json:"read_ts"` BackupId string `json:"backup_id"` BackupNum uint64 `json:"backup_num"` Encrypted bool `json:"encrypted"` @@ -290,8 +289,7 @@ func runLsbackupCmd() error { be := backupEntry{ Path: manifest.Path, - Since: manifest.SinceTsDeprecated, - ReadTs: manifest.ReadTs, + Since: manifest.Since, BackupId: manifest.BackupId, BackupNum: manifest.BackupNum, Encrypted: manifest.Encrypted, diff --git a/graphql/admin/list_backups.go b/graphql/admin/list_backups.go index b5891ad127a..7cef8d0d3ee 100644 --- a/graphql/admin/list_backups.go +++ b/graphql/admin/list_backups.go @@ -44,7 +44,6 @@ type group struct { type manifest struct { Type string `json:"type,omitempty"` Since uint64 `json:"since,omitempty"` - ReadTs uint64 `json:"read_ts,omitempty"` Groups []*group `json:"groups,omitempty"` BackupId string `json:"backupId,omitempty"` BackupNum uint64 `json:"backupNum,omitempty"` @@ -108,8 +107,7 @@ func convertManifests(manifests []*worker.Manifest) []*manifest { for i, m := range manifests { res[i] = &manifest{ Type: m.Type, - Since: m.SinceTsDeprecated, - ReadTs: m.ReadTs, + Since: m.Since, BackupId: m.BackupId, BackupNum: m.BackupNum, Path: m.Path, diff --git a/posting/list.go b/posting/list.go index d7ebfdc3d77..2d11ad80d42 100644 --- a/posting/list.go +++ b/posting/list.go @@ -853,7 +853,7 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) { // ToBackupPostingList uses rollup to generate a single list with no splits. // It's used during backup so that each backed up posting list is stored in a single key. -func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error) { +func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator) (*bpb.KV, error) { bl.Reset() l.RLock() defer l.RUnlock() @@ -868,12 +868,15 @@ func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator, defer out.free() ol := out.plist - - // Encode uids to []byte instead of []uint64. This helps improve memory usage. - buf.Reset() - codec.DecodeToBuffer(buf, ol.Pack) - bl.UidBytes = buf.Bytes() - + // Encode uids to []byte instead of []uint64 if we have more than 1000 + // uids. We do this to improve the memory usage. + if codec.ApproxLen(ol.Pack) > 1024 { + buf := codec.DecodeToBuffer(ol.Pack, 0) + defer buf.Release() + bl.UidBytes = buf.Bytes() + } else { + bl.Uids = codec.Decode(ol.Pack, 0) + } bl.Postings = ol.Postings bl.CommitTs = ol.CommitTs bl.Splits = ol.Splits diff --git a/posting/list_test.go b/posting/list_test.go index 6e4d8f8f096..e11e26b4526 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -29,7 +29,6 @@ import ( "github.com/dgraph-io/badger/v3" bpb "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/dgo/v210/protos/api" - "github.com/dgraph-io/ristretto/z" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -1341,9 +1340,7 @@ func TestSingleListRollup(t *testing.T) { } var bl pb.BackupPostingList - buf := z.NewBuffer(10<<10, "TestSingleListRollup") - defer buf.Release() - kv, err := ol.ToBackupPostingList(&bl, nil, buf) + kv, err := ol.ToBackupPostingList(&bl, nil) require.NoError(t, err) require.Equal(t, 1, len(kv.UserMeta)) require.Equal(t, BitCompletePosting, kv.UserMeta[0]) diff --git a/worker/backup_common.go b/worker/backup_common.go index e24d0b5ac11..666d2162823 100644 --- a/worker/backup_common.go +++ b/worker/backup_common.go @@ -32,17 +32,17 @@ import ( type predicateSet map[string]struct{} // Manifest records backup details, these are values used during restore. -// ReadTs will be used to create the next incremental backup. +// Since is the timestamp from which the next incremental backup should start (it's set +// to the readTs of the current backup). // Groups are the IDs of the groups involved. type Manifest struct { sync.Mutex //Type is the type of backup, either full or incremental. Type string `json:"type"` - // SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs. - SinceTsDeprecated uint64 `json:"since"` - // ReadTs is the timestamp at which this backup was taken. This would be - // the since timestamp for the next incremental backup. - ReadTs uint64 `json:"read_ts"` + // Since is the timestamp at which this backup was taken. It's called Since + // because it will become the timestamp from which to backup in the next + // incremental backup. + Since uint64 `json:"since"` // Groups is the map of valid groups to predicates at the time the backup was created. Groups map[uint32][]string `json:"groups"` // BackupId is a unique ID assigned to all the backups in the same series @@ -64,18 +64,6 @@ type Manifest struct { // DropOperations lists the various DROP operations that took place since the last backup. // These are used during restore to redo those operations before applying the backup. DropOperations []*pb.DropOperation `json:"drop_operations"` - // Compression keeps track of the compression that was used for the data. - Compression string `json:"compression"` -} - -// ValidReadTs function returns the valid read timestamp. The backup can have -// the readTs=0 if the backup was done on an older version of dgraph. The -// SinceTsDecprecated is kept for backward compatibility. -func (m *Manifest) ValidReadTs() uint64 { - if m.ReadTs == 0 { - return m.SinceTsDeprecated - } - return m.ReadTs } type MasterManifest struct { diff --git a/worker/backup_ee.go b/worker/backup_ee.go index a64c9e0c611..205f646a09b 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -57,11 +57,10 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupR return nil, errors.Wrapf(err, "cannot start backup operation") } defer closer.Done() - bp := NewBackupProcessor(pstore, req) defer bp.Close() - return bp.WriteBackup(closer.Ctx()) + return bp.WriteBackup(ctx) } // BackupGroup backs up the group specified in the backup request. @@ -162,12 +161,8 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error return err } - // Use the readTs as the sinceTs for the next backup. If not found, use the - // SinceTsDeprecated value from the latest manifest. - req.SinceTs = latestManifest.ValidReadTs() - + req.SinceTs = latestManifest.Since if forceFull { - // To force a full backup we'll set the sinceTs to zero. req.SinceTs = 0 } else { if x.WorkerConfig.EncryptionKey != nil { @@ -206,43 +201,35 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error } glog.Infof( - "Created backup request: read_ts:%d since_ts:%d unix_ts:%q destination:%q. Groups=%v\n", + "Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\". Groups=%v\n", req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) ctx, cancel := context.WithCancel(ctx) defer cancel() - var dropOperations []*pb.DropOperation - { // This is the code which sends out Backup requests and waits for them to finish. - resCh := make(chan BackupRes, len(state.Groups)) - for _, gid := range groups { - br := proto.Clone(req).(*pb.BackupRequest) - br.GroupId = gid - br.Predicates = predMap[gid] - go func(req *pb.BackupRequest) { - res, err := BackupGroup(ctx, req) - resCh <- BackupRes{res: res, err: err} - }(br) - } + resCh := make(chan BackupRes, len(state.Groups)) + for _, gid := range groups { + br := proto.Clone(req).(*pb.BackupRequest) + br.GroupId = gid + br.Predicates = predMap[gid] + go func(req *pb.BackupRequest) { + res, err := BackupGroup(ctx, req) + resCh <- BackupRes{res: res, err: err} + }(br) + } - for range groups { - if backupRes := <-resCh; backupRes.err != nil { - glog.Errorf("Error received during backup: %v", backupRes.err) - return backupRes.err - } else { - dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...) - } + var dropOperations []*pb.DropOperation + for range groups { + if backupRes := <-resCh; backupRes.err != nil { + glog.Errorf("Error received during backup: %v", backupRes.err) + return backupRes.err + } else { + dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...) } } dir := fmt.Sprintf(backupPathFmt, req.UnixTs) - m := Manifest{ - ReadTs: req.ReadTs, - Groups: predMap, - Version: x.DgraphVersion, - DropOperations: dropOperations, - Path: dir, - Compression: "snappy", - } + m := Manifest{Since: req.ReadTs, Groups: predMap, Version: x.DgraphVersion, + DropOperations: dropOperations, Path: dir} if req.SinceTs == 0 { m.Type = "full" m.BackupId = x.GetRandomName(1) @@ -255,7 +242,6 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error m.Encrypted = (x.WorkerConfig.EncryptionKey != nil) bp := NewBackupProcessor(nil, req) - defer bp.Close() err = bp.CompleteBackup(ctx, &m) if err != nil { diff --git a/worker/backup_handler.go b/worker/backup_handler.go index b1527b98035..03dd4b192e7 100644 --- a/worker/backup_handler.go +++ b/worker/backup_handler.go @@ -63,9 +63,6 @@ type UriHandler interface { // These function calls are used by both Create and Load. io.WriteCloser - // BytesWritten returns the number of bytes written. - BytesWritten() int - // GetManifest returns the master manifest, containing information about all the // backups. If the backup directory is using old formats (version < 21.03) of manifests, // then it will return a consolidated master manifest. diff --git a/worker/backup_processor.go b/worker/backup_processor.go index 4b05f414959..c3283e8244e 100644 --- a/worker/backup_processor.go +++ b/worker/backup_processor.go @@ -13,6 +13,7 @@ package worker import ( + "compress/gzip" "context" "encoding/binary" "encoding/hex" @@ -26,9 +27,7 @@ import ( bpb "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/ristretto/z" - "github.com/dustin/go-humanize" "github.com/golang/glog" - "github.com/golang/snappy" "github.com/pkg/errors" "github.com/dgraph-io/dgraph/ee/enc" @@ -57,7 +56,6 @@ type threadLocal struct { bpl pb.BackupPostingList alloc *z.Allocator itr *badger.Iterator - buf *z.Buffer } func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { @@ -70,12 +68,8 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { bp.txn = db.NewTransactionAt(req.ReadTs, false) } for i := range bp.threads { - buf, err := z.NewBufferWith(32<<20, 32<<30, z.UseCalloc, "Worker.BackupProcessor") - x.Check(err) - buf.AutoMmapAfter(1 << 30) bp.threads[i] = &threadLocal{ Request: bp.Request, - buf: buf, } if bp.txn != nil { iopt := badger.DefaultIteratorOptions @@ -86,18 +80,6 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { return bp } -func (pr *BackupProcessor) Close() { - for _, th := range pr.threads { - if pr.txn != nil { - th.itr.Close() - } - th.buf.Release() - } - if pr.txn != nil { - pr.txn.Discard() - } -} - // LoadResult holds the output of a Load operation. type LoadResult struct { // Version is the timestamp at which the database is after loading a backup. @@ -111,6 +93,16 @@ type LoadResult struct { Err error } +func (pr *BackupProcessor) Close() { + if pr.txn == nil { + return + } + for _, th := range pr.threads { + th.itr.Close() + } + pr.txn.Discard() +} + // WriteBackup uses the request values to create a stream writer then hand off the data // retrieval to stream.Orchestrate. The writer will create all the fd's needed to // collect the data and later move to the target. @@ -145,18 +137,14 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, var maxVersion uint64 - iwriter, err := enc.GetWriter(x.WorkerConfig.EncryptionKey, handler) + newhandler, err := enc.GetWriter(x.WorkerConfig.EncryptionKey, handler) if err != nil { return &response, errors.Wrap(err, "failed to get encWriter") } - // Snappy is much faster than gzip compression. In fact, in my experiments, gzip compression - // caused the output speed to be ~30 MBps. Snappy can write at ~90 MBps, and overall the speed - // is similar to writing uncompressed data on disk. - // These are the times I saw: - // Without compression: 7m2s 33GB output. - // With snappy: 7m11s 9.5GB output. - // With snappy + S3: 7m54s 9.5GB output. - cWriter := snappy.NewBufferedWriter(iwriter) + gzWriter, err := gzip.NewWriterLevel(newhandler, gzip.BestSpeed) + if err != nil { + return &response, errors.Wrap(err, "failed to create new gzip writer") + } stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" @@ -220,10 +208,10 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, maxVersion = kv.Version } } - return writeKVList(list, cWriter) + return writeKVList(list, gzWriter) } - if err := stream.Orchestrate(ctx); err != nil { + if err := stream.Orchestrate(context.Background()); err != nil { glog.Errorf("While taking backup: %v", err) return &response, err } @@ -279,7 +267,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, kv.ExpiresAt = item.ExpiresAt() list.Kv = append(list.Kv, kv) } - return writeKVList(list, cWriter) + return writeKVList(list, gzWriter) } for _, prefix := range []byte{x.ByteSchema, x.ByteType} { @@ -295,17 +283,16 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, } glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs) - if err = cWriter.Close(); err != nil { + if err = gzWriter.Close(); err != nil { glog.Errorf("While closing gzipped writer: %v", err) return &response, err } + if err = handler.Close(); err != nil { glog.Errorf("While closing handler: %v", err) return &response, err } - glog.Infof("Backup complete: group %d at %d. Bytes Written: %s\n", - pr.Request.GroupId, pr.Request.ReadTs, - humanize.IBytes(uint64(handler.BytesWritten()))) + glog.Infof("Backup complete: group %d at %d", pr.Request.GroupId, pr.Request.ReadTs) return &response, nil } @@ -345,8 +332,8 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro // GoString implements the GoStringer interface for Manifest. func (m *Manifest) GoString() string { - return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`, - m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted) + return fmt.Sprintf(`Manifest{Since: %d, Groups: %v, Encrypted: %v}`, + m.Since, m.Groups, m.Encrypted) } func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) ( @@ -372,7 +359,7 @@ func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) ( } // Don't allocate kv on tl.alloc, because we don't need it by the end of this func. - kv, err := l.ToBackupPostingList(&tl.bpl, tl.alloc, tl.buf) + kv, err := l.ToBackupPostingList(&tl.bpl, tl.alloc) if err != nil { return nil, nil, errors.Wrapf(err, "while rolling up list") } diff --git a/worker/draft.go b/worker/draft.go index 68021a7f735..859c9c5cbfe 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -199,11 +199,9 @@ func (n *node) stopAllTasks() { defer n.closer.Done() // CLOSER:1 <-n.closer.HasBeenClosed() - glog.Infof("Stopping all ongoing registered tasks...") n.opsLock.Lock() defer n.opsLock.Unlock() - for op, closer := range n.ops { - glog.Infof("Stopping op: %s...\n", op) + for _, closer := range n.ops { closer.SignalAndWait() } glog.Infof("Stopped all ongoing registered tasks.") diff --git a/worker/file_handler.go b/worker/file_handler.go index 7f4324eb4e6..41909fcaa2a 100644 --- a/worker/file_handler.go +++ b/worker/file_handler.go @@ -31,8 +31,7 @@ import ( // fileHandler is used for 'file:' URI scheme. type fileHandler struct { - fp *os.File - numWritten int + fp *os.File } // readManifest reads a manifest file at path using the handler. @@ -181,13 +180,13 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn l var since uint64 var maxUid, maxNsId uint64 for i, manifest := range manifests { - if manifest.ValidReadTs() == 0 || len(manifest.Groups) == 0 { + if manifest.Since == 0 || len(manifest.Groups) == 0 { continue } path := filepath.Join(uri.Path, manifests[i].Path) for gid := range manifest.Groups { - file := filepath.Join(path, backupName(manifest.ValidReadTs(), gid)) + file := filepath.Join(path, backupName(manifest.Since, gid)) fp, err := os.Open(file) if err != nil { return LoadResult{Err: errors.Wrapf(err, "Failed to open %q", file)} @@ -199,20 +198,15 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn l predSet := manifests[len(manifests)-1].getPredsInGroup(gid) groupMaxUid, groupMaxNsId, err := fn(gid, - &loadBackupInput{ - r: fp, - preds: predSet, - dropOperations: manifest.DropOperations, - isOld: manifest.Version == 0, - compression: manifest.Compression, - }) + &loadBackupInput{r: fp, preds: predSet, dropOperations: manifest.DropOperations, + isOld: manifest.Version == 0}) if err != nil { return LoadResult{Err: err} } maxUid = x.Max(maxUid, groupMaxUid) maxNsId = x.Max(maxNsId, groupMaxNsId) } - since = manifest.ValidReadTs() + since = manifest.Since } return LoadResult{Version: since, MaxLeaseUid: maxUid, MaxLeaseNsId: maxNsId} @@ -241,13 +235,7 @@ func (h *fileHandler) Close() error { } func (h *fileHandler) Write(b []byte) (int, error) { - n, err := h.fp.Write(b) - h.numWritten += n - return n, err -} - -func (h *fileHandler) BytesWritten() int { - return h.numWritten + return h.fp.Write(b) } // pathExist checks if a path (file or dir) is found at target. diff --git a/worker/online_restore_ee.go b/worker/online_restore_ee.go index abef20b1285..04c4e674bd2 100644 --- a/worker/online_restore_ee.go +++ b/worker/online_restore_ee.go @@ -13,6 +13,7 @@ package worker import ( + "compress/gzip" "context" "fmt" "net/url" @@ -343,13 +344,17 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error { return 0, 0, errors.Wrapf(err, "unable to get encryption config") } _, encKey := ee.GetKeys(cfg) - bReader, err := in.getReader(encKey) + in.r, err = enc.GetReader(encKey, in.r) if err != nil { - return 0, 0, errors.Wrap(err, "failed to getReader for restore") + return 0, 0, errors.Wrapf(err, "cannot get encrypted reader") + } + gzReader, err := gzip.NewReader(in.r) + if err != nil { + return 0, 0, errors.Wrapf(err, "couldn't create gzip reader") } maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{ - r: bReader, + r: gzReader, restoreTs: req.RestoreTs, preds: in.preds, dropOperations: in.dropOperations, diff --git a/worker/restore.go b/worker/restore.go index 0d5aa2c7b5b..9e8422cc956 100644 --- a/worker/restore.go +++ b/worker/restore.go @@ -28,7 +28,6 @@ import ( "github.com/dgraph-io/badger/v3/options" bpb "github.com/dgraph-io/badger/v3/pb" "github.com/golang/glog" - "github.com/golang/snappy" "github.com/pkg/errors" "github.com/dgraph-io/dgraph/codec" @@ -50,11 +49,22 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, // and we create a new p dir for each. return LoadBackup(location, backupId, 0, nil, func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) { - bReader, err := in.getReader(key) + + dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) + r, err := enc.GetReader(key, in.r) if err != nil { - return 0, 0, errors.Wrap(err, "failed to get reader for restore") + return 0, 0, err } - dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) + + gzReader, err := gzip.NewReader(r) + if err != nil { + if len(key) != 0 { + err = errors.Wrap(err, + "Unable to read the backup. Ensure the encryption key is correct.") + } + return 0, 0, err + } + if !pathExist(dir) { fmt.Println("Creating new db:", dir) } @@ -74,15 +84,14 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, } defer db.Close() maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{ - r: bReader, + r: gzReader, restoreTs: 0, preds: in.preds, dropOperations: in.dropOperations, isOld: in.isOld, - compression: in.compression, }) if err != nil { - return 0, 0, errors.Wrap(err, "loadFromBackup failed") + return 0, 0, err } return maxUid, maxNsId, x.WriteGroupIdFile(dir, uint32(groupId)) }) @@ -94,29 +103,6 @@ type loadBackupInput struct { preds predicateSet dropOperations []*pb.DropOperation isOld bool - compression string -} - -func (l *loadBackupInput) getReader(key x.SensitiveByteSlice) (io.Reader, error) { - r, err := enc.GetReader(key, l.r) - if err != nil { - return nil, err - } - switch l.compression { - case "": - gzReader, err := gzip.NewReader(r) - if err != nil && len(key) != 0 { - err = errors.Wrap(err, - "Unable to read the backup. Ensure the encryption key is correct.") - } - return gzReader, err - case "snappy": - // Snappy doesn't return an error. If the data is encrypted, we will - // get an error while reading it. - return snappy.NewReader(r), nil - default: - return nil, errors.Errorf("Invalid compression in backup %q", l.compression) - } } // loadFromBackup reads the backup, converts the keys and values to the required format, @@ -151,7 +137,7 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) if err == io.EOF { break } else if err != nil { - return 0, 0, errors.Wrap(err, "read failed") + return 0, 0, err } if cap(unmarshalBuf) < int(sz) { diff --git a/worker/s3_handler.go b/worker/s3_handler.go index 927319e82b8..de7d4797ae2 100644 --- a/worker/s3_handler.go +++ b/worker/s3_handler.go @@ -23,7 +23,6 @@ import ( "time" "github.com/dgraph-io/dgraph/x" - "github.com/dustin/go-humanize" "github.com/dgraph-io/dgraph/protos/pb" "github.com/golang/glog" @@ -65,7 +64,6 @@ type s3Handler struct { creds *x.MinioCredentials uri *url.URL mc *x.MinioClient - numWritten int } // setup creates a new session, checks valid bucket at uri.Path, and configures a minio client. @@ -220,13 +218,13 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa // otherwise this is a failure and the user must remedy. var maxUid, maxNsId uint64 for i, manifest := range manifests { - if manifest.ValidReadTs() == 0 || len(manifest.Groups) == 0 { + if manifest.Since == 0 || len(manifest.Groups) == 0 { continue } path := filepath.Join(h.objectPrefix, manifests[i].Path) for gid := range manifest.Groups { - object := filepath.Join(path, backupName(manifest.ValidReadTs(), gid)) + object := filepath.Join(path, backupName(manifest.Since, gid)) reader, err := h.mc.GetObject(h.bucketName, object, minio.GetObjectOptions{}) if err != nil { return LoadResult{Err: errors.Wrapf(err, "Failed to get %q", object)} @@ -247,20 +245,15 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa predSet := manifests[len(manifests)-1].getPredsInGroup(gid) groupMaxUid, groupMaxNsId, err := fn(gid, - &loadBackupInput{ - r: reader, - preds: predSet, - dropOperations: manifest.DropOperations, - isOld: manifest.Version == 0, - compression: manifest.Compression, - }) + &loadBackupInput{r: reader, preds: predSet, dropOperations: manifest.DropOperations, + isOld: manifest.Version == 0}) if err != nil { return LoadResult{Err: err} } maxUid = x.Max(maxUid, groupMaxUid) maxNsId = x.Max(maxNsId, groupMaxNsId) } - since = manifest.ValidReadTs() + since = manifest.Since } return LoadResult{Version: since, MaxLeaseUid: maxUid, MaxLeaseNsId: maxNsId} @@ -285,8 +278,8 @@ func (h *s3Handler) upload(mc *x.MinioClient, object string) error { // of upload. We're already tracking progress of the writes in stream.Lists, so no need to track // the progress of read. By definition, it must be the same. n, err := mc.PutObject(h.bucketName, object, h.preader, -1, minio.PutObjectOptions{}) - glog.V(2).Infof("Backup sent data of size %s. Time elapsed: %s", - humanize.IBytes(uint64(n)), time.Since(start).Round(time.Second)) + glog.V(2).Infof("Backup sent %d bytes. Time elapsed: %s", + n, time.Since(start).Round(time.Second)) if err != nil { // This should cause Write to fail as well. @@ -321,13 +314,7 @@ func (h *s3Handler) flush() error { } func (h *s3Handler) Write(b []byte) (int, error) { - n, err := h.pwriter.Write(b) - h.numWritten += n - return n, err -} - -func (h *s3Handler) BytesWritten() int { - return h.numWritten + return h.pwriter.Write(b) } func (h *s3Handler) objectExists(objectPath string) bool { @@ -337,7 +324,7 @@ func (h *s3Handler) objectExists(objectPath string) bool { if errResponse.Code == "NoSuchKey" { return false } else { - glog.Errorf("Failed to verify object %s existance: %s", objectPath, errResponse.Code) + glog.Errorf("Failed to verify object existance: %s", errResponse.Code) return false } }