Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
.*: add list-member command (#654)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored May 19, 2020
1 parent 01298fc commit b308a2d
Show file tree
Hide file tree
Showing 10 changed files with 5,377 additions and 2,383 deletions.
1 change: 1 addition & 0 deletions dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewRootCmd() *cobra.Command {
master.NewMigrateRelayCmd(),
master.NewOperateSourceCmd(),
master.NewOfflineWorkerCmd(),
master.NewListMemberCmd(),
)
return cmd
}
Expand Down
100 changes: 100 additions & 0 deletions dm/ctl/master/list_member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
"context"
"os"

"github.com/pingcap/errors"
"github.com/spf13/cobra"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
)

var (
listMemberFlags = ListMemberFlags{}
)

// ListMemberFlags are flags that used in ListMember command
type ListMemberFlags struct {
names []string // specify names to list information
}

// NewListMemberCmd creates an ListMember command
func NewListMemberCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list-member [--leader] [--master] [--worker] [--name master-name/worker-name ...]",
Short: "list member information",
Run: listMemberFunc,
}
cmd.Flags().BoolP("leader", "l", false, "only to list leader information")
cmd.Flags().BoolP("master", "m", false, "only to list master information")
cmd.Flags().BoolP("worker", "w", false, "only to list worker information")
cmd.Flags().StringSliceVarP(&listMemberFlags.names, "name", "n", []string{}, "specify member names in choosing type")
return cmd
}

func convertListMemberType(cmd *cobra.Command) (bool, bool, bool, error) {
leader, err := cmd.Flags().GetBool("leader")
if err != nil {
return false, false, false, err
}
master, err := cmd.Flags().GetBool("master")
if err != nil {
return false, false, false, err
}
worker, err := cmd.Flags().GetBool("worker")
if err != nil {
return false, false, false, err
}
if !leader && !master && !worker {
leader = true
master = true
worker = true
}
return leader, master, worker, nil
}

// listMemberFunc does list member request
func listMemberFunc(cmd *cobra.Command, _ []string) {
if len(cmd.Flags().Args()) != 0 {
cmd.SetOut(os.Stdout)
cmd.Usage()
return
}

leader, master, worker, err := convertListMemberType(cmd)
if err != nil {
common.PrintLines("%s", errors.ErrorStack(err))
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli := common.MasterClient()
resp, err := cli.ListMember(ctx, &pb.ListMemberRequest{
Leader: leader,
Master: master,
Worker: worker,
Names: listMemberFlags.names,
})

if err != nil {
common.PrintLines("list member failed, error:\n%v", errors.ErrorStack(err))
return
}
common.PrettyPrintResponse(resp)
}
16 changes: 16 additions & 0 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,22 @@ func (s *Scheduler) RemoveWorker(name string) error {
return nil
}

// GetAllWorkers gets all worker agent.
func (s *Scheduler) GetAllWorkers() ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started {
return nil, terror.ErrSchedulerNotStarted.Generate()
}

workers := make([]*Worker, 0, len(s.workers))
for _, value := range s.workers {
workers = append(workers, value)
}
return workers, nil
}

// GetWorkerByName gets worker agent by worker name.
func (s *Scheduler) GetWorkerByName(name string) *Worker {
s.mu.RLock()
Expand Down
169 changes: 169 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,3 +1559,172 @@ func (s *Server) getSourceRespsAfterOperation(ctx context.Context, taskName stri
wg.Wait()
return sortCommonWorkerResults(sourceRespCh)
}

func (s *Server) listMemberMaster(ctx context.Context, names []string) (*pb.Members_Master, error) {

resp := &pb.Members_Master{
Master: &pb.ListMasterMember{},
}

memberList, err := s.etcdClient.MemberList(ctx)
if err != nil {
resp.Master.Msg = errors.ErrorStack(err)
return resp, nil
}

all := len(names) == 0
set := make(map[string]bool)
for _, name := range names {
set[name] = true
}

etcdMembers := memberList.Members
masters := make([]*pb.MasterInfo, 0, len(etcdMembers))
client := http.Client{
Timeout: 1 * time.Second,
}

for _, etcdMember := range etcdMembers {
if !all && !set[etcdMember.Name] {
continue
}

alive := true
_, err := client.Get(etcdMember.ClientURLs[0] + "/health")
if err != nil {
alive = false
}

masters = append(masters, &pb.MasterInfo{
Name: etcdMember.Name,
MemberID: etcdMember.ID,
Alive: alive,
ClientURLs: etcdMember.ClientURLs,
PeerURLs: etcdMember.PeerURLs,
})
}

sort.Slice(masters, func(lhs, rhs int) bool {
return masters[lhs].Name < masters[rhs].Name
})
resp.Master.Masters = masters
return resp, nil
}

func (s *Server) listMemberWorker(ctx context.Context, names []string) (*pb.Members_Worker, error) {
resp := &pb.Members_Worker{
Worker: &pb.ListWorkerMember{},
}

workerAgents, err := s.scheduler.GetAllWorkers()
if err != nil {
resp.Worker.Msg = errors.ErrorStack(err)
return resp, nil
}

all := len(names) == 0
set := make(map[string]bool)
for _, name := range names {
set[name] = true
}

workers := make([]*pb.WorkerInfo, 0, len(workerAgents))

for _, workerAgent := range workerAgents {
if !all && !set[workerAgent.BaseInfo().Name] {
continue
}

workers = append(workers, &pb.WorkerInfo{
Name: workerAgent.BaseInfo().Name,
Addr: workerAgent.BaseInfo().Addr,
Stage: string(workerAgent.Stage()),
Source: workerAgent.Bound().Source,
})
}

sort.Slice(workers, func(lhs, rhs int) bool {
return workers[lhs].Name < workers[rhs].Name
})
resp.Worker.Workers = workers
return resp, nil
}

func (s *Server) listMemberLeader(ctx context.Context, names []string) (*pb.Members_Leader, error) {
resp := &pb.Members_Leader{
Leader: &pb.ListLeaderMember{},
}

all := len(names) == 0
set := make(map[string]bool)
for _, name := range names {
set[name] = true
}

_, name, addr, err := s.election.LeaderInfo(ctx)
if err != nil {
resp.Leader.Msg = errors.ErrorStack(err)
return resp, nil
}

if !all && !set[name] {
return resp, nil
}

resp.Leader.Name = name
resp.Leader.Addr = addr
return resp, nil
}

// ListMember list member information
func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb.ListMemberResponse, error) {
log.L().Info("", zap.Stringer("payload", req), zap.String("request", "ListMember"))

isLeader, needForward := s.isLeaderAndNeedForward()
if !isLeader {
if needForward {
return s.leaderClient.ListMember(ctx, req)
}
return nil, terror.ErrMasterRequestIsNotForwardToLeader
}

resp := &pb.ListMemberResponse{}
members := make([]*pb.Members, 0)

if req.Leader {
res, err := s.listMemberLeader(ctx, req.Names)
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
members = append(members, &pb.Members{
Member: res,
})
}

if req.Master {
res, err := s.listMemberMaster(ctx, req.Names)
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
members = append(members, &pb.Members{
Member: res,
})
}

if req.Worker {
res, err := s.listMemberWorker(ctx, req.Names)
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
members = append(members, &pb.Members{
Member: res,
})
}

resp.Result = true
resp.Members = members
return resp, nil
}
Loading

0 comments on commit b308a2d

Please sign in to comment.