Skip to content

Commit

Permalink
Implement the server QueryRegion interface
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Oct 28, 2024
1 parent 30934a8 commit 568bd46
Show file tree
Hide file tree
Showing 9 changed files with 3,567 additions and 22 deletions.
65 changes: 65 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,71 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo {
return regions
}

// TODO: benchmark the performance of `QueryRegions` and `getRegionsByKeys`.
// QueryRegions searches RegionInfo from regionTree by keys and IDs in batch.
func (r *RegionsInfo) QueryRegions(
keys [][]byte, ids []uint64,
) ([]uint64, map[uint64]*pdpb.RegionResponse) {
// Iterate the region keys to find the regions.
regions := r.getRegionsByKeys(keys)
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")

Check warning on line 1474 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1474

Added line #L1474 was not covered by tests
}
// Build the key -> ID map for the final results.
var (
keyIDMap = make([]uint64, len(regions))
regionsByID = make(map[uint64]*pdpb.RegionResponse, len(regions))
)
for idx, region := range regions {
regionID := region.GetMeta().GetId()
keyIDMap[idx] = regionID
// Check if the region has been found.
if regionFound, ok := regionsByID[regionID]; (ok && regionFound != nil) || regionID == 0 {
continue
}
// If the given key is not found in the region tree, set the region to nil.
if region == nil {
regionsByID[regionID] = nil

Check warning on line 1490 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1490

Added line #L1490 was not covered by tests
} else {
regionsByID[regionID] = &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
// TODO: get the buckets info.
}
}
}
// Iterate the region IDs to find the regions.
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue

Check warning on line 1505 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1505

Added line #L1505 was not covered by tests
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
regionsByID[id] = nil
} else {
regionsByID[id] = &pdpb.RegionResponse{
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
// TODO: get the buckets info.
}
}
}
return keyIDMap, regionsByID
}

// getRegionsByKeys searches RegionInfo from regionTree by keys.
func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.searchByKeys(keys)
}

// SubTreeRegionType is the type of sub tree region.
type SubTreeRegionType string

Expand Down
47 changes: 47 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,3 +1201,50 @@ func TestScanRegion(t *testing.T) {
re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3)
re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1)
}

func TestQueryRegions(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
regions.CheckAndPutRegion(NewTestRegionInfo(1, 1, []byte("a"), []byte("b")))
regions.CheckAndPutRegion(NewTestRegionInfo(2, 1, []byte("b"), []byte("c")))
regions.CheckAndPutRegion(NewTestRegionInfo(3, 1, []byte("d"), []byte("e")))
// Query regions by keys.
keyIDMap, regionsByID := regions.QueryRegions([][]byte{[]byte("a"), []byte("b"), []byte("c")}, nil)
re.Len(keyIDMap, 3)
re.Equal(uint64(1), keyIDMap[0])
re.Equal(uint64(2), keyIDMap[1])
// The key is not in the region tree, so its ID should be 0.
re.Zero(keyIDMap[2])
re.Len(regionsByID, 2)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
// Query regions by IDs.
keyIDMap, regionsByID = regions.QueryRegions(nil, []uint64{1, 2, 3})
re.Empty(keyIDMap)
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query regions by keys and IDs.
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("b"), []byte("c")}, []uint64{1, 3})
re.Len(keyIDMap, 2)
re.Equal(uint64(2), keyIDMap[0])
re.Zero(keyIDMap[1])
re.Len(regionsByID, 3)
re.Equal(uint64(1), regionsByID[1].GetRegion().GetId())
re.Equal(uint64(2), regionsByID[2].GetRegion().GetId())
re.Equal(uint64(3), regionsByID[3].GetRegion().GetId())
// Query the region that does not exist.
keyIDMap, regionsByID = regions.QueryRegions(nil, []uint64{4})
re.Empty(keyIDMap)
re.Len(regionsByID, 1)
re.Nil(regionsByID[4])
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("c")}, nil)
re.Len(keyIDMap, 1)
re.Zero(keyIDMap[0])
re.Empty(regionsByID)
keyIDMap, regionsByID = regions.QueryRegions([][]byte{[]byte("c")}, []uint64{4})
re.Len(keyIDMap, 1)
re.Zero(keyIDMap[0])
re.Nil(regionsByID[4])
}
10 changes: 10 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo {
return prevRegionItem.RegionInfo
}

// searchByKeys searches the regions by keys and return a slice of `*RegionInfo` whose order is the same as the input keys.
func (t *regionTree) searchByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, len(keys))
// TODO: do we need to deduplicate the input keys?
for idx, key := range keys {
regions[idx] = t.search(key)
}
return regions
}

// find returns the range item contains the start key.
func (t *regionTree) find(item *regionItem) *regionItem {
var result *regionItem
Expand Down
48 changes: 45 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
continue
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
Expand All @@ -578,11 +577,13 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId())
}

count := request.GetCount()
ctx, task := trace.NewTask(ctx, "tso")
start := time.Now()
ts, err := s.tsoAllocatorManager.HandleRequest(ctx, request.GetDcLocation(), count)
task.End()
tsoHandleDuration.Observe(time.Since(start).Seconds())
task.End()
if err != nil {
return status.Error(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -1578,7 +1579,48 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB

// QueryRegion provides a stream processing of the region query.
func (s *GrpcServer) QueryRegion(stream pdpb.PD_QueryRegionServer) error {
return nil
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return err
}

Check warning on line 1589 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1581-L1589

Added lines #L1581 - L1589 were not covered by tests
}

for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

Check warning on line 1599 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1592-L1599

Added lines #L1592 - L1599 were not covered by tests

// TODO: add forwarding function

if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if clusterID := s.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId())
}

Check warning on line 1609 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1603-L1609

Added lines #L1603 - L1609 were not covered by tests

start := time.Now()
keyIDMap, regionsByID := s.GetRaftCluster().QueryRegions(request.GetRegionKeys(), request.GetRegionIds())
regionQueryDuration.Observe(time.Since(start).Seconds())
// Build the response and send it to the client.
response := &pdpb.QueryRegionResponse{
Header: s.header(),
RegionsById: regionsByID,
KeyIdMap: keyIDMap,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}

Check warning on line 1622 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1611-L1622

Added lines #L1611 - L1622 were not covered by tests
}
}

// Deprecated: use BatchScanRegions instead.
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

regionQueryDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "region_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of region query requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})

bucketReportLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -172,6 +181,7 @@ func init() {
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionQueryDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
prometheus.MustRegister(bucketReportCounter)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/tests/integrations
go 1.23

replace (
github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20241015032459-be1e7521da0a
github.com/tikv/pd => ../../
github.com/tikv/pd/client => ../../client
github.com/tikv/pd/tests/integrations/mcs => ./mcs
Expand Down
Loading

0 comments on commit 568bd46

Please sign in to comment.