diff --git a/posting/list.go b/posting/list.go index e03aca21f44..e8d7eb0e83a 100644 --- a/posting/list.go +++ b/posting/list.go @@ -918,6 +918,8 @@ func (out *rollupOutput) marshalPostingListPart(alloc *z.Allocator, return kv, nil } +// MarshalPostingList returns a KV with the marshalled posting list. The caller +// SHOULD SET the Key and Version for the returned KV. func MarshalPostingList(plist *pb.PostingList, alloc *z.Allocator) *bpb.KV { kv := y.NewKV(alloc) if isPlistEmpty(plist) { diff --git a/systest/backup/filesystem/backup_test.go b/systest/backup/filesystem/backup_test.go index 67a60a387bb..ac306ba888d 100644 --- a/systest/backup/filesystem/backup_test.go +++ b/systest/backup/filesystem/backup_test.go @@ -42,12 +42,11 @@ import ( ) var ( - copyBackupDir = "./data/backups_copy" - restoreDir = "./data/restore" - testDirs = []string{restoreDir} - - alphaBackupDir = "/data/backups" - + copyBackupDir = "./data/backups_copy" + restoreDir = "./data/restore" + testDirs = []string{restoreDir} + alphaBackupDir = "/data/backups" + oldBackupDir = "/data/to_restore" alphaContainers = []string{ "alpha1", "alpha2", @@ -55,6 +54,76 @@ var ( } ) +func sendRestoreRequest(t *testing.T, location string) int { + if location == "" { + location = "/data/backup" + } + params := testutil.GraphQLParams{ + Query: `mutation restore($location: String!) { + restore(input: {location: $location}) { + code + message + restoreId + } + }`, + Variables: map[string]interface{}{ + "location": location, + }, + } + resp := testutil.MakeGQLRequestWithTLS(t, ¶ms, testutil.GetAlphaClientConfig(t)) + resp.RequireNoGraphQLErrors(t) + + var restoreResp struct { + Restore struct { + Code string + Message string + RestoreId int + } + } + + require.NoError(t, json.Unmarshal(resp.Data, &restoreResp)) + require.Equal(t, restoreResp.Restore.Code, "Success") + require.Greater(t, restoreResp.Restore.RestoreId, 0) + return restoreResp.Restore.RestoreId +} + +// This test takes a backup and then restores an old backup in a cluster incrementally. +// Next, cleans up the cluster and tries restoring the backups above. +// Regression test for DGRAPH-2775 +func TestBackupOfOldRestore(t *testing.T) { + dirSetup(t) + copyOldBackupDir(t) + + conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t)))) + require.NoError(t, err) + dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) + require.NoError(t, err) + + testutil.DropAll(t, dg) + time.Sleep(2 * time.Second) + + _ = runBackup(t, 3, 1) + + restoreId := sendRestoreRequest(t, oldBackupDir) + testutil.WaitForRestore(t, restoreId, dg) + + resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`) + require.NoError(t, err) + require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json)) + + _ = runBackup(t, 6, 2) + + // Clean the cluster and try restoring the backups created above. + testutil.DropAll(t, dg) + time.Sleep(2 * time.Second) + restoreId = sendRestoreRequest(t, alphaBackupDir) + testutil.WaitForRestore(t, restoreId, dg) + + resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`) + require.NoError(t, err) + require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json)) +} + func TestBackupFilesystem(t *testing.T) { conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t)))) require.NoError(t, err) @@ -112,8 +181,8 @@ func TestBackupFilesystem(t *testing.T) { break } } - require.True(t, moveOk) + require.True(t, moveOk) // Setup test directories. dirSetup(t) @@ -334,7 +403,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles, copyToLocalFs(t) files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool { - return !isdir && strings.HasSuffix(path, ".backup") + return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.") }) require.Equal(t, numExpectedFiles, len(files)) @@ -344,7 +413,7 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles, require.Equal(t, numExpectedDirs, len(dirs)) manifests := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool { - return !isdir && strings.Contains(path, "manifest.json") + return !isdir && strings.Contains(path, "manifest.json") && strings.HasPrefix(path, "data/backups_copy/dgraph.") }) require.Equal(t, numExpectedDirs, len(manifests)) @@ -407,6 +476,7 @@ func dirCleanup(t *testing.T) { if err := os.RemoveAll(restoreDir); err != nil { t.Fatalf("Error removing directory: %s", err.Error()) } + if err := os.RemoveAll(copyBackupDir); err != nil { t.Fatalf("Error removing directory: %s", err.Error()) } @@ -417,6 +487,16 @@ func dirCleanup(t *testing.T) { } } +func copyOldBackupDir(t *testing.T) { + for i := 1; i < 4; i++ { + destPath := fmt.Sprintf("%s_alpha%d_1:/data", testutil.DockerPrefix, i) + srchPath := "." + oldBackupDir + if err := testutil.DockerCp(srchPath, destPath); err != nil { + t.Fatalf("Error copying files from docker container: %s", err.Error()) + } + } +} + func copyToLocalFs(t *testing.T) { // The original backup files are not accessible because docker creates all files in // the shared volume as the root user. This restriction is circumvented by using diff --git a/systest/backup/filesystem/data/backups/.gitkeep b/systest/backup/filesystem/data/backups/.gitkeep deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/manifest.json b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/manifest.json new file mode 100644 index 00000000000..4c3c669b8e0 --- /dev/null +++ b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/manifest.json @@ -0,0 +1 @@ +{"type":"full","since":68,"groups":{"1":["dgraph.graphql.p_sha256hash","Author.pwd","dgraph.graphql.xid","dgraph.drop.op","dgraph.graphql.schema_history","dgraph.graphql.schema_created_at","dgraph.type","dgraph.graphql.schema","dgraph.cors","dgraph.graphql.p_query","Author.name","Person.name"],"2":[],"3":[]},"backup_id":"loving_jones5","backup_num":1,"encrypted":false,"drop_operations":null} diff --git a/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g1.backup b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g1.backup new file mode 100644 index 00000000000..5542b2215d5 Binary files /dev/null and b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g1.backup differ diff --git a/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g2.backup b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g2.backup new file mode 100644 index 00000000000..e204447fdfb Binary files /dev/null and b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g2.backup differ diff --git a/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g3.backup b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g3.backup new file mode 100644 index 00000000000..0ce8592f881 Binary files /dev/null and b/systest/backup/filesystem/data/to_restore/dgraph.20201125.173944.587/r68-g3.backup differ diff --git a/systest/online-restore/online_restore_test.go b/systest/online-restore/online_restore_test.go index 4380f9542eb..7625e98c855 100644 --- a/systest/online-restore/online_restore_test.go +++ b/systest/online-restore/online_restore_test.go @@ -74,63 +74,6 @@ func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) return restoreResp.Restore.RestoreId } -func waitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) { - query := fmt.Sprintf(`query status() { - restoreStatus(restoreId: %d) { - status - errors - } - }`, restoreId) - params := testutil.GraphQLParams{ - Query: query, - } - b, err := json.Marshal(params) - require.NoError(t, err) - - restoreDone := false - client := testutil.GetHttpsClient(t) - for i := 0; i < 15; i++ { - resp, err := client.Post(testutil.AdminUrlHttps(), "application/json", bytes.NewBuffer(b)) - require.NoError(t, err) - buf, err := ioutil.ReadAll(resp.Body) - require.NoError(t, err) - sbuf := string(buf) - if strings.Contains(sbuf, "OK") { - restoreDone = true - break - } - time.Sleep(4 * time.Second) - } - require.True(t, restoreDone) - - // Wait for the client to exit draining mode. This is needed because the client might - // be connected to a follower and might be behind the leader in applying the restore. - // Waiting for three consecutive successful queries is done to prevent a situation in - // which the query succeeds at the first attempt because the follower is behind and - // has not started to apply the restore proposal. - numSuccess := 0 - for { - // This is a dummy query that returns no results. - _, err = dg.NewTxn().Query(context.Background(), `{ - q(func: has(invalid_pred)) { - invalid_pred - }}`) - - if err == nil { - numSuccess += 1 - } else { - require.Contains(t, err.Error(), "the server is in draining mode") - numSuccess = 0 - } - - if numSuccess == 3 { - // The server has been responsive three times in a row. - break - } - time.Sleep(1 * time.Second) - } -} - // disableDraining disables draining mode before each test for increased reliability. func disableDraining(t *testing.T) { drainRequest := `mutation draining { @@ -237,7 +180,7 @@ func TestBasicRestore(t *testing.T) { require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0) - waitForRestore(t, restoreId, dg) + testutil.WaitForRestore(t, restoreId, dg) runQueries(t, dg, false) runMutations(t, dg) } @@ -254,7 +197,7 @@ func TestRestoreBackupNum(t *testing.T) { runQueries(t, dg, true) restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 1) - waitForRestore(t, restoreId, dg) + testutil.WaitForRestore(t, restoreId, dg) runQueries(t, dg, true) runMutations(t, dg) } @@ -329,13 +272,13 @@ func TestMoveTablets(t *testing.T) { require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) restoreId := sendRestoreRequest(t, "", "youthful_rhodes3", 0) - waitForRestore(t, restoreId, dg) + testutil.WaitForRestore(t, restoreId, dg) runQueries(t, dg, false) // Send another restore request with a different backup. This backup has some of the // same predicates as the previous one but they are stored in different groups. restoreId = sendRestoreRequest(t, "", "blissful_hermann1", 0) - waitForRestore(t, restoreId, dg) + testutil.WaitForRestore(t, restoreId, dg) resp, err := dg.NewTxn().Query(context.Background(), `{ q(func: has(name), orderasc: name) { @@ -628,7 +571,7 @@ func backupRestoreAndVerify(t *testing.T, dg *dgo.Dgraph, backupDir, queryToVeri expectedResponse string, schemaVerificationOpts testutil.SchemaOptions) { schemaVerificationOpts.ExcludeAclSchema = true backup(t, backupDir) - waitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg) + testutil.WaitForRestore(t, sendRestoreRequest(t, backupDir, "", 0), dg) testutil.VerifyQueryResponse(t, dg, queryToVerify, expectedResponse) testutil.VerifySchema(t, dg, schemaVerificationOpts) } diff --git a/testutil/backup.go b/testutil/backup.go index ea35c195548..7f8a18f9c71 100644 --- a/testutil/backup.go +++ b/testutil/backup.go @@ -17,10 +17,17 @@ package testutil import ( + "bytes" + "context" + "encoding/json" "fmt" + "io/ioutil" + "strings" "testing" + "time" "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" @@ -56,6 +63,63 @@ func openDgraph(pdir string) (*badger.DB, error) { return badger.OpenManaged(opt) } +func WaitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) { + query := fmt.Sprintf(`query status() { + restoreStatus(restoreId: %d) { + status + errors + } + }`, restoreId) + params := GraphQLParams{ + Query: query, + } + b, err := json.Marshal(params) + require.NoError(t, err) + + restoreDone := false + client := GetHttpsClient(t) + for i := 0; i < 15; i++ { + resp, err := client.Post(AdminUrlHttps(), "application/json", bytes.NewBuffer(b)) + require.NoError(t, err) + buf, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + sbuf := string(buf) + if strings.Contains(sbuf, "OK") { + restoreDone = true + break + } + time.Sleep(4 * time.Second) + } + require.True(t, restoreDone) + + // Wait for the client to exit draining mode. This is needed because the client might + // be connected to a follower and might be behind the leader in applying the restore. + // Waiting for three consecutive successful queries is done to prevent a situation in + // which the query succeeds at the first attempt because the follower is behind and + // has not started to apply the restore proposal. + numSuccess := 0 + for { + // This is a dummy query that returns no results. + _, err = dg.NewTxn().Query(context.Background(), `{ + q(func: has(invalid_pred)) { + invalid_pred + }}`) + + if err == nil { + numSuccess += 1 + } else { + require.Contains(t, err.Error(), "the server is in draining mode") + numSuccess = 0 + } + + if numSuccess == 3 { + // The server has been responsive three times in a row. + break + } + time.Sleep(1 * time.Second) + } +} + // GetPredicateValues reads the specified p directory and returns the values for the given // attribute in a map. func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) { diff --git a/worker/restore.go b/worker/restore.go index debcd80bbcc..e824190a893 100644 --- a/worker/restore.go +++ b/worker/restore.go @@ -186,10 +186,14 @@ func loadFromBackup(db *badger.DB, r io.Reader, restoreTs uint64, preds predicat // part without rolling the key first. This part is here for backwards // compatibility. New backups are not affected because there was a change // to roll up lists into a single one. - kv := posting.MarshalPostingList(pl, nil) + newKv := posting.MarshalPostingList(pl, nil) codec.FreePack(pl.Pack) - kv.Key = restoreKey - if err := loader.Set(kv); err != nil { + newKv.Key = restoreKey + // Use the version of the KV before we marshalled the + // posting list. The MarshalPostingList function returns KV + // with a zero version. + newKv.Version = kv.Version + if err := loader.Set(newKv); err != nil { return 0, err } } else {