Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: watch scheduling service's primary address #7072

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) {

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error {

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool {
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}

// NewParticipantByService creates a new participant by service name.
func NewParticipantByService(serviceName string) (p participant) {
switch serviceName {
case utils.TSOServiceName:
p = &tsopb.Participant{}
case utils.SchedulingServiceName:
p = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
p = &resource_manager.Participant{}
}
return p
}
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
return path.Join(electionPath, utils.PrimaryKey)
}

// SchedulingPrimaryPath returns the path of scheduling primary.
// Path: /ms/{cluster_id}/scheduling/primary
func SchedulingPrimaryPath(clusterID uint64) string {
return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
Expand Down
5 changes: 4 additions & 1 deletion server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/gorilla/mux"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/serverapi"
"github.com/tikv/pd/server"
Expand All @@ -39,7 +40,9 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")),
apiPrefix+"/api/v1"+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName)),
negroni.Wrap(r)),
)

Expand Down
36 changes: 25 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ type Server struct {

auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() {
s.initTSOPrimaryWatcher()
s.tsoPrimaryWatcher.StartWatchLoop()
s.initSchedulingPrimaryWatcher()
}
}

Expand Down Expand Up @@ -1962,16 +1963,28 @@ func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey)
s.tsoPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initSchedulingPrimaryWatcher() {
serviceName := mcs.SchedulingServiceName
primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID)
s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey)
s.schedulingPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher {
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
primary := member.NewParticipantByService(serviceName)
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update tso primary", zap.String("primary", listenUrls[0]))
log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0]))
}
return nil
}
Expand All @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() {
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher(
name := fmt.Sprintf("%s-primary-watcher", serviceName)
return etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
"tso-primary-watcher",
tsoServicePrimaryKey,
name,
primaryKey,
putFn,
deleteFn,
func() error { return nil },
Expand Down
5 changes: 5 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.server.GetTSOAllocatorManager()
}

// GetServicePrimaryAddr returns the primary address of the service.
func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
return s.server.GetServicePrimaryAddr(ctx, serviceName)
}

// TestCluster is only for test.
type TestCluster struct {
config *clusterConfig
Expand Down
15 changes: 13 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"go.uber.org/goleak"
Expand Down Expand Up @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() {
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
primary := tc.GetPrimaryServer()
addr := primary.GetAddr()
oldPrimaryAddr := primary.GetAddr()
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && oldPrimaryAddr == watchedAddr
})
// transfer leader
primary.Close()
tc.WaitForPrimaryServing(re)
primary = tc.GetPrimaryServer()
re.NotEqual(addr, primary.GetAddr())
newPrimaryAddr := primary.GetAddr()
re.NotEqual(oldPrimaryAddr, newPrimaryAddr)
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && newPrimaryAddr == watchedAddr
})
}