diff --git a/clientv3/watch.go b/clientv3/watch.go index 9b0cc036af3..b70d8c3bf86 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -511,6 +511,12 @@ func (w *watchGrpcStream) run() { case <-w.ctx.Done(): return case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + if len(w.substreams)+len(w.resuming) == 0 { + // no more watchers on this stream, shutdown, skip cancellation + return + } if ws.id != -1 { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives @@ -523,12 +529,6 @@ func (w *watchGrpcStream) run() { req := &pb.WatchRequest{RequestUnion: cr} wc.Send(req) } - w.closeSubstream(ws) - delete(closing, ws) - if len(w.substreams)+len(w.resuming) == 0 { - // no more watchers on this stream, shutdown - return - } } } } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 2aca66605b1..042057d9a71 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1233,3 +1233,32 @@ func TestV3WatchCancellation(t *testing.T) { t.Fatalf("expected one watch, got %s", minWatches) } } + +// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far. +func TestV3WatchCloseCancelRace(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cli := clus.RandClient() + + for i := 0; i < 1000; i++ { + ctx, cancel := context.WithCancel(ctx) + cli.Watch(ctx, "/foo") + cancel() + } + + // Wait a little for cancellations to take hold + time.Sleep(3 * time.Second) + + minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + + if minWatches != "0" { + t.Fatalf("expected zero watches, got %s", minWatches) + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 78df19326b9..9ac418205c0 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -146,10 +146,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -169,6 +172,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -178,7 +182,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }