Skip to content

Commit

Permalink
*: update cluster_id to a global variable (#8615)
Browse files Browse the repository at this point in the history
close #8588

Signed-off-by: okJiang <819421878@qq.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Oct 29, 2024
1 parent 20087e2 commit e257097
Show file tree
Hide file tree
Showing 69 changed files with 836 additions and 815 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
re.NoError(kgm.Bootstrap(suite.ctx))
re.NoError(suite.manager.Bootstrap())
Expand Down
19 changes: 8 additions & 11 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client

syncutil.RWMutex
// groups is the cache of keyspace group related information.
Expand All @@ -86,7 +85,6 @@ func NewKeyspaceGroupManager(
ctx context.Context,
store endpoint.KeyspaceGroupStorage,
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
groups := make(map[endpoint.UserKind]*indexedHeap)
Expand All @@ -99,15 +97,14 @@ func NewKeyspaceGroupManager(
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.initTSONodesWatcher(m.client)
m.tsoNodesWatcher.StartWatchLoop()
}
return m
Expand Down Expand Up @@ -218,8 +215,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
tsoServiceKey := keypath.TSOPath()

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down Expand Up @@ -1154,7 +1151,7 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath(m.clusterID)
rootPath := keypath.TSOSvcRootPath()
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil)
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
Expand Down
13 changes: 4 additions & 9 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
package discovery

import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := ServicePath(clusterID, serviceName)
func Discover(cli *clientv3.Client, serviceName string) ([]string, error) {
key := keypath.ServicePath(serviceName)
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand All @@ -48,11 +47,7 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case constant.TSOServiceName, constant.SchedulingServiceName, constant.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, constant.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
servicePath := keypath.ServicePath(serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand Down
16 changes: 8 additions & 8 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestDiscover(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
err := sr1.Register()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
re.Equal("127.0.0.1:1", endpoints[0])
Expand All @@ -43,7 +43,7 @@ func TestDiscover(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
Expand All @@ -55,17 +55,17 @@ func TestServiceRegistryEntry(t *testing.T) {
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "12345", "test_service")
endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
Expand All @@ -78,7 +78,7 @@ func TestServiceRegistryEntry(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "12345", "test_service")
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
41 changes: 0 additions & 41 deletions pkg/mcs/discovery/key_path.go

This file was deleted.

5 changes: 3 additions & 2 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -40,9 +41,9 @@ type ServiceRegister struct {
}

// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
serviceKey := keypath.RegistryPath(serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestRegister(t *testing.T) {
etcd, cfg := servers[0], servers[0].Config()

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
sr := NewServiceRegister(context.Background(), client, "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
err := sr.Register()
re.NoError(err)
re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key)
re.Equal("/ms/0/test_service/registry/http://127.0.0.1:1", sr.key)
resp, err := client.Get(context.Background(), sr.key)
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))
Expand All @@ -51,14 +51,14 @@ func TestRegister(t *testing.T) {
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
sr.cancel()
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
fname := testutil.InitTempFileLogger("info")
Expand Down
31 changes: 16 additions & 15 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -58,10 +59,10 @@ type Service struct {
}

// NewService creates a new meta storage service.
func NewService[T ClusterIDProvider](svr bs.Server) registry.RegistrableService {
func NewService(svr bs.Server) registry.RegistrableService {
return &Service{
ctx: svr.Context(),
manager: NewManager[T](svr),
manager: NewManager(svr),
}
}

Expand Down Expand Up @@ -115,11 +116,11 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
if res.Err() != nil {
var resp meta_storagepb.WatchResponse
if startRevision < res.CompactRevision {
resp.Header = s.wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_DATA_COMPACTED,
resp.Header = wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_DATA_COMPACTED,
fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision %d.", startRevision, res.CompactRevision))
resp.CompactRevision = res.CompactRevision
} else {
resp.Header = s.wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_UNKNOWN,
resp.Header = wrapErrorAndRevision(res.Header.GetRevision(), meta_storagepb.ErrorType_UNKNOWN,
fmt.Sprintf("watch channel meet other error %s.", res.Err().Error()))
}
if err := server.Send(&resp); err != nil {
Expand All @@ -146,7 +147,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
}
if len(events) > 0 {
if err := server.Send(&meta_storagepb.WatchResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: res.Header.GetRevision()},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: res.Header.GetRevision()},
Events: events, CompactRevision: res.CompactRevision}); err != nil {
return err
}
Expand Down Expand Up @@ -180,10 +181,10 @@ func (s *Service) Get(ctx context.Context, req *meta_storagepb.GetRequest) (*met
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.GetResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.GetResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
}
resp := &meta_storagepb.GetResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
Count: res.Count,
More: res.More,
}
Expand Down Expand Up @@ -219,11 +220,11 @@ func (s *Service) Put(ctx context.Context, req *meta_storagepb.PutRequest) (*met
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.PutResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.PutResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
}

resp := &meta_storagepb.PutResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
}
if res.PrevKv != nil {
resp.PrevKv = &meta_storagepb.KeyValue{Key: res.PrevKv.Key, Value: res.PrevKv.Value}
Expand Down Expand Up @@ -251,11 +252,11 @@ func (s *Service) Delete(ctx context.Context, req *meta_storagepb.DeleteRequest)
revision = res.Header.GetRevision()
}
if err != nil {
return &meta_storagepb.DeleteResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
return &meta_storagepb.DeleteResponse{Header: wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil
}

resp := &meta_storagepb.DeleteResponse{
Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision},
Header: &meta_storagepb.ResponseHeader{ClusterId: keypath.ClusterID(), Revision: revision},
}
resp.PrevKvs = make([]*meta_storagepb.KeyValue, len(res.PrevKvs))
for i, kv := range res.PrevKvs {
Expand All @@ -264,16 +265,16 @@ func (s *Service) Delete(ctx context.Context, req *meta_storagepb.DeleteRequest)
return resp, nil
}

func (s *Service) wrapErrorAndRevision(revision int64, errorType meta_storagepb.ErrorType, message string) *meta_storagepb.ResponseHeader {
return s.errorHeader(revision, &meta_storagepb.Error{
func wrapErrorAndRevision(revision int64, errorType meta_storagepb.ErrorType, message string) *meta_storagepb.ResponseHeader {
return errorHeader(revision, &meta_storagepb.Error{
Type: errorType,
Message: message,
})
}

func (s *Service) errorHeader(revision int64, err *meta_storagepb.Error) *meta_storagepb.ResponseHeader {
func errorHeader(revision int64, err *meta_storagepb.Error) *meta_storagepb.ResponseHeader {
return &meta_storagepb.ResponseHeader{
ClusterId: s.manager.ClusterID(),
ClusterId: keypath.ClusterID(),
Revision: revision,
Error: err,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/metastorage/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("MetaStorage", ms_server.NewService[ms_server.ClusterIDProvider])
register.RegisterService("MetaStorage", ms_server.NewService)
}
Loading

0 comments on commit e257097

Please sign in to comment.