Skip to content

Commit

Permalink
mvcc: avoid negative watcher count metrics (#11882)
Browse files Browse the repository at this point in the history
The watch count metrics are not robust to duplicate cancellations. These
cause the count to be decremented twice, leading eventually to negative
counts. We are seeing this in production. The duplicate cancellations
themselves are not themselves a big problem (except performance), but
they are caused by the new proactive cancellation logic (#11850) which
cancels proactively even immediately before initiating a Close, thus
nearly guaranteeing a Close-cancel race, as discussed in
watchable_store.go. We can avoid this in most cases by not sending a
cancellation when we are going to Close.
  • Loading branch information
jackkleeman authored May 12, 2020
1 parent ab49495 commit ec13797
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
12 changes: 6 additions & 6 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,12 @@ func (w *watchGrpcStream) run() {
return

case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown, skip cancellation
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
Expand All @@ -665,12 +671,6 @@ func (w *watchGrpcStream) run() {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
}
}
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
return
}
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,3 +1245,32 @@ func TestV3WatchCancellation(t *testing.T) {
t.Fatalf("expected one watch, got %s", minWatches)
}
}

// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far.
func TestV3WatchCloseCancelRace(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

cli := clus.RandClient()

for i := 0; i < 1000; i++ {
ctx, cancel := context.WithCancel(ctx)
cli.Watch(ctx, "/foo")
cancel()
}

// Wait a little for cancellations to take hold
time.Sleep(3 * time.Second)

minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
t.Fatal(err)
}

if minWatches != "0" {
t.Fatalf("expected zero watches, got %s", minWatches)
}
}
5 changes: 4 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
break
} else if wa.compacted {
watcherGauge.Dec()
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
Expand All @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
if victimBatch != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(victimBatch, wa)
break
}
Expand All @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
time.Sleep(time.Millisecond)
}

watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}
Expand Down

0 comments on commit ec13797

Please sign in to comment.