Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker, ha: increase keepalive TTL to 1 minute, and to 30 minutes if relay enabled #1405

Merged
merged 17 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import (
// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string
var defaultKeepAliveTTL = int64(10)
var (
SampleConfigFile string
defaultKeepAliveTTL = int64(60) // 1 minute
relayEnabledKeepAliveTTL = int64(60 * 30) // 30 minutes
)

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand Down
4 changes: 4 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
// change keepalive TTL to 30 minutes if it's default value
if s.cfg.KeepAliveTTL == defaultKeepAliveTTL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if s.cfg.KeepAliveTTL == defaultKeepAliveTTL {
if s.cfg.KeepAliveTTL < relayEnabledKeepAliveTTL {

What if I set KeepAliveTTL to 31s? 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should left a chance if user doesn't want this feature, so only increase TTL when default value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think we should add a configuration named relayEnabledKeepAliveTTL. In the current way, I can't set keepAliveTTL and increase relayKeepAliveTTL at the same time.

ha.NotifyKeepAliveChange(relayEnabledKeepAliveTTL)
}
}
go func() {
w.Start(startRelay)
Expand Down
40 changes: 39 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import (
"github.com/pingcap/dm/pkg/log"
)

var (
keepAliveUpdateCh = make(chan int64, 10)
)

// NotifyKeepAliveChange is used to dynamically change keepalive TTL and don't let watcher observe a DELETE of old key
func NotifyKeepAliveChange(newTTL int64) {
keepAliveUpdateCh <- newTTL
}

// WorkerEvent represents the PUT/DELETE keepalive event of DM-worker.
type WorkerEvent struct {
WorkerName string `json:"worker-name"` // the worker name of the worker.
Expand Down Expand Up @@ -98,7 +107,12 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
}
}()

ch, err := cli.KeepAlive(ctx, lease.ID)
keepAliveCtx, keepAliveCancel := context.WithCancel(ctx)
defer func() {
keepAliveCancel()
}()
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved

ch, err := cli.KeepAlive(keepAliveCtx, lease.ID)
if err != nil {
return err
}
Expand All @@ -112,6 +126,30 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
case <-ctx.Done():
log.L().Info("ctx is canceled, keepalive will exit now")
return nil
case newTTL := <-keepAliveUpdateCh:
// create a new lease with new TTL, and overwrite original KV
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err = cli.Grant(cliCtx, newTTL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The revokeLease function should be called for the old lease, like line 104.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lease will expire if not keepalive, so I think it's not needed

if err != nil {
log.L().Error("meet error when change keepalive TTL", zap.Error(err))
return err
}
_, err = cli.Put(cliCtx, k, workerEventJSON, clientv3.WithLease(lease.ID))
if err != nil {
log.L().Error("meet error when change keepalive TTL", zap.Error(err))
return err
}

oldCancel := keepAliveCancel
keepAliveCtx, keepAliveCancel = context.WithCancel(ctx)
ch, err = cli.KeepAlive(keepAliveCtx, lease.ID)
if err != nil {
log.L().Error("meet error when change keepalive TTL", zap.Error(err))
return err
}
// after new keepalive is succeed, we cancel the old keepalive
oldCancel()
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down