Skip to content

Commit

Permalink
mcs: use separate participant (tikv#7032)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Dec 1, 2023
1 parent a9995c0 commit 1a67217
Show file tree
Hide file tree
Showing 11 changed files with 824 additions and 60 deletions.
4 changes: 4 additions & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{
"name": "pd-tso-bench",
"path": "tools/pd-tso-bench"
},
{
"name": "pd-api-bench",
"path": "tools/pd-api-bench"
}
],
"settings": {}
Expand Down
14 changes: 8 additions & 6 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package server

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -30,6 +28,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -284,10 +283,13 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName)
p := &resource_manager.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
13 changes: 8 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -370,10 +370,13 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.SchedulingServiceName)
p := &schedulingpb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
Expand Down Expand Up @@ -64,14 +63,14 @@ func (l *EmbeddedEtcdLeader) Watch(ctx context.Context) {
// EtcdLeader is the leader in the election group backed by the etcd, but it's
// decoupled from the embedded etcd.
type EtcdLeader struct {
wrapper *Participant
pariticipant *tsopb.Participant
revision int64
wrapper *Participant
participant participant
revision int64
}

// GetListenUrls returns current leader's client urls
func (l *EtcdLeader) GetListenUrls() []string {
return l.pariticipant.GetListenUrls()
return l.participant.GetListenUrls()
}

// GetRevision the revision of the leader in etcd
Expand All @@ -81,10 +80,10 @@ func (l *EtcdLeader) GetRevision() int64 {

// String declares fmt.Stringer
func (l *EtcdLeader) String() string {
return l.pariticipant.String()
return l.participant.String()
}

// Watch on the leader
func (l *EtcdLeader) Watch(ctx context.Context) {
l.wrapper.WatchLeader(ctx, l.pariticipant, l.revision)
l.wrapper.WatchLeader(ctx, l.participant, l.revision)
}
94 changes: 63 additions & 31 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,42 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type leadershipCheckFunc func(*election.Leadership) bool

type participant interface {
GetName() string
GetId() uint64
GetListenUrls() []string
String() string
Marshal() ([]byte, error)
Reset()
ProtoMessage()
}

// Participant is used for the election related logic. Compared to its counterpart
// EmbeddedEtcdMember, Participant relies on etcd for election, but it's decoupled
// from the embedded etcd. It implements Member interface.
type Participant struct {
leadership *election.Leadership
// stored as member type
leader atomic.Value
client *clientv3.Client
rootPath string
leaderPath string
member *tsopb.Participant
leader atomic.Value
client *clientv3.Client
rootPath string
leaderPath string
member participant
serviceName string
// memberValue is the serialized string of `member`. It will be saved in the
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
Expand All @@ -56,42 +70,37 @@ type Participant struct {
}

// NewParticipant create a new Participant.
func NewParticipant(client *clientv3.Client) *Participant {
func NewParticipant(client *clientv3.Client, serviceName string) *Participant {
return &Participant{
client: client,
client: client,
serviceName: serviceName,
}
}

// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName)
func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderName string, purpose string, advertiseListenAddr string) {
leader := &tsopb.Participant{
Name: name,
Id: id, // id is unique among all participants
ListenUrls: []string{advertiseListenAddr},
}

data, err := leader.Marshal()
func (m *Participant) InitInfo(p participant, rootPath string, leaderName string, purpose string) {
data, err := p.Marshal()
if err != nil {
// can't fail, so panic here.
log.Fatal("marshal leader meet error", zap.Stringer("leader-name", leader), errs.ZapError(errs.ErrMarshalLeader, err))
log.Fatal("marshal leader meet error", zap.String("member-name", p.String()), errs.ZapError(errs.ErrMarshalLeader, err))
}
m.member = leader
m.member = p
m.memberValue = string(data)
m.rootPath = rootPath
m.leaderPath = path.Join(rootPath, leaderName)
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.leaderPath))
}

// ID returns the unique ID for this participant in the election group
func (m *Participant) ID() uint64 {
return m.member.Id
return m.member.GetId()
}

// Name returns the unique name in the election group.
func (m *Participant) Name() string {
return m.member.Name
return m.member.GetName()
}

// GetMember returns the member.
Expand All @@ -112,6 +121,9 @@ func (m *Participant) Client() *clientv3.Client {
// IsLeader returns whether the participant is the leader or not by checking its leadership's
// lease and leader info.
func (m *Participant) IsLeader() bool {
if m.GetLeader() == nil {
return false
}
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck()
}

Expand All @@ -122,6 +134,9 @@ func (m *Participant) IsLeaderElected() bool {

// GetLeaderListenUrls returns current leader's listen urls
func (m *Participant) GetLeaderListenUrls() []string {
if m.GetLeader() == nil {
return nil
}
return m.GetLeader().GetListenUrls()
}

Expand All @@ -131,27 +146,36 @@ func (m *Participant) GetLeaderID() uint64 {
}

// GetLeader returns current leader of the election group.
func (m *Participant) GetLeader() *tsopb.Participant {
func (m *Participant) GetLeader() participant {
leader := m.leader.Load()
if leader == nil {
return nil
}
member := leader.(*tsopb.Participant)
member := leader.(participant)
if member.GetId() == 0 {
return nil
}
return member
}

// setLeader sets the member's leader.
func (m *Participant) setLeader(member *tsopb.Participant) {
func (m *Participant) setLeader(member participant) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
m.leader.Store(&tsopb.Participant{})
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}

Expand Down Expand Up @@ -200,8 +224,16 @@ func (m *Participant) PreCheckLeader() error {
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (*tsopb.Participant, int64, error) {
leader := &tsopb.Participant{}
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -248,14 +280,14 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) {
}

return &EtcdLeader{
wrapper: m,
pariticipant: leader,
revision: revision,
wrapper: m,
participant: leader,
revision: revision,
}, false
}

// WatchLeader is used to watch the changes of the leader.
func (m *Participant) WatchLeader(ctx context.Context, leader *tsopb.Participant, revision int64) {
func (m *Participant) WatchLeader(ctx context.Context, leader participant, revision int64) {
m.setLeader(leader)
m.leadership.Watch(ctx, revision)
m.unsetLeader()
Expand All @@ -269,7 +301,7 @@ func (m *Participant) ResetLeader() {
}

// IsSameLeader checks whether a server is the leader itself.
func (m *Participant) IsSameLeader(leader *tsopb.Participant) bool {
func (m *Participant) IsSameLeader(leader participant) bool {
return leader.GetId() == m.ID()
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -736,10 +737,13 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.PrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
participant := member.NewParticipant(kgm.etcdClient, mcsutils.TSOServiceName)
p := &tsopb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()},
}
participant.InitInfo(p, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), mcsutils.PrimaryKey, "keyspace group primary election")
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},

{"test2", rmpb.GroupMode_RUMode, true, true,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":10000},"action":2},"background_settings":{"job_types":["test"]}}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -687,7 +687,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},
},
{"test2", rmpb.GroupMode_RUMode, false, true,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":1000},"action":3,"watch":{"lasting_duration_ms":100000,"type":2}},"background_settings":{"job_types":["br","lightning"]}}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/tso/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ replace (
replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0

require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
github.com/tikv/pd v0.0.0-00010101000000-000000000000
github.com/tikv/pd/client v0.0.0-00010101000000-000000000000
github.com/tikv/pd/tests/integrations/mcs v0.0.0-00010101000000-000000000000
Expand Down
Loading

0 comments on commit 1a67217

Please sign in to comment.