Skip to content

Commit

Permalink
forward region requests to scheduling server
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Aug 31, 2023
1 parent 5eae57e commit 51566c5
Show file tree
Hide file tree
Showing 15 changed files with 258 additions and 36 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4
replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0=
github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE=
github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,4 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4
replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0=
github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE=
github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
16 changes: 0 additions & 16 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ func (s *Service) RegisterOperatorsRouter() {
}

// @Tags operators
<<<<<<< HEAD
// @Summary Get an operator by ID.
=======
// @Summary Get a Region's pending operator.
>>>>>>> add http support for scheduling service
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {object} operator.OpWithStatus
Expand All @@ -150,13 +146,8 @@ func getOperatorByID(c *gin.Context) {
}

// @Tags operators
<<<<<<< HEAD
// @Summary List operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
=======
// @Summary List pending operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region)
>>>>>>> add http support for scheduling service
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
Expand Down Expand Up @@ -197,17 +188,10 @@ func getOperators(c *gin.Context) {
}

// @Tags checkers
<<<<<<< HEAD
// @Summary Get checker by name
// @Param name path string true "The name of the checker."
// @Produce json
// @Success 200 {string} string "The checker's status."
=======
// @Summary Get if checker is paused
// @Param name path string true "The name of the scheduler."
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
>>>>>>> add http support for scheduling service
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
Expand Down
82 changes: 82 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
Expand Down Expand Up @@ -57,6 +58,10 @@ func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage
storage: storage,
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels())
if err != nil {
return nil, err
}
return c, nil
}

Expand Down Expand Up @@ -228,6 +233,83 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest) er
return nil
}

// HandleRegionHeartbeat processes RegionInfo reports from client.
func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.processRegionHeartbeat(region); err != nil {
return err
}

c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}

// processRegionHeartbeat updates the region information.
func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
origin, _, err := c.PreCheckPutRegion(region)
if err != nil {
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.GetRegionStores(region))
}
return nil
}

var overlaps []*core.RegionInfo
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err
}

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
c.ruleManager.InvalidCache(item.GetID())
}
}

if hasRegionStats {
c.regionStats.Observe(region, c.GetRegionStores(region))
}

if !c.IsPrepared() && isNew {
c.coordinator.GetPrepareChecker().Collect(region)
}

return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}

// TODO: implement the following methods

// AllocID allocates a new ID.
Expand Down
103 changes: 103 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ package server

import (
"context"
"io"
"net/http"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/logutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -68,6 +73,104 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService {
}
}

// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
type heartbeatServer struct {
stream pdpb.PD_RegionHeartbeatServer
closed int32
}

func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
if atomic.LoadInt32(&s.closed) == 1 {
return io.EOF
}
done := make(chan error, 1)
go func() {
defer logutil.LogPanic()
done <- s.stream.Send(m)
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&s.closed, 1)
}
return errors.WithStack(err)
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
}
}

func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
req, err := s.stream.Recv()
if err != nil {
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(err)
}
return req, nil
}

// RegionHeartbeat implements gRPC PDServer.
func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error {
var (
server = &heartbeatServer{stream: stream}
cancel context.CancelFunc
lastBind time.Time
)
defer func() {
// cancel the forward stream
if cancel != nil {
cancel()
}
}()

for {
request, err := server.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}

c := s.GetCluster()
if c == nil {
resp := &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{
ClusterId: s.clusterID,
Error: &pdpb.Error{
Type: pdpb.ErrorType_NOT_BOOTSTRAPPED,
Message: "scheduling server is not initialized yet",
},
}}
err := server.Send(resp)
return errors.WithStack(err)
}

storeID := request.GetLeader().GetStoreId()
store := c.GetStore(storeID)
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}

if time.Since(lastBind) > time.Minute {
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
err = c.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader())
continue
}
}
}

// StoreHeartbeat implements gRPC PDServer.
func (s *Service) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHeartbeatRequest) (*pdpb.StoreHeartbeatResponse, error) {
c := s.GetCluster()
Expand Down
5 changes: 0 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,6 @@ func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
}

// GetCluster returns the cluster.
func (s *Server) GetCluster() *Cluster {
return s.cluster
}

// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
Expand Down
58 changes: 58 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastForwardedHost string
lastBind time.Time
errCh chan error
schedulingStream schedulingpb.Scheduling_RegionHeartbeatClient
cancel1 context.CancelFunc
lastPrimaryAddr string
)
defer func() {
// cancel the forward stream
Expand Down Expand Up @@ -1325,6 +1328,36 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader())
continue
}

if s.IsAPIServiceMode() {
ctx := stream.Context()
primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
if schedulingStream == nil || lastPrimaryAddr != primaryAddr {
if cancel1 != nil {
cancel1()
}
client, err := s.getDelegateClient(ctx, primaryAddr)
if err != nil {
log.Error("get delegate client failed", zap.Error(err))
}

log.Info("create region heartbeat forward stream", zap.String("forwarded-host", primaryAddr))
schedulingStream, cancel1, err = s.createSchedulingStream(client)
if err != nil {
log.Error("create region heartbeat forward stream failed", zap.Error(err))
} else {
lastPrimaryAddr = primaryAddr
errCh = make(chan error, 1)
go forwardSchedulingToServer(schedulingStream, server, errCh)
}
}
if schedulingStream != nil {
if err := schedulingStream.Send(request); err != nil {
log.Error("forward region heartbeat failed", zap.Error(err))
}
}
}

regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
}
Expand Down Expand Up @@ -2266,6 +2299,31 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC
}
}

func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) {
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go grpcutil.CheckStream(ctx, cancel, done)
forwardStream, err := schedulingpb.NewSchedulingClient(client).RegionHeartbeat(ctx)
done <- struct{}{}
return forwardStream, cancel, err
}

func forwardSchedulingToServer(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) {
defer logutil.LogPanic()
defer close(errCh)
for {
resp, err := forwardStream.Recv()
if err != nil {
errCh <- errors.WithStack(err)
return
}
if err := server.Send(resp); err != nil {
errCh <- errors.WithStack(err)
return
}
}
}

func (s *GrpcServer) createTSOForwardStream(
ctx context.Context, client *grpc.ClientConn,
) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) {
Expand Down
Loading

0 comments on commit 51566c5

Please sign in to comment.