Skip to content

Commit

Permalink
cherry pick pingcap#1533 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
lance6716 authored and ti-srebot committed Apr 9, 2021
1 parent 11fad2f commit 03dc7b3
Show file tree
Hide file tree
Showing 34 changed files with 985 additions and 215 deletions.
8 changes: 4 additions & 4 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"strings"
"syscall"

"github.com/pingcap/errors"
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
globalLog "github.com/pingcap/log"

"github.com/pingcap/errors"
"go.uber.org/zap"
)

func main() {
Expand Down
6 changes: 6 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"io/ioutil"
"math"
"math/rand"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -257,6 +259,9 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
if c.EnableRelay && len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}
if filepath.IsAbs(c.RelayDir) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return nil
}
Expand Down Expand Up @@ -293,6 +298,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
return terror.WithScope(err, terror.ScopeUpstream)
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
Expand Down
289 changes: 285 additions & 4 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,195 @@ func (s *Scheduler) UnboundSources() []string {
return IDs
}

<<<<<<< HEAD
=======
// StartRelay puts etcd key-value pairs to start relay on some workers
func (s *Scheduler) StartRelay(source string, workers []string) error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.started {
return terror.ErrSchedulerNotStarted.Generate()
}

// 1. precheck
if _, ok := s.sourceCfgs[source]; !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(source)
}
startedWorkers := s.relayWorkers[source]
if startedWorkers == nil {
startedWorkers = map[string]struct{}{}
s.relayWorkers[source] = startedWorkers
}
var (
notExistWorkers []string
// below two list means the worker that requested start-relay has bound to another source
boundWorkers, boundSources []string
alreadyStarted []string
)
for _, worker := range workers {
if _, ok := s.workers[worker]; !ok {
notExistWorkers = append(notExistWorkers, worker)
}
if _, ok := startedWorkers[worker]; ok {
alreadyStarted = append(alreadyStarted, worker)
}

for source2, w := range s.bounds {
if source2 == source {
continue
}
if w.BaseInfo().Name == worker {
boundWorkers = append(boundWorkers, worker)
boundSources = append(boundSources, source2)
}
}
}
if len(notExistWorkers) > 0 {
return terror.ErrSchedulerWorkerNotExist.Generate(notExistWorkers)
}
if len(boundWorkers) > 0 {
return terror.ErrSchedulerRelayWorkersWrongBound.Generate(boundWorkers, boundSources)
}
if len(alreadyStarted) > 0 {
s.logger.Warn("some workers already started relay",
zap.String("source", source),
zap.Strings("already started workers", alreadyStarted))
}

// currently we forbid one worker starting multiple relay
// worker -> source
oldSource := map[string]string{}
for source2, workers2 := range s.relayWorkers {
if source2 == source {
continue
}
for w := range workers2 {
oldSource[w] = source2
}
}
var busyWorkers, busySources []string
for _, w := range workers {
source2 := oldSource[w]
if source2 != "" {
busyWorkers = append(busyWorkers, w)
busySources = append(busySources, source2)
}
}
if len(busyWorkers) > 0 {
return terror.ErrSchedulerRelayWorkersBusy.Generate(busyWorkers, busySources)
}

// 2. put etcd and update memory cache
// if there's no relay stage, create a running one. otherwise we should respect paused stage
if len(startedWorkers) == 0 {
stage := ha.NewRelayStage(pb.Stage_Running, source)
if _, err := ha.PutRelayStage(s.etcdCli, stage); err != nil {
return err
}
s.expectRelayStages[source] = stage
}
if _, err := ha.PutRelayConfig(s.etcdCli, source, workers...); err != nil {
return err
}
for _, w := range workers {
s.relayWorkers[source][w] = struct{}{}
}
return nil
}

// StopRelay deletes etcd key-value pairs to stop relay on some workers
func (s *Scheduler) StopRelay(source string, workers []string) error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.started {
return terror.ErrSchedulerNotStarted.Generate()
}

// 1. precheck
if _, ok := s.sourceCfgs[source]; !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(source)
}
startedWorkers := s.relayWorkers[source]
var (
notExistWorkers []string
unmatchedWorkers, unmatchedSources []string
alreadyStopped []string
)
for _, worker := range workers {
if _, ok := s.workers[worker]; !ok {
notExistWorkers = append(notExistWorkers, worker)
}
if _, ok := startedWorkers[worker]; !ok {
alreadyStopped = append(alreadyStopped, worker)
}
for source2, workers2 := range s.relayWorkers {
if source2 == source {
continue
}
if _, ok := workers2[worker]; ok {
unmatchedWorkers = append(unmatchedWorkers, worker)
unmatchedSources = append(unmatchedSources, source2)
}
}
}
if len(notExistWorkers) > 0 {
return terror.ErrSchedulerWorkerNotExist.Generate(notExistWorkers)
}
if len(unmatchedWorkers) > 0 {
return terror.ErrSchedulerRelayWorkersWrongRelay.Generate(unmatchedWorkers, unmatchedSources)
}
if len(alreadyStopped) > 0 {
s.logger.Warn("some workers already stopped relay",
zap.String("source", source),
zap.Strings("already stopped workers", alreadyStopped))
}

// 2. delete from etcd and update memory cache
if _, err := ha.DeleteRelayConfig(s.etcdCli, workers...); err != nil {
return err
}
for _, w := range workers {
delete(s.relayWorkers[source], w)
}
if len(s.relayWorkers[source]) == 0 {
if _, err := ha.DeleteRelayStage(s.etcdCli, source); err != nil {
return err
}
delete(s.relayWorkers, source)
delete(s.expectRelayStages, source)
}
return nil
}

// GetRelayWorkers returns all alive worker instances for a relay source
func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started {
return nil, terror.ErrSchedulerNotStarted.Generate()
}

workers := s.relayWorkers[source]
var ret []*Worker
for w := range workers {
worker, ok := s.workers[w]
if !ok {
// should not happen
s.logger.Error("worker instance for relay worker not found", zap.String("worker", w))
continue
}
ret = append(ret, worker)
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].baseInfo.Name < ret[j].baseInfo.Name
})
return ret, nil
}

>>>>>>> 83f7610e... *: `query-status -s` and purge-relay send to all relay worker (#1533)
// UpdateExpectRelayStage updates the current expect relay stage.
// now, only support updates:
// - from `Running` to `Paused`.
Expand Down Expand Up @@ -1280,7 +1469,45 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
source := s.lastBound[w.baseInfo.Name].Source
if _, ok := s.unbounds[source]; !ok {
source = ""
<<<<<<< HEAD
=======
}

// try to find its relay source (currently only one relay source)
if source != "" {
s.logger.Info("found history source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
} else {
for source2, workers := range s.relayWorkers {
if _, ok2 := workers[w.BaseInfo().Name]; ok2 {
source = source2
s.logger.Info("found relay source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
}
// found a relay source
if source != "" {
// currently worker can only handle same relay source and source bound, so we don't try bound another source
if oldWorker, ok := s.bounds[source]; ok {
s.logger.Info("worker has started relay for a source, but that source is bound to another worker, so we let this worker free",
zap.String("worker", w.BaseInfo().Name),
zap.String("relay source", source),
zap.String("bound worker for its relay source", oldWorker.BaseInfo().Name))
return false, nil
}
}

// randomly pick one from unbounds
if source == "" {
>>>>>>> 83f7610e... *: `query-status -s` and purge-relay send to all relay worker (#1533)
for source = range s.unbounds {
s.logger.Info("found unbound source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break // got a source.
}
}
Expand Down Expand Up @@ -1311,26 +1538,80 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// tryBoundForSource tries to bound a source to a random Free worker.
// returns (true, nil) after bounded.
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
<<<<<<< HEAD
// 1. try to find history workers, then random Free worker.
=======
>>>>>>> 83f7610e... *: `query-status -s` and purge-relay send to all relay worker (#1533)
var worker *Worker
for workerName, bound := range s.lastBound {
if bound.Source == source {
relayWorkers := s.relayWorkers[source]
// 1. try to find a history worker in relay workers...
if len(relayWorkers) > 0 {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if _, ok2 := relayWorkers[workerName]; ok2 && w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
<<<<<<< HEAD

=======
// then a relay worker for this source...
if worker == nil {
for workerName := range relayWorkers {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
// a not found worker, should not happened
s.logger.Warn("worker instance not found for relay worker", zap.String("worker", workerName))
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}

// then a history worker for this source...
if worker == nil {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// and then a random Free worker.
>>>>>>> 83f7610e... *: `query-status -s` and purge-relay send to all relay worker (#1533)
if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found free worker when source bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand Down
Loading

0 comments on commit 03dc7b3

Please sign in to comment.