Skip to content

Commit

Permalink
change more
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 19, 2024
1 parent 685db6b commit 76ef427
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 63 deletions.
8 changes: 4 additions & 4 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServiceMembers(context.Context, string) (*MicroServiceMemberInfo, error)
GetMicroServicePrimary(context.Context, string) (string, error)
DeleteOperators(context.Context) error

Expand Down Expand Up @@ -856,8 +856,8 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}

// GetMicroServiceMembers gets the members of the microservice.
func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]MicroServiceMember, error) {
var members []MicroServiceMember
func (c *client) GetMicroServiceMembers(ctx context.Context, service string) (*MicroServiceMemberInfo, error) {
var members MicroServiceMemberInfo
err := c.request(ctx, newRequestInfo().
WithName(getMicroServiceMembersName).
WithURI(MicroServiceMembers(service)).
Expand All @@ -866,7 +866,7 @@ func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]
if err != nil {
return nil, err
}
return members, nil
return &members, nil
}

// GetMicroServicePrimary gets the primary of the microservice.
Expand Down
16 changes: 11 additions & 5 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,17 @@ type MembersInfo struct {
EtcdLeader *pdpb.Member `json:"etcd_leader,omitempty"`
}

// MicroServiceMemberInfo is the member info of a micro service.
type MicroServiceMemberInfo struct {
Members []*MicroServiceMember `json:"members,omitempty"`
Primary *MicroServiceMember `json:"primary,omitempty"`
}

// MicroServiceMember is the member info of a micro service.
type MicroServiceMember struct {
ServiceAddr string `json:"service-addr"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
ServiceAddr string `json:"service-addr,omitempty"`
Version string `json:"version,omitempty"`
GitHash string `json:"git-hash,omitempty"`
DeployPath string `json:"deploy-path,omitempty"`
StartTimestamp int64 `json:"start-timestamp,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
tsoServiceKey := discovery.TSOPath(clusterID)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
s := &discovery.MicroServiceMember{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
Expand Down
14 changes: 7 additions & 7 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
func GetMSMembers(name string, client *clientv3.Client) ([]*MicroServiceMember, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
Expand All @@ -61,18 +61,18 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry,
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}

var entries []ServiceRegistryEntry
var members []*MicroServiceMember
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("try to deserialize service registry entry failed", zap.String("key", string(keyValue.Key)), zap.Error(err))
var member *MicroServiceMember
if err = member.Deserialize(keyValue.Value); err != nil {
log.Error("try to deserialize service member failed", zap.String("key", string(keyValue.Key)), zap.Error(err))
continue
}
entries = append(entries, entry)
members = append(members, member)
}
}
return entries, nil
return members, nil
}

return nil, errors.Errorf("unknown service name %s", name)
Expand Down
22 changes: 11 additions & 11 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ func TestDiscover(t *testing.T) {
re.Empty(endpoints)
}

func TestServiceRegistryEntry(t *testing.T) {
func TestMicroServiceMember(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
member1 := &MicroServiceMember{ServiceAddr: "127.0.0.1:1"}
s1, err := member1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
member2 := &MicroServiceMember{ServiceAddr: "127.0.0.1:2"}
s2, err := member2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
Expand All @@ -68,12 +68,12 @@ func TestServiceRegistryEntry(t *testing.T) {
endpoints, err := Discover(client, "12345", "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
returnedEntry1.Deserialize([]byte(endpoints[0]))
re.Equal("127.0.0.1:1", returnedEntry1.ServiceAddr)
returnedEntry2 := &ServiceRegistryEntry{}
returnedEntry2.Deserialize([]byte(endpoints[1]))
re.Equal("127.0.0.1:2", returnedEntry2.ServiceAddr)
returnedMember1 := &MicroServiceMember{}
returnedMember1.Deserialize([]byte(endpoints[0]))
re.Equal("127.0.0.1:1", returnedMember1.ServiceAddr)
returnedMember2 := &MicroServiceMember{}
returnedMember2.Deserialize([]byte(endpoints[1]))
re.Equal("127.0.0.1:2", returnedMember2.ServiceAddr)

sr1.cancel()
sr2.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,35 @@ import (
"go.uber.org/zap"
)

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
// MicroServiceMember is the member of the micro service.
type MicroServiceMember struct {
ServiceAddr string `json:"service-addr"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
}

// Serialize this service registry entry
func (e *ServiceRegistryEntry) Serialize() (serializedValue string, err error) {
// MicroServiceMemberInfo is the member info of the micro service.
type MicroServiceMemberInfo struct {
Members []*MicroServiceMember `json:"members,omitempty"`
Primary *MicroServiceMember `json:"primary,omitempty"`
}

// Serialize serializes the service member.
func (e *MicroServiceMember) Serialize() (serializedValue string, err error) {
data, err := json.Marshal(e)
if err != nil {
log.Error("json marshal the service registry entry failed", zap.Error(err))
log.Error("json marshal the service member failed", zap.Error(err))
return "", err
}
return string(data), nil
}

// Deserialize the data to this service registry entry
func (e *ServiceRegistryEntry) Deserialize(data []byte) error {
// Deserialize deserializes the service member.
func (e *MicroServiceMember) Deserialize(data []byte) error {
if err := json.Unmarshal(data, e); err != nil {
log.Error("json unmarshal the service registry entry failed", zap.Error(err))
log.Error("json unmarshal the service member failed", zap.Error(err))
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (s *Server) startServer() (err error) {
s.startServerLoop()

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
entry := &discovery.MicroServiceMember{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Server struct {
primaryExitCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceMember *discovery.MicroServiceMember
serviceRegister *discovery.ServiceRegister

cluster *Cluster
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
deployPath = ""
}
s.serviceID = &discovery.ServiceRegistryEntry{
s.serviceMember = &discovery.MicroServiceMember{
ServiceAddr: s.cfg.AdvertiseListenAddr,
Version: versioninfo.PDReleaseVersion,
GitHash: versioninfo.PDGitHash,
Expand Down Expand Up @@ -457,12 +457,12 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
serializedEntry, err := s.serviceID.Serialize()
serializedMember, err := s.serviceMember.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10),
utils.SchedulingServiceName, s.cfg.GetAdvertiseListenAddr(), serializedEntry, discovery.DefaultLeaseInSeconds)
utils.SchedulingServiceName, s.cfg.GetAdvertiseListenAddr(), serializedMember, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err))
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Server struct {
tsoProtoFactory *tsoutil.TSOProtoFactory

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceMember *discovery.MicroServiceMember
serviceRegister *discovery.ServiceRegister
}

Expand Down Expand Up @@ -372,15 +372,15 @@ func (s *Server) startServer() (err error) {
if err != nil {
deployPath = ""
}
s.serviceID = &discovery.ServiceRegistryEntry{
s.serviceMember = &discovery.MicroServiceMember{
ServiceAddr: s.cfg.AdvertiseListenAddr,
Version: versioninfo.PDReleaseVersion,
GitHash: versioninfo.PDGitHash,
DeployPath: deployPath,
StartTimestamp: s.StartTimestamp(),
}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
s.serverLoopCtx, s.serviceMember, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
Expand All @@ -406,12 +406,12 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
serializedEntry, err := s.serviceID.Serialize()
serializedMember, err := s.serviceMember.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10),
utils.TSOServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
utils.TSOServiceName, s.cfg.AdvertiseListenAddr, serializedMember, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to register the service", zap.String("service-name", utils.TSOServiceName), errs.ZapError(err))
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ type KeyspaceGroupManager struct {
wg sync.WaitGroup

// tsoServiceID is the service ID of the TSO service, registered in the service discovery
tsoServiceID *discovery.ServiceRegistryEntry
tsoServiceID *discovery.MicroServiceMember
etcdClient *clientv3.Client
httpClient *http.Client
// electionNamePrefix is the name prefix to generate the unique name of a participant,
Expand All @@ -327,7 +327,7 @@ type KeyspaceGroupManager struct {
electionNamePrefix string
// tsoServiceKey is the path for storing the registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
// Value: discover.MicroServiceMember
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/API service. It's in the format of "/pd/{cluster_id}".
Expand Down Expand Up @@ -394,7 +394,7 @@ type KeyspaceGroupManager struct {
// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
func NewKeyspaceGroupManager(
ctx context.Context,
tsoServiceID *discovery.ServiceRegistryEntry,
tsoServiceID *discovery.MicroServiceMember,
etcdClient *clientv3.Client,
httpClient *http.Client,
electionNamePrefix string,
Expand Down Expand Up @@ -480,10 +480,10 @@ func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
// Value: discover.MicroServiceMember
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
s := &discovery.MicroServiceMember{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
Expand Down
10 changes: 5 additions & 5 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
re := suite.Require()

tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr}
tsoServiceID := &discovery.MicroServiceMember{ServiceAddr: suite.cfg.AdvertiseListenAddr}
guid := uuid.New().String()
tsoServiceKey := discovery.ServicePath(guid, "tso")
legacySvcRootPath := path.Join("/pd", guid)
Expand Down Expand Up @@ -795,7 +795,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
uniqueStr string,
cfg *TestServiceConfig,
) *KeyspaceGroupManager {
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()}
tsoServiceID := &discovery.MicroServiceMember{ServiceAddr: cfg.GetAdvertiseListenAddr()}
tsoServiceKey := discovery.ServicePath(uniqueStr, "tso")
legacySvcRootPath := path.Join("/pd", uniqueStr)
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, uniqueStr, "tso")
Expand Down Expand Up @@ -1148,11 +1148,11 @@ func (suite *keyspaceGroupManagerTestSuite) registerTSOServer(
re *require.Assertions, clusterID, svcAddr string, cfg *TestServiceConfig,
) error {
// Register TSO server 1
serviceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()}
serializedEntry, err := serviceID.Serialize()
serviceMember := &discovery.MicroServiceMember{ServiceAddr: cfg.GetAdvertiseListenAddr()}
serializedMember, err := serviceMember.Serialize()
re.NoError(err)
serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr)
_, err = suite.etcdClient.Put(suite.ctx, serviceKey, serializedEntry)
_, err = suite.etcdClient.Put(suite.ctx, serviceKey, serializedMember)
return err
}

Expand Down
17 changes: 14 additions & 3 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func RegisterMicroService(r *gin.RouterGroup) {
// @Tags members
// @Summary Get all members of the cluster for the specified service.
// @Produce json
// @Success 200 {object} []discovery.ServiceRegistryEntry
// @Success 200 {object} []discovery.MicroServiceMember
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
Expand All @@ -45,12 +45,23 @@ func GetMembers(c *gin.Context) {
}

if service := c.Param("service"); len(service) > 0 {
entries, err := discovery.GetMSMembers(service, svr.GetClient())
members, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, entries)
var primary *discovery.MicroServiceMember
addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
for i := range members {
if members[i].ServiceAddr == addr {
primary = members[i]
}
}

c.IndentedJSON(http.StatusOK, &discovery.MicroServiceMemberInfo{
Members: members,
Primary: primary,
})
return
}

Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {

endpoints, err := discovery.Discover(client, suite.clusterID, serviceName)
re.NoError(err)
returnedEntry := &discovery.ServiceRegistryEntry{}
returnedEntry.Deserialize([]byte(endpoints[0]))
re.Equal(addr, returnedEntry.ServiceAddr)
returnedMember := &discovery.MicroServiceMember{}
returnedMember.Deserialize([]byte(endpoints[0]))
re.Equal(addr, returnedMember.ServiceAddr)

// test primary when only one server
expectedPrimary := tests.WaitForPrimaryServing(re, map[string]bs.Server{addr: s})
Expand Down
1 change: 0 additions & 1 deletion tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) {
re := suite.Require()
stores := []*metapb.Store{
{
// metapb.StoreState_Up == 0
Id: 1,
Address: "tikv1",
State: metapb.StoreState_Up,
Expand Down

0 comments on commit 76ef427

Please sign in to comment.