Skip to content

Commit

Permalink
resource_manager: add service mode server (tikv#5994)
Browse files Browse the repository at this point in the history
ref tikv#5837

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Conflicts:
	go.mod
	pkg/mcs/tso/server/server.go
	tests/client/go.sum
	tests/mcs/go.sum
  • Loading branch information
lhy1024 authored and nolouch committed Feb 24, 2023
1 parent 1a73b63 commit 362e629
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 35 deletions.
6 changes: 3 additions & 3 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
// errNotLeaderMsg is returned when the requested server is not the leader.
errNotLeaderMsg = "not leader"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -58,7 +58,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotLeaderMsg) {
if strings.Contains(err.Error(), errNotPrimary) {
c.ScheduleCheckLeader()
}
}
Expand Down
25 changes: 22 additions & 3 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/autoscaling"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
resource_manager "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -78,14 +79,15 @@ func main() {
// NewServiceCommand returns the service command.
func NewServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "service <tso>",
Short: "Run a service",
Use: "service <mode>",
Short: "Run a service, for example, tso, resource_manager",
}
cmd.AddCommand(NewTSOServiceCommand())
cmd.AddCommand(NewResourceManagerServiceCommand())
return cmd
}

// NewTSOServiceCommand returns the unsafe remove failed stores command.
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tso",
Expand All @@ -102,6 +104,23 @@ func NewTSOServiceCommand() *cobra.Command {
return cmd
}

// NewResourceManagerServiceCommand returns the resource manager service command.
func NewResourceManagerServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "resource_manager",
Short: "Run the resource manager service",
Run: resource_manager.CreateServerWrapper,
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
return cmd
}

func createServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cfg := config.NewConfig()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"net/http"

"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -38,9 +37,8 @@ type Server interface {
GetHTTPClient() *http.Client
// AddStartCallback adds a callback in the startServer phase.
AddStartCallback(callbacks ...func())
// TODO: replace these two methods with `primary` function without etcd server dependency.
// GetMember returns the member information.
GetMember() *member.Member
// AddLeaderCallback adds a callback in the leader campaign phase.
AddLeaderCallback(callbacks ...func(context.Context))
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
}
170 changes: 170 additions & 0 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.uber.org/zap"
)

const (
defaultName = "Resource Manager"
defaultBackendEndpoints = "127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:3380"
defaultEnableGRPCGateway = true

defaultLogFormat = "text"
defaultDisableErrorVerbose = true
)

// Config is the configuration for the resource manager.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Logger *zap.Logger
LogProps *log.ZapProperties
Security configutil.SecurityConfig `toml:"security" json:"security"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
return &Config{}
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Load config file if specified.
var (
meta *toml.MetaData
err error
)
if configFile, _ := flagSet.GetString("config"); configFile != "" {
meta, err = configutil.ConfigFromFile(c, configFile)
if err != nil {
return err
}
}

// Ignore the error check here
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level")
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file")
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr")
configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert")
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")

return c.Adjust(meta, false)
}

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
warningMsgs := make([]string, 0)
if err := configMetaData.CheckUndecoded(); err != nil {
warningMsgs = append(warningMsgs, err.Error())
}
configutil.PrintConfigCheckMsg(os.Stdout, warningMsgs)

if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
adjustPath(&c.DataDir)

if err := c.Validate(); err != nil {
return err
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)

if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = defaultEnableGRPCGateway
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = defaultLogFormat
}

return nil
}

func adjustPath(p *string) {
absPath, err := filepath.Abs(*p)
if err == nil {
*p = absPath
}
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
}
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
}

// Validate is used to validate if some configurations are right.
func (c *Config) Validate() error {
dataDir, err := filepath.Abs(c.DataDir)
if err != nil {
return errors.WithStack(err)
}
logFile, err := filepath.Abs(c.Log.File.Filename)
if err != nil {
return errors.WithStack(err)
}
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if err != nil {
return errors.WithStack(err)
}
if !strings.HasPrefix(rel, "..") {
return errors.New("log directory shouldn't be the subdirectory of data directory")
}

return nil
}
16 changes: 8 additions & 8 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ func (s *Service) GetManager() *Manager {
return s.manager
}

func (s *Service) checkLeader() error {
if !s.manager.member.IsLeader() {
func (s *Service) checkServing() error {
if !s.manager.srv.IsServing() {
return errNotLeader
}
return nil
}

// GetResourceGroup implements ResourceManagerServer.GetResourceGroup.
func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
Expand All @@ -107,7 +107,7 @@ func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGro

// ListResourceGroups implements ResourceManagerServer.ListResourceGroups.
func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResourceGroupsRequest) (*rmpb.ListResourceGroupsResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
groups := s.manager.GetResourceGroupList()
Expand All @@ -122,7 +122,7 @@ func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResource

// AddResourceGroup implements ResourceManagerServer.AddResourceGroup.
func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := FromProtoResourceGroup(req.GetGroup())
Expand All @@ -135,7 +135,7 @@ func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGro

// DeleteResourceGroup implements ResourceManagerServer.DeleteResourceGroup.
func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResourceGroupRequest) (*rmpb.DeleteResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.DeleteResourceGroup(req.ResourceGroupName)
Expand All @@ -147,7 +147,7 @@ func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResou

// ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.
func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.ModifyResourceGroup(req.GetGroup())
Expand All @@ -172,7 +172,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
if err != nil {
return errors.WithStack(err)
}
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return err
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
Expand Down
10 changes: 4 additions & 6 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"go.uber.org/zap"
Expand All @@ -43,7 +42,7 @@ const (
// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
member *member.Member
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
Expand All @@ -59,7 +58,6 @@ type Manager struct {
// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
m := &Manager{
member: &member.Member{},
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
Expand All @@ -74,10 +72,10 @@ func NewManager(srv bs.Server) *Manager {
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
nil,
)
m.member = srv.GetMember()
m.srv = srv
})
// The second initialization after the leader is elected.
srv.AddLeaderCallback(m.Init)
// The second initialization after becoming serving.
srv.AddServiceReadyCallback(m.Init)
return m
}

Expand Down
Loading

0 comments on commit 362e629

Please sign in to comment.