Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(enterprise): Set version correctly post marshalling during restor… #7045

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,8 @@ func (out *rollupOutput) marshalPostingListPart(alloc *z.Allocator,
return kv, nil
}

// MarshalPostingList returns a KV with the marshalled posting list. The caller
// SHOULD SET the Key and Version for the returned KV.
func MarshalPostingList(plist *pb.PostingList, alloc *z.Allocator) *bpb.KV {
kv := y.NewKV(alloc)
if isPlistEmpty(plist) {
Expand Down
98 changes: 89 additions & 9 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,88 @@ import (
)

var (
copyBackupDir = "./data/backups_copy"
restoreDir = "./data/restore"
testDirs = []string{restoreDir}

alphaBackupDir = "/data/backups"

copyBackupDir = "./data/backups_copy"
restoreDir = "./data/restore"
testDirs = []string{restoreDir}
alphaBackupDir = "/data/backups"
oldBackupDir = "/data/to_restore"
alphaContainers = []string{
"alpha1",
"alpha2",
"alpha3",
}
)

func sendRestoreRequest(t *testing.T, location string) int {
if location == "" {
location = "/data/backup"
}
params := testutil.GraphQLParams{
Query: `mutation restore($location: String!) {
restore(input: {location: $location}) {
code
message
restoreId
}
}`,
Variables: map[string]interface{}{
"location": location,
},
}
resp := testutil.MakeGQLRequestWithTLS(t, &params, testutil.GetAlphaClientConfig(t))
resp.RequireNoGraphQLErrors(t)

var restoreResp struct {
Restore struct {
Code string
Message string
RestoreId int
}
}

require.NoError(t, json.Unmarshal(resp.Data, &restoreResp))
require.Equal(t, restoreResp.Restore.Code, "Success")
require.Greater(t, restoreResp.Restore.RestoreId, 0)
return restoreResp.Restore.RestoreId
}

// This test takes a backup and then restores an old backup in a cluster incrementally.
// Next, cleans up the cluster and tries restoring the backups above.
// Regression test for DGRAPH-2775
func TestBackupOfOldRestore(t *testing.T) {
dirSetup(t)
copyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)

testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)

_ = runBackup(t, 3, 1)

restoreId := sendRestoreRequest(t, oldBackupDir)
testutil.WaitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

_ = runBackup(t, 6, 2)

// Clean the cluster and try restoring the backups created above.
testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)
restoreId = sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, restoreId, dg)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
Expand Down Expand Up @@ -112,8 +181,8 @@ func TestBackupFilesystem(t *testing.T) {
break
}
}
require.True(t, moveOk)

require.True(t, moveOk)
// Setup test directories.
dirSetup(t)

Expand Down Expand Up @@ -334,7 +403,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
copyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup")
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand All @@ -344,7 +413,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,
require.Equal(t, numExpectedDirs, len(dirs))

manifests := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.Contains(path, "manifest.json")
return !isdir && strings.Contains(path, "manifest.json") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedDirs, len(manifests))

Expand Down Expand Up @@ -407,6 +476,7 @@ func dirCleanup(t *testing.T) {
if err := os.RemoveAll(restoreDir); err != nil {
t.Fatalf("Error removing directory: %s", err.Error())
}

if err := os.RemoveAll(copyBackupDir); err != nil {
t.Fatalf("Error removing directory: %s", err.Error())
}
Expand All @@ -417,6 +487,16 @@ func dirCleanup(t *testing.T) {
}
}

func copyOldBackupDir(t *testing.T) {
for i := 1; i < 4; i++ {
destPath := fmt.Sprintf("%s_alpha%d_1:/data", testutil.DockerPrefix, i)
srchPath := "." + oldBackupDir
if err := testutil.DockerCp(srchPath, destPath); err != nil {
t.Fatalf("Error copying files from docker container: %s", err.Error())
}
}
}

func copyToLocalFs(t *testing.T) {
// The original backup files are not accessible because docker creates all files in
// the shared volume as the root user. This restriction is circumvented by using
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"full","since":68,"groups":{"1":["dgraph.graphql.p_sha256hash","Author.pwd","dgraph.graphql.xid","dgraph.drop.op","dgraph.graphql.schema_history","dgraph.graphql.schema_created_at","dgraph.type","dgraph.graphql.schema","dgraph.cors","dgraph.graphql.p_query","Author.name","Person.name"],"2":[],"3":[]},"backup_id":"loving_jones5","backup_num":1,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Binary file not shown.
67 changes: 5 additions & 62 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,63 +74,6 @@ func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int)
return restoreResp.Restore.RestoreId
}

func waitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
params := testutil.GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)

restoreDone := false
client := testutil.GetHttpsClient(t)
for i := 0; i < 15; i++ {
resp, err := client.Post(testutil.AdminUrlHttps(), "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
restoreDone = true
break
}
time.Sleep(4 * time.Second)
}
require.True(t, restoreDone)

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
// Waiting for three consecutive successful queries is done to prevent a situation in
// which the query succeeds at the first attempt because the follower is behind and
// has not started to apply the restore proposal.
numSuccess := 0
for {
// This is a dummy query that returns no results.
_, err = dg.NewTxn().Query(context.Background(), `{
q(func: has(invalid_pred)) {
invalid_pred
}}`)

if err == nil {
numSuccess += 1
} else {
require.Contains(t, err.Error(), "the server is in draining mode")
numSuccess = 0
}

if numSuccess == 3 {
// The server has been responsive three times in a row.
break
}
time.Sleep(1 * time.Second)
}
}

// disableDraining disables draining mode before each test for increased reliability.
func disableDraining(t *testing.T) {
drainRequest := `mutation draining {
Expand Down Expand Up @@ -237,7 +180,7 @@ func TestBasicRestore(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand All @@ -254,7 +197,7 @@ func TestRestoreBackupNum(t *testing.T) {
runQueries(t, dg, true)

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 1)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, true)
runMutations(t, dg)
}
Expand Down Expand Up @@ -329,13 +272,13 @@ func TestMoveTablets(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
restoreId = sendRestoreRequest(t, "", "blissful_hermann1", 0)
waitForRestore(t, restoreId, dg)
testutil.WaitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{
q(func: has(name), orderasc: name) {
Expand Down Expand Up @@ -628,7 +571,7 @@ func backupRestoreAndVerify(t *testing.T, dg *dgo.Dgraph, backupDir, queryToVeri
expectedResponse string, schemaVerificationOpts testutil.SchemaOptions) {
schemaVerificationOpts.ExcludeAclSchema = true
backup(t, backupDir)
waitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg)
testutil.WaitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg)
testutil.VerifyQueryResponse(t, dg, queryToVerify, expectedResponse)
testutil.VerifySchema(t, dg, schemaVerificationOpts)
}
64 changes: 64 additions & 0 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package testutil

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"testing"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -56,6 +63,63 @@ func openDgraph(pdir string) (*badger.DB, error) {
return badger.OpenManaged(opt)
}

func WaitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
params := GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)

restoreDone := false
client := GetHttpsClient(t)
for i := 0; i < 15; i++ {
resp, err := client.Post(AdminUrlHttps(), "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
restoreDone = true
break
}
time.Sleep(4 * time.Second)
}
require.True(t, restoreDone)

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
// Waiting for three consecutive successful queries is done to prevent a situation in
// which the query succeeds at the first attempt because the follower is behind and
// has not started to apply the restore proposal.
numSuccess := 0
for {
// This is a dummy query that returns no results.
_, err = dg.NewTxn().Query(context.Background(), `{
q(func: has(invalid_pred)) {
invalid_pred
}}`)

if err == nil {
numSuccess += 1
} else {
require.Contains(t, err.Error(), "the server is in draining mode")
numSuccess = 0
}

if numSuccess == 3 {
// The server has been responsive three times in a row.
break
}
time.Sleep(1 * time.Second)
}
}

// GetPredicateValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) {
Expand Down
10 changes: 7 additions & 3 deletions worker/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat
// part without rolling the key first. This part is here for backwards
// compatibility. New backups are not affected because there was a change
// to roll up lists into a single one.
kv := posting.MarshalPostingList(pl, nil)
newKv := posting.MarshalPostingList(pl, nil)
codec.FreePack(pl.Pack)
kv.Key = restoreKey
if err := loader.Set(kv); err != nil {
newKv.Key = restoreKey
// Use the version of the KV before we marshalled the
// posting list. The MarshalPostingList function returns KV
// with a zero version.
newKv.Version = kv.Version
if err := loader.Set(newKv); err != nil {
return 0, err
}
} else {
Expand Down