Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: query-status -s and purge-relay send to all relay worker #1533

Merged
merged 59 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
d278c79
worker: move relay related function together
lance6716 Mar 11, 2021
86df82e
Merge branch 'master' into refine-relay
lance6716 Mar 11, 2021
8d5db03
try fix CI
lance6716 Mar 11, 2021
ce9ec0d
fix some CI
lance6716 Mar 11, 2021
8ba22e1
fix CI
lance6716 Mar 12, 2021
82dfc73
remove some comment
lance6716 Mar 12, 2021
77cc17e
dont shadow
lance6716 Mar 12, 2021
5d40ea4
Merge branch 'master' of https://github.com/pingcap/dm into refine-relay
lance6716 Mar 12, 2021
b4b1c4b
worker: move subtask related function together
lance6716 Mar 12, 2021
55d27d4
worker: enable relay and handling subtask on demand
lance6716 Mar 12, 2021
890a79a
fix mistakenly set `w.closed`
lance6716 Mar 12, 2021
85e6ab9
split call to EnabledXXX
lance6716 Mar 13, 2021
7be2b78
Merge branch 'master' into disable-relay-subtask
lance6716 Mar 13, 2021
6789011
fix some concurrent accessing
lance6716 Mar 13, 2021
df6614c
try fix unstable test
lance6716 Mar 13, 2021
e2ed64b
fix some tests
lance6716 Mar 14, 2021
841fe25
fix CI
lance6716 Mar 14, 2021
2b64812
reduce some TODO
lance6716 Mar 14, 2021
0b24d22
add test
lance6716 Mar 14, 2021
fecd526
improve unit test stability
lance6716 Mar 15, 2021
9cef9fd
Merge remote-tracking branch 'upstream/master' into disable-relay-sub…
lance6716 Mar 17, 2021
5fca007
Merge branch 'master' of https://github.com/pingcap/dm into disable-r…
lance6716 Mar 18, 2021
3f2ddde
Merge branch 'master' into disable-relay-subtask
lance6716 Mar 22, 2021
8821a40
worker, ha: separate etcd message for subtask and relay
lance6716 Mar 15, 2021
f077964
we enabled relay for every source in last commit, fix it
lance6716 Mar 16, 2021
ac1ebbc
fix checkpoint wasn't flush
lance6716 Mar 22, 2021
e59bbfa
fix etcd port conflict
lance6716 Mar 22, 2021
cbf709d
leave some problems to be fixed in future
lance6716 Mar 22, 2021
7a0b1cc
Merge branch 'master' of https://github.com/pingcap/dm into observe-r…
lance6716 Mar 24, 2021
5395abb
*: add start-relay/stop-relay command
lance6716 Mar 17, 2021
2d75c11
fix CI
lance6716 Mar 22, 2021
4a1989d
fix unstable test
lance6716 Mar 22, 2021
2b8c245
fix github CI
lance6716 Mar 24, 2021
7540cdd
Merge branch 'master' of github.com:pingcap/dm into start-stop-relay
lance6716 Mar 29, 2021
6e0b336
try resolve confilct
lance6716 Mar 29, 2021
9e89958
fix CI
lance6716 Mar 29, 2021
01cd66a
*: `query-status -s` and purge-relay send to all relay worker
lance6716 Mar 24, 2021
734ecef
*: add test
lance6716 Mar 25, 2021
6f65c36
fix CI
lance6716 Mar 25, 2021
502a0ca
fix unit test
lance6716 Mar 25, 2021
6c12434
fix more
lance6716 Mar 25, 2021
27f98fc
revert some changes
lance6716 Mar 25, 2021
691ddea
fix CI
lance6716 Mar 25, 2021
73dd7b5
try debug CI
lance6716 Mar 25, 2021
49af33d
fix CI
lance6716 Mar 26, 2021
9d79abc
fix unstable behaviour
lance6716 Mar 26, 2021
be60757
Merge branch 'master' into finish-new-relay
lance6716 Mar 30, 2021
99fd149
address comments from previous PR, and refine comment
lance6716 Mar 30, 2021
ffb020e
Merge branch 'master' into finish-new-relay
lance6716 Mar 30, 2021
b5f5706
debug CI
lance6716 Mar 31, 2021
57addc1
test if all relay workers are down, non-relay workers could sync
lance6716 Mar 31, 2021
4f66d99
remove debug log
lance6716 Mar 31, 2021
1b6e647
Merge branch 'master' of github.com:pingcap/dm into finish-new-relay
lance6716 Apr 6, 2021
8ed8363
fix history worker is prior to free relay worker
lance6716 Apr 6, 2021
24d118b
Merge branch 'master' into finish-new-relay
GMHDBJD Apr 7, 2021
72a4fcf
Merge branch 'master' of github.com:pingcap/dm into finish-new-relay
lance6716 Apr 8, 2021
85c6c4a
refine log
lance6716 Apr 7, 2021
3748f6c
fix too long keepalive
lance6716 Apr 8, 2021
0e08bad
try fix components closeing order
lance6716 Apr 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"strings"
"syscall"

"github.com/pingcap/errors"
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
globalLog "github.com/pingcap/log"

"github.com/pingcap/errors"
"go.uber.org/zap"
)

func main() {
Expand Down
6 changes: 6 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"io/ioutil"
"math"
"math/rand"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -255,6 +257,9 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
if len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}
if filepath.IsAbs(c.RelayDir) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return nil
}
Expand Down Expand Up @@ -291,6 +296,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
return terror.WithScope(err, terror.ScopeUpstream)
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
Expand Down
94 changes: 81 additions & 13 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,32 @@ func (s *Scheduler) StopRelay(source string, workers []string) error {
return nil
}

// GetRelayWorkers returns all alive worker instances for a relay source
func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started {
return nil, terror.ErrSchedulerNotStarted.Generate()
}

workers := s.relayWorkers[source]
var ret []*Worker
for w := range workers {
worker, ok := s.workers[w]
if !ok {
// should not happen
s.logger.Error("worker instance for relay worker not found", zap.String("worker", w))
continue
}
ret = append(ret, worker)
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].baseInfo.Name < ret[j].baseInfo.Name
})
return ret, nil
}

// UpdateExpectRelayStage updates the current expect relay stage.
// now, only support updates:
// - from `Running` to `Paused`.
Expand Down Expand Up @@ -1475,10 +1501,17 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
}

// try to find its relay source (currently only one relay source)
if source == "" {
if source != "" {
s.logger.Info("found history source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
} else {
for source2, workers := range s.relayWorkers {
if _, ok2 := workers[w.BaseInfo().Name]; ok2 {
source = source2
s.logger.Info("found relay source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand All @@ -1498,6 +1531,9 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// randomly pick one from unbounds
if source == "" {
for source = range s.unbounds {
s.logger.Info("found unbound source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break // got a source.
}
}
Expand Down Expand Up @@ -1529,40 +1565,72 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// returns (true, nil) after bounded.
// called should update the s.unbounds
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
// 1. try to find history workers...
var worker *Worker
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
break
relayWorkers := s.relayWorkers[source]
// 1. try to find a history worker in relay workers...
if len(relayWorkers) > 0 {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if _, ok2 := relayWorkers[workerName]; ok2 && w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// then a relay worker for this source...
if worker == nil {
for workerName := range s.relayWorkers[source] {
for workerName := range relayWorkers {
w, ok := s.workers[workerName]
if !ok {
// a not found worker, should not happened
s.logger.Warn("worker instance not found for relay worker", zap.String("worker", workerName))
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
// then a history worker for this source...
if worker == nil {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// and then a random Free worker.
if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found free worker when source bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand Down
23 changes: 23 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,12 @@ func (t *testScheduler) TestStartStopSource(c *C) {
// test not exist source
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StartRelay(sourceID3, []string{workerName1})), IsTrue)
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StopRelay(sourceID4, []string{workerName1})), IsTrue)
noWorkerSources := []string{sourceID1, sourceID2, sourceID3, sourceID4}
for _, source := range noWorkerSources {
workers, err := s.GetRelayWorkers(source)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}

// start-relay success on bound-same-source and free worker
c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1175,6 +1181,9 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.relayWorkers[sourceID1], HasLen, 2)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName1)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName3)
workers, err := s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})

// failed on bound-not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongBound.Equal(s.StartRelay(sourceID1, []string{workerName2})), IsTrue)
Expand All @@ -1189,11 +1198,22 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers[sourceID2], HasLen, 1)
c.Assert(s.relayWorkers[sourceID2], HasKey, workerName2)
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// failed on not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongRelay.Equal(s.StopRelay(sourceID1, []string{workerName2})), IsTrue)
c.Assert(terror.ErrSchedulerWorkerNotExist.Equal(s.StopRelay(sourceID1, []string{"not-exist"})), IsTrue)

// nothing changed
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// stop-relay success
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1202,4 +1222,7 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers, HasLen, 1)
c.Assert(s.relayWorkers, HasKey, sourceID2)
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}
Loading