Skip to content

Commit

Permalink
resource_manager, storage: create new storage endpoint for resource m…
Browse files Browse the repository at this point in the history
…anager (#5887)

ref #5851, ref #5886, ref pingcap/tiflow#8110

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Feb 1, 2023
1 parent 872c73d commit 73c188f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 22 deletions.
25 changes: 16 additions & 9 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
)

Expand All @@ -33,8 +34,9 @@ const defaultConsumptionChanSize = 1024
// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
groups map[string]*ResourceGroup
storage func() storage.Storage
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
createStorageFunc func() endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
// info to the background metrics flusher.
consumptionDispatcher chan struct {
Expand All @@ -46,8 +48,12 @@ type Manager struct {
// NewManager returns a new Manager.
func NewManager(srv *server.Server) *Manager {
m := &Manager{
groups: make(map[string]*ResourceGroup),
storage: srv.GetStorage,
groups: make(map[string]*ResourceGroup),
createStorageFunc: func() endpoint.ResourceGroupStorage {
return endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
nil)
},
consumptionDispatcher: make(chan struct {
resourceGroupName string
*rmpb.Consumption
Expand All @@ -60,6 +66,7 @@ func NewManager(srv *server.Server) *Manager {

// Init initializes the resource group manager.
func (m *Manager) Init() {
m.storage = m.createStorageFunc()
handler := func(k, v string) {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(v), group); err != nil {
Expand All @@ -68,7 +75,7 @@ func (m *Manager) Init() {
}
m.groups[group.Name] = FromProtoResourceGroup(group)
}
m.storage().LoadResourceGroupSettings(handler)
m.storage.LoadResourceGroupSettings(handler)
}

// AddResourceGroup puts a resource group.
Expand All @@ -84,7 +91,7 @@ func (m *Manager) AddResourceGroup(group *ResourceGroup) error {
return err
}
m.Lock()
if err := group.persistSettings(m.storage()); err != nil {
if err := group.persistSettings(m.storage); err != nil {
return err
}
m.groups[group.Name] = group
Expand All @@ -108,7 +115,7 @@ func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error {
if err != nil {
return err
}
if err := newGroup.persistSettings(m.storage()); err != nil {
if err := newGroup.persistSettings(m.storage); err != nil {
return err
}
m.groups[group.Name] = newGroup
Expand All @@ -117,7 +124,7 @@ func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error {

// DeleteResourceGroup deletes a resource group.
func (m *Manager) DeleteResourceGroup(name string) error {
if err := m.storage().DeleteResourceGroupSetting(name); err != nil {
if err := m.storage.DeleteResourceGroupSetting(name); err != nil {
return err
}
m.Lock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -220,7 +220,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {

// persistSettings persists the resource group settings.
// TODO: persist the state of the group separately.
func (rg *ResourceGroup) persistSettings(storage storage.Storage) error {
func (rg *ResourceGroup) persistSettings(storage endpoint.ResourceGroupStorage) error {
metaGroup := rg.IntoProtoResourceGroup()
return storage.SaveResourceGroupSetting(rg.Name, metaGroup)
}
3 changes: 2 additions & 1 deletion pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const (
schedulePath = "schedule"
gcPath = "gc"
rulesPath = "rules"
resourceGroupSettingsPath = "resource_group/settings"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
replicationPath = "replication_mode"
Expand All @@ -43,6 +42,8 @@ const (
keyspaceIDInfix = "id"
keyspaceAllocID = "alloc_id"
regionPathPrefix = "raft/r"
// resource group storage endpoint has prefix `resource_group`
resourceGroupSettingsPath = "settings"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down
8 changes: 4 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,9 +1888,9 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
name := item.GetName()
switch item.GetKind() {
case pdpb.EventType_PUT:
ops[i] = clientv3.OpPut(s.GetFinalPathWithinPD(request.GetConfigPath()+name), item.GetValue())
ops[i] = clientv3.OpPut(request.GetConfigPath()+name, item.GetValue())
case pdpb.EventType_DELETE:
ops[i] = clientv3.OpDelete(s.GetFinalPathWithinPD(request.GetConfigPath() + name))
ops[i] = clientv3.OpDelete(request.GetConfigPath() + name)
}
}
res, err :=
Expand All @@ -1907,7 +1907,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
configPath := request.GetConfigPath()
r, err := s.client.Get(ctx, s.GetFinalPathWithinPD(configPath), clientv3.WithPrefix())
r, err := s.client.Get(ctx, configPath, clientv3.WithPrefix())
if err != nil {
return &pdpb.LoadGlobalConfigResponse{}, err
}
Expand All @@ -1928,7 +1928,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve
// If the revision is compacted, will meet required revision has been compacted error.
// - If required revision < CompactRevision, we need to reload all configs to avoid losing data.
// - If required revision >= CompactRevision, just keep watching.
watchChan := s.client.Watch(ctx, s.GetFinalPathWithinPD(req.GetConfigPath()), clientv3.WithPrefix(), clientv3.WithRev(revision))
watchChan := s.client.Watch(ctx, req.GetConfigPath(), clientv3.WithPrefix(), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
Expand Down
5 changes: 0 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,11 +730,6 @@ func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}

// GetFinalPathWithinPD returns the etcd path.
func (s *Server) GetFinalPathWithinPD(configPath string) string {
return strings.Join([]string{s.rootPath, configPath}, "/")
}

// GetLeader returns the leader of PD cluster(i.e the PD leader).
func (s *Server) GetLeader() *pdpb.Member {
return s.member.GetLeader()
Expand Down
2 changes: 1 addition & 1 deletion tests/client/global_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (suite *globalConfigTestSuite) TearDownSuite() {
}

func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string {
return suite.server.GetFinalPathWithinPD(globalConfigPath + configPath)
return globalConfigPath + configPath
}

func (suite *globalConfigTestSuite) TestLoad() {
Expand Down

0 comments on commit 73c188f

Please sign in to comment.