Skip to content

Commit

Permalink
Merge #29684
Browse files Browse the repository at this point in the history
29684: storage: Avoid adding all replicas at once in RelocateRange r=a-robinson a=a-robinson

Does a more gradual process of adding one replica then removing one
until all the desired changes have been made, using the existing
allocator code to do so in a reasonable order.

Fixes #29130

Release note: None

-------------------

This is still a work-in-progress in the sense that it isn't really usable from the SQL code yet, since I just implemented it as a method on `Store`. If the approach looks reasonable, though, it can be converted it into an Admin API method.

Co-authored-by: Alex Robinson <alexdwanerobinson@gmail.com>
  • Loading branch information
craig[bot] and a-robinson committed Sep 20, 2018
2 parents 82a5348 + 4b28b0b commit fbd747c
Show file tree
Hide file tree
Showing 19 changed files with 1,806 additions and 1,042 deletions.
23 changes: 21 additions & 2 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,15 @@ func (b *Batch) fillResults(ctx context.Context) {
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.BeginTransactionRequest:
case *roachpb.EndTransactionRequest:
case *roachpb.AdminMergeRequest:
case *roachpb.AdminSplitRequest:
case *roachpb.AdminTransferLeaseRequest:
case *roachpb.AdminChangeReplicasRequest:
case *roachpb.AdminRelocateRangeRequest:
case *roachpb.HeartbeatTxnRequest:
case *roachpb.GCRequest:
case *roachpb.LeaseInfoRequest:
Expand Down Expand Up @@ -636,6 +637,24 @@ func (b *Batch) adminChangeReplicas(
b.initResult(1, 0, notRaw, nil)
}

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminRelocateRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// writeBatch is only exported on DB.
func (b *Batch) writeBatch(s, e interface{}, data []byte) {
begin, err := marshalKey(s)
Expand Down
10 changes: 10 additions & 0 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ func (db *DB) AdminChangeReplicas(
return getOneErr(db.Run(ctx, b), b)
}

// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
return getOneErr(db.Run(ctx, b), b)
}

// WriteBatch applies the operations encoded in a BatchRepr, which is the
// serialized form of a RocksDB Batch. The command cannot span Ranges and must
// be run on an empty keyrange.
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (*AdminTransferLeaseRequest) Method() Method { return AdminTransferLease }
// Method implements the Request interface.
func (*AdminChangeReplicasRequest) Method() Method { return AdminChangeReplicas }

// Method implements the Request interface.
func (*AdminRelocateRangeRequest) Method() Method { return AdminRelocateRange }

// Method implements the Request interface.
func (*HeartbeatTxnRequest) Method() Method { return HeartbeatTxn }

Expand Down Expand Up @@ -680,6 +683,12 @@ func (acrr *AdminChangeReplicasRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (acrr *AdminRelocateRangeRequest) ShallowCopy() Request {
shallowCopy := *acrr
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (htr *HeartbeatTxnRequest) ShallowCopy() Request {
shallowCopy := *htr
Expand Down Expand Up @@ -1020,6 +1029,7 @@ func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone }
func (*AdminRelocateRangeRequest) flags() int { return isAdmin | isAlone }
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*PushTxnRequest) flags() int { return isWrite | isAlone }
Expand Down
2,140 changes: 1,328 additions & 812 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,21 @@ message AdminChangeReplicasResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// An AdminRelocateRangeRequest is the argument to the AdminRelocateRange()
// method. Relocates the replicas for a range to the specified target stores.
// The first store in the list of targets becomes the new leaseholder.
message AdminRelocateRangeRequest {
option (gogoproto.equal) = true;

RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
repeated ReplicationTarget targets = 2 [(gogoproto.nullable) = false];
// TODO(a-robinson): Add "reason"/"details" string fields?
}

message AdminRelocateRangeResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// A HeartbeatTxnRequest is arguments to the HeartbeatTxn()
// method. It's sent by transaction coordinators to let the system
// know that the transaction is still ongoing. Note that this
Expand Down Expand Up @@ -1349,6 +1364,7 @@ message RequestUnion {
AdminMergeRequest admin_merge = 11;
AdminTransferLeaseRequest admin_transfer_lease = 29;
AdminChangeReplicasRequest admin_change_replicas = 35;
AdminRelocateRangeRequest admin_relocate_range = 45;
HeartbeatTxnRequest heartbeat_txn = 12;
GCRequest gc = 13;
PushTxnRequest push_txn = 14;
Expand Down Expand Up @@ -1397,6 +1413,7 @@ message ResponseUnion {
AdminMergeResponse admin_merge = 11;
AdminTransferLeaseResponse admin_transfer_lease = 29;
AdminChangeReplicasResponse admin_change_replicas = 35;
AdminRelocateRangeResponse admin_relocate_range = 45;
HeartbeatTxnResponse heartbeat_txn = 12;
GCResponse gc = 13;
PushTxnResponse push_txn = 14;
Expand Down
Loading

0 comments on commit fbd747c

Please sign in to comment.