diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index 7a75c2bab7..cad15734ac 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -35,6 +35,7 @@ var ( globalConfig *Config ) +// InitUtils inits necessary dmctl utils func InitUtils(cfg *Config) error { globalConfig = cfg return errors.Trace(InitClient(cfg.MasterAddr)) diff --git a/dm/master/agent_pool.go b/dm/master/agent_pool.go index 0358ef0afc..6089a0baa9 100644 --- a/dm/master/agent_pool.go +++ b/dm/master/agent_pool.go @@ -25,7 +25,7 @@ var ( pool *AgentPool // singleton instance once sync.Once defalutRate float64 = 10 - defaultBurst int = 40 + defaultBurst = 40 ) type emitFunc func(args ...interface{}) @@ -41,6 +41,7 @@ type AgentPool struct { limiter *rate.Limiter } +// RateLimitConfig holds rate limit config type RateLimitConfig struct { rate float64 // dispatch rate burst int // max permits bursts @@ -81,6 +82,7 @@ func (pool *AgentPool) Apply(ctx context.Context, id int) *Agent { } } +// InitAgentPool initials agent pool singleton func InitAgentPool(cfg *RateLimitConfig) *AgentPool { once.Do(func() { pool = NewAgentPool(&RateLimitConfig{rate: cfg.rate, burst: cfg.burst}) @@ -90,7 +92,8 @@ func InitAgentPool(cfg *RateLimitConfig) *AgentPool { } func (pool *AgentPool) dispatch() { - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { case <-ctx.Done(): diff --git a/dm/master/server.go b/dm/master/server.go index 3c99adaf0f..94477228f0 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -250,22 +250,22 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S go Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - stCfg, ok := argsExtractor(args...) + stCfg2, ok := argsExtractor(args...) if !ok { return } - worker, ok1 := s.cfg.DeployMap[stCfg.SourceID] + worker, ok1 := s.cfg.DeployMap[stCfg2.SourceID] cli, ok2 := s.workerClients[worker] if !ok1 || !ok2 { workerRespCh <- &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("%s relevant worker not found", stCfg.SourceID), + Msg: fmt.Sprintf("%s relevant worker not found", stCfg2.SourceID), } return } validWorkerCh <- worker - stCfgToml, err := stCfg.Toml() // convert to TOML format + stCfg2Toml, err := stCfg2.Toml() // convert to TOML format if err != nil { workerRespCh <- &pb.CommonWorkerResponse{ Result: false, @@ -274,21 +274,21 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S } return } - workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfgToml}) - workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp) + workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfg2Toml}) + workerResp = s.handleOperationResult(ctx, cli, stCfg2.Name, err, workerResp) workerResp.Meta.Worker = worker workerRespCh <- workerResp.Meta }, func(args ...interface{}) { defer wg.Done() - stCfg, ok := argsExtractor(args...) + stCfg2, ok := argsExtractor(args...) if !ok { return } workerRespCh <- &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("fail to get emit opporunity for source %s", stCfg.SourceID), + Msg: fmt.Sprintf("fail to get emit opporunity for source %s", stCfg2.SourceID), } }, stCfg) } @@ -1092,7 +1092,8 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, tas go func(worker string) { defer wg.Done() cli := s.workerClients[worker] - cctx, _ := context.WithTimeout(ctx, s.cfg.RPCTimeout) + cctx, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout) + defer cancel() workerStatus, err := cli.QueryStatus(cctx, workerReq) if err != nil { workerStatus = &pb.QueryStatusResponse{ diff --git a/go.mod b/go.mod index 1cc6ef5e65..4f8fe02567 100644 --- a/go.mod +++ b/go.mod @@ -23,8 +23,8 @@ require ( github.com/spf13/cobra v0.0.4 github.com/syndtr/goleveldb v1.0.0 golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc // indirect - golang.org/x/sys v0.0.0-20190116161447-11f53e031339 - golang.org/x/time v0.0.0-20181108054448-85acf8d2951c + golang.org/x/sys v0.0.0-20190422165155-953cdadca894 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 google.golang.org/grpc v1.17.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 55b502982b..d219687ea7 100644 --- a/go.sum +++ b/go.sum @@ -323,6 +323,8 @@ golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc h1:F5tKCVGp+MUAHhKp5MZtGqAlGX3+oCsiL1Q629FL90M= +golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -344,9 +346,8 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= -golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=