From ec13797407d8cadb6b28cc34de84e3c73a06e9ac Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Wed, 13 May 2020 00:50:53 +0100 Subject: [PATCH] mvcc: avoid negative watcher count metrics (#11882) The watch count metrics are not robust to duplicate cancellations. These cause the count to be decremented twice, leading eventually to negative counts. We are seeing this in production. The duplicate cancellations themselves are not themselves a big problem (except performance), but they are caused by the new proactive cancellation logic (#11850) which cancels proactively even immediately before initiating a Close, thus nearly guaranteeing a Close-cancel race, as discussed in watchable_store.go. We can avoid this in most cases by not sending a cancellation when we are going to Close. --- clientv3/watch.go | 12 ++++++------ integration/v3_watch_test.go | 29 +++++++++++++++++++++++++++++ mvcc/watchable_store.go | 5 ++++- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 36bf1a71820..3612d35f165 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -650,6 +650,12 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + // no more watchers on this stream, shutdown, skip cancellation + if len(w.substreams)+len(w.resuming) == 0 { + return + } if ws.id != -1 { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives @@ -665,12 +671,6 @@ func (w *watchGrpcStream) run() { lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) } } - w.closeSubstream(ws) - delete(closing, ws) - // no more watchers on this stream, shutdown - if len(w.substreams)+len(w.resuming) == 0 { - return - } } } } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index de1ee4f295c..4ae371973c3 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1245,3 +1245,32 @@ func TestV3WatchCancellation(t *testing.T) { t.Fatalf("expected one watch, got %s", minWatches) } } + +// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far. +func TestV3WatchCloseCancelRace(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cli := clus.RandClient() + + for i := 0; i < 1000; i++ { + ctx, cancel := context.WithCancel(ctx) + cli.Watch(ctx, "/foo") + cancel() + } + + // Wait a little for cancellations to take hold + time.Sleep(3 * time.Second) + + minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + + if minWatches != "0" { + t.Fatalf("expected zero watches, got %s", minWatches) + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 15e2c55f5c2..b74fa375f95 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }