From 72cebd1349e21f48e756c9ba9df4cbe98017ae9a Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Wed, 14 Apr 2021 00:18:49 +0530 Subject: [PATCH] fix(raftwal): take snapshot after restore (#7719) We should propose a snapshot immediately after the restore to prevent the restore from being replayed. Earlier we were trying to do that too. Snapshot happens after the restore but that does not actually include the restore proposal as it has not been applied yet (n.Applied.Done() is not yet called for that proposal). Now, we will first wait for the raft entry to be marked as done and then try to propose the snapshot. --- systest/online-restore/online_restore_test.go | 38 +++++++++++++++++++ worker/draft.go | 4 +- worker/online_restore.go | 18 +++++++-- worker/online_restore_oss.go | 2 +- 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/systest/online-restore/online_restore_test.go b/systest/online-restore/online_restore_test.go index 433590974c7..6ce373adde4 100644 --- a/systest/online-restore/online_restore_test.go +++ b/systest/online-restore/online_restore_test.go @@ -32,11 +32,13 @@ import ( "github.com/dgraph-io/dgo/v210" "github.com/dgraph-io/dgo/v210/protos/api" + "github.com/graph-gophers/graphql-go/errors" "github.com/stretchr/testify/require" "google.golang.org/grpc" "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/testutil" + "github.com/dgraph-io/dgraph/x" ) func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) { @@ -96,6 +98,34 @@ func disableDraining(t *testing.T) { require.Contains(t, string(buf), "draining mode has been set to false") } +func getSnapshotTs(t *testing.T) uint64 { + snapTsRequest := `query { + state { + groups{ + id + snapshotTs + } + } + }` + + params := testutil.GraphQLParams{ + Query: snapTsRequest, + } + resp := testutil.MakeGQLRequestWithTLS(t, ¶ms, testutil.GetAlphaClientConfig(t)) + resp.RequireNoGraphQLErrors(t) + + var stateResp struct { + State struct { + Groups []struct { + SnapshotTs uint64 + } + } + } + require.NoError(t, json.Unmarshal(resp.Data, &stateResp)) + require.GreaterOrEqual(t, len(stateResp.State.Groups), 1) + return stateResp.State.Groups[0].SnapshotTs +} + func runQueries(t *testing.T, dg *dgo.Dgraph, shouldFail bool) { _, thisFile, _, _ := runtime.Caller(0) queryDir := filepath.Join(filepath.Dir(thisFile), "queries") @@ -177,8 +207,16 @@ func TestBasicRestore(t *testing.T) { ctx := context.Background() require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) + snapshotTs := getSnapshotTs(t) sendRestoreRequest(t, "", "youthful_rhodes3", 0) testutil.WaitForRestore(t, dg) + // Snapshot must be taken just after the restore and hence the snapshotTs be updated. + require.NoError(t, x.RetryUntilSuccess(3, 1*time.Second, func() error { + if getSnapshotTs(t) <= snapshotTs { + return errors.Errorf("snapshot not taken after restore") + } + return nil + })) runQueries(t, dg, false) runMutations(t, dg) } diff --git a/worker/draft.go b/worker/draft.go index 5eaa3cff727..51abb58ec5a 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -752,7 +752,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { } defer closer.Done() - if err := handleRestoreProposal(ctx, proposal.Restore); err != nil { + glog.Infof("Got restore proposal at Index:%d, ReadTs:%d", + proposal.Index, proposal.Restore.RestoreTs) + if err := handleRestoreProposal(ctx, proposal.Restore, proposal.Index); err != nil { return err } diff --git a/worker/online_restore.go b/worker/online_restore.go index 695db783547..fd647c7f0f4 100644 --- a/worker/online_restore.go +++ b/worker/online_restore.go @@ -266,7 +266,7 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S } // TODO(DGRAPH-1232): Ensure all groups receive the restore proposal. -func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { +func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error { if req == nil { return errors.Errorf("nil restore request") } @@ -364,9 +364,19 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { // Propose a snapshot immediately after all the work is done to prevent the restore // from being replayed. - if err := groups().Node.proposeSnapshot(); err != nil { - return errors.Wrapf(err, "cannot propose snapshot after processing restore proposal") - } + go func(idx uint64) { + n := groups().Node + if !n.AmLeader() { + return + } + if err := n.Applied.WaitForMark(context.Background(), idx); err != nil { + glog.Errorf("Error waiting for mark for index %d: %+v", idx, err) + return + } + if err := n.proposeSnapshot(); err != nil { + glog.Errorf("cannot propose snapshot after processing restore proposal %+v", err) + } + }(pidx) // Update the membership state to re-compute the group checksums. if err := UpdateMembershipState(ctx); err != nil { diff --git a/worker/online_restore_oss.go b/worker/online_restore_oss.go index 46b907feb36..ce1752d4ae1 100644 --- a/worker/online_restore_oss.go +++ b/worker/online_restore_oss.go @@ -38,6 +38,6 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S return &pb.Status{}, x.ErrNotSupported } -func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { +func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error { return nil }