Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
worker: sync master's endpoint peroidcally (#1139)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
ti-srebot and lance6716 authored Oct 17, 2020
1 parent 76918fe commit aea4661
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 9 deletions.
43 changes: 43 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinPosForSubTaskFunc = getMinPosForSubTask
)

Expand Down Expand Up @@ -108,6 +109,12 @@ func (s *Server) Start() error {
return err
}

s.wg.Add(1)
go func() {
s.syncMasterEndpoints(s.ctx)
s.wg.Done()
}()

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
// TODO: need retry
Expand Down Expand Up @@ -177,6 +184,42 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

func (s *Server) syncMasterEndpoints(ctx context.Context) {
lastClientUrls := []string{}
clientURLs := []string{}

updateF := func() {
clientURLs = clientURLs[:0]
resp, err := s.etcdClient.MemberList(ctx)
if err != nil {
log.L().Error("can't get etcd member list", zap.Error(err))
return
}

for _, m := range resp.Members {
clientURLs = append(clientURLs, m.GetClientURLs()...)
}
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
log.L().Debug("etcd member list doesn't change", zap.Strings("client URLs", clientURLs))
return
}
log.L().Info("will sync endpoints to", zap.Strings("client URLs", clientURLs))
s.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}

for {
updateF()

select {
case <-ctx.Done():
return
case <-time.After(syncMasterEndpointsTime):
}
}
}

func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Client, rev int64) error {
var wg sync.WaitGroup
for {
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (t *testServer) TestServer(c *C) {
keepAliveTTL = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -230,7 +230,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -376,7 +376,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, ETCD *embed.Etcd, d
// When worker server fail to keepalive with etcd, sever should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "host://"+hostName)
ETCD, err := createMockETCD(dir, "http://"+hostName)
c.Assert(err, IsNil)
time.Sleep(3 * time.Second)
return ETCD
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) {
)
hostName := "127.0.0.1:8261"
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+hostName)
ETCD, err := createMockETCD(etcdDir, "http://"+hostName)
c.Assert(err, IsNil)
defer ETCD.Close()

Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -337,7 +337,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down
1 change: 0 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func workerEventFromKey(key string) (WorkerEvent, error) {
// KeepAlive puts the join time of the workerName into etcd.
// this key will be kept in etcd until the worker is blocked or failed
// k/v: workerName -> join time.
// TODO: fetch the actual master endpoints, the master member maybe changed.
func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
Expand Down
17 changes: 17 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,20 @@ func ExtractTaskFromLockID(lockID string) string {
}
return strs[1]
}

// NonRepeatStringsEqual is used to compare two un-ordered, non-repeat-element string slice is equal
func NonRepeatStringsEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
m := make(map[string]struct{}, len(a))
for _, s := range a {
m[s] = struct{}{}
}
for _, s := range b {
if _, ok := m[s]; !ok {
return false
}
}
return true
}
7 changes: 7 additions & 0 deletions pkg/utils/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,10 @@ func (s *testCommonSuite) TestDDLLockID(c *C) {
// invalid ID
c.Assert(ExtractTaskFromLockID("invalid-lock-id"), Equals, "")
}

func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) {
c.Assert(NonRepeatStringsEqual([]string{}, []string{}), IsTrue)
c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "1"}), IsTrue)
c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse)
c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse)
}
2 changes: 1 addition & 1 deletion tests/ha/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661"
join = "127.0.0.1:8261"
2 changes: 1 addition & 1 deletion tests/ha/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661"
join = "127.0.0.1:8261"

0 comments on commit aea4661

Please sign in to comment.