Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker, ha: increase keepalive TTL to 1 minute, and to 30 minutes if relay enabled #1405

Merged
merged 17 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/failpoint"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -33,8 +34,18 @@ import (
// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string
var defaultKeepAliveTTL = int64(10)
var (
SampleConfigFile string
defaultKeepAliveTTL = int64(60) // 1 minute
defaultRelayKeepAliveTTL = int64(60 * 30) // 30 minutes
)

func init() {
failpoint.Inject("defaultKeepAliveTTL", func(val failpoint.Value) {
i := val.(int)
defaultKeepAliveTTL = int64(i)
})
}

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand All @@ -55,6 +66,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`)
fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member")
fs.Int64Var(&cfg.KeepAliveTTL, "keepalive-ttl", defaultKeepAliveTTL, "dm-worker's TTL for keepalive with etcd (in seconds)")
fs.Int64Var(&cfg.RelayKeepAliveTTL, "relay-keepalive-ttl", defaultRelayKeepAliveTTL, "dm-worker's TTL for keepalive with etcd when handle relay enabled sources (in seconds)")

fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
Expand All @@ -80,7 +92,8 @@ type Config struct {

ConfigFile string `toml:"config-file" json:"config-file"`
// TODO: in the future dm-workers should share a same ttl from dm-master
KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
RelayKeepAliveTTL int64 `toml:"relay-keepalive-ttl" json:"relay-keepalive-ttl"`

// tls config
config.Security
Expand Down
12 changes: 12 additions & 0 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,15 @@ func (s *Server) KeepAlive() {
}
}
}

// UpdateKeepAliveTTL updates keepalive key with new lease TTL in place, to avoid watcher observe a DELETE event
// this function should not be concurrently called
func (s *Server) UpdateKeepAliveTTL(newTTL int64) {
if ha.CurrentKeepAliveTTL == newTTL {
log.L().Info("not changing keepalive TTL, skip", zap.Int64("ttl", newTTL))
return
}
ha.CurrentKeepAliveTTL = newTTL
ha.KeepAliveUpdateCh <- newTTL
log.L().Debug("received update keepalive TTL request, should be updated soon", zap.Int64("new ttl", newTTL))
}
2 changes: 2 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func (s *Server) stopWorker(sourceID string) error {
s.Unlock()
return terror.ErrWorkerSourceNotMatch
}
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.Unlock()
w.Close()
Expand Down Expand Up @@ -588,6 +589,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL)
}
go func() {
w.Start(startRelay)
Expand Down
4 changes: 3 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (t *testServer) TestServer(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

NewRelayHolder = NewDummyRelayHolder
NewSubTask = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *SubTask {
Expand Down Expand Up @@ -238,6 +239,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

s := NewServer(cfg)
etcdCli, err := clientv3.New(clientv3.Config{
Expand Down Expand Up @@ -375,7 +377,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
func (t *testServer) testRetryConnectMaster(c *C, s *Server, ETCD *embed.Etcd, dir string, hostName string) *embed.Etcd {
ETCD.Close()
time.Sleep(6 * time.Second)
// When worker server fail to keepalive with etcd, sever should close its worker
// When worker server fail to keepalive with etcd, server should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "http://"+hostName)
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.Join = masterAddr
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
Expand Down Expand Up @@ -344,6 +345,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.Join = masterAddr
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
Expand Down
73 changes: 65 additions & 8 deletions pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ha
import (
"context"
"encoding/json"
"sync/atomic"
"time"

"go.etcd.io/etcd/clientv3"
Expand All @@ -27,6 +28,13 @@ import (
"github.com/pingcap/dm/pkg/log"
)

var (
// CurrentKeepAliveTTL may be assigned to KeepAliveTTL or RelayKeepAliveTTL
CurrentKeepAliveTTL int64
// KeepAliveUpdateCh is used to notify keepalive TTL changing, in order to let watcher not see a DELETE of old key
KeepAliveUpdateCh = make(chan int64, 10)
)

// WorkerEvent represents the PUT/DELETE keepalive event of DM-worker.
type WorkerEvent struct {
WorkerName string `json:"worker-name"` // the worker name of the worker.
Expand Down Expand Up @@ -72,12 +80,13 @@ func workerEventFromKey(key string) (WorkerEvent, error) {
// this key will be kept in etcd until the worker is blocked or failed
// k/v: workerName -> join time.
func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err := cli.Grant(cliCtx, keepAliveTTL)
if err != nil {
return err
// TTL in KeepAliveUpdateCh has higher priority
for len(KeepAliveUpdateCh) > 0 {
keepAliveTTL = <-KeepAliveUpdateCh
}
// though in regular routine there's no concurrent KeepAlive, we need to handle tests
atomic.StoreInt64(&CurrentKeepAliveTTL, keepAliveTTL)

k := common.WorkerKeepAliveKeyAdapter.Encode(workerName)
workerEventJSON, err := WorkerEvent{
WorkerName: workerName,
Expand All @@ -86,19 +95,40 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
if err != nil {
return err
}
_, err = cli.Put(cliCtx, k, workerEventJSON, clientv3.WithLease(lease.ID))

grantAndPutKV := func(k, v string, ttl int64) (clientv3.LeaseID, error) {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err := cli.Grant(cliCtx, ttl)
if err != nil {
return 0, err
}
_, err = cli.Put(cliCtx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
return 0, err
}
return lease.ID, nil
}

leaseID, err := grantAndPutKV(k, workerEventJSON, keepAliveTTL)
if err != nil {
return err
}

// once we put the key successfully, we should revoke lease before we quit keepalive normally
defer func() {
_, err2 := revokeLease(cli, lease.ID)
_, err2 := revokeLease(cli, leaseID)
if err2 != nil {
log.L().Warn("fail to revoke lease", zap.Error(err))
}
}()

ch, err := cli.KeepAlive(ctx, lease.ID)
keepAliveCtx, keepAliveCancel := context.WithCancel(ctx)
defer func() {
keepAliveCancel()
}()
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved

ch, err := cli.KeepAlive(keepAliveCtx, leaseID)
if err != nil {
return err
}
Expand All @@ -107,11 +137,38 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
case _, ok := <-ch:
if !ok {
log.L().Info("keep alive channel is closed")
keepAliveCancel() // make go vet happy
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
case <-ctx.Done():
log.L().Info("ctx is canceled, keepalive will exit now")
keepAliveCancel() // make go vet happy
return nil
case newTTL := <-KeepAliveUpdateCh:
// create a new lease with new TTL, and overwrite original KV
oldLeaseID := leaseID
leaseID, err = grantAndPutKV(k, workerEventJSON, newTTL)
if err != nil {
keepAliveCancel() // make go vet happy
return err
}

oldCancel := keepAliveCancel
keepAliveCtx, keepAliveCancel = context.WithCancel(ctx)
ch, err = cli.KeepAlive(keepAliveCtx, leaseID)
if err != nil {
log.L().Error("meet error when change keepalive TTL", zap.Error(err))
keepAliveCancel() // make go vet happy
return err
}
log.L().Info("dynamically changed keepalive TTL to", zap.Int64("ttl in seconds", newTTL))

// after new keepalive is succeed, we cancel the old keepalive
oldCancel()
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
_, err2 := revokeLease(cli, oldLeaseID)
if err2 != nil {
log.L().Warn("fail to revoke lease", zap.Error(err))
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/dmctl_basic/conf/get_worker1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ join = "http://127.0.0.1:8261"
worker-addr = "0.0.0.0:8262"
advertise-addr = "127.0.0.1:8262"
config-file = "/tmp/dm_test/dmctl_basic/worker1/dm-worker.toml"
keepalive-ttl = 10
keepalive-ttl = 60
relay-keepalive-ttl = 1800
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
3 changes: 2 additions & 1 deletion tests/dmctl_basic/conf/get_worker2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ join = "http://127.0.0.1:8261"
worker-addr = "0.0.0.0:8263"
advertise-addr = "127.0.0.1:8263"
config-file = "/tmp/dm_test/dmctl_basic/worker2/dm-worker.toml"
keepalive-ttl = 10
keepalive-ttl = 60
relay-keepalive-ttl = 1800
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
27 changes: 27 additions & 0 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ function run() {
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/defaultKeepAliveTTL=return(1)"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# test keepalive is changed by failpoint, so after 1 second DM master will know not alive
killall -9 dm-worker.test
sleep 3
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"\"stage\": \"offline\"" 2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
Expand All @@ -29,6 +43,19 @@ function run() {
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# relay should be started after source bounded
sleep 1
# and now default keepalive TTL is 30 minutes
killall -9 dm-worker.test
sleep 3
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"\"stage\": \"bound\"" 2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# start a task in `full` mode
echo "start task in full mode"
cat $cur/conf/dm-task.yaml > $WORK_DIR/dm-task.yaml
Expand Down