diff --git a/clientv3/watch.go b/clientv3/watch.go index e2f11ad263c6..cc44a7c8d065 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -649,6 +649,20 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: + if ws.id != -1 { + // client is closing an established watch; close it on the server proactively instead of waiting + // to close when the next message arrives + cancelSet[ws.id] = struct{}{} + cr := &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{ + WatchId: ws.id, + }, + } + req := &pb.WatchRequest{RequestUnion: cr} + if err := wc.Send(req); err != nil { + lg.Warningf("error when sending request: %v", err) + } + } w.closeSubstream(ws) delete(closing, ws) // no more watchers on this stream, shutdown diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 188b5883a471..54043e173408 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1213,3 +1213,34 @@ func TestV3WatchWithPrevKV(t *testing.T) { } } } + +// TestV3WatchCancellation ensures that watch cancellation frees up server resources. +func TestV3WatchCancellation(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cli := clus.RandClient() + + cli.Watch(ctx, "/foo") + + for i := 0; i < 1000; i++ { + ctx, cancel := context.WithCancel(ctx) + cli.Watch(ctx, "/foo") + cancel() + } + + // Wait a little for cancellations to take hold + time.Sleep(3 * time.Second) + + minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + + if minWatches != "1" { + t.Fatalf("expected one watch, got %s", minWatches) + } +}