Skip to content

Commit

Permalink
resource_manager, server: implement the resource manager failover (#5900
Browse files Browse the repository at this point in the history
)

ref #5851

- Refine the resource manager initialization to support failover via PD election.
- Implement the client failover.
- Fix a race bug.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JmPotato and ti-chi-bot authored Feb 6, 2023
1 parent 1af4fbf commit 3844971
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 129 deletions.
30 changes: 20 additions & 10 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type baseClient struct {
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string

checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
updateConnectionCtxsCh chan struct{}
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
updateConnectionCtxsCh chan struct{}
updateTokenConnectionCh chan struct{}

wg sync.WaitGroup
ctx context.Context
Expand All @@ -81,13 +82,14 @@ type SecurityOption struct {
func newBaseClient(ctx context.Context, urls []string, security SecurityOption) *baseClient {
clientCtx, clientCancel := context.WithCancel(ctx)
bc := &baseClient{
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
security: security,
option: newOption(),
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
security: security,
option: newOption(),
}
bc.urls.Store(urls)
return bc
Expand Down Expand Up @@ -168,6 +170,13 @@ func (c *baseClient) scheduleUpdateConnectionCtxs() {
}
}

func (c *baseClient) scheduleUpdateTokenConnection() {
select {
case c.updateTokenConnectionCh <- struct{}{}:
default:
}
}

// GetClusterID returns the ClusterID.
func (c *baseClient) GetClusterID(context.Context) uint64 {
return c.clusterID
Expand Down Expand Up @@ -375,6 +384,7 @@ func (c *baseClient) switchLeader(addrs []string) error {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
c.leader.Store(addr)
c.allocators.Store(globalDCLocation, addr)
c.scheduleUpdateTokenConnection()
log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader))
return nil
}
Expand Down
92 changes: 66 additions & 26 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pd

import (
"context"
"strings"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -24,7 +25,6 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type actionType int
Expand All @@ -33,6 +33,8 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
// errNotLeaderMsg is returned when the requested server is not the leader.
errNotLeaderMsg = "not leader"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -48,22 +50,30 @@ type ResourceManagerClient interface {

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn))
if cc, err := c.getOrCreateGRPCConn(c.GetLeaderAddr()); err == nil {
return rmpb.NewResourceManagerClient(cc)
}
return nil
}

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotLeaderMsg) {
c.ScheduleCheckLeader()
}
}

// ListResourceGroups loads and returns all metadata of resource groups.
func (c *client) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
req := &rmpb.ListResourceGroupsRequest{}
resp, err := c.resourceManagerClient().ListResourceGroups(ctx, req)
if err != nil {
c.gRPCErrorHandler(err)
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[resource_manager]" + resErr.Message)
return nil, errors.Errorf("[resource_manager] %s", resErr.Message)
}
return resp.GetGroups(), nil
}
Expand All @@ -74,11 +84,12 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string)
}
resp, err := c.resourceManagerClient().GetResourceGroup(ctx, req)
if err != nil {
c.gRPCErrorHandler(err)
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[resource_manager]" + resErr.Message)
return nil, errors.Errorf("[resource_manager] %s", resErr.Message)
}
return resp.GetGroup(), nil
}
Expand All @@ -103,11 +114,12 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG
resp, err = c.resourceManagerClient().ModifyResourceGroup(ctx, req)
}
if err != nil {
c.gRPCErrorHandler(err)
return str, err
}
resErr := resp.GetError()
if resErr != nil {
return str, errors.Errorf("[resource_manager]" + resErr.Message)
return str, errors.Errorf("[resource_manager] %s", resErr.Message)
}
str = resp.GetBody()
return
Expand All @@ -119,11 +131,12 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
}
resp, err := c.resourceManagerClient().DeleteResourceGroup(ctx, req)
if err != nil {
c.gRPCErrorHandler(err)
return "", err
}
resErr := resp.GetError()
if resErr != nil {
return "", errors.Errorf("[resource_manager]" + resErr.Message)
return "", errors.Errorf("[resource_manager] %s", resErr.Message)
}
return resp.GetBody(), nil
}
Expand Down Expand Up @@ -234,6 +247,13 @@ type resourceManagerConnectionContext struct {
cancel context.CancelFunc
}

func (cc *resourceManagerConnectionContext) reset() {
cc.stream = nil
if cc.cancel != nil {
cc.cancel()
}
}

func (c *client) createTokenDispatcher() {
dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx)
dispatcher := &tokenDispatcher{
Expand All @@ -246,37 +266,57 @@ func (c *client) createTokenDispatcher() {
}

func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) {
var connection resourceManagerConnectionContext
if err := c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil {
log.Warn("get stream error", zap.Error(err))
var (
connection resourceManagerConnectionContext
firstRequest *tokenRequest
stream rmpb.ResourceManager_AcquireTokenBucketsClient
streamCtx context.Context
toReconnect bool
err error
)
if err = c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil {
log.Warn("[resource_manager] get token stream error", zap.Error(err))
}

for {
var firstRequest *tokenRequest
// Fetch the request from the channel.
select {
case <-dispatcherCtx.Done():
return
case firstRequest = <-tbc.tokenRequestCh:
}
stream, streamCtx, cancel := connection.stream, connection.ctx, connection.cancel
if stream == nil {
// Try to get a stream connection.
stream, streamCtx = connection.stream, connection.ctx
select {
case <-c.updateTokenConnectionCh:
toReconnect = true
default:
toReconnect = stream == nil
}
// If the stream is nil or the leader has changed, try to reconnect.
if toReconnect {
connection.reset()
c.tryResourceManagerConnect(dispatcherCtx, &connection)
firstRequest.done <- errors.Errorf("no stream")
log.Info("[resource_manager] token leader may change, try to reconnect the stream")
stream, streamCtx = connection.stream, connection.ctx
}
// If the stream is still nil, return an error.
if stream == nil {
firstRequest.done <- errors.Errorf("failed to get the stream connection")
c.ScheduleCheckLeader()
connection.reset()
continue
}
select {
case <-streamCtx.Done():
log.Info("[resource_manager] resource manager stream is canceled")
cancel()
connection.stream = nil
connection.reset()
log.Info("[resource_manager] token stream is canceled")
continue
default:
}
err := c.processTokenRequests(stream, firstRequest)
if err != nil {
log.Info("processTokenRequests error", zap.Error(err))
cancel()
connection.stream = nil
if err = c.processTokenRequests(stream, firstRequest); err != nil {
c.ScheduleCheckLeader()
connection.reset()
log.Info("[resource_manager] token request error", zap.Error(err))
}
}
}
Expand All @@ -290,15 +330,15 @@ func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBu
}
resp, err := stream.Recv()
if err != nil {
c.gRPCErrorHandler(err)
err = errors.WithStack(err)
t.done <- err
return err
}
if resp.GetError() != nil {
return errors.Errorf("[resource_manager]" + resp.GetError().Message)
return errors.Errorf("[resource_manager] %s", resp.GetError().Message)
}
tokenBuckets := resp.GetResponses()
t.TokenBuckets = tokenBuckets
t.TokenBuckets = resp.GetResponses()
t.done <- nil
return nil
}
Expand Down
25 changes: 17 additions & 8 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package registry

import (
"fmt"
"net/http"

"github.com/pingcap/log"
Expand Down Expand Up @@ -54,33 +55,41 @@ func newServiceRegistry() *ServiceRegistry {
}
}

func createServiceName(prefix, name string) string {
return fmt.Sprintf("%s_%s", prefix, name)
}

// InstallAllGRPCServices installs all registered grpc services.
func (r *ServiceRegistry) InstallAllGRPCServices(srv *server.Server, g *grpc.Server) {
prefix := srv.Name()
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
l.RegisterGRPCService(g)
log.Info("gRPC service already registered", zap.String("service-name", name))
log.Info("gRPC service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
continue
}
l := builder(srv)
r.services[name] = l
r.services[serviceName] = l
l.RegisterGRPCService(g)
log.Info("gRPC service registered successfully", zap.String("service-name", name))
log.Info("gRPC service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}

// InstallAllRESTHandler installs all registered REST services.
func (r *ServiceRegistry) InstallAllRESTHandler(srv *server.Server, h map[string]http.Handler) {
prefix := srv.Name()
for name, builder := range r.builders {
if l, ok := r.services[name]; ok {
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
l.RegisterRESTHandler(h)
log.Info("restful API service already registered", zap.String("service-name", name))
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
continue
}
l := builder(srv)
r.services[name] = l
r.services[serviceName] = l
l.RegisterRESTHandler(h)
log.Info("restful API service registered successfully", zap.String("service-name", name))
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/mcs/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context,
threshold := counter.notify.setupNotificationThreshold
counter.notify.mu.Unlock()
counter.limiter.SetupNotificationThreshold(now, threshold)
gc.updateRunState(ctx)
case <-ctx.Done():
return
}
Expand Down
Loading

0 comments on commit 3844971

Please sign in to comment.