Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd_worker: add timeout for etcd txn and watchCh #3667

Merged
merged 36 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ccf71fc
etcd_worker: add timeout for etcd txn and watchCh
asddongmen Nov 30, 2021
4442c0a
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Nov 30, 2021
c4ed5bd
etcd_worker: fix typos
asddongmen Nov 30, 2021
7d14ac5
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Nov 30, 2021
8d98e75
etcd_worker: resolve comments
asddongmen Nov 30, 2021
257ed66
ectd_worker (ticdc): moves etcd watchCh related logic to etcd/client.go
asddongmen Dec 1, 2021
3b143a5
client (ticdc): fix leak test
asddongmen Dec 1, 2021
933c03c
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 1, 2021
e04709f
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 2, 2021
6573300
client_test: add unit test
asddongmen Dec 2, 2021
77149b4
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 2, 2021
b63d1f4
client_test: refine test code
asddongmen Dec 2, 2021
26dec10
client_test: refine test code
asddongmen Dec 3, 2021
b3bacc0
client: resolve comments
asddongmen Dec 3, 2021
124e37b
Apply suggestions from code review
asddongmen Dec 3, 2021
9ea48e2
client: resolve comments
asddongmen Dec 3, 2021
7e62aa2
client: fix error
asddongmen Dec 3, 2021
2395ddd
client: resolves comments
asddongmen Dec 3, 2021
1aa93ae
client: resolves comments
asddongmen Dec 3, 2021
5b034ba
client: add unit test
asddongmen Dec 5, 2021
97d41b9
client: improvement code struct
asddongmen Dec 5, 2021
f7cfffc
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 5, 2021
a5c0899
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 6, 2021
589b9ff
client: improvement code struct
asddongmen Dec 6, 2021
04dd8dc
client: resolve comments
asddongmen Dec 6, 2021
25593a9
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 6, 2021
e8c2a24
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 6, 2021
fa2fbd8
client: fix error
asddongmen Dec 6, 2021
620d16c
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 6, 2021
cbfbd01
Merge branch 'master' into fix_pd_cause_owner_stuck
liuzix Dec 6, 2021
0c28104
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 6, 2021
6e41c16
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 7, 2021
ef85a3c
etcd_worker: fix lint error
asddongmen Dec 7, 2021
9c3fa2e
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 7, 2021
cba9d75
client: resolve comment
asddongmen Dec 7, 2021
ec31e78
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package etcd

import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -24,6 +26,7 @@ import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
)

Expand All @@ -41,6 +44,14 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
)

// set to var instead of const for mocking the value to speedup test
Expand All @@ -50,11 +61,13 @@ var maxTries int64 = 8
type Client struct {
cli *clientv3.Client
metrics map[string]prometheus.Counter
// clock is for making it easier to mock time-related data structures in unit tests
clock clock.Clock
}

// Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.
func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client {
return &Client{cli: cli, metrics: metrics}
return &Client{cli: cli, metrics: metrics, clock: clock.New()}
}

// Unwrap returns a clientv3.Client
Expand Down Expand Up @@ -165,7 +178,70 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return c.cli.Watch(ctx, key, opts...)
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
var lastRevision int64
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
// limit the rate to reset Watch
limiter := rate.NewLimiter(rate.Limit(1/etcdWatchChTimeoutDuration), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done PTAL~

lastReceivedResponseTime := c.clock.Now()

for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil {
lastRevision = response.Header.Revision
}
amyangfei marked this conversation as resolved.
Show resolved Hide resolved

Loop:
// we must loop here until response sent to outCh
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// otherwise the response will lose
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may blocking here
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}

ticker.Reset(etcdRequestProgressDuration)
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration && limiter.Allow() {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
}
}
}
}

// RequestProgress requests a progress notify response be sent in all watch channels.
Expand Down
127 changes: 127 additions & 0 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -44,6 +45,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
resetCount *int
requestCount *int
}

func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
*m.resetCount++
return m.watchCh
}

func (m mockWatcher) RequestProgress(ctx context.Context) error {
*m.requestCount++
return nil
}

func (s *clientSuite) TestRetry(c *check.C) {
defer testleak.AfterTest(c)()
originValue := maxTries
Expand Down Expand Up @@ -90,3 +108,112 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(ttlResp.TTL, check.Equals, int64(-1))
}

// test no data lost when WatchCh blocked
func (s *etcdSuite) TestWatchChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
{CompactRevision: 4},
{CompactRevision: 5},
{CompactRevision: 6},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

key := "testWatchChBlocked"
outCh := make(chan clientv3.WatchResponse, 6)
revision := int64(1)
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
// make sure watchCh has been reset since timeout
c.Assert(*watcher.resetCount > 1, check.IsTrue)
// make sure RequestProgress has been call since timeout
c.Assert(*watcher.requestCount > 1, check.IsTrue)
// make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration
c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration)
}

// test no data lost when OutCh blocked
func (s *etcdSuite) TestOutChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

key := "testOutChBlocked"
outCh := make(chan clientv3.WatchResponse, 1)
revision := int64(1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
}
51 changes: 29 additions & 22 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ import (

const (
etcdRequestProgressDuration = 2 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks
// it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log
etcdWorkerLogsWarnDuration = 1 * time.Second
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
)

// EtcdWorker handles all interactions with Etcd
Expand Down Expand Up @@ -64,7 +70,8 @@ type EtcdWorker struct {
// a `compare-and-swap` semantics, which is essential for implementing
// snapshot isolation for Reactor ticks.
deleteCounter int64
metrics *etcdWorkerMetrics

metrics *etcdWorkerMetrics
}

type etcdWorkerMetrics struct {
Expand Down Expand Up @@ -121,13 +128,13 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return errors.Trace(err)
}

ctx1, cancel := context.WithCancel(ctx)
defer cancel()

ticker := time.NewTicker(timerInterval)
defer ticker.Stop()

watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
exiting bool
Expand All @@ -139,7 +146,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
// should never be closed
sessionDone = make(chan struct{})
}
lastReceivedEventTime := time.Now()

// tickRate represents the number of times EtcdWorker can tick
// the reactor per second
Expand All @@ -153,17 +159,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return cerrors.ErrEtcdSessionDone.GenWithStackByArgs()
case <-ticker.C:
// There is no new event to handle on timer ticks, so we have nothing here.
if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration {
if err := worker.client.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
}
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
case response := <-watchCh:
// In this select case, we receive new events from Etcd, and call handleEvent if appropriate.
if err := response.Err(); err != nil {
return errors.Trace(err)
}
lastReceivedEventTime = time.Now()
// Check whether the response is stale.
if worker.revision >= response.Header.GetRevision() {
continue
Expand Down Expand Up @@ -216,11 +216,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
startTime := time.Now()
// it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
log.Warn("etcdWorker ticks reactor cost time more than 1 second")
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime))
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime)
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds())
if err != nil {
if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
return errors.Trace(err)
Expand Down Expand Up @@ -323,6 +323,10 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]
}

func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error {
if len(changedState) == 0 {
return nil
}

cmps := make([]clientv3.Cmp, 0, len(changedState))
ops := make([]clientv3.Op, 0, len(changedState))
hasDelete := false
Expand Down Expand Up @@ -362,12 +366,15 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m

worker.metrics.metricEtcdTxnSize.Observe(float64(size))
startTime := time.Now()
resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
log.Warn("etcdWorker commit etcd txn cost time more than 1 second")

txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit()
cancel()
costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime))
}
worker.metrics.metricEtcdTxnDuration.Observe(costTime)
worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds())
if err != nil {
return errors.Trace(err)
}
Expand Down