Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(predMove): hot tablet move #7703

Merged
merged 6 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@ Design change:
• If you’re not the leader, don’t talk to zero.
• Let the leader send you updates via proposals.

Move:
MOVE:
• Dgraph zero would decide that G1 should not serve P, G2 should serve it.
• Zero would propose that G1 is read-only for predicate P. This would propagate to the cluster.

• Zero would tell G1 to move P to G2 (Endpoint: Zero → G1)

This would trigger G1 to get latest state. Wait for it.
G1 would propose this state to it’s followers.
• G1 after proposing would do a call to G2, and start streaming.
Phase I:
Zero would tell G1 to move P's data before timestamp T1 to G2 (Endpoint: Zero → G1)
• G1 would do a call to G2, and start streaming.
• Before G2 starts accepting, it should delete any current keys for P.
• It should tell Zero whether it succeeded or failed. (Endpoint: G1 → Zero)
• G1 should tell Zero whether it succeeded or failed. (Endpoint: G1 → Zero)
• If Phase I succeeds, proceed with Phase II. Else, P would be served by G1.

Phase II:
• Zero would propose that G1 is read-only for predicate P. This would propagate to the cluster.
• Zero would tell G1 to move rest of the P's data (with T1<=ts<=T2) to G2 (Endpoint: Zero → G1)
• G2 acceps the data, but this time does not clean the current keys for P.
• G1 should tell Zero whether it succeeded or failed. (Endpoint G1 → Zero)
• Zero would then propose that G2 is serving P (or G1 is, if fail above) P would RW.
• G1 gets this, G2 gets this.
• Both propagate this to their followers.
Expand Down Expand Up @@ -149,18 +152,15 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
if tab == nil {
return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
}

// PHASE I:
msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
" from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
glog.Info(msg)
span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg)

// Block all commits on this predicate. Keep them blocked until we return from this function.
unblock := s.blockTablet(predicate)
defer unblock()

// Get a new timestamp, beyond which we are sure that no new txns would be committed for this
// predicate. Source Alpha leader must reach this timestamp before streaming the data.
// Get a new timestamp. Source Alpha leader must reach this timestamp before streaming the data.
ids, err := s.Timestamps(ctx, &pb.Num{Val: 1})
if err != nil || ids.StartId == 0 {
return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids)
Expand All @@ -176,12 +176,37 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
Predicate: predicate,
SourceGid: srcGroup,
DestGid: dstGroup,
TxnTs: ids.StartId,
ReadTs: ids.StartId,
SinceTs: 0,
}

// Move the predicate. Commits on this predicate are not blocked yet. Any data after ReadTs
// will be moved in the phase II below.
span.Annotatef(nil, "Starting move [1]: %+v", in)
glog.Infof("Starting move [1]: %+v", in)
if _, err := wc.MovePredicate(ctx, in); err != nil {
return errors.Wrapf(err, "while moving the majority of predicate")
}

// PHASE II:
// Block all commits on this predicate. Keep them blocked until we return from this function.
unblock := s.blockTablet(predicate)
defer unblock()

// Get a new timestamp, beyond which we are sure that no new txns would be committed for this
// predicate. Source Alpha leader must reach this timestamp before streaming the data.
ids, err = s.Timestamps(ctx, &pb.Num{Val: 1})
if err != nil || ids.StartId == 0 {
return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids)
}
span.Annotatef(nil, "Starting move: %+v", in)
glog.Infof("Starting move: %+v", in)

// We have done a majority of move. Now transfer rest of the data.
in.SinceTs = in.ReadTs
in.ReadTs = ids.StartId
span.Annotatef(nil, "Starting move [2]: %+v", in)
glog.Infof("Starting move [2]: %+v", in)
if _, err := wc.MovePredicate(ctx, in); err != nil {
return errors.Wrapf(err, "while calling MovePredicate")
return errors.Wrapf(err, "while moving the rest of the predicate")
}

p := &pb.ZeroProposal{}
Expand All @@ -191,7 +216,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
OnDiskBytes: tab.OnDiskBytes,
UncompressedBytes: tab.UncompressedBytes,
Force: true,
MoveTs: in.TxnTs,
MoveTs: in.ReadTs,
}
msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p)
span.Annotate(nil, msg)
Expand Down
3 changes: 2 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,9 @@ message MovePredicatePayload {
string predicate = 1;
uint32 source_gid = 2;
uint32 dest_gid = 3;
uint64 txn_ts = 4;
uint64 read_ts = 4;
uint64 expected_checksum = 5;
uint64 since_ts = 6;
}

message TxnStatus {
Expand Down
Loading