Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: init HTTP handler #6963

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// register microservice HTTP API
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this? Does install pkg duplicate with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't import, the independent service cannot use the HTTP interface in the current implementation. There is an import cycle problem.

Copy link
Contributor

@nolouch nolouch Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean why not

_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/install"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/install"
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/install"

make http and grpc register togeter. Does it exist import cycle? I think it's same with xxx/api.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
)

func main() {
Expand Down
207 changes: 105 additions & 102 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -76,9 +75,15 @@
etcdClient *clientv3.Client
httpClient *http.Client

secure bool
muxListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service

// Store as map[string]*grpc.ClientConn
clientConns sync.Map

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand All @@ -105,16 +110,19 @@

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
skipWaitAPIServiceReady := false
failpoint.Inject("skipWaitAPIServiceReady", func() {
skipWaitAPIServiceReady = true
})

Check warning on line 116 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L115-L116

Added lines #L115 - L116 were not covered by tests
if !skipWaitAPIServiceReady {
if err := utils.WaitAPIServiceReady(s); err != nil {
return err

Check warning on line 119 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L119

Added line #L119 was not covered by tests
}
}
if err = s.startServer(); err != nil {
if err := utils.InitClient(s); err != nil {
return err
}

s.startServerLoop()

return nil
return s.startServer()
}

func (s *Server) startServerLoop() {
Expand Down Expand Up @@ -211,6 +219,8 @@

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -258,103 +268,97 @@
return s != nil && atomic.LoadInt64(&s.isRunning) == 0
}

// IsSecure checks if the server enable TLS.
func (s *Server) IsSecure() bool {
return s.secure
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ","))
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
// GetBackendEndpoints returns the backend endpoints.
func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetClientConns returns the client connections.
func (s *Server) GetClientConns() *sync.Map {
return &s.clientConns

Check warning on line 288 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L288

Added line #L288 was not covered by tests
}

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
err := gs.Serve(l)
log.Info("gRPC server stop serving")
// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
}

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout)
defer timer.Stop()
select {
case <-done:
case <-timer.C:
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout))
gs.Stop()
}
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
// ServerLoopWgAdd increases the server loop wait group.
func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetHTTPServer returns the http server.
func (s *Server) GetHTTPServer() *http.Server {
return s.httpServer
}

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
err := hs.Serve(l)
log.Info("http server stop serving")
// SetHTTPServer sets the http server.
func (s *Server) SetHTTPServer(httpServer *http.Server) {
s.httpServer = httpServer
}

ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetGRPCServer returns the grpc server.
func (s *Server) GetGRPCServer() *grpc.Server {
return s.grpcServer
}

mux := cmux.New(l)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
// SetGRPCServer sets the grpc server.
func (s *Server) SetGRPCServer(grpcServer *grpc.Server) {
s.grpcServer = grpcServer
}

s.serverLoopWg.Add(2)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)
// RegisterGRPCService registers the grpc service.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
s.service.RegisterGRPCService(grpcServer)
}

if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
} else {
log.Fatal("mux stop serving unexpectedly", errs.ZapError(err))
// SetETCDClient sets the etcd client.
func (s *Server) SetETCDClient(etcdClient *clientv3.Client) {
s.etcdClient = etcdClient
}

// SetHTTPClient sets the http client.
func (s *Server) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err

Check warning on line 352 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L352

Added line #L352 was not covered by tests
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err

Check warning on line 356 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L356

Added line #L356 was not covered by tests
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
Expand All @@ -378,7 +382,10 @@
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err

Check warning on line 387 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L387

Added line #L387 was not covered by tests
}
s.service = &Service{
ctx: s.ctx,
manager: NewManager[*Server](s),
Expand All @@ -388,11 +395,8 @@
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.secure = true

Check warning on line 399 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L399

Added line #L399 was not covered by tests
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -401,8 +405,12 @@
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener)
<-serverReadyChan
s.startServerLoop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down Expand Up @@ -455,7 +463,7 @@
return
} else if printVersion {
versioninfo.Print()
exit(0)
utils.Exit(0)

Check warning on line 466 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L466

Added line #L466 was not covered by tests
}

// New zap logger
Expand Down Expand Up @@ -500,13 +508,8 @@
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
utils.Exit(0)

Check warning on line 511 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L511

Added line #L511 was not covered by tests
default:
exit(1)
utils.Exit(1)

Check warning on line 513 in pkg/mcs/resourcemanager/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/server.go#L513

Added line #L513 was not covered by tests
}
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
Loading
Loading