Skip to content

Commit

Permalink
Merge branch 'master' into fix-placement-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Oct 17, 2023
2 parents e6664b8 + a85f29c commit a51fb5e
Show file tree
Hide file tree
Showing 21 changed files with 489 additions and 131 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA=
github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
Expand Down Expand Up @@ -130,6 +132,16 @@ func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
return c.labelerManager
}

// GetRegionSplitter returns the region splitter.
func (c *Cluster) GetRegionSplitter() *splitter.RegionSplitter {
return c.coordinator.GetRegionSplitter()
}

// GetRegionScatterer returns the region scatter.
func (c *Cluster) GetRegionScatterer() *scatter.RegionScatterer {
return c.coordinator.GetRegionScatterer()
}

// GetStoresLoads returns load stats of all stores.
func (c *Cluster) GetStoresLoads() map[uint64][]float64 {
return c.hotStat.GetStoresLoads()
Expand Down
105 changes: 97 additions & 8 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat

c := s.GetCluster()
if c == nil {
resp := &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{
ClusterId: s.clusterID,
Error: &schedulingpb.Error{
Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED,
Message: "scheduling server is not initialized yet",
},
}}
resp := &schedulingpb.RegionHeartbeatResponse{Header: s.notBootstrappedHeader()}
err := server.Send(resp)
return errors.WithStack(err)
}
Expand Down Expand Up @@ -177,7 +171,7 @@ func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.Stor
if c == nil {
// TODO: add metrics
log.Info("cluster isn't initialized")
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
return &schedulingpb.StoreHeartbeatResponse{Header: s.notBootstrappedHeader()}, nil
}

if c.GetStore(request.GetStats().GetStoreId()) == nil {
Expand All @@ -191,6 +185,75 @@ func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.Stor
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
}

// SplitRegions split regions by the given split keys
func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitRegionsRequest) (*schedulingpb.SplitRegionsResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
finishedPercentage, newRegionIDs := c.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
return &schedulingpb.SplitRegionsResponse{
Header: s.header(),
RegionsId: newRegionIDs,
FinishedPercentage: uint64(finishedPercentage),
}, nil
}

// ScatterRegions implements gRPC PDServer.
func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}

opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
return nil, err
}
percentage := 100
if len(failures) > 0 {
percentage = 100 - 100*len(failures)/(opsCount+len(failures))
log.Debug("scatter regions", zap.Errors("failures", func() []error {
r := make([]error, 0, len(failures))
for _, err := range failures {
r = append(r, err)
}
return r
}()))
}
return &schedulingpb.ScatterRegionsResponse{
Header: s.header(),
FinishedPercentage: uint64(percentage),
}, nil
}

// GetOperator gets information about the operator belonging to the specify region.
func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOperatorRequest) (*schedulingpb.GetOperatorResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := c.GetCoordinator().GetOperatorController()
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
if r == nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: "Not Found",
})
return &schedulingpb.GetOperatorResponse{Header: header}, nil
}

return &schedulingpb.GetOperatorResponse{
Header: s.header(),
RegionId: requestID,
Desc: []byte(r.Desc()),
Kind: []byte(r.Kind().String()),
Status: r.Status,
}, nil
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
schedulingpb.RegisterSchedulingServer(g, s)
Expand All @@ -201,3 +264,29 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader {
return &schedulingpb.ResponseHeader{
ClusterId: s.clusterID,
Error: err,
}
}

func (s *Service) notBootstrappedHeader() *schedulingpb.ResponseHeader {
return s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED,
Message: "cluster is not initialized",
})
}

func (s *Service) header() *schedulingpb.ResponseHeader {
if s.clusterID == 0 {
return s.wrapErrorToHeader(schedulingpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready")
}
return &schedulingpb.ResponseHeader{ClusterId: s.clusterID}
}

func (s *Service) wrapErrorToHeader(
errorType schedulingpb.ErrorType, message string) *schedulingpb.ResponseHeader {
return s.errorHeader(&schedulingpb.Error{Type: errorType, Message: message})
}
83 changes: 62 additions & 21 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -49,18 +52,27 @@ type Watcher struct {
etcdClient *clientv3.Client
ruleStorage endpoint.RuleStorage

// checkerController is used to add the suspect key ranges to the checker when the rule changed.
checkerController *checker.Controller
// ruleManager is used to manage the placement rules.
ruleManager *placement.RuleManager
// regionLabeler is used to manage the region label rules.
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
checkerController *checker.Controller,
ruleManager *placement.RuleManager,
regionLabeler *labeler.RegionLabeler,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
Expand All @@ -71,6 +83,9 @@ func NewWatcher(
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -90,17 +105,31 @@ func NewWatcher(
func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
}
postEventFn := func() error {
return nil
Expand All @@ -120,14 +149,24 @@ func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleGroupJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
Expand All @@ -147,14 +186,16 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRegionRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
}
postEventFn := func() error {
return nil
Expand Down
17 changes: 14 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) {
func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startWatcher()
err := s.startMetaConfWatcher()
if err != nil {
return err
}
Expand All @@ -464,7 +464,13 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
err = s.startRuleWatcher()
if err != nil {
return err
}
s.cluster.StartBackgroundJobs()
return nil
}
Expand All @@ -474,7 +480,7 @@ func (s *Server) stopCluster() {
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
Expand All @@ -483,7 +489,12 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage)
return err
}

func (s *Server) startRuleWatcher() (err error) {
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage,
s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler())
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
}
}

// update inmemory states.
// update in-memory states.
l.Lock()
defer l.Unlock()

Expand Down
6 changes: 4 additions & 2 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat
}

func (m *RuleManager) loadRules() error {
var toSave []*Rule
var toDelete []string
var (
toSave []*Rule
toDelete []string
)
err := m.storage.LoadRules(func(k, v string) {
r, err := NewRuleFromJSON([]byte(v))
if err != nil {
Expand Down
Loading

0 comments on commit a51fb5e

Please sign in to comment.