Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: query-status -s and purge-relay send to all relay worker (#1533) #1573

Merged
merged 1 commit into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"strings"
"syscall"

"github.com/pingcap/errors"
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
globalLog "github.com/pingcap/log"

"github.com/pingcap/errors"
"go.uber.org/zap"
)

func main() {
Expand Down
6 changes: 6 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"io/ioutil"
"math"
"math/rand"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -255,6 +257,9 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
if len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}
if filepath.IsAbs(c.RelayDir) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return nil
}
Expand Down Expand Up @@ -291,6 +296,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
return terror.WithScope(err, terror.ScopeUpstream)
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
Expand Down
94 changes: 81 additions & 13 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,32 @@ func (s *Scheduler) StopRelay(source string, workers []string) error {
return nil
}

// GetRelayWorkers returns all alive worker instances for a relay source
func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started {
return nil, terror.ErrSchedulerNotStarted.Generate()
}

workers := s.relayWorkers[source]
var ret []*Worker
for w := range workers {
worker, ok := s.workers[w]
if !ok {
// should not happen
s.logger.Error("worker instance for relay worker not found", zap.String("worker", w))
continue
}
ret = append(ret, worker)
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].baseInfo.Name < ret[j].baseInfo.Name
})
return ret, nil
}

// UpdateExpectRelayStage updates the current expect relay stage.
// now, only support updates:
// - from `Running` to `Paused`.
Expand Down Expand Up @@ -1475,10 +1501,17 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
}

// try to find its relay source (currently only one relay source)
if source == "" {
if source != "" {
s.logger.Info("found history source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
} else {
for source2, workers := range s.relayWorkers {
if _, ok2 := workers[w.BaseInfo().Name]; ok2 {
source = source2
s.logger.Info("found relay source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand All @@ -1498,6 +1531,9 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// randomly pick one from unbounds
if source == "" {
for source = range s.unbounds {
s.logger.Info("found unbound source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break // got a source.
}
}
Expand Down Expand Up @@ -1529,40 +1565,72 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// returns (true, nil) after bounded.
// called should update the s.unbounds
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
// 1. try to find history workers...
var worker *Worker
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
break
relayWorkers := s.relayWorkers[source]
// 1. try to find a history worker in relay workers...
if len(relayWorkers) > 0 {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if _, ok2 := relayWorkers[workerName]; ok2 && w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// then a relay worker for this source...
if worker == nil {
for workerName := range s.relayWorkers[source] {
for workerName := range relayWorkers {
w, ok := s.workers[workerName]
if !ok {
// a not found worker, should not happened
s.logger.Warn("worker instance not found for relay worker", zap.String("worker", workerName))
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
// then a history worker for this source...
if worker == nil {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// and then a random Free worker.
if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found free worker when source bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand Down
23 changes: 23 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,12 @@ func (t *testScheduler) TestStartStopSource(c *C) {
// test not exist source
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StartRelay(sourceID3, []string{workerName1})), IsTrue)
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StopRelay(sourceID4, []string{workerName1})), IsTrue)
noWorkerSources := []string{sourceID1, sourceID2, sourceID3, sourceID4}
for _, source := range noWorkerSources {
workers, err := s.GetRelayWorkers(source)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}

// start-relay success on bound-same-source and free worker
c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1175,6 +1181,9 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.relayWorkers[sourceID1], HasLen, 2)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName1)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName3)
workers, err := s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})

// failed on bound-not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongBound.Equal(s.StartRelay(sourceID1, []string{workerName2})), IsTrue)
Expand All @@ -1189,11 +1198,22 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers[sourceID2], HasLen, 1)
c.Assert(s.relayWorkers[sourceID2], HasKey, workerName2)
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// failed on not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongRelay.Equal(s.StopRelay(sourceID1, []string{workerName2})), IsTrue)
c.Assert(terror.ErrSchedulerWorkerNotExist.Equal(s.StopRelay(sourceID1, []string{"not-exist"})), IsTrue)

// nothing changed
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// stop-relay success
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1202,4 +1222,7 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers, HasLen, 1)
c.Assert(s.relayWorkers, HasKey, sourceID2)
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}
Loading