Skip to content

Commit

Permalink
fix(backup): handle manifest version logic, update manifest version t…
Browse files Browse the repository at this point in the history
…o 2105 (#7825)

The backward compatibility of the backup's manifest was broken by #7810, although the tool was added (#7815) that enables smooth migration of manifest.
This PR makes backup backward compatible, by updating the manifest(in-memory) after reading.
  • Loading branch information
NamanJain8 authored and Harshil Goel committed Jan 31, 2023
1 parent 35ecaaa commit 7f8ec2f
Show file tree
Hide file tree
Showing 18 changed files with 297 additions and 43 deletions.
133 changes: 133 additions & 0 deletions ee/updatemanifest/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// +build !oss

/*
* Copyright 2021 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package updatemanifest

import (
"encoding/binary"
"log"
"net/url"
"os"
"strings"

"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

var (
logger = log.New(os.Stderr, "", 0)
// UpdateManifest is the sub-command invoked when running "dgraph update_manifest".
UpdateManifest x.SubCommand
)

var opt struct {
location string
key []byte
}

func init() {
UpdateManifest.Cmd = &cobra.Command{
Use: "update_manifest",
Short: "Run the Dgraph update tool to update the manifest from v21.03 to latest.",
Run: func(cmd *cobra.Command, args []string) {
if err := run(); err != nil {
logger.Fatalf("%v\n", err)
}
},
Annotations: map[string]string{"group": "tool"},
}
UpdateManifest.EnvPrefix = "DGRAPH_UPDATE_MANIFEST"
UpdateManifest.Cmd.SetHelpTemplate(x.NonRootTemplate)

flag := UpdateManifest.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
`Sets the location of the backup. Both file URIs and s3 are supported.
This command will take care of all the full + incremental backups present in the location.`)
ee.RegisterEncFlag(flag)
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.New("replacement char found")
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

func run() error {
keys, err := ee.GetKeys(UpdateManifest.Conf)
if err != nil {
return err
}
opt.key = keys.EncKey
uri, err := url.Parse(opt.location)
if err != nil {
return errors.Wrapf(err, "while parsing location")
}
handler, err := worker.NewUriHandler(uri, nil)
if err != nil {
return errors.Wrapf(err, "while creating uri handler")
}
masterManifest, err := worker.GetManifestNoUpgrade(handler, uri)
if err != nil {
return errors.Wrapf(err, "while getting manifest")
}

update := func(manifest *worker.Manifest) {
for gid, preds := range manifest.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
if err != nil {
logger.Printf("Unable to parse the pred: %v", pred)
continue
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
}
manifest.Groups[gid] = parsedPreds
}
for _, op := range manifest.DropOperations {
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
if err != nil {
logger.Printf("Unable to parse the drop operation %+v pred: %v",
op, []byte(op.DropValue))
continue
}
op.DropValue = x.NamespaceAttr(ns, attr)
}
}
}

// Update the master manifest with the changes for drop operations and group predicates.
for _, manifest := range masterManifest.Manifests {
if manifest.Version == 2103 {
update(manifest)
}
}

// Rewrite the master manifest.
return errors.Wrap(worker.CreateManifest(handler, uri, masterManifest), "rewrite failed")
}
56 changes: 50 additions & 6 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
alphaBackupDir = "/data/backups"
oldBackupDir1 = "/data/to_restore/1"
oldBackupDir2 = "/data/to_restore/2"
oldBackupDir3 = "/data/to_restore/3"
alphaContainers = []string{
"alpha1",
"alpha2",
Expand Down Expand Up @@ -85,14 +86,53 @@ func sendRestoreRequest(t *testing.T, location string) {
return
}

// This test restores the old backups.
// The backup dir contains:
// - Full backup with pred "p1", "p2", "p3". (insert k1, k2, k3).
// - Incremental backup after drop data was called and "p2", "p3", "p4" inserted. --> (insert k4,k5)
// - Incremental backup after "p3" was dropped.
func TestRestoreOfOldBackup(t *testing.T) {
test := func(dir string) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)

testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)

sendRestoreRequest(t, dir)
testutil.WaitForRestore(t, dg)

queryAndCheck := func(pred string, cnt int) {
q := fmt.Sprintf(`{ me(func: has(%s)) { count(uid) } }`, pred)
r := fmt.Sprintf("{\"me\":[{\"count\":%d}]}", cnt)
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, r, string(resp.Json))
}
queryAndCheck("p1", 0)
queryAndCheck("p2", 2)
queryAndCheck("p3", 0)
queryAndCheck("p4", 2)
}
t.Run("backup of 20.11", func(t *testing.T) { test(oldBackupDir2) })
t.Run("backup of 21.03", func(t *testing.T) { test(oldBackupDir3) })
}

// This test takes a backup and then restores an old backup in a cluster incrementally.
// Next, cleans up the cluster and tries restoring the backups above.
// Regression test for DGRAPH-2775
func TestBackupOfOldRestore(t *testing.T) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)
Expand All @@ -105,7 +145,8 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, oldBackupDir1)
testutil.WaitForRestore(t, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
q := `{ authors(func: has(Author.name)) { count(uid) } }`
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

Expand All @@ -117,7 +158,7 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, dg)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
resp, err = dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}
Expand Down Expand Up @@ -160,7 +201,8 @@ func TestRestoreOfOldBackup(t *testing.T) {
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

Expand Down Expand Up @@ -432,15 +474,17 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,

var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "backup", "response", "code").(string))
require.Equal(t, "Success",
testutil.JsonGet(data, "data", "backup", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "backup", "taskId").(string)
testutil.WaitForTask(t, taskId, true)

// Verify that the right amount of files and directories were created.
common.CopyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
return !isdir && strings.HasSuffix(path, ".backup") &&
strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions systest/backup/filesystem/data/to_restore/3/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Manifests":[{"type":"full","since":0,"read_ts":9,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":1,"version":2103,"path":"dgraph.20210517.095641.969","encrypted":false,"drop_operations":null,"compression":"snappy"},{"type":"incremental","since":0,"read_ts":21,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":2,"version":2103,"path":"dgraph.20210517.095716.130","encrypted":false,"drop_operations":[{"drop_op":1}],"compression":"snappy"},{"type":"incremental","since":0,"read_ts":26,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":3,"version":2103,"path":"dgraph.20210517.095726.320","encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3"}],"compression":"snappy"}]}
4 changes: 0 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {

predSet := make(predicateSet)
for _, pred := range preds {
if m.Version == 0 {
// For older versions, preds set will contain attribute without namespace.
pred = x.NamespaceAttr(x.GalaxyNamespace, pred)
}
predSet[pred] = struct{}{}
}
return predSet
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
Version: x.ManifestVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
Expand Down Expand Up @@ -555,13 +555,13 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
return err
}

manifest, err := GetManifest(handler, uri)
manifest, err := GetManifestNoUpgrade(handler, uri)
if err != nil {
return err
}
manifest.Manifests = append(manifest.Manifests, m)

if err := createManifest(handler, uri, manifest); err != nil {
if err := CreateManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
}
glog.Infof("Backup completed OK.")
Expand Down
Loading

0 comments on commit 7f8ec2f

Please sign in to comment.