diff --git a/client/http/client.go b/client/http/client.go index cd7be205c12..d15693e11d4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -49,8 +49,12 @@ type Client interface { GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + /* Config-related interfaces */ + GetScheduleConfig(context.Context) (map[string]interface{}, error) + SetScheduleConfig(context.Context, map[string]interface{}) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -191,12 +195,23 @@ func (c *client) execDuration(name string, duration time.Duration) { c.executionDuration.WithLabelValues(name).Observe(duration.Seconds()) } +// HeaderOption configures the HTTP header. +type HeaderOption func(header http.Header) + +// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request. +func WithAllowFollowerHandle() HeaderOption { + return func(header http.Header) { + header.Set("PD-Allow-Follower-Handle", "true") + } +} + // At present, we will use the retry strategy of polling by default to keep // it consistent with the current implementation of some clients (e.g. TiDB). func (c *client) requestWithRetry( ctx context.Context, name, uri, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { var ( err error @@ -204,7 +219,7 @@ func (c *client) requestWithRetry( ) for idx := 0; idx < len(c.pdAddrs); idx++ { addr = c.pdAddrs[idx] - err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res) + err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...) if err == nil { break } @@ -218,6 +233,7 @@ func (c *client) request( ctx context.Context, name, url, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { logFields := []zap.Field{ zap.String("name", name), @@ -229,6 +245,9 @@ func (c *client) request( log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) } + for _, opt := range headerOpts { + opt(req.Header) + } start := time.Now() resp, err := c.cli.Do(req) if err != nil { @@ -361,6 +380,23 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e return &hotWriteRegions, nil } +// GetHistoryHotRegions gets the history hot region statistics info. +func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) { + reqJSON, err := json.Marshal(req) + if err != nil { + return nil, errors.Trace(err) + } + var historyHotRegions HistoryHotRegions + err = c.requestWithRetry(ctx, + "GetHistoryHotRegions", HotHistory, + http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, + WithAllowFollowerHandle()) + if err != nil { + return nil, err + } + return &historyHotRegions, nil +} + // GetRegionStatusByKeyRange gets the region status by key range. // The keys in the key range should be encoded in the UTF-8 bytes format. func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) { @@ -375,6 +411,29 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan return ®ionStats, nil } +// GetScheduleConfig gets the schedule configurations. +func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { + var config map[string]interface{} + err := c.requestWithRetry(ctx, + "GetScheduleConfig", ScheduleConfig, + http.MethodGet, http.NoBody, &config) + if err != nil { + return nil, err + } + return config, nil +} + +// SetScheduleConfig sets the schedule configurations. +func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interface{}) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "SetScheduleConfig", ScheduleConfig, + http.MethodPost, bytes.NewBuffer(configJSON), nil) +} + // GetStores gets the stores info. func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo diff --git a/client/http/types.go b/client/http/types.go index df448e7e20d..4e99d911e0b 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -19,6 +19,8 @@ import ( "encoding/json" "net/url" "time" + + "github.com/pingcap/kvproto/pkg/encryptionpb" ) // KeyRange defines a range of keys in bytes. @@ -166,6 +168,46 @@ type HotPeerStatShow struct { LastUpdateTime time.Time `json:"last_update_time,omitempty"` } +// HistoryHotRegionsRequest wrap the request conditions. +type HistoryHotRegionsRequest struct { + StartTime int64 `json:"start_time,omitempty"` + EndTime int64 `json:"end_time,omitempty"` + RegionIDs []uint64 `json:"region_ids,omitempty"` + StoreIDs []uint64 `json:"store_ids,omitempty"` + PeerIDs []uint64 `json:"peer_ids,omitempty"` + IsLearners []bool `json:"is_learners,omitempty"` + IsLeaders []bool `json:"is_leaders,omitempty"` + HotRegionTypes []string `json:"hot_region_type,omitempty"` +} + +// HistoryHotRegions wraps historyHotRegion +type HistoryHotRegions struct { + HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"` +} + +// HistoryHotRegion wraps hot region info +// it is storage format of hot_region_storage +type HistoryHotRegion struct { + UpdateTime int64 `json:"update_time"` + RegionID uint64 `json:"region_id"` + PeerID uint64 `json:"peer_id"` + StoreID uint64 `json:"store_id"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + HotRegionType string `json:"hot_region_type"` + HotDegree int64 `json:"hot_degree"` + FlowBytes float64 `json:"flow_bytes"` + KeyRate float64 `json:"key_rate"` + QueryRate float64 `json:"query_rate"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + // Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key. + // IV for end_key is calculated from (encryption_meta.iv + len(start_key)). + // The field is only used by PD and should be ignored otherwise. + // If encryption_meta is empty (i.e. nil), it means start_key and end_key are unencrypted. + EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"` +} + // StoresInfo represents the information of all TiKV/TiFlash stores. type StoresInfo struct { Count int `json:"count"` diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index e96093f598d..98ddd611326 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -63,8 +63,6 @@ type ServiceDiscovery interface { GetKeyspaceID() uint32 // GetKeyspaceGroupID returns the ID of the keyspace group GetKeyspaceGroupID() uint32 - // DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. - DiscoverMicroservice(svcType serviceType) ([]string, error) // GetServiceURLs returns the URLs of the servers providing the service GetServiceURLs() []string // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint @@ -324,7 +322,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 { } // DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. -func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) { +func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) { switch svcType { case apiService: urls = c.GetServiceURLs() diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 92f95129951..5f14c406797 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -288,21 +288,6 @@ func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 { return c.keyspaceGroupSD.group.Id } -// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. -func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) ([]string, error) { - var urls []string - - switch svcType { - case apiService: - case tsoService: - return c.apiSvcDiscovery.DiscoverMicroservice(tsoService) - default: - panic("invalid service type") - } - - return urls, nil -} - // GetServiceURLs returns the URLs of the tso primary/secondary addresses of this keyspace group. // For testing use. It should only be called when the client is closed. func (c *tsoServiceDiscovery) GetServiceURLs() []string { @@ -582,7 +567,7 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) ) t := c.tsoServerDiscovery if len(t.addrs) == 0 || t.failureCount == len(t.addrs) { - addrs, err = sd.DiscoverMicroservice(tsoService) + addrs, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService) if err != nil { return "", err } diff --git a/pkg/dashboard/adapter/manager.go b/pkg/dashboard/adapter/manager.go index a3691242c8f..293d8ad6549 100644 --- a/pkg/dashboard/adapter/manager.go +++ b/pkg/dashboard/adapter/manager.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/pingcap/tidb-dashboard/pkg/apiserver" @@ -75,6 +76,10 @@ func (m *Manager) Stop() { func (m *Manager) serviceLoop() { defer logutil.LogPanic() defer m.wg.Done() + // TODO: After we fix the atomic problem of config, we can remove this failpoint. + failpoint.Inject("skipDashboardLoop", func() { + failpoint.Return() + }) ticker := time.NewTicker(CheckInterval) defer ticker.Stop() diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index c20abfc5f79..0c1b017d7aa 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -1224,14 +1224,12 @@ func waitForPrimariesServing( re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32, ) { testutil.Eventually(re, func() bool { - for i := 0; i < 100; i++ { - for j, id := range ids { - if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() { - return false - } - if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil { - return false - } + for j, id := range ids { + if member, err := mgrs[j].GetElectionMember(id, id); err != nil || member == nil || !member.IsLeader() { + return false + } + if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil { + return false } } return true diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 44d45ff4c70..a001ec4bd03 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -163,6 +163,7 @@ func GetForwardedHost(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { log.Debug("failed to get forwarding metadata") + return "" } if t, ok := md[ForwardMetadataKey]; ok { return t[0] diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index 9602cc863ef..c2802bb939d 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -113,7 +113,7 @@ func LoadKeyspace(c *gin.Context) { if value, ok := c.GetQuery("force_refresh_group_id"); ok && value == "true" { groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } // keyspace has been checked in LoadKeyspace, so no need to check again. diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index a580b21f705..a9f042687f6 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -30,7 +30,8 @@ import ( "github.com/tikv/pd/server/apiv2/middlewares" ) -const groupManagerUninitializedErr = "keyspace group manager is not initialized" +// GroupManagerUninitializedErr is the error message for uninitialized keyspace group manager. +const GroupManagerUninitializedErr = "keyspace group manager is not initialized" // RegisterTSOKeyspaceGroup registers keyspace group handlers to the server. func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { @@ -78,7 +79,7 @@ func CreateKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.CreateKeyspaceGroups(createParams.KeyspaceGroups) @@ -101,7 +102,7 @@ func GetKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } keyspaceGroups, err := manager.GetKeyspaceGroups(scanStart, scanLimit) @@ -152,7 +153,7 @@ func GetKeyspaceGroupByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } @@ -189,7 +190,7 @@ func DeleteKeyspaceGroupByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } kg, err := manager.DeleteKeyspaceGroupByID(id) @@ -250,7 +251,7 @@ func SplitKeyspaceGroupByID(c *gin.Context) { } groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } @@ -289,7 +290,7 @@ func FinishSplitKeyspaceByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.FinishSplitKeyspaceByID(id) @@ -337,7 +338,7 @@ func MergeKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } // Merge keyspace group. @@ -364,7 +365,7 @@ func FinishMergeKeyspaceByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.FinishMergeKeyspaceByID(id) @@ -390,7 +391,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } allocParams := &AllocNodesForKeyspaceGroupParams{} @@ -437,7 +438,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } setParams := &SetNodesForKeyspaceGroupParams{} @@ -493,7 +494,7 @@ func SetPriorityForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } setParams := &SetPriorityForKeyspaceGroupParams{} diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 42ab91fce17..ae9047c626b 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -1024,6 +1024,7 @@ func (o *PersistOptions) IsRaftKV2() bool { } // SetRegionBucketEnabled sets if the region bucket is enabled. +// only used for test. func (o *PersistOptions) SetRegionBucketEnabled(enabled bool) { cfg := o.GetStoreConfig().Clone() cfg.SetRegionBucketEnabled(enabled) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 3a52e91e1f8..a007b893187 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -55,8 +55,16 @@ func (suite *httpClientTestSuite) SetupSuite() { re.NoError(err) leader := suite.cluster.WaitLeader() re.NotEmpty(leader) - err = suite.cluster.GetLeaderServer().BootstrapCluster() + leaderServer := suite.cluster.GetLeaderServer() + err = leaderServer.BootstrapCluster() re.NoError(err) + for _, region := range []*core.RegionInfo{ + core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), + core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), + } { + err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + re.NoError(err) + } var ( testServers = suite.cluster.GetServers() endpoints = make([]string, 0, len(testServers)) @@ -73,6 +81,53 @@ func (suite *httpClientTestSuite) TearDownSuite() { suite.cluster.Destroy() } +func (suite *httpClientTestSuite) TestMeta() { + re := suite.Require() + region, err := suite.client.GetRegionByID(suite.ctx, 10) + re.NoError(err) + re.Equal(int64(10), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey) + region, err = suite.client.GetRegionByKey(suite.ctx, []byte("a2")) + re.NoError(err) + re.Equal(int64(11), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey) + regions, err := suite.client.GetRegions(suite.ctx) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByStoreID(suite.ctx, 1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + re.NoError(err) + re.Equal(2, regionStats.Count) + hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx) + re.NoError(err) + re.Len(hotReadRegions.AsPeer, 1) + re.Len(hotReadRegions.AsLeader, 1) + hotWriteRegions, err := suite.client.GetHotWriteRegions(suite.ctx) + re.NoError(err) + re.Len(hotWriteRegions.AsPeer, 1) + re.Len(hotWriteRegions.AsLeader, 1) + historyHorRegions, err := suite.client.GetHistoryHotRegions(suite.ctx, &pd.HistoryHotRegionsRequest{ + StartTime: 0, + EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), + }) + re.NoError(err) + re.Len(historyHorRegions.HistoryHotRegion, 0) + store, err := suite.client.GetStores(suite.ctx) + re.NoError(err) + re.Equal(1, store.Count) + re.Len(store.Stores, 1) +} + func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { re := suite.Require() testMinResolvedTS := tsoutil.TimeToTS(time.Now()) @@ -271,13 +326,6 @@ func (suite *httpClientTestSuite) TestRegionLabel() { func (suite *httpClientTestSuite) TestAccelerateSchedule() { re := suite.Require() raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() - for _, region := range []*core.RegionInfo{ - core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), - core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), - } { - err := raftCluster.HandleRegionHeartbeat(region) - re.NoError(err) - } suspectRegions := raftCluster.GetSuspectRegions() re.Len(suspectRegions, 0) err := suite.client.AccelerateSchedule(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) @@ -295,3 +343,18 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { suspectRegions = raftCluster.GetSuspectRegions() re.Len(suspectRegions, 2) } + +func (suite *httpClientTestSuite) TestScheduleConfig() { + re := suite.Require() + config, err := suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(4), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) + config["leader-schedule-limit"] = float64(8) + err = suite.client.SetScheduleConfig(suite.ctx, config) + re.NoError(err) + config, err = suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(8), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) +} diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 3f82be44399..3b3310185b1 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -60,8 +61,10 @@ func TestConfigTestSuite(t *testing.T) { } func (suite *configTestSuite) TestConfig() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/dashboard/adapter/skipDashboardLoop", `return(true)`)) env := tests.NewSchedulingTestEnvironment(suite.T()) env.RunTestInTwoModes(suite.checkConfig) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/dashboard/adapter/skipDashboardLoop")) } func (suite *configTestSuite) checkConfig(cluster *tests.TestCluster) { diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go index 805a30e6f18..3ff755fe601 100644 --- a/tests/pdctl/keyspace/keyspace_test.go +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -105,6 +105,35 @@ func TestKeyspace(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } +// Show command should auto retry without refresh_group_id if keyspace group manager not initialized. +// See issue: #7441 +func TestKeyspaceGroupUninitialized(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + tc, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + tc.WaitLeader() + re.NoError(tc.GetLeaderServer().BootstrapCluster()) + pdAddr := tc.GetConfig().GetClientURL() + + keyspaceName := "DEFAULT" + keyspaceID := uint32(0) + args := []string{"-u", pdAddr, "keyspace", "show", "name", keyspaceName} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + var meta api.KeyspaceMeta + re.NoError(json.Unmarshal(output, &meta)) + re.Equal(keyspaceName, meta.GetName()) + re.Equal(keyspaceID, meta.GetId()) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) +} + type keyspaceTestSuite struct { suite.Suite ctx context.Context diff --git a/tools/pd-ctl/pdctl/command/keyspace_command.go b/tools/pd-ctl/pdctl/command/keyspace_command.go index 7c0d3d78bf6..93a99abc39f 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_command.go @@ -28,11 +28,12 @@ import ( const ( keyspacePrefix = "pd/api/v2/keyspaces" // flags - nmConfig = "config" - nmLimit = "limit" - nmPageToken = "page_token" - nmRemove = "remove" - nmUpdate = "update" + nmConfig = "config" + nmLimit = "limit" + nmPageToken = "page_token" + nmRemove = "remove" + nmUpdate = "update" + nmForceRefreshGroupID = "force_refresh_group_id" ) // NewKeyspaceCommand returns a keyspace subcommand of rootCmd. @@ -64,6 +65,7 @@ func newShowKeyspaceCommand() *cobra.Command { Short: "show keyspace metadata specified by keyspace name", Run: showKeyspaceNameCommandFunc, } + showByName.Flags().Bool(nmForceRefreshGroupID, true, "force refresh keyspace group id") r.AddCommand(showByID) r.AddCommand(showByName) return r @@ -87,7 +89,21 @@ func showKeyspaceNameCommandFunc(cmd *cobra.Command, args []string) { cmd.Usage() return } - resp, err := doRequest(cmd, fmt.Sprintf("%s/%s?force_refresh_group_id=true", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + refreshGroupID, err := cmd.Flags().GetBool(nmForceRefreshGroupID) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + url := fmt.Sprintf("%s/%s", keyspacePrefix, args[0]) + if refreshGroupID { + url += "?force_refresh_group_id=true" + } + resp, err := doRequest(cmd, url, http.MethodGet, http.Header{}) + // Retry without the force_refresh_group_id if the keyspace group manager is not initialized. + // This can happen when PD is not running in API mode. + if err != nil && refreshGroupID && strings.Contains(err.Error(), handlers.GroupManagerUninitializedErr) { + resp, err = doRequest(cmd, fmt.Sprintf("%s/%s", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + } if err != nil { cmd.PrintErrln("Failed to get the keyspace information: ", err) return