Skip to content

Commit

Permalink
cherry pick pingcap#1507 to release-2.0 (pingcap#1513)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Mar 17, 2021
1 parent d4607cb commit d68c1bf
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 149 deletions.
98 changes: 8 additions & 90 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,73 +557,19 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return terror.ErrWorkerAlreadyStart.Generate()
}

// we get the newest subtask stages directly which will omit the subtask stage PUT/DELETE event
// because triggering these events is useless now
subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(s.etcdClient, cfg.SourceID)
if err != nil {
return err
}

subTaskCfgs := make([]*config.SubTaskConfig, 0, len(subTaskCfgm))
for _, subTaskCfg := range subTaskCfgm {
subTaskCfg.LogLevel = s.cfg.LogLevel
subTaskCfg.LogFile = s.cfg.LogFile
subTaskCfg.LogFormat = s.cfg.LogFormat
subTaskCfgClone := subTaskCfg
if err = copyConfigFromSource(&subTaskCfgClone, cfg); err != nil {
return err
}
subTaskCfgs = append(subTaskCfgs, &subTaskCfgClone)
}

if cfg.EnableRelay {
dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs)
if err1 != nil {
return err1
}

if minLoc != nil {
log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name
cfg.RelayBinlogGTID = minLoc.GTIDSetStr()
// set UUIDSuffix when bound to a source
cfg.UUIDSuffix, err = binlog.ExtractSuffix(minLoc.Position.Name)
if err != nil {
return err
}
} else {
// set UUIDSuffix even not checkpoint exist
// so we will still remove relay dir
cfg.UUIDSuffix = binlog.MinUUIDSuffix
}
}

log.L().Info("starting to handle mysql source", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs))
w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name)
if err != nil {
return err
}
s.setWorker(w, false)

startRelay := false
var revRelay int64
if cfg.EnableRelay {
var relayStage ha.Stage
// we get the newest relay stages directly which will omit the relay stage PUT/DELETE event
// because triggering these events is useless now
relayStage, revRelay, err = ha.GetRelayStage(s.etcdClient, cfg.SourceID)
if err != nil {
// TODO: need retry
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL)
if err2 := w.EnableRelay(); err2 != nil {
return err2
}
}
go func() {
w.Start(startRelay)
}()
go w.Start()

isStarted := utils.WaitSomething(50, 100*time.Millisecond, func() bool {
return w.closed.Get() == closedFalse
Expand All @@ -633,37 +579,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return terror.ErrWorkerNoStart
}

for _, subTaskCfg := range subTaskCfgs {
expectStage := subTaskStages[subTaskCfg.Name]
if expectStage.IsDeleted {
continue
}
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
if err := w.StartSubTask(subTaskCfg, expectStage.Expect); err != nil {
return err
}
}

w.wg.Add(1)
go func() {
defer w.wg.Done()
// TODO: handle fatal error from observeSubtaskStage
//nolint:errcheck
w.observeSubtaskStage(w.ctx, s.etcdClient, revSubTask)
}()

if cfg.EnableRelay {
w.wg.Add(1)
go func() {
defer w.wg.Done()
// TODO: handle fatal error from observeRelayStage
//nolint:errcheck
w.observeRelayStage(w.ctx, s.etcdClient, revRelay)
}()
}

err = w.EnableHandleSubtasks()
log.L().Info("started to handle mysql source", zap.String("sourceCfg", cfg.String()))
return nil
return err
}

func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
Expand All @@ -679,7 +597,7 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {

// all subTask in subTaskCfgs should have same source
// this function return the min location in all subtasks, used for relay's location
func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minLoc *binlog.Location, err error) {
func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs map[string]config.SubTaskConfig) (minLoc *binlog.Location, err error) {
for _, subTaskCfg := range subTaskCfgs {
loc, err := getMinLocForSubTaskFunc(ctx, subTaskCfg)
if err != nil {
Expand All @@ -702,7 +620,7 @@ func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskCo
return minLoc, nil
}

func getMinLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) (minLoc *binlog.Location, err error) {
if subTaskCfg.Mode == config.ModeFull {
return nil, nil
}
Expand Down
20 changes: 9 additions & 11 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,22 +532,20 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed.
}

func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
subTaskCfg := []*config.SubTaskConfig{
{
Name: "test2",
}, {
Name: "test3",
}, {
Name: "test1",
},

subTaskCfg := map[string]config.SubTaskConfig{
"test2": {Name: "test2"},
"test3": {Name: "test3"},
"test1": {Name: "test1"},
}
minLoc, err := getMinLocInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001")
c.Assert(minLoc.Position.Pos, Equals, uint32(12))

for _, subtask := range subTaskCfg {
subtask.EnableGTID = true
for k, cfg := range subTaskCfg {
cfg.EnableGTID = true
subTaskCfg[k] = cfg
}

minLoc, err = getMinLocInAllSubTasks(context.Background(), subTaskCfg)
Expand Down Expand Up @@ -666,7 +664,7 @@ func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
c.Assert(relay.RelayCatchUpMaster, IsTrue)
}

func getFakeLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
func getFakeLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) (minLoc *binlog.Location, err error) {
gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30")
gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50")
gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
Expand Down
161 changes: 134 additions & 27 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -68,7 +69,8 @@ type Worker struct {
name string
}

// NewWorker creates a new Worker
// NewWorker creates a new Worker. The functionality of relay and subtask is disabled by default, need call EnableRelay
// and EnableSubtask later
func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) {
w = &Worker{
cfg: cfg,
Expand All @@ -89,18 +91,6 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin
}
}(w)

if cfg.EnableRelay {
// initial relay holder, the cfg's password need decrypt
w.relayHolder = NewRelayHolder(cfg)
purger1, err1 := w.relayHolder.Init([]purger.PurgeInterceptor{
w,
})
if err1 != nil {
return nil, err1
}
w.relayPurger = purger1
}

// initial task status checker
if w.cfg.Checker.CheckEnable {
tsc := NewTaskStatusChecker(w.cfg.Checker, w)
Expand All @@ -119,17 +109,7 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin
}

// Start starts working
func (w *Worker) Start(startRelay bool) {

if w.cfg.EnableRelay && startRelay {
log.L().Info("relay is started")
// start relay
w.relayHolder.Start()

// start purger
w.relayPurger.Start()
}

func (w *Worker) Start() {
// start task status checker
if w.cfg.Checker.CheckEnable {
w.taskStatusChecker.Start()
Expand Down Expand Up @@ -190,6 +170,122 @@ func (w *Worker) Close() {
w.l.Info("Stop worker")
}

// EnableRelay enables the functionality of start/watch/handle relay
func (w *Worker) EnableRelay() error {
// 1. adjust relay starting position, to the earliest of subtasks
_, subTaskCfgs, _, err := w.fetchSubTasksAndAdjust()
if err != nil {
return err
}

dctx, dcancel := context.WithTimeout(w.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs)
if err1 != nil {
return err1
}

if minLoc != nil {
log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
w.cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name
w.cfg.RelayBinlogGTID = minLoc.GTIDSetStr()
// set UUIDSuffix when bound to a source
w.cfg.UUIDSuffix, err = binlog.ExtractSuffix(minLoc.Position.Name)
if err != nil {
return err
}
} else {
// set UUIDSuffix even not checkpoint exist
// so we will still remove relay dir
w.cfg.UUIDSuffix = binlog.MinUUIDSuffix
}

// 2. initial relay holder, the cfg's password need decrypt
w.relayHolder = NewRelayHolder(w.cfg)
relayPurger, err := w.relayHolder.Init([]purger.PurgeInterceptor{
w,
})
if err != nil {
return err
}
w.relayPurger = relayPurger

// 3. get relay stage from etcd and check if need starting
// we get the newest relay stages directly which will omit the relay stage PUT/DELETE event
// because triggering these events is useless now
relayStage, revRelay, err := ha.GetRelayStage(w.etcdClient, w.cfg.SourceID)
if err != nil {
// TODO: need retry
return err
}
startImmediately := !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
if startImmediately {
log.L().Info("relay is started")
w.relayHolder.Start()
w.relayPurger.Start()
}

// 4. watch relay stage
w.wg.Add(1)
go func() {
defer w.wg.Done()
// TODO: handle fatal error from observeRelayStage
//nolint:errcheck
w.observeRelayStage(w.ctx, w.etcdClient, revRelay)
}()
return nil
}

// EnableHandleSubtasks enables the functionality of start/watch/handle subtasks
func (w *Worker) EnableHandleSubtasks() error {
subTaskStages, subTaskCfgM, revSubTask, err := w.fetchSubTasksAndAdjust()
if err != nil {
return err
}

log.L().Info("starting to handle mysql source", zap.String("sourceCfg", w.cfg.String()), zap.Any("subTasks", subTaskCfgM))

for _, subTaskCfg := range subTaskCfgM {
expectStage := subTaskStages[subTaskCfg.Name]
if expectStage.IsDeleted {
continue
}
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
// for range of a map will use a same value-address, so we'd better not pass value-address to other function
clone := subTaskCfg
if err := w.StartSubTask(&clone, expectStage.Expect); err != nil {
return err
}
}

w.wg.Add(1)
go func() {
defer w.wg.Done()
// TODO: handle fatal error from observeSubtaskStage
//nolint:errcheck
w.observeSubtaskStage(w.ctx, w.etcdClient, revSubTask)
}()

return nil
}

// fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status
// source **must not be empty**
// return map{task name -> subtask stage}, map{task name -> subtask config}, revision, error.
func (w *Worker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string]config.SubTaskConfig, int64, error) {
// we get the newest subtask stages directly which will omit the subtask stage PUT/DELETE event
// because triggering these events is useless now
subTaskStages, subTaskCfgM, revSubTask, err := ha.GetSubTaskStageConfig(w.etcdClient, w.cfg.SourceID)
if err != nil {
return nil, nil, 0, err
}

if err = copyConfigFromSourceForEach(subTaskCfgM, w.cfg); err != nil {
return nil, nil, 0, err
}
return subTaskStages, subTaskCfgM, revSubTask, nil
}

// StartSubTask creates a sub task an run it
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) error {
w.Lock()
Expand Down Expand Up @@ -298,8 +394,8 @@ func (w *Worker) QueryStatus(ctx context.Context, name string) []*pb.SubTaskStat
return w.Status(ctx2, name)
}

func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(etcdCli, w.cfg.SourceID)
func (w *Worker) resetSubtaskStage() (int64, error) {
subTaskStages, subTaskCfgm, revSubTask, err := w.fetchSubTasksAndAdjust()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -361,7 +457,7 @@ func (w *Worker) observeSubtaskStage(ctx context.Context, etcdCli *clientv3.Clie
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
rev, err = w.resetSubtaskStage(etcdCli)
rev, err = w.resetSubtaskStage()
if err != nil {
log.L().Error("resetSubtaskStage is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
}
Expand Down Expand Up @@ -663,6 +759,17 @@ func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceCon
return nil
}

// copyConfigFromSourceForEach do copyConfigFromSource for each value in subTaskCfgM and change subTaskCfgM in-place
func copyConfigFromSourceForEach(subTaskCfgM map[string]config.SubTaskConfig, sourceCfg *config.SourceConfig) error {
for k, subTaskCfg := range subTaskCfgM {
if err2 := copyConfigFromSource(&subTaskCfg, sourceCfg); err2 != nil {
return err2
}
subTaskCfgM[k] = subTaskCfg
}
return nil
}

// getAllSubTaskStatus returns all subtask status of this worker, note the field
// in subtask status is not completed, only includes `Name`, `Stage` and `Result` now
func (w *Worker) getAllSubTaskStatus() map[string]*pb.SubTaskStatus {
Expand Down
Loading

0 comments on commit d68c1bf

Please sign in to comment.