Skip to content

Commit

Permalink
client: introduce an independent region client interface (#8874)
Browse files Browse the repository at this point in the history
ref #8690

Introduce an independent region client interface.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Dec 9, 2024
1 parent 53adbed commit 536ae49
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 99 deletions.
99 changes: 17 additions & 82 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package pd

import (
"context"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand All @@ -32,6 +30,7 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/constants"
"github.com/tikv/pd/client/errs"
Expand All @@ -43,15 +42,6 @@ import (
"go.uber.org/zap"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
EventType pdpb.EventType
Expand All @@ -64,30 +54,6 @@ type GlobalConfigItem struct {
type RPCClient interface {
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -141,6 +107,7 @@ type RPCClient interface {
// on your needs.
WithCallerComponent(callerComponent caller.Component) RPCClient

router.Client
tso.Client
metastorage.Client
// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -214,38 +181,6 @@ type SecurityOption struct {
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
callerComponent caller.Component,
Expand Down Expand Up @@ -634,12 +569,12 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region {
if res.Region == nil {
return nil
}

r := &Region{
r := &router.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Expand All @@ -652,7 +587,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -691,7 +626,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -731,7 +666,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -771,7 +706,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -811,7 +746,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
}

// ScanRegions implements the RPCClient interface.
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -862,7 +797,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
}

// BatchScanRegions implements the RPCClient interface.
func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) BatchScanRegions(ctx context.Context, ranges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -915,10 +850,10 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*router.Region {
regions := make([]*router.Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
region := &router.Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand All @@ -932,21 +867,21 @@ func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*router.Region {
var regions []*router.Region
if len(resp.GetRegions()) == 0 {
// Make it compatible with old server.
metas, leaders := resp.GetRegionMetas(), resp.GetLeaders()
for i := range metas {
r := &Region{Meta: metas[i]}
r := &router.Region{Meta: metas[i]}
if i < len(leaders) {
r.Leader = leaders[i]
}
regions = append(regions, r)
}
} else {
for _, r := range resp.GetRegions() {
region := &Region{
region := &router.Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand Down
93 changes: 93 additions & 0 deletions client/clients/router/router_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

import (
"context"
"encoding/hex"
"net/url"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/client/opt"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// Client defines the interface of a router client, which includes the methods for obtaining the routing information.
type Client interface {
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
}
2 changes: 1 addition & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
pd "github.com/tikv/pd/client"
pd "github.com/tikv/pd/client/clients/router"
)

// ServiceSafePoint is the safepoint for a specific service
Expand Down
Loading

0 comments on commit 536ae49

Please sign in to comment.