Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: init HTTP handler #6963

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// register microservice API
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/install"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/install"
_ "github.com/tikv/pd/pkg/mcs/tso/server/install"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Service struct {
}

// NewService creates a new resource manager service.
func NewService[T ResourceManagerConfigProvider](svr bs.Server) registry.RegistrableService {
func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService {
manager := NewManager[T](svr)

return &Service{
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package install

import (
"github.com/tikv/pd/pkg/mcs/registry"
rm_server "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/resourcemanager/server"

// init API group
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1"
Expand All @@ -28,5 +28,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server])
register.RegisterService("ResourceManager", server.NewService[*server.Server])
}
8 changes: 4 additions & 4 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ type Manager struct {
consumptionRecord map[string]time.Time
}

// ResourceManagerConfigProvider is used to get resource manager config from the given
// ConfigProvider is used to get resource manager config from the given
// `bs.server` without modifying its interface.
type ResourceManagerConfigProvider interface {
type ConfigProvider interface {
GetControllerConfig() *ControllerConfig
}

// NewManager returns a new manager base on the given server,
// which should implement the `ResourceManagerConfigProvider` interface.
func NewManager[T ResourceManagerConfigProvider](srv bs.Server) *Manager {
// which should implement the `ConfigProvider` interface.
func NewManager[T ConfigProvider](srv bs.Server) *Manager {
m := &Manager{
controllerConfig: srv.(T).GetControllerConfig(),
groups: make(map[string]*ResourceGroup),
Expand Down
207 changes: 105 additions & 102 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ import (
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -76,9 +75,15 @@ type Server struct {
etcdClient *clientv3.Client
httpClient *http.Client

secure bool
muxListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service

// Store as map[string]*grpc.ClientConn
clientConns sync.Map

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand All @@ -105,16 +110,19 @@ func (s *Server) GetAddr() string {

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
skipWaitAPIServiceReady := false
failpoint.Inject("skipWaitAPIServiceReady", func() {
skipWaitAPIServiceReady = true
})
if !skipWaitAPIServiceReady {
if err := utils.WaitAPIServiceReady(s); err != nil {
return err
}
}
if err = s.startServer(); err != nil {
if err := utils.InitClient(s); err != nil {
return err
}

s.startServerLoop()

return nil
return s.startServer()
}

func (s *Server) startServerLoop() {
Expand Down Expand Up @@ -211,6 +219,8 @@ func (s *Server) Close() {

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -258,103 +268,97 @@ func (s *Server) IsClosed() bool {
return s != nil && atomic.LoadInt64(&s.isRunning) == 0
}

// IsSecure checks if the server enable TLS.
func (s *Server) IsSecure() bool {
return s.secure
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ","))
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
// GetBackendEndpoints returns the backend endpoints.
func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetClientConns returns the client connections.
func (s *Server) GetClientConns() *sync.Map {
return &s.clientConns
}

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
err := gs.Serve(l)
log.Info("gRPC server stop serving")
// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
}

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout)
defer timer.Stop()
select {
case <-done:
case <-timer.C:
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout))
gs.Stop()
}
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
// ServerLoopWgAdd increases the server loop wait group.
func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetHTTPServer returns the http server.
func (s *Server) GetHTTPServer() *http.Server {
return s.httpServer
}

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
err := hs.Serve(l)
log.Info("http server stop serving")
// SetHTTPServer sets the http server.
func (s *Server) SetHTTPServer(httpServer *http.Server) {
s.httpServer = httpServer
}

ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetGRPCServer returns the grpc server.
func (s *Server) GetGRPCServer() *grpc.Server {
return s.grpcServer
}

mux := cmux.New(l)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
// SetGRPCServer sets the grpc server.
func (s *Server) SetGRPCServer(grpcServer *grpc.Server) {
s.grpcServer = grpcServer
}

s.serverLoopWg.Add(2)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)
// RegisterGRPCService registers the grpc service.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
s.service.RegisterGRPCService(grpcServer)
}

if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
} else {
log.Fatal("mux stop serving unexpectedly", errs.ZapError(err))
// SetETCDClient sets the etcd client.
func (s *Server) SetETCDClient(etcdClient *clientv3.Client) {
s.etcdClient = etcdClient
}

// SetHTTPClient sets the http client.
func (s *Server) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
Expand All @@ -378,7 +382,10 @@ func (s *Server) startServer() (err error) {
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
s.service = &Service{
ctx: s.ctx,
manager: NewManager[*Server](s),
Expand All @@ -388,11 +395,8 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.secure = true
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -401,8 +405,12 @@ func (s *Server) startServer() (err error) {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener)
<-serverReadyChan
s.startServerLoop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down Expand Up @@ -455,7 +463,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
return
} else if printVersion {
versioninfo.Print()
exit(0)
utils.Exit(0)
}

// New zap logger
Expand Down Expand Up @@ -500,13 +508,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
utils.Exit(0)
default:
exit(1)
utils.Exit(1)
}
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
Loading
Loading