diff --git a/pkg/integration_test/leader_watch_test.go b/pkg/integration_test/leader_watch_test.go index 350a4f12477..9c88c814de9 100644 --- a/pkg/integration_test/leader_watch_test.go +++ b/pkg/integration_test/leader_watch_test.go @@ -25,7 +25,7 @@ import ( func (s *integrationTestSuite) TestWatcher(c *C) { c.Parallel() - cluster, err := newTestCluster(1) + cluster, err := newTestCluster(1, func(conf *server.Config) { conf.AutoCompactionRetention = "1s" }) c.Assert(err, IsNil) defer cluster.Destroy() diff --git a/server/leader.go b/server/leader.go index 2f7e2b974e2..132c3f158b0 100644 --- a/server/leader.go +++ b/server/leader.go @@ -299,14 +299,17 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + // The revision is the revision of last modification on this key. + // If the revision is compacted, will meet required revision has been compacted error. + // In this case, use the compact revision to re-watch the key. for { // gofail: var delayWatcher struct{} rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision)) for wresp := range rch { - // meet compacted error, use current revision. + // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { - log.Warnf("required revision %d has been compacted, use current revision", revision) - revision = 0 + log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) + revision = wresp.CompactRevision break } if wresp.Canceled {