Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: sync master's endpoint peroidcally #1139

Merged
merged 13 commits into from
Oct 16, 2020
29 changes: 29 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second // enough?
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
getMinPosForSubTaskFunc = getMinPosForSubTask
)

Expand Down Expand Up @@ -108,6 +109,12 @@ func (s *Server) Start() error {
return err
}

s.wg.Add(1)
go func() {
s.syncMasterEndpoints(s.ctx)
s.wg.Done()
}()

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
// TODO: need retry
Expand Down Expand Up @@ -177,6 +184,28 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

func (s *Server) syncMasterEndpoints(ctx context.Context) {
clientURLs := []string{}
for {
clientURLs = clientURLs[:0]
resp, err := s.etcdClient.MemberList(ctx)
if err != nil {
log.L().Error("can't get etcd member list", zap.Error(err))
} else {
for _, m := range resp.Members {
clientURLs = append(clientURLs, m.GetClientURLs()...)
}
log.L().Debug("will sync endpoints to", zap.Strings("client URLs", clientURLs))
s.etcdClient.SetEndpoints(clientURLs...)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}
select {
case <-ctx.Done():
return
case <-time.After(syncMasterEndpointsTime):
}
}
}

func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Client, rev int64) error {
var wg sync.WaitGroup
for {
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (t *testServer) TestServer(c *C) {
keepAliveTTL = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -230,7 +230,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -376,7 +376,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, ETCD *embed.Etcd, d
// When worker server fail to keepalive with etcd, sever should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "host://"+hostName)
ETCD, err := createMockETCD(dir, "http://"+hostName)
c.Assert(err, IsNil)
time.Sleep(3 * time.Second)
return ETCD
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) {
)
hostName := "127.0.0.1:8261"
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+hostName)
ETCD, err := createMockETCD(etcdDir, "http://"+hostName)
c.Assert(err, IsNil)
defer ETCD.Close()

Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down Expand Up @@ -337,7 +337,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
startRev = int64(1)
)
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "host://"+masterAddr)
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
Expand Down
1 change: 0 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func workerEventFromKey(key string) (WorkerEvent, error) {
// KeepAlive puts the join time of the workerName into etcd.
// this key will be kept in etcd until the worker is blocked or failed
// k/v: workerName -> join time.
// TODO: fetch the actual master endpoints, the master member maybe changed.
func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion tests/ha/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661"
join = "127.0.0.1:8261"
2 changes: 1 addition & 1 deletion tests/ha/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661"
join = "127.0.0.1:8261"