Skip to content

Commit

Permalink
pd-client: get region by id (tikv#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored and disksing committed Mar 6, 2017
1 parent f8b7efa commit 174171b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Client interface {
// Also it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(key []byte) (*metapb.Region, *metapb.Peer, error)
// GetRegion gets a region and its leader Peer from PD by id.
GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -97,6 +99,25 @@ func (c *client) GetRegion(key []byte) (*metapb.Region, *metapb.Peer, error) {
return req.pbResp.GetRegion(), req.pbResp.GetLeader(), nil
}

func (c *client) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, error) {
req := &regionByIDRequest{
pbReq: &pdpb.GetRegionByIDRequest{
RegionId: regionID,
},
done: make(chan error, 1),
}

start := time.Now()
c.worker.requests <- req
err := <-req.done
requestDuration.WithLabelValues("get_region").Observe(time.Since(start).Seconds())

if err != nil {
return nil, nil, errors.Trace(err)
}
return req.pbResp.GetRegion(), req.pbResp.GetLeader(), nil
}

func (c *client) GetStore(storeID uint64) (*metapb.Store, error) {
req := &storeRequest{
pbReq: &pdpb.GetStoreRequest{
Expand Down
9 changes: 9 additions & 0 deletions pd-client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ func (s *testClientSuite) TestGetRegion(c *C) {
c.Assert(leader, DeepEquals, peer)
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
heartbeatRegion(c, s.srv)

r, leader, err := s.client.GetRegionByID(3)
c.Assert(err, IsNil)
c.Assert(r, DeepEquals, region)
c.Assert(leader, DeepEquals, peer)
}

func (s *testClientSuite) TestGetStore(c *C) {
cluster := s.srv.GetRaftCluster()
c.Assert(cluster, NotNil)
Expand Down
36 changes: 36 additions & 0 deletions pd-client/rpc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ type regionRequest struct {
pbResp *pdpb.GetRegionResponse
}

type regionByIDRequest struct {
pbReq *pdpb.GetRegionByIDRequest
done chan error
pbResp *pdpb.GetRegionResponse
}

type clusterConfigRequest struct {
pbReq *pdpb.GetClusterConfigRequest
done chan error
Expand Down Expand Up @@ -173,6 +179,17 @@ func (w *rpcWorker) handleRequests(requests []interface{}, conn *bufio.ReadWrite
r.pbResp = regionResp
r.done <- nil
}
case *regionByIDRequest:
regionResp, err := w.getRegionByIDFromRemote(conn, r.pbReq)
if err != nil {
ok = false
log.Error(err)
r.done <- err
} else {
r.pbResp = regionResp
r.done <- nil
}

case *clusterConfigRequest:
clusterConfigResp, err := w.getClusterConfigFromRemote(conn, r.pbReq)
if err != nil {
Expand Down Expand Up @@ -326,6 +343,25 @@ func (w *rpcWorker) getRegionFromRemote(conn *bufio.ReadWriter, regionReq *pdpb.
return rsp.GetGetRegion(), nil
}

func (w *rpcWorker) getRegionByIDFromRemote(conn *bufio.ReadWriter, regionReq *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) {
req := &pdpb.Request{
Header: &pdpb.RequestHeader{
Uuid: uuid.NewV4().Bytes(),
ClusterId: w.clusterID,
},
CmdType: pdpb.CommandType_GetRegionByID,
GetRegionById: regionReq,
}
rsp, err := w.callRPC(conn, req)
if err != nil {
return nil, errors.Trace(err)
}
if rsp.GetGetRegionById() == nil {
return nil, errors.New("[pd] GetRegion field in rpc response not set")
}
return rsp.GetGetRegionById(), nil
}

func (w *rpcWorker) getClusterConfigFromRemote(conn *bufio.ReadWriter, clusterConfigReq *pdpb.GetClusterConfigRequest) (*pdpb.GetClusterConfigResponse, error) {
req := &pdpb.Request{
Header: &pdpb.RequestHeader{
Expand Down

0 comments on commit 174171b

Please sign in to comment.