diff --git a/cmd/dm-worker/main.go b/cmd/dm-worker/main.go index 07490056d8..1322c19b5f 100644 --- a/cmd/dm-worker/main.go +++ b/cmd/dm-worker/main.go @@ -21,15 +21,15 @@ import ( "strings" "syscall" + "github.com/pingcap/errors" + globalLog "github.com/pingcap/log" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/ctl/common" "github.com/pingcap/dm/dm/worker" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" - globalLog "github.com/pingcap/log" - - "github.com/pingcap/errors" - "go.uber.org/zap" ) func main() { diff --git a/dm/config/source_config.go b/dm/config/source_config.go index 22cf5db1f2..93559a2bf3 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -8,7 +8,9 @@ import ( "io/ioutil" "math" "math/rand" + "path/filepath" "strings" + "time" "github.com/BurntSushi/toml" "github.com/siddontang/go-mysql/mysql" @@ -255,6 +257,9 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) { if len(c.RelayDir) == 0 { c.RelayDir = defaultRelayDir } + if filepath.IsAbs(c.RelayDir) { + log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker") + } return nil } @@ -291,6 +296,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error { return terror.WithScope(err, terror.ScopeUpstream) } + rand.Seed(time.Now().UnixNano()) for i := 0; i < 5; i++ { randomValue := uint32(rand.Intn(100000)) randomServerID := defaultBaseServerID + randomValue diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index dcef1bb7b7..a7c84f5be4 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -932,6 +932,32 @@ func (s *Scheduler) StopRelay(source string, workers []string) error { return nil } +// GetRelayWorkers returns all alive worker instances for a relay source +func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if !s.started { + return nil, terror.ErrSchedulerNotStarted.Generate() + } + + workers := s.relayWorkers[source] + var ret []*Worker + for w := range workers { + worker, ok := s.workers[w] + if !ok { + // should not happen + s.logger.Error("worker instance for relay worker not found", zap.String("worker", w)) + continue + } + ret = append(ret, worker) + } + sort.Slice(ret, func(i, j int) bool { + return ret[i].baseInfo.Name < ret[j].baseInfo.Name + }) + return ret, nil +} + // UpdateExpectRelayStage updates the current expect relay stage. // now, only support updates: // - from `Running` to `Paused`. @@ -1475,10 +1501,17 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { } // try to find its relay source (currently only one relay source) - if source == "" { + if source != "" { + s.logger.Info("found history source when worker bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) + } else { for source2, workers := range s.relayWorkers { if _, ok2 := workers[w.BaseInfo().Name]; ok2 { source = source2 + s.logger.Info("found relay source when worker bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) break } } @@ -1498,6 +1531,9 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // randomly pick one from unbounds if source == "" { for source = range s.unbounds { + s.logger.Info("found unbound source when worker bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) break // got a source. } } @@ -1529,40 +1565,72 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // returns (true, nil) after bounded. // called should update the s.unbounds func (s *Scheduler) tryBoundForSource(source string) (bool, error) { - // 1. try to find history workers... var worker *Worker - for workerName, bound := range s.lastBound { - if bound.Source == source { - w, ok := s.workers[workerName] - if !ok { - // a not found worker - continue - } - if w.Stage() == WorkerFree { - worker = w - break + relayWorkers := s.relayWorkers[source] + // 1. try to find a history worker in relay workers... + if len(relayWorkers) > 0 { + for workerName, bound := range s.lastBound { + if bound.Source == source { + w, ok := s.workers[workerName] + if !ok { + // a not found worker + continue + } + if _, ok2 := relayWorkers[workerName]; ok2 && w.Stage() == WorkerFree { + worker = w + s.logger.Info("found history relay worker when source bound", + zap.String("worker", workerName), + zap.String("source", source)) + break + } } } } // then a relay worker for this source... if worker == nil { - for workerName := range s.relayWorkers[source] { + for workerName := range relayWorkers { w, ok := s.workers[workerName] if !ok { // a not found worker, should not happened + s.logger.Warn("worker instance not found for relay worker", zap.String("worker", workerName)) continue } if w.Stage() == WorkerFree { worker = w + s.logger.Info("found relay worker when source bound", + zap.String("worker", workerName), + zap.String("source", source)) break } } } + // then a history worker for this source... + if worker == nil { + for workerName, bound := range s.lastBound { + if bound.Source == source { + w, ok := s.workers[workerName] + if !ok { + // a not found worker + continue + } + if w.Stage() == WorkerFree { + worker = w + s.logger.Info("found history worker when source bound", + zap.String("worker", workerName), + zap.String("source", source)) + break + } + } + } + } // and then a random Free worker. if worker == nil { for _, w := range s.workers { if w.Stage() == WorkerFree { worker = w + s.logger.Info("found free worker when source bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) break } } diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 7e492629ae..268d820b0b 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1164,6 +1164,12 @@ func (t *testScheduler) TestStartStopSource(c *C) { // test not exist source c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StartRelay(sourceID3, []string{workerName1})), IsTrue) c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StopRelay(sourceID4, []string{workerName1})), IsTrue) + noWorkerSources := []string{sourceID1, sourceID2, sourceID3, sourceID4} + for _, source := range noWorkerSources { + workers, err := s.GetRelayWorkers(source) + c.Assert(err, IsNil) + c.Assert(workers, HasLen, 0) + } // start-relay success on bound-same-source and free worker c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil) @@ -1175,6 +1181,9 @@ func (t *testScheduler) TestStartStopSource(c *C) { c.Assert(s.relayWorkers[sourceID1], HasLen, 2) c.Assert(s.relayWorkers[sourceID1], HasKey, workerName1) c.Assert(s.relayWorkers[sourceID1], HasKey, workerName3) + workers, err := s.GetRelayWorkers(sourceID1) + c.Assert(err, IsNil) + c.Assert(workers, DeepEquals, []*Worker{worker1, worker3}) // failed on bound-not-same-source worker and not exist worker c.Assert(terror.ErrSchedulerRelayWorkersWrongBound.Equal(s.StartRelay(sourceID1, []string{workerName2})), IsTrue) @@ -1189,11 +1198,22 @@ func (t *testScheduler) TestStartStopSource(c *C) { c.Assert(s.expectRelayStages, HasKey, sourceID2) c.Assert(s.relayWorkers[sourceID2], HasLen, 1) c.Assert(s.relayWorkers[sourceID2], HasKey, workerName2) + workers, err = s.GetRelayWorkers(sourceID2) + c.Assert(err, IsNil) + c.Assert(workers, DeepEquals, []*Worker{worker2}) // failed on not-same-source worker and not exist worker c.Assert(terror.ErrSchedulerRelayWorkersWrongRelay.Equal(s.StopRelay(sourceID1, []string{workerName2})), IsTrue) c.Assert(terror.ErrSchedulerWorkerNotExist.Equal(s.StopRelay(sourceID1, []string{"not-exist"})), IsTrue) + // nothing changed + workers, err = s.GetRelayWorkers(sourceID1) + c.Assert(err, IsNil) + c.Assert(workers, DeepEquals, []*Worker{worker1, worker3}) + workers, err = s.GetRelayWorkers(sourceID2) + c.Assert(err, IsNil) + c.Assert(workers, DeepEquals, []*Worker{worker2}) + // stop-relay success c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil) c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil) @@ -1202,4 +1222,7 @@ func (t *testScheduler) TestStartStopSource(c *C) { c.Assert(s.expectRelayStages, HasKey, sourceID2) c.Assert(s.relayWorkers, HasLen, 1) c.Assert(s.relayWorkers, HasKey, sourceID2) + workers, err = s.GetRelayWorkers(sourceID1) + c.Assert(err, IsNil) + c.Assert(workers, HasLen, 0) } diff --git a/dm/master/server.go b/dm/master/server.go index 6707d7b6ac..27c111d929 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -633,18 +633,16 @@ func extractSources(s *Server, req hasWokers) ([]string, error) { var sources []string if len(req.GetSources()) > 0 { - // query specified dm-workers - invalidWorkers := make([]string, 0, len(req.GetSources())) - for _, source := range req.GetSources() { - w := s.scheduler.GetWorkerBySource(source) - if w == nil || w.Stage() == scheduler.WorkerOffline { - invalidWorkers = append(invalidWorkers, source) + sources = req.GetSources() + var invalidSource []string + for _, source := range sources { + if s.scheduler.GetSourceCfgByID(source) == nil { + invalidSource = append(invalidSource, source) } } - if len(invalidWorkers) > 0 { - return nil, errors.Errorf("%s relevant worker-client not found", strings.Join(invalidWorkers, ", ")) + if len(invalidSource) > 0 { + return nil, errors.Errorf("sources %s haven't been added", invalidSource) } - sources = req.GetSources() } else if len(req.GetName()) > 0 { // query specified task's sources sources = s.getTaskResources(req.GetName()) @@ -677,18 +675,24 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest Msg: err.Error(), }, nil } - workerRespCh := s.getStatusFromWorkers(ctx, sources, req.Name) - workerRespMap := make(map[string]*pb.QueryStatusResponse, len(sources)) - for len(workerRespCh) > 0 { - workerResp := <-workerRespCh - workerRespMap[workerResp.SourceStatus.Source] = workerResp + queryRelayWorker := false + if len(req.GetSources()) > 0 { + // if user specified sources, query relay workers instead of task workers + queryRelayWorker = true + } + + resps := s.getStatusFromWorkers(ctx, sources, req.Name, queryRelayWorker) + + workerRespMap := make(map[string][]*pb.QueryStatusResponse, len(sources)) + for _, workerResp := range resps { + workerRespMap[workerResp.SourceStatus.Source] = append(workerRespMap[workerResp.SourceStatus.Source], workerResp) } sort.Strings(sources) workerResps := make([]*pb.QueryStatusResponse, 0, len(sources)) for _, worker := range sources { - workerResps = append(workerResps, workerRespMap[worker]) + workerResps = append(workerResps, workerRespMap[worker]...) } resp := &pb.QueryStatusListResponse{ Result: true, @@ -788,45 +792,62 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR }, } - workerRespCh := make(chan *pb.CommonWorkerResponse, len(req.Sources)) + var ( + workerResps = make([]*pb.CommonWorkerResponse, 0, len(req.Sources)) + workerRespMu sync.Mutex + ) + setWorkerResp := func(resp *pb.CommonWorkerResponse) { + workerRespMu.Lock() + workerResps = append(workerResps, resp) + workerRespMu.Unlock() + } + var wg sync.WaitGroup for _, source := range req.Sources { - wg.Add(1) - go func(source string) { - defer wg.Done() - worker := s.scheduler.GetWorkerBySource(source) + workers, err := s.scheduler.GetRelayWorkers(source) + if err != nil { + return nil, err + } + if len(workers) == 0 { + setWorkerResp(errorCommonWorkerResponse(fmt.Sprintf("relay worker for source %s not found, please `start-relay` first", source), source, "")) + continue + } + for _, worker := range workers { if worker == nil { - workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("source %s relevant worker-client not found", source), source, "") - return - } - resp, err := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) - var workerResp *pb.CommonWorkerResponse - if err != nil { - workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name) - } else { - workerResp = resp.PurgeRelay + setWorkerResp(errorCommonWorkerResponse(fmt.Sprintf("relay worker instance for source %s not found, please `start-relay` first", source), source, "")) + continue } - workerResp.Source = source - workerRespCh <- workerResp - }(source) + wg.Add(1) + go func(worker *scheduler.Worker, source string) { + defer wg.Done() + resp, err3 := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) + var workerResp *pb.CommonWorkerResponse + if err3 != nil { + workerResp = errorCommonWorkerResponse(err3.Error(), source, worker.BaseInfo().Name) + } else { + workerResp = resp.PurgeRelay + } + workerResp.Source = source + setWorkerResp(workerResp) + }(worker, source) + } } wg.Wait() - workerRespMap := make(map[string]*pb.CommonWorkerResponse, len(req.Sources)) - for len(workerRespCh) > 0 { - workerResp := <-workerRespCh - workerRespMap[workerResp.Source] = workerResp + workerRespMap := make(map[string][]*pb.CommonWorkerResponse, len(req.Sources)) + for _, workerResp := range workerResps { + workerRespMap[workerResp.Source] = append(workerRespMap[workerResp.Source], workerResp) } sort.Strings(req.Sources) - workerResps := make([]*pb.CommonWorkerResponse, 0, len(req.Sources)) + returnResps := make([]*pb.CommonWorkerResponse, 0, len(req.Sources)) for _, worker := range req.Sources { - workerResps = append(workerResps, workerRespMap[worker]) + returnResps = append(returnResps, workerRespMap[worker]...) } return &pb.PurgeWorkerRelayResponse{ Result: true, - Sources: workerResps, + Sources: returnResps, }, nil } @@ -879,14 +900,23 @@ func (s *Server) getTaskResources(task string) []string { } // getStatusFromWorkers does RPC request to get status from dm-workers -func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, taskName string) chan *pb.QueryStatusResponse { +func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, taskName string, relayWorker bool) []*pb.QueryStatusResponse { workerReq := &workerrpc.Request{ Type: workerrpc.CmdQueryStatus, QueryStatus: &pb.QueryStatusRequest{Name: taskName}, } - workerRespCh := make(chan *pb.QueryStatusResponse, len(sources)) - handleErr := func(err error, source string) bool { + var ( + workerResps = make([]*pb.QueryStatusResponse, 0, len(sources)) + workerRespMu sync.Mutex + ) + setWorkerResp := func(resp *pb.QueryStatusResponse) { + workerRespMu.Lock() + workerResps = append(workerResps, resp) + workerRespMu.Unlock() + } + + handleErr := func(err error, source string, worker string) { log.L().Error("response error", zap.Error(err)) resp := &pb.QueryStatusResponse{ Result: false, @@ -895,43 +925,79 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas Source: source, }, } - workerRespCh <- resp - return false + if worker != "" { + resp.SourceStatus.Worker = worker + } + setWorkerResp(resp) } var wg sync.WaitGroup for _, source := range sources { - wg.Add(1) - go s.ap.Emit(ctx, 0, func(args ...interface{}) { - defer wg.Done() - sourceID, _ := args[0].(string) - worker := s.scheduler.GetWorkerBySource(sourceID) - if worker == nil { - err := terror.ErrMasterWorkerArgsExtractor.Generatef("%s relevant worker-client not found", sourceID) - handleErr(err, sourceID) - return + var ( + workers []*scheduler.Worker + workerNameSet = make(map[string]struct{}) + err2 error + ) + if relayWorker { + workers, err2 = s.scheduler.GetRelayWorkers(source) + if err2 != nil { + handleErr(err2, source, "") + continue } - resp, err := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) - var workerStatus *pb.QueryStatusResponse - if err != nil { - workerStatus = &pb.QueryStatusResponse{ - Result: false, - Msg: err.Error(), - SourceStatus: &pb.SourceStatus{}, - } - } else { - workerStatus = resp.QueryStatus + // returned workers is not duplicated + for _, w := range workers { + workerNameSet[w.BaseInfo().Name] = struct{}{} } - workerStatus.SourceStatus.Source = sourceID - workerRespCh <- workerStatus - }, func(args ...interface{}) { - defer wg.Done() - sourceID, _ := args[0].(string) - handleErr(terror.ErrMasterNoEmitToken.Generate(sourceID), sourceID) - }, source) + } + + // subtask workers may have been found in relay workers + taskWorker := s.scheduler.GetWorkerBySource(source) + if taskWorker != nil { + if _, ok := workerNameSet[taskWorker.BaseInfo().Name]; !ok { + workers = append(workers, taskWorker) + } + } + + if len(workers) == 0 { + err := terror.ErrMasterWorkerArgsExtractor.Generatef("%s relevant worker-client not found", source) + handleErr(err, source, "") + continue + } + + for _, worker := range workers { + wg.Add(1) + go s.ap.Emit(ctx, 0, func(args ...interface{}) { + defer wg.Done() + sourceID := args[0].(string) + w, _ := args[1].(*scheduler.Worker) + + resp, err := w.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) + var workerStatus *pb.QueryStatusResponse + if err != nil { + workerStatus = &pb.QueryStatusResponse{ + Result: false, + Msg: err.Error(), + SourceStatus: &pb.SourceStatus{}, + } + } else { + workerStatus = resp.QueryStatus + } + workerStatus.SourceStatus.Source = sourceID + setWorkerResp(workerStatus) + }, func(args ...interface{}) { + defer wg.Done() + sourceID, _ := args[0].(string) + w, _ := args[1].(*scheduler.Worker) + workerName := "" + if w != nil { + workerName = w.BaseInfo().Name + } + handleErr(terror.ErrMasterNoEmitToken.Generate(sourceID), sourceID, workerName) + }, source, worker) + } } wg.Wait() - return workerRespCh + return workerResps } // TODO: refine the call stack of this API, query worker configs that we needed only diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 9cb0e1f87f..6eb4f84325 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -283,6 +283,40 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour return scheduler2, cancels } +func testMockSchedulerForRelay(ctx context.Context, wg *sync.WaitGroup, c *check.C, sources, workers []string, password string, workerClients map[string]workerrpc.Client) (*scheduler.Scheduler, []context.CancelFunc) { + logger := log.L() + scheduler2 := scheduler.NewScheduler(&logger, config.Security{}) + err := scheduler2.Start(ctx, etcdTestCli) + c.Assert(err, check.IsNil) + cancels := make([]context.CancelFunc, 0, 2) + for i := range workers { + // add worker to scheduler's workers map + name := workers[i] + c.Assert(scheduler2.AddWorker(name, workers[i]), check.IsNil) + scheduler2.SetWorkerClientForTest(name, workerClients[workers[i]]) + // operate mysql config on this worker + cfg := config.NewSourceConfig() + cfg.SourceID = sources[i] + cfg.From.Password = password + c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources)) + wg.Add(1) + ctx1, cancel1 := context.WithCancel(ctx) + cancels = append(cancels, cancel1) + go func(ctx context.Context, workerName string) { + defer wg.Done() + // TODO: why this will get context cancel? + c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName, keepAliveTTL), check.IsNil) + }(ctx1, name) + c.Assert(scheduler2.StartRelay(sources[i], []string{workers[i]}), check.IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + relayWorkers, err2 := scheduler2.GetRelayWorkers(sources[i]) + c.Assert(err2, check.IsNil) + return len(relayWorkers) == 1 && relayWorkers[0].BaseInfo().Name == name + }), check.IsTrue) + } + return scheduler2, cancels +} + func (t *testMaster) TestQueryStatus(c *check.C) { ctrl := gomock.NewController(c) defer ctrl.Finish() @@ -323,7 +357,7 @@ func (t *testMaster) TestQueryStatus(c *check.C) { t.workerClients[worker] = newMockRPCClient(mockWorkerClient) } ctx, cancel = context.WithCancel(context.Background()) - server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients) + server.scheduler, _ = testMockSchedulerForRelay(ctx, &wg, c, sources, workers, "", t.workerClients) resp, err = server.QueryStatus(context.Background(), &pb.QueryStatusListRequest{ Sources: sources, }) @@ -336,7 +370,7 @@ func (t *testMaster) TestQueryStatus(c *check.C) { }) c.Assert(err, check.IsNil) c.Assert(resp.Result, check.IsFalse) - c.Assert(resp.Msg, check.Matches, ".*relevant worker-client not found") + c.Assert(resp.Msg, check.Matches, "sources .* haven't been added") // query with invalid task name resp, err = server.QueryStatus(context.Background(), &pb.QueryStatusListRequest{ @@ -837,6 +871,10 @@ func (t *testMaster) TestPurgeWorkerRelay(c *check.C) { } } + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + server.scheduler, _ = testMockSchedulerForRelay(ctx, &wg, c, nil, nil, "", t.workerClients) + // test PurgeWorkerRelay with invalid dm-worker[s] resp, err := server.PurgeWorkerRelay(context.Background(), &pb.PurgeWorkerRelayRequest{ Sources: []string{"invalid-source1", "invalid-source2"}, @@ -848,14 +886,14 @@ func (t *testMaster) TestPurgeWorkerRelay(c *check.C) { c.Assert(resp.Sources, check.HasLen, 2) for _, w := range resp.Sources { c.Assert(w.Result, check.IsFalse) - c.Assert(w.Msg, check.Matches, ".*relevant worker-client not found") + c.Assert(w.Msg, check.Matches, "relay worker for source .* not found.*") } + clearSchedulerEnv(c, cancel, &wg) - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(context.Background()) // test PurgeWorkerRelay successfully mockPurgeRelay(true) - server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients) + server.scheduler, _ = testMockSchedulerForRelay(ctx, &wg, c, sources, workers, "", t.workerClients) resp, err = server.PurgeWorkerRelay(context.Background(), &pb.PurgeWorkerRelayRequest{ Sources: sources, Time: now, @@ -872,7 +910,7 @@ func (t *testMaster) TestPurgeWorkerRelay(c *check.C) { ctx, cancel = context.WithCancel(context.Background()) // test PurgeWorkerRelay with error response mockPurgeRelay(false) - server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients) + server.scheduler, _ = testMockSchedulerForRelay(ctx, &wg, c, sources, workers, "", t.workerClients) resp, err = server.PurgeWorkerRelay(context.Background(), &pb.PurgeWorkerRelayRequest{ Sources: sources, Time: now, diff --git a/dm/worker/config.go b/dm/worker/config.go index 4d65ef7a12..b6e2fd2730 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -45,6 +45,10 @@ func init() { i := val.(int) defaultKeepAliveTTL = int64(i) }) + failpoint.Inject("defaultRelayKeepAliveTTL", func(val failpoint.Value) { + i := val.(int) + defaultRelayKeepAliveTTL = int64(i) + }) } // NewConfig creates a new base config for worker. diff --git a/dm/worker/server.go b/dm/worker/server.go index 71d8221b9b..4505b99e21 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -187,20 +187,50 @@ func (s *Server) Start() error { // NOTE: don't need to set tls config, because rootLis already use tls s.svr = grpc.NewServer() pb.RegisterWorkerServer(s.svr, s) + + grpcExitCh := make(chan struct{}, 1) s.wg.Add(1) go func() { - defer s.wg.Done() err2 := s.svr.Serve(grpcL) if err2 != nil && !common.IsErrNetClosing(err2) && err2 != cmux.ErrListenerClosed { log.L().Error("gRPC server returned", log.ShortError(err2)) } + grpcExitCh <- struct{}{} }() + go func(ctx context.Context) { + defer s.wg.Done() + select { + case <-ctx.Done(): + if s.svr != nil { + // GracefulStop can not cancel active stream RPCs + // and the stream RPC may block on Recv or Send + // so we use Stop instead to cancel all active RPCs + s.svr.Stop() + } + case <-grpcExitCh: + } + }(s.ctx) + + httpExitCh := make(chan struct{}, 1) s.wg.Add(1) go func() { - defer s.wg.Done() InitStatus(httpL) // serve status + httpExitCh <- struct{}{} }() + go func(ctx context.Context) { + defer s.wg.Done() + select { + case <-ctx.Done(): + if s.rootLis != nil { + err2 := s.rootLis.Close() + if err2 != nil && !common.IsErrNetClosing(err2) { + log.L().Error("fail to close net listener", log.ShortError(err2)) + } + } + case <-httpExitCh: + } + }(s.ctx) s.closed.Set(false) log.L().Info("listening gRPC API and status request", zap.String("address", s.cfg.WorkerAddr)) @@ -443,27 +473,16 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } func (s *Server) doClose() { + s.cancel() + // close server in advance, stop receiving source bound and relay bound + s.wg.Wait() + s.Lock() defer s.Unlock() if s.closed.Get() { return } - - if s.rootLis != nil { - err := s.rootLis.Close() - if err != nil && !common.IsErrNetClosing(err) { - log.L().Error("fail to close net listener", log.ShortError(err)) - } - } - if s.svr != nil { - // GracefulStop can not cancel active stream RPCs - // and the stream RPC may block on Recv or Send - // so we use Stop instead to cancel all active RPCs - s.svr.Stop() - } - // close worker and wait for return - s.cancel() if w := s.getWorker(false); w != nil { w.Close() } @@ -472,9 +491,8 @@ func (s *Server) doClose() { // Close close the RPC server, this function can be called multiple times func (s *Server) Close() { - s.doClose() s.stopKeepAlive() - s.wg.Wait() + s.doClose() } // if needLock is false, we should make sure Server has been locked in caller @@ -503,6 +521,7 @@ func (s *Server) getSourceStatus(needLock bool) pb.SourceStatus { return s.sourceStatus } +// TODO: move some call to setWorker/getOrStartWorker func (s *Server) setSourceStatus(source string, err error, needLock bool) { if needLock { s.Lock() @@ -578,7 +597,7 @@ OUTER: } } } - log.L().Info("worker server is closed, handleSourceBound will quit now") + log.L().Info("handleSourceBound will quit now") return nil } @@ -701,11 +720,11 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro // because no re-assigned mechanism exists for keepalived DM-worker yet. return err2 } - s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) if err2 = w.EnableRelay(); err2 != nil { s.setSourceStatus(sourceCfg.SourceID, err2, false) return err2 } + s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) return nil } @@ -810,6 +829,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Wor return nil, terror.ErrWorkerAlreadyStart.Generate(w.name, w.cfg.SourceID, cfg.SourceID) } + log.L().Info("will start a new worker", zap.String("sourceID", cfg.SourceID)) w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { return nil, err diff --git a/dm/worker/worker.go b/dm/worker/worker.go index d080e74519..3d2583a0d6 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/dm/pkg/etcdutil" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay/purger" @@ -182,6 +183,7 @@ func (w *Worker) Close() { // EnableRelay enables the functionality of start/watch/handle relay func (w *Worker) EnableRelay() (err error) { + w.l.Info("enter EnableRelay") w.Lock() defer w.Unlock() if w.relayEnabled.Get() { @@ -239,7 +241,7 @@ func (w *Worker) EnableRelay() (err error) { } startImmediately := !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running if startImmediately { - w.l.Info("relay is started") + w.l.Info("start relay for existing relay stage") w.relayHolder.Start() w.relayPurger.Start() } @@ -261,6 +263,7 @@ func (w *Worker) EnableRelay() (err error) { // DisableRelay disables the functionality of start/watch/handle relay func (w *Worker) DisableRelay() { + w.l.Info("enter DisableRelay") w.Lock() defer w.Unlock() if !w.relayEnabled.CompareAndSwap(true, false) { @@ -291,10 +294,12 @@ func (w *Worker) DisableRelay() { w.relayPurger = nil r.Close() } + w.l.Info("relay disabled") } // EnableHandleSubtasks enables the functionality of start/watch/handle subtasks func (w *Worker) EnableHandleSubtasks() error { + w.l.Info("enter EnableHandleSubtasks") w.Lock() defer w.Unlock() if w.subTaskEnabled.Get() { @@ -341,6 +346,7 @@ func (w *Worker) EnableHandleSubtasks() error { // DisableHandleSubtasks disables the functionality of start/watch/handle subtasks func (w *Worker) DisableHandleSubtasks() { + w.l.Info("enter DisableHandleSubtasks") if !w.subTaskEnabled.CompareAndSwap(true, false) { w.l.Warn("already disabled handling subtasks") return @@ -354,6 +360,7 @@ func (w *Worker) DisableHandleSubtasks() { // close all sub tasks w.subTaskHolder.closeAllSubTasks() + w.l.Info("handling subtask enabled") } // fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status @@ -765,6 +772,7 @@ func (w *Worker) operateRelay(ctx context.Context, op pb.RelayOp) error { } if w.relayEnabled.Get() { + // TODO: lock the worker? return w.relayHolder.Operate(ctx, op) } @@ -778,12 +786,35 @@ func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) erro return terror.ErrWorkerAlreadyClosed.Generate() } - if w.relayEnabled.Get() { - return w.relayPurger.Do(ctx, req) + if !w.relayEnabled.Get() { + w.l.Warn("enable-relay is false, ignore purge relay") + return nil } - w.l.Warn("enable-relay is false, ignore purge relay") - return nil + if !w.subTaskEnabled.Get() { + w.l.Info("worker received purge-relay but didn't handling subtasks, read global checkpoint to decided active relay log") + + uuid := w.relayHolder.Status(ctx).RelaySubDir + + _, subTaskCfgs, _, err := w.fetchSubTasksAndAdjust() + if err != nil { + return err + } + for _, subTaskCfg := range subTaskCfgs { + loc, err2 := getMinLocForSubTaskFunc(ctx, subTaskCfg) + if err2 != nil { + return err2 + } + w.l.Info("update active relay log with", + zap.String("task name", subTaskCfg.Name), + zap.String("uuid", uuid), + zap.String("binlog name", loc.Position.Name)) + if err3 := streamer.GetReaderHub().UpdateActiveRelayLog(subTaskCfg.Name, uuid, loc.Position.Name); err3 != nil { + w.l.Error("Error when update active relay log", zap.Error(err3)) + } + } + } + return w.relayPurger.Do(ctx, req) } // ForbidPurge implements PurgeInterceptor.ForbidPurge diff --git a/pkg/ha/keepalive.go b/pkg/ha/keepalive.go index be4f2e7035..d34c1c6303 100644 --- a/pkg/ha/keepalive.go +++ b/pkg/ha/keepalive.go @@ -154,6 +154,7 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee oldLeaseID := leaseID leaseID, err = grantAndPutKV(k, workerEventJSON, newTTL) if err != nil { + log.L().Error("meet error when grantAndPutKV keepalive TTL", zap.Error(err)) keepAliveCancel() // make go vet happy return err } diff --git a/pkg/ha/source.go b/pkg/ha/source.go index afa31d8f1d..e31122cee5 100644 --- a/pkg/ha/source.go +++ b/pkg/ha/source.go @@ -108,8 +108,9 @@ func ClearTestInfoOperation(cli *clientv3.Client) error { clearBound := clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix()) clearLastBound := clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix()) clearRelayStage := clientv3.OpDelete(common.StageRelayKeyAdapter.Path(), clientv3.WithPrefix()) + clearRelayConfig := clientv3.OpDelete(common.UpstreamRelayWorkerKeyAdapter.Path(), clientv3.WithPrefix()) clearSubTaskStage := clientv3.OpDelete(common.StageSubTaskKeyAdapter.Path(), clientv3.WithPrefix()) _, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clearSource, clearTask, clearSubTask, clearWorkerInfo, clearBound, - clearLastBound, clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage) + clearLastBound, clearWorkerKeepAlive, clearRelayStage, clearRelayConfig, clearSubTaskStage) return err } diff --git a/pkg/streamer/hub.go b/pkg/streamer/hub.go index 562b3b0ad8..994f270f40 100644 --- a/pkg/streamer/hub.go +++ b/pkg/streamer/hub.go @@ -18,19 +18,14 @@ import ( "strings" "sync" - "github.com/siddontang/go-mysql/mysql" - "github.com/pingcap/dm/pkg/binlog" - "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) var ( - readerHub *ReaderHub // singleton instance - relayMetaHub *RelayMetaHub - relayMetaOnce sync.Once - once sync.Once + readerHub *ReaderHub // singleton instance + once sync.Once ) // RelayLogInfo represents information for relay log @@ -142,48 +137,3 @@ func (h *ReaderHub) EarliestActiveRelayLog() *RelayLogInfo { _, rli := h.rlih.earliest() return rli } - -// RelayMetaHub holds information for relay metas -type RelayMetaHub struct { - mu sync.RWMutex - meta Meta -} - -// GetRelayMetaHub gets singleton instance of RelayMetaHub -func GetRelayMetaHub() *RelayMetaHub { - relayMetaOnce.Do(func() { - relayMetaHub = &RelayMetaHub{} - }) - return relayMetaHub -} - -// GetMeta gets all metas -func (r *RelayMetaHub) GetMeta() Meta { - r.mu.Lock() - defer r.mu.Unlock() - return r.meta -} - -// SetMeta sets meta -func (r *RelayMetaHub) SetMeta(uuid string, pos mysql.Position, gset gtid.Set) { - gs := "" - if gset != nil { - gs = gset.String() - } - meta := Meta{ - BinLogPos: pos.Pos, - BinLogName: pos.Name, - BinlogGTID: gs, - UUID: uuid, - } - r.mu.Lock() - defer r.mu.Unlock() - r.meta = meta -} - -// ClearMeta clears meta -func (r *RelayMetaHub) ClearMeta() { - r.mu.Lock() - defer r.mu.Unlock() - r.meta = Meta{} -} diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 600d694832..7ae0b63ef4 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -78,33 +78,6 @@ func GetAllServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, error return serverIDs, nil } -// ReuseServerID reuse given server ID or get a new server ID -func ReuseServerID(ctx context.Context, serverID uint32, db *sql.DB) (uint32, error) { - serverIDs, err := GetAllServerID(ctx, db) - if err != nil { - return 0, err - } - - if _, ok := serverIDs[serverID]; !ok && serverID > 0 { - // reuse given server ID - return serverID, nil - } - - rand.Seed(time.Now().UnixNano()) - for i := 0; i < 99999; i++ { - randomValue := uint32(rand.Intn(100000)) - randomServerID := uint32(defaultBaseServerID) + randomValue - if _, ok := serverIDs[randomServerID]; ok { - continue - } - - return randomServerID, nil - } - - // should never happened unless the master has too many slave. - return 0, terror.ErrInvalidServerID.Generatef("can't find a random available server ID") -} - // GetRandomServerID gets a random server ID which is not used func GetRandomServerID(ctx context.Context, db *sql.DB) (uint32, error) { rand.Seed(time.Now().UnixNano()) diff --git a/relay/relay.go b/relay/relay.go index df6d54fdc5..cdd55c8345 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -120,8 +120,6 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } - - relayMetaHub *pkgstreamer.RelayMetaHub } // NewRealRelay creates an instance of Relay. @@ -164,9 +162,6 @@ func (r *Relay) Init(ctx context.Context) (err error) { return err } - r.relayMetaHub = pkgstreamer.GetRelayMetaHub() - r.relayMetaHub.ClearMeta() - return reportRelayLogSpaceInBackground(ctx, r.cfg.RelayDir) } @@ -255,9 +250,21 @@ func (r *Relay) process(ctx context.Context) error { if err2 != nil { return err2 } - err2 = r.SaveMeta(mysql.Position{Name: neededBinlogName, Pos: binlog.MinPosition.Pos}, neededBinlogGset) - if err2 != nil { - return err2 + uuidWithSuffix := r.meta.UUID() // only change after switch + uuid, _, err3 := utils.ParseSuffixForUUID(uuidWithSuffix) + if err3 != nil { + r.logger.Error("parse suffix for UUID when relay meta oudated", zap.String("UUID", uuidWithSuffix), zap.Error(err)) + return err3 + } + + pos := &mysql.Position{Name: neededBinlogName, Pos: binlog.MinPosition.Pos} + err = r.meta.AddDir(uuid, pos, neededBinlogGset, r.cfg.UUIDSuffix) + if err != nil { + return err + } + err = r.meta.Load() + if err != nil { + return err } } } @@ -728,7 +735,8 @@ func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { ctx2, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout) defer cancel() - randomServerID, err := utils.ReuseServerID(ctx2, r.cfg.ServerID, r.db) + // always use a new random serverID + randomServerID, err := utils.GetRandomServerID(ctx2, r.db) if err != nil { // should never happened unless the master has too many slave return nil, terror.Annotate(err, "fail to get random server id for relay reader") @@ -786,17 +794,12 @@ func (r *Relay) IsClosed() bool { // SaveMeta save relay meta and update meta in RelayLogInfo func (r *Relay) SaveMeta(pos mysql.Position, gset gtid.Set) error { - if err := r.meta.Save(pos, gset); err != nil { - return err - } - r.relayMetaHub.SetMeta(r.meta.UUID(), pos, gset) - return nil + return r.meta.Save(pos, gset) } // ResetMeta reset relay meta func (r *Relay) ResetMeta() { r.meta = NewLocalMeta(r.cfg.Flavor, r.cfg.RelayDir) - r.relayMetaHub.ClearMeta() } // FlushMeta flush relay meta @@ -998,7 +1001,8 @@ func (r *Relay) setSyncConfig() error { func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) { // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := r.syncerCfg - randomServerID, err := utils.ReuseServerID(ctx, r.cfg.ServerID, r.db) + // always use a new random serverID + randomServerID, err := utils.GetRandomServerID(ctx, r.db) if err != nil { return nil, terror.Annotate(err, "fail to get random server id when relay adjust gtid") } diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index 09f10133c8..188c64ede9 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -252,6 +252,7 @@ func (c *StreamerController) GetEvent(tctx *tcontext.Context) (event *replicatio if err != nil { if err != context.Canceled && err != context.DeadlineExceeded { c.Lock() + tctx.L().Error("meet error when get binlog event", zap.Error(err)) c.meetError = true c.Unlock() } diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index ebf00b6fb3..64612015a9 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -20,6 +20,19 @@ function cleanup_process() { wait_process_exit dm-syncer.test } +function wait_pattern_exit() { + pattern=$1 + while true + do + if [ "pgrep -f $pattern" != "0" ]; then + echo "pattern $pattern already exit" + return 0 + fi + sleep 0.2 + echo "wait pattern $pattern exit..." + done +} + if [ "$RESET_MASTER" = true ]; then run_sql "RESET MASTER" $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "RESET MASTER" $MYSQL_PORT2 $MYSQL_PASSWORD2 diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 2e67cffc9d..4cb4a982c5 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -224,8 +224,8 @@ function run() { check_log_contain_with_retry '\\"tidb_txn_mode\\":\\"optimistic\\"' $WORK_DIR/worker2/log/dm-worker.log # restart dm-worker1 - pkill -hup dm-worker1.toml 2>/dev/null || true - wait_process_exit dm-worker1.toml + pkill -hup -f dm-worker1.toml 2>/dev/null || true + wait_pattern_exit dm-worker1.toml run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT # make sure worker1 have bound a source, and the source should same with bound before @@ -234,8 +234,8 @@ function run() { "worker1" 1 # restart dm-worker2 - pkill -hup dm-worker2.toml 2>/dev/null || true - wait_process_exit dm-worker2.toml + pkill -hup -f dm-worker2.toml 2>/dev/null || true + wait_pattern_exit dm-worker2.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/tests/dmctl_basic/check_list/query_status.sh b/tests/dmctl_basic/check_list/query_status.sh index bfd2d1371c..a27f38ad00 100644 --- a/tests/dmctl_basic/check_list/query_status.sh +++ b/tests/dmctl_basic/check_list/query_status.sh @@ -9,7 +9,7 @@ function query_status_wrong_arg() { function query_status_wrong_params() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s source-x task-y" \ - "source-x relevant worker-client not found" 1 + "sources \[source-x\] haven't been added" 1 } function query_status_with_no_tasks() { diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 2d2f40eda4..aca1daa7d5 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -172,6 +172,10 @@ function test_kill_master() { function test_kill_and_isolate_worker() { + inject_points=("github.com/pingcap/dm/dm/worker/defaultKeepAliveTTL=return(1)" + "github.com/pingcap/dm/dm/worker/defaultRelayKeepAliveTTL=return(2)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" echo "[$(date)] <<<<<< start test_kill_and_isolate_worker >>>>>>" test_running @@ -265,6 +269,7 @@ function test_kill_and_isolate_worker() { echo "use sync_diff_inspector to check increment2 data now!" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml echo "[$(date)] <<<<<< finish test_kill_and_isolate_worker >>>>>>" + export GO_FAILPOINTS="" } # usage: test_kill_master_in_sync leader @@ -369,7 +374,6 @@ function test_standalone_running() { check_sync_diff $WORK_DIR $cur/conf/diff-standalone-config.toml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml - sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/standalone-task2.yaml" \ @@ -396,7 +400,7 @@ function test_standalone_running() { echo "kill $worker_name" ps aux | grep dm-worker${worker_idx} |awk '{print $2}'|xargs kill || true check_port_offline ${worker_ports[$worker_idx]} 20 - rm -rf $WORK_DIR/worker${worker_idx}/relay_log + rm -rf $WORK_DIR/worker${worker_idx}/relay-dir # test running, test2 fail run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/tests/new_relay/conf/diff_config.toml b/tests/new_relay/conf/diff_config.toml new file mode 100644 index 0000000000..a10afcf774 --- /dev/null +++ b/tests/new_relay/conf/diff_config.toml @@ -0,0 +1,40 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "new_relay" +tables = ["~t.*"] + +[[table-config]] +schema = "new_relay" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "new_relay" +table = "t1" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +instance-id = "source-1" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/tests/new_relay/conf/dm-master.toml b/tests/new_relay/conf/dm-master.toml new file mode 100644 index 0000000000..9a36bcbc84 --- /dev/null +++ b/tests/new_relay/conf/dm-master.toml @@ -0,0 +1,3 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" diff --git a/tests/new_relay/conf/dm-task.yaml b/tests/new_relay/conf/dm-task.yaml new file mode 100644 index 0000000000..3abc7a87f8 --- /dev/null +++ b/tests/new_relay/conf/dm-task.yaml @@ -0,0 +1,44 @@ +--- +name: test +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +timezone: "Asia/Shanghai" +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["new_relay"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/new_relay/conf/dm-worker1.toml b/tests/new_relay/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/new_relay/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/new_relay/conf/dm-worker2.toml b/tests/new_relay/conf/dm-worker2.toml new file mode 100644 index 0000000000..010e21c73e --- /dev/null +++ b/tests/new_relay/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/new_relay/conf/dm-worker3.toml b/tests/new_relay/conf/dm-worker3.toml new file mode 100644 index 0000000000..ab7e1b9cb3 --- /dev/null +++ b/tests/new_relay/conf/dm-worker3.toml @@ -0,0 +1,2 @@ +name = "worker3" +join = "127.0.0.1:8261" diff --git a/tests/new_relay/conf/source1.yaml b/tests/new_relay/conf/source1.yaml new file mode 100644 index 0000000000..406e7ae254 --- /dev/null +++ b/tests/new_relay/conf/source1.yaml @@ -0,0 +1,13 @@ +source-id: mysql-replica-01 +flavor: 'mysql' +enable-gtid: true +relay-binlog-name: '' +relay-binlog-gtid: '' +enable-relay: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/tests/new_relay/data/db1.increment.sql b/tests/new_relay/data/db1.increment.sql new file mode 100644 index 0000000000..0ff0143c4e --- /dev/null +++ b/tests/new_relay/data/db1.increment.sql @@ -0,0 +1,27 @@ +use new_relay; +insert into t1 (id, name) values (3, 'Eddard Stark'); +update t1 set name = 'Arya Stark' where id = 1; +update t1 set name = 'Catelyn Stark' where name = 'catelyn'; +flush logs; + +-- test multi column index with generated column +alter table t1 add column info json; +alter table t1 add column gen_id int as (info->"$.id"); +alter table t1 add index multi_col(`id`, `gen_id`); +insert into t1 (id, name, info) values (4, 'gentest', '{"id": 123}'); +insert into t1 (id, name, info) values (5, 'gentest', '{"id": 124}'); +update t1 set info = '{"id": 120}' where id = 1; +update t1 set info = '{"id": 121}' where id = 2; +flush logs; +update t1 set info = '{"id": 122}' where id = 3; + +-- test genColumnCache is reset after ddl +alter table t1 add column info2 varchar(40); +insert into t1 (id, name, info) values (6, 'gentest', '{"id": 125, "test cache": false}'); +alter table t1 add unique key gen_idx(`gen_id`); +update t1 set name = 'gentestxx' where gen_id = 123; + +insert into t1 (id, name, info) values (7, 'gentest', '{"id": 126}'); +update t1 set name = 'gentestxxxxxx' where gen_id = 124; +-- delete with unique key +delete from t1 where gen_id > 124; diff --git a/tests/new_relay/data/db1.increment2.sql b/tests/new_relay/data/db1.increment2.sql new file mode 100644 index 0000000000..7c144ac5f6 --- /dev/null +++ b/tests/new_relay/data/db1.increment2.sql @@ -0,0 +1,4 @@ +flush logs; +insert into new_relay.t1 (id, name, info) values (8, 'gentest', '{"id": 128}'); +insert into new_relay.t1 (id, name, info) values (9, 'gentest', '{"id": 129}'), (10, 'gentest', '{"id": 130}'); +alter table new_relay.t1 add column notused int; diff --git a/tests/new_relay/data/db1.increment3.sql b/tests/new_relay/data/db1.increment3.sql new file mode 100644 index 0000000000..a3eb51c204 --- /dev/null +++ b/tests/new_relay/data/db1.increment3.sql @@ -0,0 +1,2 @@ +flush logs; +alter table new_relay.t1 add column notused2 int; diff --git a/tests/new_relay/data/db1.increment4.sql b/tests/new_relay/data/db1.increment4.sql new file mode 100644 index 0000000000..095c538596 --- /dev/null +++ b/tests/new_relay/data/db1.increment4.sql @@ -0,0 +1,2 @@ +alter table new_relay.t1 drop column notused, drop column notused2; +insert into new_relay.t1 (id, name, info) values (11, 'gentest', '{"id": 131}'), (12, 'gentest', '{"id": 132}'); diff --git a/tests/new_relay/data/db1.prepare.sql b/tests/new_relay/data/db1.prepare.sql new file mode 100644 index 0000000000..540b577fdc --- /dev/null +++ b/tests/new_relay/data/db1.prepare.sql @@ -0,0 +1,5 @@ +drop database if exists `new_relay`; +create database `new_relay`; +use `new_relay`; +create table t1 (id int, name varchar(20), primary key(`id`)); +insert into t1 (id, name) values (1, 'arya'), (2, 'catelyn'); diff --git a/tests/new_relay/run.sh b/tests/new_relay/run.sh new file mode 100755 index 0000000000..a76cd68868 --- /dev/null +++ b/tests/new_relay/run.sh @@ -0,0 +1,110 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +TASK_NAME="test" +SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt" + +API_VERSION="v1alpha1" + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1 worker2" \ + "\"result\": true" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $SOURCE_ID1 worker1" \ + "\"result\": true" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 3 \ + "\"worker\": \"worker1\"" 1 \ + "\"worker\": \"worker2\"" 1 + + dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + + # subtask is preferred to scheduled to another relay worker + pkill -hup -f dm-worker1.toml 2>/dev/null || true + wait_pattern_exit dm-worker1.toml + # worker1 is down, worker2 has running relay and sync unit + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "connect: connection refused" 1 \ + "\"stage\": \"Running\"" 2 + + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # after restarting, worker will purge relay log directory because checkpoint is newer than relay.meta + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 3 \ + "\"worker\": \"worker1\"" 1 \ + "\"worker\": \"worker2\"" 1 + + # test purge-relay for all relay workers + run_sql_source1 "show binary logs\G" + max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}') + server_uuid_1=$(tail -n 1 $WORK_DIR/worker1/relay-dir/server-uuid.index) + relay_log_count_1=$(($(ls $WORK_DIR/worker1/relay-dir/$server_uuid_1 | wc -l) - 1)) + server_uuid_2=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index) + relay_log_count_2=$(($(ls $WORK_DIR/worker2/relay-dir/$server_uuid_2 | wc -l) - 1)) + [ "$relay_log_count_1" -ne 1 ] + [ "$relay_log_count_2" -ne 1 ] + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "purge-relay --filename $max_binlog_name -s $SOURCE_ID1" \ + "\"result\": true" 3 + new_relay_log_count_1=$(($(ls $WORK_DIR/worker1/relay-dir/$server_uuid_1 | wc -l) - 1)) + new_relay_log_count_2=$(($(ls $WORK_DIR/worker2/relay-dir/$server_uuid_2 | wc -l) - 1)) + [ "$new_relay_log_count_1" -eq 1 ] + [ "$new_relay_log_count_2" -eq 1 ] + + pkill -hup -f dm-worker1.toml 2>/dev/null || true + wait_pattern_exit dm-worker1.toml + pkill -hup -f dm-worker2.toml 2>/dev/null || true + wait_pattern_exit dm-worker2.toml + + # if all relay workers are offline, relay-not-enabled worker should continue to sync + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 2 \ + "\"worker\": \"worker3\"" 1 + + run_sql_file $cur/data/db1.increment4.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +cleanup_data $TEST_NAME +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration.txt b/tests/others_integration.txt index b4085e81b1..e187b8c4c3 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -1,3 +1,4 @@ +new_relay import_v10x tls sharding2 diff --git a/tests/start_task/run.sh b/tests/start_task/run.sh index 1c439ddf63..60a3eaaf96 100644 --- a/tests/start_task/run.sh +++ b/tests/start_task/run.sh @@ -93,7 +93,7 @@ function run() { echo "check un-accessible DM-worker exists" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s 127.0.0.1:8888" \ - "127.0.0.1:8888 relevant worker-client not found" 1 + "sources \[127.0.0.1:8888\] haven't been added" 1 echo "start task and will failed" task_conf="$cur/conf/dm-task.yaml"