Skip to content

Commit

Permalink
Merge branch 'master' into follower/add_follower_option
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 29, 2023
2 parents 79b0ba0 + 90245e3 commit f4923bd
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 55 deletions.
61 changes: 60 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ type Client interface {
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Config-related interfaces */
GetScheduleConfig(context.Context) (map[string]interface{}, error)
SetScheduleConfig(context.Context, map[string]interface{}) error
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
Expand Down Expand Up @@ -191,20 +195,31 @@ func (c *client) execDuration(name string, duration time.Duration) {
c.executionDuration.WithLabelValues(name).Observe(duration.Seconds())
}

// HeaderOption configures the HTTP header.
type HeaderOption func(header http.Header)

// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request.
func WithAllowFollowerHandle() HeaderOption {
return func(header http.Header) {
header.Set("PD-Allow-Follower-Handle", "true")
}
}

// At present, we will use the retry strategy of polling by default to keep
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
ctx context.Context,
name, uri, method string,
body io.Reader, res interface{},
headerOpts ...HeaderOption,
) error {
var (
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res)
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...)
if err == nil {
break
}
Expand All @@ -218,6 +233,7 @@ func (c *client) request(
ctx context.Context,
name, url, method string,
body io.Reader, res interface{},
headerOpts ...HeaderOption,
) error {
logFields := []zap.Field{
zap.String("name", name),
Expand All @@ -229,6 +245,9 @@ func (c *client) request(
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
for _, opt := range headerOpts {
opt(req.Header)
}
start := time.Now()
resp, err := c.cli.Do(req)
if err != nil {
Expand Down Expand Up @@ -361,6 +380,23 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e
return &hotWriteRegions, nil
}

// GetHistoryHotRegions gets the history hot region statistics info.
func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) {
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
var historyHotRegions HistoryHotRegions
err = c.requestWithRetry(ctx,
"GetHistoryHotRegions", HotHistory,
http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions,
WithAllowFollowerHandle())
if err != nil {
return nil, err
}
return &historyHotRegions, nil
}

// GetRegionStatusByKeyRange gets the region status by key range.
// The keys in the key range should be encoded in the UTF-8 bytes format.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
Expand All @@ -375,6 +411,29 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan
return &regionStats, nil
}

// GetScheduleConfig gets the schedule configurations.
func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) {
var config map[string]interface{}
err := c.requestWithRetry(ctx,
"GetScheduleConfig", ScheduleConfig,
http.MethodGet, http.NoBody, &config)
if err != nil {
return nil, err
}
return config, nil
}

// SetScheduleConfig sets the schedule configurations.
func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interface{}) error {
configJSON, err := json.Marshal(config)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetScheduleConfig", ScheduleConfig,
http.MethodPost, bytes.NewBuffer(configJSON), nil)
}

// GetStores gets the stores info.
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
Expand Down
42 changes: 42 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/json"
"net/url"
"time"

"github.com/pingcap/kvproto/pkg/encryptionpb"
)

// KeyRange defines a range of keys in bytes.
Expand Down Expand Up @@ -166,6 +168,46 @@ type HotPeerStatShow struct {
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

// HistoryHotRegionsRequest wrap the request conditions.
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
IsLearners []bool `json:"is_learners,omitempty"`
IsLeaders []bool `json:"is_leaders,omitempty"`
HotRegionTypes []string `json:"hot_region_type,omitempty"`
}

// HistoryHotRegions wraps historyHotRegion
type HistoryHotRegions struct {
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"`
}

// HistoryHotRegion wraps hot region info
// it is storage format of hot_region_storage
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time"`
RegionID uint64 `json:"region_id"`
PeerID uint64 `json:"peer_id"`
StoreID uint64 `json:"store_id"`
IsLeader bool `json:"is_leader"`
IsLearner bool `json:"is_learner"`
HotRegionType string `json:"hot_region_type"`
HotDegree int64 `json:"hot_degree"`
FlowBytes float64 `json:"flow_bytes"`
KeyRate float64 `json:"key_rate"`
QueryRate float64 `json:"query_rate"`
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
// Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key.
// IV for end_key is calculated from (encryption_meta.iv + len(start_key)).
// The field is only used by PD and should be ignored otherwise.
// If encryption_meta is empty (i.e. nil), it means start_key and end_key are unencrypted.
EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"`
}

// StoresInfo represents the information of all TiKV/TiFlash stores.
type StoresInfo struct {
Count int `json:"count"`
Expand Down
4 changes: 1 addition & 3 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ type ServiceDiscovery interface {
GetKeyspaceID() uint32
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
DiscoverMicroservice(svcType serviceType) ([]string, error)
// GetServiceURLs returns the URLs of the servers providing the service
GetServiceURLs() []string
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
Expand Down Expand Up @@ -324,7 +322,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
}

// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
urls = c.GetServiceURLs()
Expand Down
17 changes: 1 addition & 16 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,6 @@ func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 {
return c.keyspaceGroupSD.group.Id
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) ([]string, error) {
var urls []string

switch svcType {
case apiService:
case tsoService:
return c.apiSvcDiscovery.DiscoverMicroservice(tsoService)
default:
panic("invalid service type")
}

return urls, nil
}

// GetServiceURLs returns the URLs of the tso primary/secondary addresses of this keyspace group.
// For testing use. It should only be called when the client is closed.
func (c *tsoServiceDiscovery) GetServiceURLs() []string {
Expand Down Expand Up @@ -582,7 +567,7 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error)
)
t := c.tsoServerDiscovery
if len(t.addrs) == 0 || t.failureCount == len(t.addrs) {
addrs, err = sd.DiscoverMicroservice(tsoService)
addrs, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService)
if err != nil {
return "", err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/dashboard/adapter/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb-dashboard/pkg/apiserver"
Expand Down Expand Up @@ -75,6 +76,10 @@ func (m *Manager) Stop() {
func (m *Manager) serviceLoop() {
defer logutil.LogPanic()
defer m.wg.Done()
// TODO: After we fix the atomic problem of config, we can remove this failpoint.
failpoint.Inject("skipDashboardLoop", func() {
failpoint.Return()
})

ticker := time.NewTicker(CheckInterval)
defer ticker.Stop()
Expand Down
14 changes: 6 additions & 8 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,14 +1224,12 @@ func waitForPrimariesServing(
re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32,
) {
testutil.Eventually(re, func() bool {
for i := 0; i < 100; i++ {
for j, id := range ids {
if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() {
return false
}
if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil {
return false
}
for j, id := range ids {
if member, err := mgrs[j].GetElectionMember(id, id); err != nil || member == nil || !member.IsLeader() {
return false
}
if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil {
return false
}
}
return true
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func GetForwardedHost(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Debug("failed to get forwarding metadata")
return ""
}
if t, ok := md[ForwardMetadataKey]; ok {
return t[0]
Expand Down
2 changes: 1 addition & 1 deletion server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func LoadKeyspace(c *gin.Context) {
if value, ok := c.GetQuery("force_refresh_group_id"); ok && value == "true" {
groupManager := svr.GetKeyspaceGroupManager()
if groupManager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
// keyspace has been checked in LoadKeyspace, so no need to check again.
Expand Down
25 changes: 13 additions & 12 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
"github.com/tikv/pd/server/apiv2/middlewares"
)

const groupManagerUninitializedErr = "keyspace group manager is not initialized"
// GroupManagerUninitializedErr is the error message for uninitialized keyspace group manager.
const GroupManagerUninitializedErr = "keyspace group manager is not initialized"

// RegisterTSOKeyspaceGroup registers keyspace group handlers to the server.
func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func CreateKeyspaceGroups(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
err = manager.CreateKeyspaceGroups(createParams.KeyspaceGroups)
Expand All @@ -101,7 +102,7 @@ func GetKeyspaceGroups(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
keyspaceGroups, err := manager.GetKeyspaceGroups(scanStart, scanLimit)
Expand Down Expand Up @@ -152,7 +153,7 @@ func GetKeyspaceGroupByID(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}

Expand Down Expand Up @@ -189,7 +190,7 @@ func DeleteKeyspaceGroupByID(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
kg, err := manager.DeleteKeyspaceGroupByID(id)
Expand Down Expand Up @@ -250,7 +251,7 @@ func SplitKeyspaceGroupByID(c *gin.Context) {
}
groupManager := svr.GetKeyspaceGroupManager()
if groupManager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}

Expand Down Expand Up @@ -289,7 +290,7 @@ func FinishSplitKeyspaceByID(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
err = manager.FinishSplitKeyspaceByID(id)
Expand Down Expand Up @@ -337,7 +338,7 @@ func MergeKeyspaceGroups(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
groupManager := svr.GetKeyspaceGroupManager()
if groupManager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
// Merge keyspace group.
Expand All @@ -364,7 +365,7 @@ func FinishMergeKeyspaceByID(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
err = manager.FinishMergeKeyspaceByID(id)
Expand All @@ -390,7 +391,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
allocParams := &AllocNodesForKeyspaceGroupParams{}
Expand Down Expand Up @@ -437,7 +438,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
setParams := &SetNodesForKeyspaceGroupParams{}
Expand Down Expand Up @@ -493,7 +494,7 @@ func SetPriorityForKeyspaceGroup(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}
setParams := &SetPriorityForKeyspaceGroupParams{}
Expand Down
1 change: 1 addition & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ func (o *PersistOptions) IsRaftKV2() bool {
}

// SetRegionBucketEnabled sets if the region bucket is enabled.
// only used for test.
func (o *PersistOptions) SetRegionBucketEnabled(enabled bool) {
cfg := o.GetStoreConfig().Clone()
cfg.SetRegionBucketEnabled(enabled)
Expand Down
Loading

0 comments on commit f4923bd

Please sign in to comment.