Skip to content

Commit

Permalink
mvcc: avoid negative watcher count metrics
Browse files Browse the repository at this point in the history
The watch count metrics are not robust to duplicate cancellations. These
cause the count to be decremented twice, leading eventually to negative
counts. We are seeing this in production. The duplicate cancellations
themselves are not themselves a big problem (except performance), but
they are caused by the new proactive cancellation logic (etcd-io#11850) which
cancels proactively even immediately before initiating a Close, thus
nearly guaranteeing a Close-cancel race, as discussed in
watchable_store.go. We can avoid this in most cases by not sending a
cancellation when we are going to Close.
  • Loading branch information
jackkleeman committed May 11, 2020
1 parent 3a4258f commit 89161b2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
12 changes: 6 additions & 6 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ func (w *watchGrpcStream) run() {
case <-w.ctx.Done():
return
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown, skip cancellation
return
}
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
Expand All @@ -523,12 +529,6 @@ func (w *watchGrpcStream) run() {
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
return
}
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,3 +1233,32 @@ func TestV3WatchCancellation(t *testing.T) {
t.Fatalf("expected one watch, got %s", minWatches)
}
}

// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far.
func TestV3WatchCloseCancelRace(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

cli := clus.RandClient()

for i := 0; i < 1000; i++ {
ctx, cancel := context.WithCancel(ctx)
cli.Watch(ctx, "/foo")
cancel()
}

// Wait a little for cancellations to take hold
time.Sleep(3 * time.Second)

minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
t.Fatal(err)
}

if minWatches != "0" {
t.Fatalf("expected zero watches, got %s", minWatches)
}
}
5 changes: 4 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
break
} else if wa.compacted {
watcherGauge.Dec()
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
Expand All @@ -169,6 +172,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
if victimBatch != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(victimBatch, wa)
break
}
Expand All @@ -178,7 +182,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
time.Sleep(time.Millisecond)
}

watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}
Expand Down

0 comments on commit 89161b2

Please sign in to comment.