Skip to content

Commit

Permalink
Revert "perf(Backup): Improve backup performance (#7601)"
Browse files Browse the repository at this point in the history
This reverts commit 2715d88.

We are reverting this change temporarily to bring in other(larger) backup
changes. We re-do changes of this commit later.
  • Loading branch information
ahsanbarkati committed Apr 23, 2021
1 parent 00ada83 commit 942c4a7
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 201 deletions.
9 changes: 7 additions & 2 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,17 +405,22 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
func DecodeToBuffer(pack *pb.UidPack, seek uint64) *z.Buffer {
buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc, "Codec.DecodeToBuffer")
x.Check(err)
buf.AutoMmapAfter(1 << 30)

var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
for uids := dec.Seek(0, SeekStart); len(uids) > 0; uids = dec.Next() {
for uids := dec.Seek(seek, SeekStart); len(uids) > 0; uids = dec.Next() {
for _, u := range uids {
n := binary.PutUvarint(tmp, u-last)
x.Check2(buf.Write(tmp[:n]))
last = u
}
}
return buf
}

func match32MSB(num1, num2 uint64) bool {
Expand Down
11 changes: 2 additions & 9 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -75,10 +74,7 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)

buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
buf := DecodeToBuffer(&pb.UidPack{}, 0)
require.Equal(t, 0, buf.LenNoPadding())
require.NoError(t, buf.Release())

Expand All @@ -94,10 +90,7 @@ func TestBufferUidPack(t *testing.T) {
actual := Decode(pack, 0)
require.Equal(t, expected, actual)

actualbuffer := z.NewBuffer(10<<10, "TestBufferUidPack")
defer actualbuffer.Release()

DecodeToBuffer(actualbuffer, pack)
actualbuffer := DecodeToBuffer(pack, 0)
enc := EncodeFromBuffer(actualbuffer.Bytes(), 256)
require.Equal(t, ExactLen(pack), ExactLen(enc))

Expand Down
4 changes: 1 addition & 3 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ func runLsbackupCmd() error {
type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
BackupId string `json:"backup_id"`
BackupNum uint64 `json:"backup_num"`
Encrypted bool `json:"encrypted"`
Expand All @@ -290,8 +289,7 @@ func runLsbackupCmd() error {

be := backupEntry{
Path: manifest.Path,
Since: manifest.SinceTsDeprecated,
ReadTs: manifest.ReadTs,
Since: manifest.Since,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Encrypted: manifest.Encrypted,
Expand Down
4 changes: 1 addition & 3 deletions graphql/admin/list_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type group struct {
type manifest struct {
Type string `json:"type,omitempty"`
Since uint64 `json:"since,omitempty"`
ReadTs uint64 `json:"read_ts,omitempty"`
Groups []*group `json:"groups,omitempty"`
BackupId string `json:"backupId,omitempty"`
BackupNum uint64 `json:"backupNum,omitempty"`
Expand Down Expand Up @@ -108,8 +107,7 @@ func convertManifests(manifests []*worker.Manifest) []*manifest {
for i, m := range manifests {
res[i] = &manifest{
Type: m.Type,
Since: m.SinceTsDeprecated,
ReadTs: m.ReadTs,
Since: m.Since,
BackupId: m.BackupId,
BackupNum: m.BackupNum,
Path: m.Path,
Expand Down
17 changes: 10 additions & 7 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) {

// ToBackupPostingList uses rollup to generate a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error) {
func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator) (*bpb.KV, error) {
bl.Reset()
l.RLock()
defer l.RUnlock()
Expand All @@ -868,12 +868,15 @@ func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator,
defer out.free()

ol := out.plist

// Encode uids to []byte instead of []uint64. This helps improve memory usage.
buf.Reset()
codec.DecodeToBuffer(buf, ol.Pack)
bl.UidBytes = buf.Bytes()

// Encode uids to []byte instead of []uint64 if we have more than 1000
// uids. We do this to improve the memory usage.
if codec.ApproxLen(ol.Pack) > 1024 {
buf := codec.DecodeToBuffer(ol.Pack, 0)
defer buf.Release()
bl.UidBytes = buf.Bytes()
} else {
bl.Uids = codec.Decode(ol.Pack, 0)
}
bl.Postings = ol.Postings
bl.CommitTs = ol.CommitTs
bl.Splits = ol.Splits
Expand Down
5 changes: 1 addition & 4 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/dgraph-io/badger/v3"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/dgraph-io/ristretto/z"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1341,9 +1340,7 @@ func TestSingleListRollup(t *testing.T) {
}

var bl pb.BackupPostingList
buf := z.NewBuffer(10<<10, "TestSingleListRollup")
defer buf.Release()
kv, err := ol.ToBackupPostingList(&bl, nil, buf)
kv, err := ol.ToBackupPostingList(&bl, nil)
require.NoError(t, err)
require.Equal(t, 1, len(kv.UserMeta))
require.Equal(t, BitCompletePosting, kv.UserMeta[0])
Expand Down
24 changes: 6 additions & 18 deletions worker/backup_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (
type predicateSet map[string]struct{}

// Manifest records backup details, these are values used during restore.
// ReadTs will be used to create the next incremental backup.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
//Type is the type of backup, either full or incremental.
Type string `json:"type"`
// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
SinceTsDeprecated uint64 `json:"since"`
// ReadTs is the timestamp at which this backup was taken. This would be
// the since timestamp for the next incremental backup.
ReadTs uint64 `json:"read_ts"`
// Since is the timestamp at which this backup was taken. It's called Since
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
Expand All @@ -64,18 +64,6 @@ type Manifest struct {
// DropOperations lists the various DROP operations that took place since the last backup.
// These are used during restore to redo those operations before applying the backup.
DropOperations []*pb.DropOperation `json:"drop_operations"`
// Compression keeps track of the compression that was used for the data.
Compression string `json:"compression"`
}

// ValidReadTs function returns the valid read timestamp. The backup can have
// the readTs=0 if the backup was done on an older version of dgraph. The
// SinceTsDecprecated is kept for backward compatibility.
func (m *Manifest) ValidReadTs() uint64 {
if m.ReadTs == 0 {
return m.SinceTsDeprecated
}
return m.ReadTs
}

type MasterManifest struct {
Expand Down
58 changes: 22 additions & 36 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupR
return nil, errors.Wrapf(err, "cannot start backup operation")
}
defer closer.Done()

bp := NewBackupProcessor(pstore, req)
defer bp.Close()

return bp.WriteBackup(closer.Ctx())
return bp.WriteBackup(ctx)
}

// BackupGroup backs up the group specified in the backup request.
Expand Down Expand Up @@ -162,12 +161,8 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
return err
}

// Use the readTs as the sinceTs for the next backup. If not found, use the
// SinceTsDeprecated value from the latest manifest.
req.SinceTs = latestManifest.ValidReadTs()

req.SinceTs = latestManifest.Since
if forceFull {
// To force a full backup we'll set the sinceTs to zero.
req.SinceTs = 0
} else {
if x.WorkerConfig.EncryptionKey != nil {
Expand Down Expand Up @@ -206,43 +201,35 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
}

glog.Infof(
"Created backup request: read_ts:%d since_ts:%d unix_ts:%q destination:%q. Groups=%v\n",
"Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\". Groups=%v\n",
req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var dropOperations []*pb.DropOperation
{ // This is the code which sends out Backup requests and waits for them to finish.
resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}
resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
}
var dropOperations []*pb.DropOperation
for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
}
}

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
}
m := Manifest{Since: req.ReadTs, Groups: predMap, Version: x.DgraphVersion,
DropOperations: dropOperations, Path: dir}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
Expand All @@ -255,7 +242,6 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
m.Encrypted = (x.WorkerConfig.EncryptionKey != nil)

bp := NewBackupProcessor(nil, req)
defer bp.Close()
err = bp.CompleteBackup(ctx, &m)

if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ type UriHandler interface {
// These function calls are used by both Create and Load.
io.WriteCloser

// BytesWritten returns the number of bytes written.
BytesWritten() int

// GetManifest returns the master manifest, containing information about all the
// backups. If the backup directory is using old formats (version < 21.03) of manifests,
// then it will return a consolidated master manifest.
Expand Down
Loading

0 comments on commit 942c4a7

Please sign in to comment.