Skip to content

Commit

Permalink
fix(enterprise): Set version correctly post marshalling during restore (
Browse files Browse the repository at this point in the history
#7018)

* Add test for old restore to a cluster

* Fix author query

* fix(restore): Use correct version after marshalling

The marshalPostingList function returns a KV without the version set and
this causes issues if you restore a backup on top of an existing data.
This PR fixes this issue by explicitly setting the version for the KV
returned by the MarshalPostingList function.

* Fix test for restoring old backup incremental

* Address review comments

* Add old backup dir

* Add wait for restore, address review comments

* Make test use tls, refactor out waitforrestore

Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io>
  • Loading branch information
rahulgurnani and Ibrahim Jarif authored Dec 2, 2020
1 parent aa12a73 commit 30ee966
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 74 deletions.
2 changes: 2 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ func (out *rollupOutput) marshalPostingListPart(alloc *z.Allocator,
return kv, nil
}

// MarshalPostingList returns a KV with the marshalled posting list. The caller
// SHOULD SET the Key and Version for the returned KV.
func MarshalPostingList(plist *pb.PostingList, alloc *z.Allocator) *bpb.KV {
kv := y.NewKV(alloc)
if isPlistEmpty(plist) {
Expand Down
98 changes: 89 additions & 9 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,88 @@ import (
)

var (
copyBackupDir = "./data/backups_copy"
restoreDir = "./data/restore"
testDirs = []string{restoreDir}

alphaBackupDir = "/data/backups"

copyBackupDir = "./data/backups_copy"
restoreDir = "./data/restore"
testDirs = []string{restoreDir}
alphaBackupDir = "/data/backups"
oldBackupDir = "/data/to_restore"
alphaContainers = []string{
"alpha1",
"alpha2",
"alpha3",
}
)

func sendRestoreRequest(t *testing.T, location string) int {
if location == "" {
location = "/data/backup"
}
params := testutil.GraphQLParams{
Query: `mutation restore($location: String!) {
restore(input: {location: $location}) {
code
message
restoreId
}
}`,
Variables: map[string]interface{}{
"location": location,
},
}
resp := testutil.MakeGQLRequestWithTLS(t, &params, testutil.GetAlphaClientConfig(t))
resp.RequireNoGraphQLErrors(t)

var restoreResp struct {
Restore struct {
Code string
Message string
RestoreId int
}
}

require.NoError(t, json.Unmarshal(resp.Data, &restoreResp))
require.Equal(t, restoreResp.Restore.Code, "Success")
require.Greater(t, restoreResp.Restore.RestoreId, 0)
return restoreResp.Restore.RestoreId
}

// This test takes a backup and then restores an old backup in a cluster incrementally.
// Next, cleans up the cluster and tries restoring the backups above.
// Regression test for DGRAPH-2775
func TestBackupOfOldRestore(t *testing.T) {
dirSetup(t)
copyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)

testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)

_ = runBackup(t, 3, 1)

restoreId := sendRestoreRequest(t, oldBackupDir)
testutil.WaitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

_ = runBackup(t, 6, 2)

// Clean the cluster and try restoring the backups created above.
testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)
restoreId = sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, restoreId, dg)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
Expand Down Expand Up @@ -112,8 +181,8 @@ func TestBackupFilesystem(t *testing.T) {
break
}
}
require.True(t, moveOk)

require.True(t, moveOk)
// Setup test directories.
dirSetup(t)

Expand Down Expand Up @@ -334,7 +403,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
copyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup")
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand All @@ -344,7 +413,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json")
return !isdir && strings.Contains(path, "manifest.json") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedDirs, len(manifests))

Expand Down Expand Up @@ -407,6 +476,7 @@ func dirCleanup(t *testing.T) {
if err := os.RemoveAll(restoreDir); err != nil {
t.Fatalf("Error removing directory: %s", err.Error())
}

if err := os.RemoveAll(copyBackupDir); err != nil {
t.Fatalf("Error removing directory: %s", err.Error())
}
Expand All @@ -417,6 +487,16 @@ func dirCleanup(t *testing.T) {
}
}

func copyOldBackupDir(t *testing.T) {
for i := 1; i < 4; i++ {
destPath := fmt.Sprintf("%s_alpha%d_1:/data", testutil.DockerPrefix, i)
srchPath := "." + oldBackupDir
if err := testutil.DockerCp(srchPath, destPath); err != nil {
t.Fatalf("Error copying files from docker container: %s", err.Error())
}
}
}

func copyToLocalFs(t *testing.T) {
// The original backup files are not accessible because docker creates all files in
// the shared volume as the root user. This restriction is circumvented by using
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"full","since":68,"groups":{"1":["dgraph.graphql.p_sha256hash","Author.pwd","dgraph.graphql.xid","dgraph.drop.op","dgraph.graphql.schema_history","dgraph.graphql.schema_created_at","dgraph.type","dgraph.graphql.schema","dgraph.cors","dgraph.graphql.p_query","Author.name","Person.name"],"2":[],"3":[]},"backup_id":"loving_jones5","backup_num":1,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Binary file not shown.
67 changes: 5 additions & 62 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,63 +74,6 @@ func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int)
return restoreResp.Restore.RestoreId
}

func waitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
params := testutil.GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)

restoreDone := false
client := testutil.GetHttpsClient(t)
for i := 0; i < 15; i++ {
resp, err := client.Post(testutil.AdminUrlHttps(), "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
restoreDone = true
break
}
time.Sleep(4 * time.Second)
}
require.True(t, restoreDone)

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
// Waiting for three consecutive successful queries is done to prevent a situation in
// which the query succeeds at the first attempt because the follower is behind and
// has not started to apply the restore proposal.
numSuccess := 0
for {
// This is a dummy query that returns no results.
_, err = dg.NewTxn().Query(context.Background(), `{
q(func: has(invalid_pred)) {
invalid_pred
}}`)

if err == nil {
numSuccess += 1
} else {
require.Contains(t, err.Error(), "the server is in draining mode")
numSuccess = 0
}

if numSuccess == 3 {
// The server has been responsive three times in a row.
break
}
time.Sleep(1 * time.Second)
}
}

// disableDraining disables draining mode before each test for increased reliability.
func disableDraining(t *testing.T) {
drainRequest := `mutation draining {
Expand Down Expand Up @@ -237,7 +180,7 @@ func TestBasicRestore(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand All @@ -254,7 +197,7 @@ func TestRestoreBackupNum(t *testing.T) {
runQueries(t, dg, true)

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 1)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, true)
runMutations(t, dg)
}
Expand Down Expand Up @@ -329,13 +272,13 @@ func TestMoveTablets(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
restoreId = sendRestoreRequest(t, "", "blissful_hermann1", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{
q(func: has(name), orderasc: name) {
Expand Down Expand Up @@ -628,7 +571,7 @@ func backupRestoreAndVerify(t *testing.T, dg *dgo.Dgraph, backupDir, queryToVeri
expectedResponse string, schemaVerificationOpts testutil.SchemaOptions) {
schemaVerificationOpts.ExcludeAclSchema = true
backup(t, backupDir)
waitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg)
testutil.WaitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg)
testutil.VerifyQueryResponse(t, dg, queryToVerify, expectedResponse)
testutil.VerifySchema(t, dg, schemaVerificationOpts)
}
64 changes: 64 additions & 0 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package testutil

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"testing"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -56,6 +63,63 @@ func openDgraph(pdir string) (*badger.DB, error) {
return badger.OpenManaged(opt)
}

func WaitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
params := GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)

restoreDone := false
client := GetHttpsClient(t)
for i := 0; i < 15; i++ {
resp, err := client.Post(AdminUrlHttps(), "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
restoreDone = true
break
}
time.Sleep(4 * time.Second)
}
require.True(t, restoreDone)

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
// Waiting for three consecutive successful queries is done to prevent a situation in
// which the query succeeds at the first attempt because the follower is behind and
// has not started to apply the restore proposal.
numSuccess := 0
for {
// This is a dummy query that returns no results.
_, err = dg.NewTxn().Query(context.Background(), `{
q(func: has(invalid_pred)) {
invalid_pred
}}`)

if err == nil {
numSuccess += 1
} else {
require.Contains(t, err.Error(), "the server is in draining mode")
numSuccess = 0
}

if numSuccess == 3 {
// The server has been responsive three times in a row.
break
}
time.Sleep(1 * time.Second)
}
}

// GetPredicateValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) {
Expand Down
10 changes: 7 additions & 3 deletions worker/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat
// part without rolling the key first. This part is here for backwards
// compatibility. New backups are not affected because there was a change
// to roll up lists into a single one.
kv := posting.MarshalPostingList(pl, nil)
newKv := posting.MarshalPostingList(pl, nil)
codec.FreePack(pl.Pack)
kv.Key = restoreKey
if err := loader.Set(kv); err != nil {
newKv.Key = restoreKey
// Use the version of the KV before we marshalled the
// posting list. The MarshalPostingList function returns KV
// with a zero version.
newKv.Version = kv.Version
if err := loader.Set(newKv); err != nil {
return 0, err
}
} else {
Expand Down

0 comments on commit 30ee966

Please sign in to comment.