From 174171b2b15c871c480959397d9b8047175559e7 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Mon, 6 Mar 2017 12:59:38 +0800 Subject: [PATCH] pd-client: get region by id (#555) --- pd-client/client.go | 21 +++++++++++++++++++++ pd-client/client_test.go | 9 +++++++++ pd-client/rpc_worker.go | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/pd-client/client.go b/pd-client/client.go index 7ccc892b25a..8b3da87238c 100644 --- a/pd-client/client.go +++ b/pd-client/client.go @@ -35,6 +35,8 @@ type Client interface { // Also it may return nil if PD finds no Region for the key temporarily, // client should retry later. GetRegion(key []byte) (*metapb.Region, *metapb.Peer, error) + // GetRegion gets a region and its leader Peer from PD by id. + GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -97,6 +99,25 @@ func (c *client) GetRegion(key []byte) (*metapb.Region, *metapb.Peer, error) { return req.pbResp.GetRegion(), req.pbResp.GetLeader(), nil } +func (c *client) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, error) { + req := ®ionByIDRequest{ + pbReq: &pdpb.GetRegionByIDRequest{ + RegionId: regionID, + }, + done: make(chan error, 1), + } + + start := time.Now() + c.worker.requests <- req + err := <-req.done + requestDuration.WithLabelValues("get_region").Observe(time.Since(start).Seconds()) + + if err != nil { + return nil, nil, errors.Trace(err) + } + return req.pbResp.GetRegion(), req.pbResp.GetLeader(), nil +} + func (c *client) GetStore(storeID uint64) (*metapb.Store, error) { req := &storeRequest{ pbReq: &pdpb.GetStoreRequest{ diff --git a/pd-client/client_test.go b/pd-client/client_test.go index 30431382a52..3ad91ca8308 100644 --- a/pd-client/client_test.go +++ b/pd-client/client_test.go @@ -177,6 +177,15 @@ func (s *testClientSuite) TestGetRegion(c *C) { c.Assert(leader, DeepEquals, peer) } +func (s *testClientSuite) TestGetRegionByID(c *C) { + heartbeatRegion(c, s.srv) + + r, leader, err := s.client.GetRegionByID(3) + c.Assert(err, IsNil) + c.Assert(r, DeepEquals, region) + c.Assert(leader, DeepEquals, peer) +} + func (s *testClientSuite) TestGetStore(c *C) { cluster := s.srv.GetRaftCluster() c.Assert(cluster, NotNil) diff --git a/pd-client/rpc_worker.go b/pd-client/rpc_worker.go index ba2c6105062..ba0f6b43eb7 100644 --- a/pd-client/rpc_worker.go +++ b/pd-client/rpc_worker.go @@ -55,6 +55,12 @@ type regionRequest struct { pbResp *pdpb.GetRegionResponse } +type regionByIDRequest struct { + pbReq *pdpb.GetRegionByIDRequest + done chan error + pbResp *pdpb.GetRegionResponse +} + type clusterConfigRequest struct { pbReq *pdpb.GetClusterConfigRequest done chan error @@ -173,6 +179,17 @@ func (w *rpcWorker) handleRequests(requests []interface{}, conn *bufio.ReadWrite r.pbResp = regionResp r.done <- nil } + case *regionByIDRequest: + regionResp, err := w.getRegionByIDFromRemote(conn, r.pbReq) + if err != nil { + ok = false + log.Error(err) + r.done <- err + } else { + r.pbResp = regionResp + r.done <- nil + } + case *clusterConfigRequest: clusterConfigResp, err := w.getClusterConfigFromRemote(conn, r.pbReq) if err != nil { @@ -326,6 +343,25 @@ func (w *rpcWorker) getRegionFromRemote(conn *bufio.ReadWriter, regionReq *pdpb. return rsp.GetGetRegion(), nil } +func (w *rpcWorker) getRegionByIDFromRemote(conn *bufio.ReadWriter, regionReq *pdpb.GetRegionByIDRequest) (*pdpb.GetRegionResponse, error) { + req := &pdpb.Request{ + Header: &pdpb.RequestHeader{ + Uuid: uuid.NewV4().Bytes(), + ClusterId: w.clusterID, + }, + CmdType: pdpb.CommandType_GetRegionByID, + GetRegionById: regionReq, + } + rsp, err := w.callRPC(conn, req) + if err != nil { + return nil, errors.Trace(err) + } + if rsp.GetGetRegionById() == nil { + return nil, errors.New("[pd] GetRegion field in rpc response not set") + } + return rsp.GetGetRegionById(), nil +} + func (w *rpcWorker) getClusterConfigFromRemote(conn *bufio.ReadWriter, clusterConfigReq *pdpb.GetClusterConfigRequest) (*pdpb.GetClusterConfigResponse, error) { req := &pdpb.Request{ Header: &pdpb.RequestHeader{