From a7caa370b53d059b69202662e21b1e805e392b7a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 17 Oct 2020 07:38:13 +0800 Subject: [PATCH] worker: sync master's endpoint peroidcally (#1139) --- dm/worker/server.go | 43 +++++++++++++++++++++++++++++++++++ dm/worker/server_test.go | 6 ++--- dm/worker/worker_test.go | 6 ++--- pkg/ha/keepalive.go | 1 - pkg/utils/common.go | 17 ++++++++++++++ pkg/utils/common_test.go | 7 ++++++ tests/ha/conf/dm-worker1.toml | 2 +- tests/ha/conf/dm-worker2.toml | 2 +- 8 files changed, 75 insertions(+), 9 deletions(-) diff --git a/dm/worker/server.go b/dm/worker/server.go index d06bc5b98d..18c2ec9874 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -48,6 +48,7 @@ var ( keepaliveTimeout = 3 * time.Second keepaliveTime = 3 * time.Second retryConnectSleepTime = time.Second + syncMasterEndpointsTime = 3 * time.Second getMinPosForSubTaskFunc = getMinPosForSubTask ) @@ -108,6 +109,12 @@ func (s *Server) Start() error { return err } + s.wg.Add(1) + go func() { + s.syncMasterEndpoints(s.ctx) + s.wg.Done() + }() + bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) if err != nil { // TODO: need retry @@ -177,6 +184,42 @@ func (s *Server) Start() error { return terror.ErrWorkerStartService.Delegate(err) } +func (s *Server) syncMasterEndpoints(ctx context.Context) { + lastClientUrls := []string{} + clientURLs := []string{} + + updateF := func() { + clientURLs = clientURLs[:0] + resp, err := s.etcdClient.MemberList(ctx) + if err != nil { + log.L().Error("can't get etcd member list", zap.Error(err)) + return + } + + for _, m := range resp.Members { + clientURLs = append(clientURLs, m.GetClientURLs()...) + } + if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) { + log.L().Debug("etcd member list doesn't change", zap.Strings("client URLs", clientURLs)) + return + } + log.L().Info("will sync endpoints to", zap.Strings("client URLs", clientURLs)) + s.etcdClient.SetEndpoints(clientURLs...) + lastClientUrls = make([]string, len(clientURLs)) + copy(lastClientUrls, clientURLs) + } + + for { + updateF() + + select { + case <-ctx.Done(): + return + case <-time.After(syncMasterEndpointsTime): + } + } +} + func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { var wg sync.WaitGroup for { diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 268ae3cf92..e58066e9b0 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -101,7 +101,7 @@ func (t *testServer) TestServer(c *C) { keepAliveTTL = int64(1) ) etcdDir := c.MkDir() - ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr) + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) c.Assert(err, IsNil) defer ETCD.Close() cfg := NewConfig() @@ -230,7 +230,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { startRev = int64(1) ) etcdDir := c.MkDir() - ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr) + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) c.Assert(err, IsNil) defer ETCD.Close() cfg := NewConfig() @@ -376,7 +376,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, ETCD *embed.Etcd, d // When worker server fail to keepalive with etcd, sever should close its worker c.Assert(s.getWorker(true), IsNil) c.Assert(s.getSourceStatus(true).Result, IsNil) - ETCD, err := createMockETCD(dir, "host://"+hostName) + ETCD, err := createMockETCD(dir, "http://"+hostName) c.Assert(err, IsNil) time.Sleep(3 * time.Second) return ETCD diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 2619242ac9..186c003a5c 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -106,7 +106,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { ) hostName := "127.0.0.1:8261" etcdDir := c.MkDir() - ETCD, err := createMockETCD(etcdDir, "host://"+hostName) + ETCD, err := createMockETCD(etcdDir, "http://"+hostName) c.Assert(err, IsNil) defer ETCD.Close() @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { startRev = int64(1) ) etcdDir := c.MkDir() - ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr) + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) c.Assert(err, IsNil) defer ETCD.Close() cfg := NewConfig() @@ -337,7 +337,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { startRev = int64(1) ) etcdDir := c.MkDir() - ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr) + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) c.Assert(err, IsNil) defer ETCD.Close() cfg := NewConfig() diff --git a/pkg/ha/keepalive.go b/pkg/ha/keepalive.go index 8b9fd096fe..f88ec0d46e 100644 --- a/pkg/ha/keepalive.go +++ b/pkg/ha/keepalive.go @@ -71,7 +71,6 @@ func workerEventFromKey(key string) (WorkerEvent, error) { // KeepAlive puts the join time of the workerName into etcd. // this key will be kept in etcd until the worker is blocked or failed // k/v: workerName -> join time. -// TODO: fetch the actual master endpoints, the master member maybe changed. func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error { cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout) defer cancel() diff --git a/pkg/utils/common.go b/pkg/utils/common.go index b9a09126d2..2c71779687 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -181,3 +181,20 @@ func ExtractTaskFromLockID(lockID string) string { } return strs[1] } + +// NonRepeatStringsEqual is used to compare two un-ordered, non-repeat-element string slice is equal +func NonRepeatStringsEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + m := make(map[string]struct{}, len(a)) + for _, s := range a { + m[s] = struct{}{} + } + for _, s := range b { + if _, ok := m[s]; !ok { + return false + } + } + return true +} diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index 0f0a92b4b0..48bb2ead0c 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -249,3 +249,10 @@ func (s *testCommonSuite) TestDDLLockID(c *C) { // invalid ID c.Assert(ExtractTaskFromLockID("invalid-lock-id"), Equals, "") } + +func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) { + c.Assert(NonRepeatStringsEqual([]string{}, []string{}), IsTrue) + c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "1"}), IsTrue) + c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse) + c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse) +} diff --git a/tests/ha/conf/dm-worker1.toml b/tests/ha/conf/dm-worker1.toml index d5eff23660..7a72ea72bf 100644 --- a/tests/ha/conf/dm-worker1.toml +++ b/tests/ha/conf/dm-worker1.toml @@ -1,2 +1,2 @@ name = "worker1" -join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661" +join = "127.0.0.1:8261" diff --git a/tests/ha/conf/dm-worker2.toml b/tests/ha/conf/dm-worker2.toml index e0e3f45665..010e21c73e 100644 --- a/tests/ha/conf/dm-worker2.toml +++ b/tests/ha/conf/dm-worker2.toml @@ -1,2 +1,2 @@ name = "worker2" -join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661" +join = "127.0.0.1:8261"