Skip to content

Commit

Permalink
etcd_worker: add timeout for etcd txn and watchCh (pingcap#3667)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and okJiang committed Dec 8, 2021
1 parent b745268 commit 7a5f991
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 25 deletions.
79 changes: 77 additions & 2 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package etcd

import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -41,6 +43,14 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
)

// set to var instead of const for mocking the value to speedup test
Expand All @@ -50,11 +60,13 @@ var maxTries int64 = 8
type Client struct {
cli *clientv3.Client
metrics map[string]prometheus.Counter
// clock is for making it easier to mock time-related data structures in unit tests
clock clock.Clock
}

// Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.
func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client {
return &Client{cli: cli, metrics: metrics}
return &Client{cli: cli, metrics: metrics, clock: clock.New()}
}

// Unwrap returns a clientv3.Client
Expand Down Expand Up @@ -165,7 +177,70 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return c.cli.Watch(ctx, key, opts...)
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
lastReceivedResponseTime := c.clock.Now()

for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil {
lastRevision = response.Header.Revision
}

Loop:
// we must loop here until the response is sent to outCh
// or otherwise the response will be lost
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}

ticker.Reset(etcdRequestProgressDuration)
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
}
}
}

// RequestProgress requests a progress notify response be sent in all watch channels.
Expand Down
128 changes: 128 additions & 0 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -44,6 +45,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
resetCount *int
requestCount *int
}

func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
*m.resetCount++
return m.watchCh
}

func (m mockWatcher) RequestProgress(ctx context.Context) error {
*m.requestCount++
return nil
}

func (s *clientSuite) TestRetry(c *check.C) {
defer testleak.AfterTest(c)()
originValue := maxTries
Expand Down Expand Up @@ -90,3 +108,113 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(ttlResp.TTL, check.Equals, int64(-1))
}

// test no data lost when WatchCh blocked
func (s *etcdSuite) TestWatchChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
{CompactRevision: 4},
{CompactRevision: 5},
{CompactRevision: 6},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

key := "testWatchChBlocked"
outCh := make(chan clientv3.WatchResponse, 6)
revision := int64(1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
// make sure watchCh has been reset since timeout
c.Assert(*watcher.resetCount > 1, check.IsTrue)
// make sure RequestProgress has been call since timeout
c.Assert(*watcher.requestCount > 1, check.IsTrue)
// make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration
c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration)
}

// test no data lost when OutCh blocked
func (s *etcdSuite) TestOutChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

key := "testOutChBlocked"
outCh := make(chan clientv3.WatchResponse, 1)
revision := int64(1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
}
52 changes: 29 additions & 23 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ import (
)

const (
etcdRequestProgressDuration = 2 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks
// it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log
etcdWorkerLogsWarnDuration = 1 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
)

// EtcdWorker handles all interactions with Etcd
Expand Down Expand Up @@ -64,7 +69,8 @@ type EtcdWorker struct {
// a `compare-and-swap` semantics, which is essential for implementing
// snapshot isolation for Reactor ticks.
deleteCounter int64
metrics *etcdWorkerMetrics

metrics *etcdWorkerMetrics
}

type etcdWorkerMetrics struct {
Expand Down Expand Up @@ -121,13 +127,13 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return errors.Trace(err)
}

ctx1, cancel := context.WithCancel(ctx)
defer cancel()

ticker := time.NewTicker(timerInterval)
defer ticker.Stop()

watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
exiting bool
Expand All @@ -139,7 +145,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
// should never be closed
sessionDone = make(chan struct{})
}
lastReceivedEventTime := time.Now()

// tickRate represents the number of times EtcdWorker can tick
// the reactor per second
Expand All @@ -153,17 +158,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return cerrors.ErrEtcdSessionDone.GenWithStackByArgs()
case <-ticker.C:
// There is no new event to handle on timer ticks, so we have nothing here.
if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration {
if err := worker.client.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
}
case response := <-watchCh:
// In this select case, we receive new events from Etcd, and call handleEvent if appropriate.
if err := response.Err(); err != nil {
return errors.Trace(err)
}
lastReceivedEventTime = time.Now()
// Check whether the response is stale.
if worker.revision >= response.Header.GetRevision() {
continue
Expand Down Expand Up @@ -216,11 +215,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
startTime := time.Now()
// it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
log.Warn("etcdWorker ticks reactor cost time more than 1 second")
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime))
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime)
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds())
if err != nil {
if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
return errors.Trace(err)
Expand Down Expand Up @@ -323,6 +322,10 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]
}

func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error {
if len(changedState) == 0 {
return nil
}

cmps := make([]clientv3.Cmp, 0, len(changedState))
ops := make([]clientv3.Op, 0, len(changedState))
hasDelete := false
Expand Down Expand Up @@ -362,12 +365,15 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m

worker.metrics.metricEtcdTxnSize.Observe(float64(size))
startTime := time.Now()
resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
log.Warn("etcdWorker commit etcd txn cost time more than 1 second")

txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit()
cancel()
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime))
}
worker.metrics.metricEtcdTxnDuration.Observe(costTime)
worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds())
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 7a5f991

Please sign in to comment.