From 0278b12795a79ec01ff7073f760fd81317fb8547 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 16 Aug 2023 17:39:01 +0800 Subject: [PATCH 1/2] init HTTP handler Signed-off-by: Ryan Leung --- cmd/pd-server/main.go | 5 + pkg/mcs/resourcemanager/server/server.go | 207 +++++++++++---------- pkg/mcs/scheduling/server/server.go | 208 ++++++--------------- pkg/mcs/tso/server/server.go | 226 +++++++---------------- pkg/mcs/utils/util.go | 177 ++++++++++++++++++ 5 files changed, 410 insertions(+), 413 deletions(-) diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index eb64e353daf..f1fecebd050 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -41,6 +41,11 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/join" "go.uber.org/zap" + + // register microservice HTTP API + _ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1" + _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" + _ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" ) func main() { diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 19c10bb8cf9..7cb04c2619e 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -25,30 +25,29 @@ import ( "os/signal" "path" "strconv" - "strings" "sync" "sync/atomic" "syscall" "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/log" "github.com/pingcap/sysutil" - "github.com/soheilhy/cmux" "github.com/spf13/cobra" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/versioninfo" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -76,9 +75,15 @@ type Server struct { etcdClient *clientv3.Client httpClient *http.Client + secure bool muxListener net.Listener + grpcServer *grpc.Server + httpServer *http.Server service *Service + // Store as map[string]*grpc.ClientConn + clientConns sync.Map + // Callback functions for different stages // startCallbacks will be called after the server is started. startCallbacks []func() @@ -105,16 +110,19 @@ func (s *Server) GetAddr() string { // Run runs the Resource Manager server. func (s *Server) Run() (err error) { - if err = s.initClient(); err != nil { - return err + skipWaitAPIServiceReady := false + failpoint.Inject("skipWaitAPIServiceReady", func() { + skipWaitAPIServiceReady = true + }) + if !skipWaitAPIServiceReady { + if err := utils.WaitAPIServiceReady(s); err != nil { + return err + } } - if err = s.startServer(); err != nil { + if err := utils.InitClient(s); err != nil { return err } - - s.startServerLoop() - - return nil + return s.startServer() } func (s *Server) startServerLoop() { @@ -211,6 +219,8 @@ func (s *Server) Close() { log.Info("closing resource manager server ...") s.serviceRegister.Deregister() + utils.StopHTTPServer(s) + utils.StopGRPCServer(s) s.muxListener.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -258,103 +268,97 @@ func (s *Server) IsClosed() bool { return s != nil && atomic.LoadInt64(&s.isRunning) == 0 } +// IsSecure checks if the server enable TLS. +func (s *Server) IsSecure() bool { + return s.secure +} + // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } -func (s *Server) initClient() error { - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) - if err != nil { - return err - } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) - return err +// GetBackendEndpoints returns the backend endpoints. +func (s *Server) GetBackendEndpoints() string { + return s.cfg.BackendEndpoints } -func (s *Server) startGRPCServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// GetClientConns returns the client connections. +func (s *Server) GetClientConns() *sync.Map { + return &s.clientConns +} - gs := grpc.NewServer() - s.service.RegisterGRPCService(gs) - err := gs.Serve(l) - log.Info("gRPC server stop serving") +// ServerLoopWgDone decreases the server loop wait group. +func (s *Server) ServerLoopWgDone() { + s.serverLoopWg.Done() +} - // Attempt graceful stop (waits for pending RPCs), but force a stop if - // it doesn't happen in a reasonable amount of time. - done := make(chan struct{}) - go func() { - defer logutil.LogPanic() - log.Info("try to gracefully stop the server now") - gs.GracefulStop() - close(done) - }() - timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) - defer timer.Stop() - select { - case <-done: - case <-timer.C: - log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) - gs.Stop() - } - if s.IsClosed() { - log.Info("grpc server stopped") - } else { - log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) - } +// ServerLoopWgAdd increases the server loop wait group. +func (s *Server) ServerLoopWgAdd(n int) { + s.serverLoopWg.Add(n) } -func (s *Server) startHTTPServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// GetHTTPServer returns the http server. +func (s *Server) GetHTTPServer() *http.Server { + return s.httpServer +} - handler, _ := SetUpRestHandler(s.service) - hs := &http.Server{ - Handler: handler, - ReadTimeout: 5 * time.Minute, - ReadHeaderTimeout: 5 * time.Second, - } - err := hs.Serve(l) - log.Info("http server stop serving") +// SetHTTPServer sets the http server. +func (s *Server) SetHTTPServer(httpServer *http.Server) { + s.httpServer = httpServer +} - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) - defer cancel() - if err := hs.Shutdown(ctx); err != nil { - log.Error("http server shutdown encountered problem", errs.ZapError(err)) - } else { - log.Info("all http(s) requests finished") - } - if s.IsClosed() { - log.Info("http server stopped") - } else { - log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) - } +// SetUpRestHandler sets up the REST handler. +func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { + return SetUpRestHandler(s.service) } -func (s *Server) startGRPCAndHTTPServers(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// GetGRPCServer returns the grpc server. +func (s *Server) GetGRPCServer() *grpc.Server { + return s.grpcServer +} - mux := cmux.New(l) - grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - httpL := mux.Match(cmux.Any()) +// SetGRPCServer sets the grpc server. +func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { + s.grpcServer = grpcServer +} - s.serverLoopWg.Add(2) - go s.startGRPCServer(grpcL) - go s.startHTTPServer(httpL) +// RegisterGRPCService registers the grpc service. +func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { + s.service.RegisterGRPCService(grpcServer) +} - if err := mux.Serve(); err != nil { - if s.IsClosed() { - log.Info("mux stop serving", errs.ZapError(err)) - } else { - log.Fatal("mux stop serving unexpectedly", errs.ZapError(err)) +// SetETCDClient sets the etcd client. +func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { + s.etcdClient = etcdClient +} + +// SetHTTPClient sets the http client. +func (s *Server) SetHTTPClient(httpClient *http.Client) { + s.httpClient = httpClient +} + +// GetTLSConfig gets the security config. +func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { + return &s.cfg.Security.TLSConfig +} + +// GetDelegateClient returns grpc client connection talking to the forwarded host +func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { + client, ok := s.clientConns.Load(forwardedHost) + if !ok { + tlsConfig, err := s.GetTLSConfig().ToTLSConfig() + if err != nil { + return nil, err } + cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) + if err != nil { + return nil, err + } + client = cc + s.clientConns.Store(forwardedHost, cc) } + return client.(*grpc.ClientConn), nil } // GetLeaderListenUrls gets service endpoints from the leader in election group. @@ -378,7 +382,10 @@ func (s *Server) startServer() (err error) { s.participant = member.NewParticipant(s.etcdClient) s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) - + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } s.service = &Service{ ctx: s.ctx, manager: NewManager[*Server](s), @@ -388,11 +395,8 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - s.listenURL, err = url.Parse(s.cfg.ListenAddr) - if err != nil { - return err - } if tlsConfig != nil { + s.secure = true s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) } else { s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) @@ -401,8 +405,12 @@ func (s *Server) startServer() (err error) { return err } + serverReadyChan := make(chan struct{}) + defer close(serverReadyChan) s.serverLoopWg.Add(1) - go s.startGRPCAndHTTPServers(s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) + <-serverReadyChan + s.startServerLoop() // Run callbacks log.Info("triggering the start callback functions") @@ -455,7 +463,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { return } else if printVersion { versioninfo.Print() - exit(0) + utils.Exit(0) } // New zap logger @@ -500,13 +508,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { svr.Close() switch sig { case syscall.SIGTERM: - exit(0) + utils.Exit(0) default: - exit(1) + utils.Exit(1) } } - -func exit(code int) { - log.Sync() - os.Exit(code) -} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 991d513e9b1..73824eb026f 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -25,7 +25,6 @@ import ( "os/signal" "path" "strconv" - "strings" "sync" "sync/atomic" "syscall" @@ -36,7 +35,6 @@ import ( "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/log" "github.com/pingcap/sysutil" - "github.com/soheilhy/cmux" "github.com/spf13/cobra" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" @@ -48,14 +46,13 @@ import ( "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" - "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/versioninfo" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -78,7 +75,6 @@ type Server struct { name string clusterID uint64 listenURL *url.URL - backendUrls []url.URL persistConfig *config.PersistConfig // etcd client @@ -86,18 +82,17 @@ type Server struct { // http client httpClient *http.Client - // Store as map[string]*grpc.ClientConn - clientConns sync.Map - // for the primary election of scheduling participant *member.Participant - secure bool - muxListener net.Listener - httpListener net.Listener - grpcServer *grpc.Server - httpServer *http.Server - service *Service + secure bool + muxListener net.Listener + grpcServer *grpc.Server + httpServer *http.Server + service *Service + + // Store as map[string]*grpc.ClientConn + clientConns sync.Map // Callback functions for different stages // startCallbacks will be called after the server is started. @@ -157,7 +152,7 @@ func (s *Server) Run() error { } } - if err := s.initClient(); err != nil { + if err := utils.InitClient(s); err != nil { return err } return s.startServer() @@ -257,8 +252,8 @@ func (s *Server) Close() { log.Info("closing scheduling server ...") s.serviceRegister.Deregister() - s.stopHTTPServer() - s.stopGRPCServer() + utils.StopHTTPServer(s) + utils.StopGRPCServer(s) s.muxListener.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -300,6 +295,11 @@ func (s *Server) IsClosed() bool { return s != nil && atomic.LoadInt64(&s.isRunning) == 0 } +// IsSecure checks if the server enable TLS. +func (s *Server) IsSecure() bool { + return s.secure +} + // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) @@ -333,81 +333,54 @@ func (s *Server) GetCoordinator() *schedule.Coordinator { return s.coordinator } -func (s *Server) initClient() error { - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - s.backendUrls, err = types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) - if err != nil { - return err - } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) - return err +// ServerLoopWgDone decreases the server loop wait group. +func (s *Server) ServerLoopWgDone() { + s.serverLoopWg.Done() } -func (s *Server) startGRPCServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// ServerLoopWgAdd increases the server loop wait group. +func (s *Server) ServerLoopWgAdd(n int) { + s.serverLoopWg.Add(n) +} - log.Info("grpc server starts serving", zap.String("address", l.Addr().String())) - err := s.grpcServer.Serve(l) - if s.IsClosed() { - log.Info("grpc server stopped") - } else { - log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) - } +// GetHTTPServer returns the http server. +func (s *Server) GetHTTPServer() *http.Server { + return s.httpServer } -func (s *Server) startHTTPServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// SetHTTPServer sets the http server. +func (s *Server) SetHTTPServer(httpServer *http.Server) { + s.httpServer = httpServer +} - log.Info("http server starts serving", zap.String("address", l.Addr().String())) - err := s.httpServer.Serve(l) - if s.IsClosed() { - log.Info("http server stopped") - } else { - log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) - } +// SetUpRestHandler sets up the REST handler. +func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { + return SetUpRestHandler(s.service) } -func (s *Server) startGRPCAndHTTPServers(serverReadyChan chan<- struct{}, l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() +// GetGRPCServer returns the grpc server. +func (s *Server) GetGRPCServer() *grpc.Server { + return s.grpcServer +} - mux := cmux.New(l) - // Don't hang on matcher after closing listener - mux.SetReadTimeout(3 * time.Second) - grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - if s.secure { - s.httpListener = mux.Match(cmux.Any()) - } else { - s.httpListener = mux.Match(cmux.HTTP1()) - } +// SetGRPCServer sets the grpc server. +func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { + s.grpcServer = grpcServer +} - s.grpcServer = grpc.NewServer() - s.service.RegisterGRPCService(s.grpcServer) - diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s) - s.serverLoopWg.Add(1) - go s.startGRPCServer(grpcL) +// RegisterGRPCService registers the grpc service. +func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { + s.service.RegisterGRPCService(grpcServer) +} - handler, _ := SetUpRestHandler(s.service) - s.httpServer = &http.Server{ - Handler: handler, - ReadTimeout: 3 * time.Second, - } - s.serverLoopWg.Add(1) - go s.startHTTPServer(s.httpListener) +// SetETCDClient sets the etcd client. +func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { + s.etcdClient = etcdClient +} - serverReadyChan <- struct{}{} - if err := mux.Serve(); err != nil { - if s.IsClosed() { - log.Info("mux stopped serving", errs.ZapError(err)) - } else { - log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err)) - } - } +// SetHTTPClient sets the http client. +func (s *Server) SetHTTPClient(httpClient *http.Client) { + s.httpClient = httpClient } // GetLeaderListenUrls gets service endpoints from the leader in election group. @@ -415,68 +388,6 @@ func (s *Server) GetLeaderListenUrls() []string { return s.participant.GetLeaderListenUrls() } -func (s *Server) stopHTTPServer() { - log.Info("stopping http server") - defer log.Info("http server stopped") - - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) - defer cancel() - - // First, try to gracefully shutdown the http server - ch := make(chan struct{}) - go func() { - defer close(ch) - s.httpServer.Shutdown(ctx) - }() - - select { - case <-ch: - case <-ctx.Done(): - // Took too long, manually close open transports - log.Warn("http server graceful shutdown timeout, forcing close") - s.httpServer.Close() - // concurrent Graceful Shutdown should be interrupted - <-ch - } -} - -func (s *Server) stopGRPCServer() { - log.Info("stopping grpc server") - defer log.Info("grpc server stopped") - - // Do not grpc.Server.GracefulStop with TLS enabled etcd server - // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 - // and https://github.com/etcd-io/etcd/issues/8916 - if s.secure { - s.grpcServer.Stop() - return - } - - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultGRPCGracefulStopTimeout) - defer cancel() - - // First, try to gracefully shutdown the grpc server - ch := make(chan struct{}) - go func() { - defer close(ch) - // Close listeners to stop accepting new connections, - // will block on any existing transports - s.grpcServer.GracefulStop() - }() - - // Wait until all pending RPCs are finished - select { - case <-ch: - case <-ctx.Done(): - // Took too long, manually close open transports - // e.g. watch streams - log.Warn("grpc server graceful shutdown timeout, forcing close") - s.grpcServer.Stop() - // concurrent GracefulStop should be interrupted - <-ch - } -} - func (s *Server) startServer() (err error) { if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { return err @@ -528,7 +439,7 @@ func (s *Server) startServer() (err error) { serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) - go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) <-serverReadyChan s.startServerLoop() @@ -596,7 +507,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { return } else if printVersion { versioninfo.Print() - exit(0) + utils.Exit(0) } // New zap logger @@ -641,13 +552,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { svr.Close() switch sig { case syscall.SIGTERM: - exit(0) + utils.Exit(0) default: - exit(1) + utils.Exit(1) } } - -func exit(code int) { - log.Sync() - os.Exit(code) -} diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 50ca5fd17a7..0dbdb8768dd 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -24,7 +24,6 @@ import ( "os" "os/signal" "strconv" - "strings" "sync" "sync/atomic" "syscall" @@ -36,7 +35,6 @@ import ( "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/pingcap/sysutil" - "github.com/soheilhy/cmux" "github.com/spf13/cobra" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" @@ -46,14 +44,13 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/systimemon" "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/versioninfo" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -79,10 +76,9 @@ type Server struct { handler *Handler - cfg *Config - clusterID uint64 - listenURL *url.URL - backendUrls []url.URL + cfg *Config + clusterID uint64 + listenURL *url.URL // etcd client etcdClient *clientv3.Client @@ -91,7 +87,6 @@ type Server struct { secure bool muxListener net.Listener - httpListener net.Listener grpcServer *grpc.Server httpServer *http.Server service *Service @@ -151,6 +146,56 @@ func (s *Server) GetClientConns() *sync.Map { return &s.clientConns } +// ServerLoopWgDone decreases the server loop wait group. +func (s *Server) ServerLoopWgDone() { + s.serverLoopWg.Done() +} + +// ServerLoopWgAdd increases the server loop wait group. +func (s *Server) ServerLoopWgAdd(n int) { + s.serverLoopWg.Add(n) +} + +// GetHTTPServer returns the http server. +func (s *Server) GetHTTPServer() *http.Server { + return s.httpServer +} + +// SetHTTPServer sets the http server. +func (s *Server) SetHTTPServer(httpServer *http.Server) { + s.httpServer = httpServer +} + +// SetUpRestHandler sets up the REST handler. +func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { + return SetUpRestHandler(s.service) +} + +// GetGRPCServer returns the grpc server. +func (s *Server) GetGRPCServer() *grpc.Server { + return s.grpcServer +} + +// SetGRPCServer sets the grpc server. +func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { + s.grpcServer = grpcServer +} + +// RegisterGRPCService registers the grpc service. +func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { + s.service.RegisterGRPCService(grpcServer) +} + +// SetETCDClient sets the etcd client. +func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { + s.etcdClient = etcdClient +} + +// SetHTTPClient sets the http client. +func (s *Server) SetHTTPClient(httpClient *http.Client) { + s.httpClient = httpClient +} + // Run runs the TSO server. func (s *Server) Run() error { skipWaitAPIServiceReady := false @@ -167,7 +212,7 @@ func (s *Server) Run() error { timeJumpBackCounter.Inc() }) - if err := s.initClient(); err != nil { + if err := utils.InitClient(s); err != nil { return err } return s.startServer() @@ -184,8 +229,8 @@ func (s *Server) Close() { // close tso service loops in the keyspace group manager s.keyspaceGroupManager.Close() s.serviceRegister.Deregister() - s.stopHTTPServer() - s.stopGRPCServer() + utils.StopHTTPServer(s) + utils.StopGRPCServer(s) s.muxListener.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -290,6 +335,11 @@ func (s *Server) IsClosed() bool { return atomic.LoadInt64(&s.isRunning) == 0 } +// IsSecure checks if the server enable TLS. +func (s *Server) IsSecure() bool { + return s.secure +} + // GetKeyspaceGroupManager returns the manager of keyspace group. func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager { return s.keyspaceGroupManager @@ -371,145 +421,6 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig } -func (s *Server) initClient() error { - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - s.backendUrls, err = types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) - if err != nil { - return err - } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) - return err -} - -func (s *Server) startGRPCServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - log.Info("grpc server starts serving", zap.String("address", l.Addr().String())) - err := s.grpcServer.Serve(l) - if s.IsClosed() { - log.Info("grpc server stopped") - } else { - log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) - } -} - -func (s *Server) startHTTPServer(l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - log.Info("http server starts serving", zap.String("address", l.Addr().String())) - err := s.httpServer.Serve(l) - if s.IsClosed() { - log.Info("http server stopped") - } else { - log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) - } -} - -func (s *Server) startGRPCAndHTTPServers(serverReadyChan chan<- struct{}, l net.Listener) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - mux := cmux.New(l) - // Don't hang on matcher after closing listener - mux.SetReadTimeout(3 * time.Second) - grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - if s.secure { - s.httpListener = mux.Match(cmux.Any()) - } else { - s.httpListener = mux.Match(cmux.HTTP1()) - } - - s.grpcServer = grpc.NewServer() - s.service.RegisterGRPCService(s.grpcServer) - diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s) - s.serverLoopWg.Add(1) - go s.startGRPCServer(grpcL) - - handler, _ := SetUpRestHandler(s.service) - s.httpServer = &http.Server{ - Handler: handler, - ReadTimeout: 3 * time.Second, - } - s.serverLoopWg.Add(1) - go s.startHTTPServer(s.httpListener) - - serverReadyChan <- struct{}{} - if err := mux.Serve(); err != nil { - if s.IsClosed() { - log.Info("mux stopped serving", errs.ZapError(err)) - } else { - log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err)) - } - } -} - -func (s *Server) stopHTTPServer() { - log.Info("stopping http server") - defer log.Info("http server stopped") - - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) - defer cancel() - - // First, try to gracefully shutdown the http server - ch := make(chan struct{}) - go func() { - defer close(ch) - s.httpServer.Shutdown(ctx) - }() - - select { - case <-ch: - case <-ctx.Done(): - // Took too long, manually close open transports - log.Warn("http server graceful shutdown timeout, forcing close") - s.httpServer.Close() - // concurrent Graceful Shutdown should be interrupted - <-ch - } -} - -func (s *Server) stopGRPCServer() { - log.Info("stopping grpc server") - defer log.Info("grpc server stopped") - - // Do not grpc.Server.GracefulStop with TLS enabled etcd server - // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 - // and https://github.com/etcd-io/etcd/issues/8916 - if s.secure { - s.grpcServer.Stop() - return - } - - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultGRPCGracefulStopTimeout) - defer cancel() - - // First, try to gracefully shutdown the grpc server - ch := make(chan struct{}) - go func() { - defer close(ch) - // Close listeners to stop accepting new connections, - // will block on any existing transports - s.grpcServer.GracefulStop() - }() - - // Wait until all pending RPCs are finished - select { - case <-ch: - case <-ctx.Done(): - // Took too long, manually close open transports - // e.g. watch streams - log.Warn("grpc server graceful shutdown timeout, forcing close") - s.grpcServer.Stop() - // concurrent GracefulStop should be interrupted - <-ch - } -} - func (s *Server) startServer() (err error) { if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { return err @@ -559,7 +470,7 @@ func (s *Server) startServer() (err error) { serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) - go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) <-serverReadyChan // Run callbacks @@ -614,7 +525,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { return } else if printVersion { versioninfo.Print() - exit(0) + utils.Exit(0) } // New zap logger @@ -659,13 +570,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { svr.Close() switch sig { case syscall.SIGTERM: - exit(0) + utils.Exit(0) default: - exit(1) + utils.Exit(1) } } - -func exit(code int) { - log.Sync() - os.Exit(code) -} diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index cd85bb6ba60..5418abcb314 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -16,20 +16,28 @@ package utils import ( "context" + "net" + "net/http" + "os" "strings" "sync" "time" "github.com/gin-gonic/gin" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/soheilhy/cmux" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/logutil" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -76,6 +84,19 @@ type server interface { GetTLSConfig() *grpcutil.TLSConfig GetClientConns() *sync.Map GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) + ServerLoopWgDone() + ServerLoopWgAdd(int) + IsClosed() bool + GetHTTPServer() *http.Server + GetGRPCServer() *grpc.Server + SetGRPCServer(*grpc.Server) + SetHTTPServer(*http.Server) + SetETCDClient(*clientv3.Client) + SetHTTPClient(*http.Client) + IsSecure() bool + RegisterGRPCService(*grpc.Server) + SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) + diagnosticspb.DiagnosticsServer } // WaitAPIServiceReady waits for the api service ready. @@ -129,3 +150,159 @@ func isAPIServiceReady(s server) (bool, error) { } return false, nil } + +// InitClient initializes the etcd and http clients. +func InitClient(s server) error { + tlsConfig, err := s.GetTLSConfig().ToTLSConfig() + if err != nil { + return err + } + backendUrls, err := types.NewURLs(strings.Split(s.GetBackendEndpoints(), ",")) + if err != nil { + return err + } + etcdClient, httpClient, err := etcdutil.CreateClients(tlsConfig, backendUrls) + if err != nil { + return err + } + s.SetETCDClient(etcdClient) + s.SetHTTPClient(httpClient) + return nil +} + +func startGRPCServer(s server, l net.Listener) { + defer logutil.LogPanic() + defer s.ServerLoopWgDone() + + log.Info("grpc server starts serving", zap.String("address", l.Addr().String())) + err := s.GetGRPCServer().Serve(l) + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func startHTTPServer(s server, l net.Listener) { + defer logutil.LogPanic() + defer s.ServerLoopWgDone() + + log.Info("http server starts serving", zap.String("address", l.Addr().String())) + err := s.GetHTTPServer().Serve(l) + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +// StartGRPCAndHTTPServers starts the grpc and http servers. +func StartGRPCAndHTTPServers(s server, serverReadyChan chan<- struct{}, l net.Listener) { + defer logutil.LogPanic() + defer s.ServerLoopWgDone() + + mux := cmux.New(l) + // Don't hang on matcher after closing listener + mux.SetReadTimeout(3 * time.Second) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + var httpListener net.Listener + if s.IsSecure() { + httpListener = mux.Match(cmux.Any()) + } else { + httpListener = mux.Match(cmux.HTTP1()) + } + + grpcServer := grpc.NewServer() + s.SetGRPCServer(grpcServer) + s.RegisterGRPCService(grpcServer) + diagnosticspb.RegisterDiagnosticsServer(grpcServer, s) + s.ServerLoopWgAdd(1) + go startGRPCServer(s, grpcL) + + handler, _ := s.SetUpRestHandler() + s.SetHTTPServer(&http.Server{ + Handler: handler, + ReadTimeout: 3 * time.Second, + }) + s.ServerLoopWgAdd(1) + go startHTTPServer(s, httpListener) + + serverReadyChan <- struct{}{} + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stopped serving", errs.ZapError(err)) + } else { + log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err)) + } + } +} + +// StopHTTPServer stops the http server. +func StopHTTPServer(s server) { + log.Info("stopping http server") + defer log.Info("http server stopped") + + ctx, cancel := context.WithTimeout(context.Background(), DefaultHTTPGracefulShutdownTimeout) + defer cancel() + + // First, try to gracefully shutdown the http server + ch := make(chan struct{}) + go func() { + defer close(ch) + s.GetHTTPServer().Shutdown(ctx) + }() + + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + log.Warn("http server graceful shutdown timeout, forcing close") + s.GetHTTPServer().Close() + // concurrent Graceful Shutdown should be interrupted + <-ch + } +} + +// StopGRPCServer stops the grpc server. +func StopGRPCServer(s server) { + log.Info("stopping grpc server") + defer log.Info("grpc server stopped") + + // Do not grpc.Server.GracefulStop with TLS enabled etcd server + // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + // and https://github.com/etcd-io/etcd/issues/8916 + if s.IsSecure() { + s.GetGRPCServer().Stop() + return + } + + ctx, cancel := context.WithTimeout(context.Background(), DefaultGRPCGracefulStopTimeout) + defer cancel() + + // First, try to gracefully shutdown the grpc server + ch := make(chan struct{}) + go func() { + defer close(ch) + // Close listeners to stop accepting new connections, + // will block on any existing transports + s.GetGRPCServer().GracefulStop() + }() + + // Wait until all pending RPCs are finished + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + // e.g. watch streams + log.Warn("grpc server graceful shutdown timeout, forcing close") + s.GetGRPCServer().Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +} + +// Exit exits the program with the given code. +func Exit(code int) { + log.Sync() + os.Exit(code) +} From 57125f67950e05c175e0fcd9032c61f2565b7926 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 28 Aug 2023 12:10:08 +0800 Subject: [PATCH 2/2] address comments Signed-off-by: Ryan Leung --- cmd/pd-server/main.go | 8 ++--- .../resourcemanager/server/grpc_service.go | 2 +- .../resourcemanager/server/install/install.go | 4 +-- pkg/mcs/resourcemanager/server/manager.go | 8 ++--- pkg/mcs/scheduling/server/grpc_service.go | 6 +++- pkg/mcs/scheduling/server/install/install.go | 32 +++++++++++++++++++ pkg/mcs/tso/server/grpc_service.go | 6 +++- pkg/mcs/tso/server/install/install.go | 32 +++++++++++++++++++ 8 files changed, 85 insertions(+), 13 deletions(-) create mode 100644 pkg/mcs/scheduling/server/install/install.go create mode 100644 pkg/mcs/tso/server/install/install.go diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index f1fecebd050..7b4199b643c 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -42,10 +42,10 @@ import ( "github.com/tikv/pd/server/join" "go.uber.org/zap" - // register microservice HTTP API - _ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1" - _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" - _ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + // register microservice API + _ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/install" + _ "github.com/tikv/pd/pkg/mcs/scheduling/server/install" + _ "github.com/tikv/pd/pkg/mcs/tso/server/install" ) func main() { diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index cf198304dfc..5c1b5f0e458 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -60,7 +60,7 @@ type Service struct { } // NewService creates a new resource manager service. -func NewService[T ResourceManagerConfigProvider](svr bs.Server) registry.RegistrableService { +func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { manager := NewManager[T](svr) return &Service{ diff --git a/pkg/mcs/resourcemanager/server/install/install.go b/pkg/mcs/resourcemanager/server/install/install.go index 89a4ac1914f..8573d5e52eb 100644 --- a/pkg/mcs/resourcemanager/server/install/install.go +++ b/pkg/mcs/resourcemanager/server/install/install.go @@ -16,7 +16,7 @@ package install import ( "github.com/tikv/pd/pkg/mcs/registry" - rm_server "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + "github.com/tikv/pd/pkg/mcs/resourcemanager/server" // init API group _ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1" @@ -28,5 +28,5 @@ func init() { // Install registers the API group and grpc service. func Install(register *registry.ServiceRegistry) { - register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server]) + register.RegisterService("ResourceManager", server.NewService[*server.Server]) } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 18d8c1f4d3c..1118e347f59 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -62,15 +62,15 @@ type Manager struct { consumptionRecord map[string]time.Time } -// ResourceManagerConfigProvider is used to get resource manager config from the given +// ConfigProvider is used to get resource manager config from the given // `bs.server` without modifying its interface. -type ResourceManagerConfigProvider interface { +type ConfigProvider interface { GetControllerConfig() *ControllerConfig } // NewManager returns a new manager base on the given server, -// which should implement the `ResourceManagerConfigProvider` interface. -func NewManager[T ResourceManagerConfigProvider](srv bs.Server) *Manager { +// which should implement the `ConfigProvider` interface. +func NewManager[T ConfigProvider](srv bs.Server) *Manager { m := &Manager{ controllerConfig: srv.(T).GetControllerConfig(), groups: make(map[string]*ResourceGroup), diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 3b0e51f1f66..e75c78eb415 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -44,13 +44,17 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write([]byte("not implemented")) } +// ConfigProvider is used to get scheduling config from the given +// `bs.server` without modifying its interface. +type ConfigProvider interface{} + // Service is the scheduling grpc service. type Service struct { *Server } // NewService creates a new TSO service. -func NewService(svr bs.Server) registry.RegistrableService { +func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { server, ok := svr.(*Server) if !ok { log.Fatal("create scheduling server failed") diff --git a/pkg/mcs/scheduling/server/install/install.go b/pkg/mcs/scheduling/server/install/install.go new file mode 100644 index 00000000000..4ce94067efb --- /dev/null +++ b/pkg/mcs/scheduling/server/install/install.go @@ -0,0 +1,32 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package install + +import ( + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/mcs/scheduling/server" + + // init API group + _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" +) + +func init() { + Install(registry.ServerServiceRegistry) +} + +// Install registers the API group and grpc service. +func Install(register *registry.ServiceRegistry) { + register.RegisterService("Scheduling", server.NewService[*server.Server]) +} diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index dd0a96b1cba..e9fdddf79ab 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -54,13 +54,17 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write([]byte("not implemented")) } +// ConfigProvider is used to get tso config from the given +// `bs.server` without modifying its interface. +type ConfigProvider interface{} + // Service is the TSO grpc service. type Service struct { *Server } // NewService creates a new TSO service. -func NewService(svr bs.Server) registry.RegistrableService { +func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { server, ok := svr.(*Server) if !ok { log.Fatal("create tso server failed") diff --git a/pkg/mcs/tso/server/install/install.go b/pkg/mcs/tso/server/install/install.go new file mode 100644 index 00000000000..27db0c51d75 --- /dev/null +++ b/pkg/mcs/tso/server/install/install.go @@ -0,0 +1,32 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package install + +import ( + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/mcs/tso/server" + + // init API group + _ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" +) + +func init() { + Install(registry.ServerServiceRegistry) +} + +// Install registers the API group and grpc service. +func Install(register *registry.ServiceRegistry) { + register.RegisterService("Scheduling", server.NewService[*server.Server]) +}