Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd_worker: add timeout for etcd txn and watchCh #3667

Merged
merged 36 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ccf71fc
etcd_worker: add timeout for etcd txn and watchCh
asddongmen Nov 30, 2021
4442c0a
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Nov 30, 2021
c4ed5bd
etcd_worker: fix typos
asddongmen Nov 30, 2021
7d14ac5
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Nov 30, 2021
8d98e75
etcd_worker: resolve comments
asddongmen Nov 30, 2021
257ed66
ectd_worker (ticdc): moves etcd watchCh related logic to etcd/client.go
asddongmen Dec 1, 2021
3b143a5
client (ticdc): fix leak test
asddongmen Dec 1, 2021
933c03c
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 1, 2021
e04709f
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 2, 2021
6573300
client_test: add unit test
asddongmen Dec 2, 2021
77149b4
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 2, 2021
b63d1f4
client_test: refine test code
asddongmen Dec 2, 2021
26dec10
client_test: refine test code
asddongmen Dec 3, 2021
b3bacc0
client: resolve comments
asddongmen Dec 3, 2021
124e37b
Apply suggestions from code review
asddongmen Dec 3, 2021
9ea48e2
client: resolve comments
asddongmen Dec 3, 2021
7e62aa2
client: fix error
asddongmen Dec 3, 2021
2395ddd
client: resolves comments
asddongmen Dec 3, 2021
1aa93ae
client: resolves comments
asddongmen Dec 3, 2021
5b034ba
client: add unit test
asddongmen Dec 5, 2021
97d41b9
client: improvement code struct
asddongmen Dec 5, 2021
f7cfffc
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 5, 2021
a5c0899
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 6, 2021
589b9ff
client: improvement code struct
asddongmen Dec 6, 2021
04dd8dc
client: resolve comments
asddongmen Dec 6, 2021
25593a9
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 6, 2021
e8c2a24
Merge branch 'master' into fix_pd_cause_owner_stuck
asddongmen Dec 6, 2021
fa2fbd8
client: fix error
asddongmen Dec 6, 2021
620d16c
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 6, 2021
cbfbd01
Merge branch 'master' into fix_pd_cause_owner_stuck
liuzix Dec 6, 2021
0c28104
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 6, 2021
6e41c16
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 7, 2021
ef85a3c
etcd_worker: fix lint error
asddongmen Dec 7, 2021
9c3fa2e
Merge remote-tracking branch 'origin/fix_pd_cause_owner_stuck' into f…
asddongmen Dec 7, 2021
cba9d75
client: resolve comment
asddongmen Dec 7, 2021
ec31e78
Merge branch 'master' into fix_pd_cause_owner_stuck
ti-chi-bot Dec 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/orchestrator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// 1.25 MiB
// Ref: https://etcd.io/docs/v3.3/dev-guide/limit/
etcdTxnMaxSize = 1024 * (1024 + 256)
etcdTxmMaxOps = 128
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
)

// getBatchChangedState has 4 return values:
Expand All @@ -45,7 +46,7 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat
if i == 0 && changedSize >= etcdTxnMaxSize {
return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs()
}
if totalSize+changedSize >= etcdTxnMaxSize {
if totalSize+changedSize >= etcdTxnMaxSize || len(batchChangedState)+len(changedState) >= etcdTxmMaxOps {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
break
}
for k, v := range changedState {
Expand Down
22 changes: 19 additions & 3 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (

const (
etcdRequestProgressDuration = 2 * time.Second
etcdTxnTimeoutDuration = 30 * time.Second
etcdWatchChTimeoutDuration = 10 * time.Second
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
deletionCounterKey = "/meta/ticdc-delete-etcd-key-count"
)

Expand Down Expand Up @@ -127,7 +129,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
ticker := time.NewTicker(timerInterval)
defer ticker.Stop()

watchTicker := time.NewTicker(etcdWatchChTimeoutDuration)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
defer watchTicker.Stop()
watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

var (
pendingPatches [][]DataPatch
exiting bool
Expand All @@ -151,6 +156,9 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return ctx.Err()
case <-sessionDone:
return cerrors.ErrEtcdSessionDone.GenWithStackByArgs()
case <-watchTicker.C:
log.Warn("watchCh blocking too long, reset watchCh")
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
watchCh = worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
case <-ticker.C:
// There is no new event to handle on timer ticks, so we have nothing here.
if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration {
Expand All @@ -159,6 +167,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
}
case response := <-watchCh:
watchTicker.Reset(etcdWatchChTimeoutDuration)
// In this select case, we receive new events from Etcd, and call handleEvent if appropriate.
if err := response.Err(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,7 +227,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
nextState, err := worker.reactor.Tick(ctx, worker.state)
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
log.Warn("etcdWorker ticks reactor cost time more than 1 second")
log.Warn("etcdWorker ticks reactor cost time more than 1 second", zap.Float64("cost time in seconds", costTime))
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}
worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime)
if err != nil {
Expand Down Expand Up @@ -323,6 +332,10 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]
}

func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error {
if len(changedState) == 0 {
return nil
}

cmps := make([]clientv3.Cmp, 0, len(changedState))
ops := make([]clientv3.Op, 0, len(changedState))
hasDelete := false
Expand Down Expand Up @@ -362,10 +375,13 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m

worker.metrics.metricEtcdTxnSize.Observe(float64(size))
startTime := time.Now()
resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()

ctx1, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should cancel the ctx1 immediately after the txn is committed. Refer to https://pkg.go.dev/context#WithTimeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

resp, err := worker.client.Txn(ctx1).If(cmps...).Then(ops...).Commit()
costTime := time.Since(startTime).Seconds()
if costTime > time.Second.Seconds()*1 {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("etcdWorker commit etcd txn cost time more than 1 second")
log.Warn("etcdWorker commit etcd txn cost time more than 1 second", zap.Float64("cost time in seconds", costTime))
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}
worker.metrics.metricEtcdTxnDuration.Observe(costTime)
if err != nil {
Expand Down