Skip to content

Commit

Permalink
fix(raftwal): take snapshot after restore (#7719)
Browse files Browse the repository at this point in the history
We should propose a snapshot immediately after the restore to prevent the restore from being replayed.
Earlier we were trying to do that too. Snapshot happens after the restore but that does not actually include the restore proposal as it has not been applied yet (n.Applied.Done() is not yet called for that proposal).
Now, we will first wait for the raft entry to be marked as done and then try to propose the snapshot.
  • Loading branch information
NamanJain8 authored Apr 13, 2021
1 parent c11a607 commit 72cebd1
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
38 changes: 38 additions & 0 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (

"github.com/dgraph-io/dgo/v210"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/graph-gophers/graphql-go/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"
)

func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) {
Expand Down Expand Up @@ -96,6 +98,34 @@ func disableDraining(t *testing.T) {
require.Contains(t, string(buf), "draining mode has been set to false")
}

func getSnapshotTs(t *testing.T) uint64 {
snapTsRequest := `query {
state {
groups{
id
snapshotTs
}
}
}`

params := testutil.GraphQLParams{
Query: snapTsRequest,
}
resp := testutil.MakeGQLRequestWithTLS(t, &params, testutil.GetAlphaClientConfig(t))
resp.RequireNoGraphQLErrors(t)

var stateResp struct {
State struct {
Groups []struct {
SnapshotTs uint64
}
}
}
require.NoError(t, json.Unmarshal(resp.Data, &stateResp))
require.GreaterOrEqual(t, len(stateResp.State.Groups), 1)
return stateResp.State.Groups[0].SnapshotTs
}

func runQueries(t *testing.T, dg *dgo.Dgraph, shouldFail bool) {
_, thisFile, _, _ := runtime.Caller(0)
queryDir := filepath.Join(filepath.Dir(thisFile), "queries")
Expand Down Expand Up @@ -177,8 +207,16 @@ func TestBasicRestore(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

snapshotTs := getSnapshotTs(t)
sendRestoreRequest(t, "", "youthful_rhodes3", 0)
testutil.WaitForRestore(t, dg)
// Snapshot must be taken just after the restore and hence the snapshotTs be updated.
require.NoError(t, x.RetryUntilSuccess(3, 1*time.Second, func() error {
if getSnapshotTs(t) <= snapshotTs {
return errors.Errorf("snapshot not taken after restore")
}
return nil
}))
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand Down
4 changes: 3 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
}
defer closer.Done()

if err := handleRestoreProposal(ctx, proposal.Restore); err != nil {
glog.Infof("Got restore proposal at Index:%d, ReadTs:%d",
proposal.Index, proposal.Restore.RestoreTs)
if err := handleRestoreProposal(ctx, proposal.Restore, proposal.Index); err != nil {
return err
}

Expand Down
18 changes: 14 additions & 4 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S
}

// TODO(DGRAPH-1232): Ensure all groups receive the restore proposal.
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error {
if req == nil {
return errors.Errorf("nil restore request")
}
Expand Down Expand Up @@ -364,9 +364,19 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {

// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
if err := groups().Node.proposeSnapshot(); err != nil {
return errors.Wrapf(err, "cannot propose snapshot after processing restore proposal")
}
go func(idx uint64) {
n := groups().Node
if !n.AmLeader() {
return
}
if err := n.Applied.WaitForMark(context.Background(), idx); err != nil {
glog.Errorf("Error waiting for mark for index %d: %+v", idx, err)
return
}
if err := n.proposeSnapshot(); err != nil {
glog.Errorf("cannot propose snapshot after processing restore proposal %+v", err)
}
}(pidx)

// Update the membership state to re-compute the group checksums.
if err := UpdateMembershipState(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion worker/online_restore_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S
return &pb.Status{}, x.ErrNotSupported
}

func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error {
return nil
}

0 comments on commit 72cebd1

Please sign in to comment.