Skip to content

Commit

Permalink
etcd_worker: fix missed watch events caused by progress notifications (
Browse files Browse the repository at this point in the history
…pingcap#3848)

(cherry picked from commit 06547d9)
  • Loading branch information
liuzix authored and asddongmen committed Dec 20, 2021
1 parent 16d6a1b commit f38f2cf
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil {
if response.Err() == nil && !response.IsProgressNotify() {
lastRevision = response.Header.Revision
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator/util"
Expand Down Expand Up @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R

func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
defer testleak.AfterTest(c)()

_ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit")
}()

totalAccountNumber := 25
workerNumber := 10
var wg sync.WaitGroup
Expand Down

0 comments on commit f38f2cf

Please sign in to comment.