Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix make check error
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed May 27, 2019
1 parent f989cf4 commit 321d2e6
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
1 change: 1 addition & 0 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
globalConfig *Config
)

// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
Expand Down
7 changes: 5 additions & 2 deletions dm/master/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
pool *AgentPool // singleton instance
once sync.Once
defalutRate float64 = 10
defaultBurst int = 40
defaultBurst = 40
)

type emitFunc func(args ...interface{})
Expand All @@ -41,6 +41,7 @@ type AgentPool struct {
limiter *rate.Limiter
}

// RateLimitConfig holds rate limit config
type RateLimitConfig struct {
rate float64 // dispatch rate
burst int // max permits bursts
Expand Down Expand Up @@ -81,6 +82,7 @@ func (pool *AgentPool) Apply(ctx context.Context, id int) *Agent {
}
}

// InitAgentPool initials agent pool singleton
func InitAgentPool(cfg *RateLimitConfig) *AgentPool {
once.Do(func() {
pool = NewAgentPool(&RateLimitConfig{rate: cfg.rate, burst: cfg.burst})
Expand All @@ -90,7 +92,8 @@ func InitAgentPool(cfg *RateLimitConfig) *AgentPool {
}

func (pool *AgentPool) dispatch() {
ctx, _ := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case <-ctx.Done():
Expand Down
19 changes: 10 additions & 9 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,22 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
go Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()

stCfg, ok := argsExtractor(args...)
stCfg2, ok := argsExtractor(args...)
if !ok {
return
}

worker, ok1 := s.cfg.DeployMap[stCfg.SourceID]
worker, ok1 := s.cfg.DeployMap[stCfg2.SourceID]
cli, ok2 := s.workerClients[worker]
if !ok1 || !ok2 {
workerRespCh <- &pb.CommonWorkerResponse{
Result: false,
Msg: fmt.Sprintf("%s relevant worker not found", stCfg.SourceID),
Msg: fmt.Sprintf("%s relevant worker not found", stCfg2.SourceID),
}
return
}
validWorkerCh <- worker
stCfgToml, err := stCfg.Toml() // convert to TOML format
stCfg2Toml, err := stCfg2.Toml() // convert to TOML format
if err != nil {
workerRespCh <- &pb.CommonWorkerResponse{
Result: false,
Expand All @@ -274,21 +274,21 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
}
return
}
workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfgToml})
workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp)
workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfg2Toml})
workerResp = s.handleOperationResult(ctx, cli, stCfg2.Name, err, workerResp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta

}, func(args ...interface{}) {
defer wg.Done()

stCfg, ok := argsExtractor(args...)
stCfg2, ok := argsExtractor(args...)
if !ok {
return
}
workerRespCh <- &pb.CommonWorkerResponse{
Result: false,
Msg: fmt.Sprintf("fail to get emit opporunity for source %s", stCfg.SourceID),
Msg: fmt.Sprintf("fail to get emit opporunity for source %s", stCfg2.SourceID),
}
}, stCfg)
}
Expand Down Expand Up @@ -1092,7 +1092,8 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, tas
go func(worker string) {
defer wg.Done()
cli := s.workerClients[worker]
cctx, _ := context.WithTimeout(ctx, s.cfg.RPCTimeout)
cctx, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout)
defer cancel()
workerStatus, err := cli.QueryStatus(cctx, workerReq)
if err != nil {
workerStatus = &pb.QueryStatusResponse{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ require (
github.com/spf13/cobra v0.0.4
github.com/syndtr/goleveldb v1.0.0
golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc // indirect
golang.org/x/sys v0.0.0-20190116161447-11f53e031339
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
golang.org/x/sys v0.0.0-20190422165155-953cdadca894
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/grpc v1.17.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.2
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc h1:F5tKCVGp+MUAHhKp5MZtGqAlGX3+oCsiL1Q629FL90M=
golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -344,9 +346,8 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down

0 comments on commit 321d2e6

Please sign in to comment.