diff --git a/client/v3/watch.go b/client/v3/watch.go index b73925ba128a..da0452774bca 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -24,7 +24,6 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -37,6 +36,13 @@ const ( EventTypePut = mvccpb.PUT closeSendErrTimeout = 250 * time.Millisecond + + // AutoWatchID is the watcher ID passed in WatchStream.Watch when no + // user-provided ID is available. If pass, an ID will automatically be assigned. + AutoWatchID = 0 + + // InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch. + InvalidWatchID = -1 ) type Event mvccpb.Event @@ -450,7 +456,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) - if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { + if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") { w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) @@ -481,7 +487,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } else if ws.outc != nil { close(ws.outc) } - if ws.id != -1 { + if ws.id != InvalidWatchID { delete(w.substreams, ws.id) return } @@ -543,7 +549,7 @@ func (w *watchGrpcStream) run() { // TODO: pass custom watch ID? ws := &watcherStream{ initReq: *wreq, - id: -1, + id: InvalidWatchID, outc: outc, // unbuffered so resumes won't cause repeat events recvc: make(chan *WatchResponse), @@ -669,7 +675,7 @@ func (w *watchGrpcStream) run() { if len(w.substreams)+len(w.resuming) == 0 { return } - if ws.id != -1 { + if ws.id != InvalidWatchID { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives cancelSet[ws.id] = struct{}{} @@ -716,9 +722,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { cancelReason: pbresp.CancelReason, } - // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to + // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to // indicate they should be broadcast. - if wr.IsProgressNotify() && pbresp.WatchId == -1 { + if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID { return w.broadcastResponse(wr) } @@ -873,7 +879,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { w.resumec = make(chan struct{}) w.joinSubstreams() for _, ws := range w.substreams { - ws.id = -1 + ws.id = InvalidWatchID w.resuming = append(w.resuming, ws) } // strip out nils, if any diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 1a3cff539f6c..00620fb69676 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -24,6 +24,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/verify" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/apply" @@ -270,7 +271,7 @@ func (sws *serverWatchStream) recvLoop() error { if !sws.isWatchPermitted(creq) { wr := &pb.WatchResponse{ Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: creq.WatchId, + WatchId: int64(mvcc.InvalidWatchID), Canceled: true, Created: true, CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(), @@ -304,7 +305,10 @@ func (sws *serverWatchStream) recvLoop() error { sws.fragment[id] = true } sws.mu.Unlock() + } else { + id = mvcc.InvalidWatchID } + wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wsrev), WatchId: int64(id), @@ -341,7 +345,7 @@ func (sws *serverWatchStream) recvLoop() error { if uv.ProgressRequest != nil { sws.ctrlStream <- &pb.WatchResponse{ Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels + WatchId: int64(mvcc.InvalidWatchID), // response is not associated with any WatchId and will be broadcast to all watch channels } } default: @@ -465,7 +469,10 @@ func (sws *serverWatchStream) sendLoop() { // track id creation wid := mvcc.WatchID(c.WatchId) - if c.Canceled { + + verify.Assert((c.Canceled && c.Created) && wid != mvcc.InvalidWatchID, "unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, mvcc.InvalidWatchID) + + if c.Canceled && wid != mvcc.InvalidWatchID { delete(ids, wid) continue } diff --git a/server/proxy/grpcproxy/watch.go b/server/proxy/grpcproxy/watch.go index 3ec38d600c3c..43cdba27d07e 100644 --- a/server/proxy/grpcproxy/watch.go +++ b/server/proxy/grpcproxy/watch.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -238,7 +239,7 @@ func (wps *watchProxyStream) recvLoop() error { if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil { wps.watchCh <- &pb.WatchResponse{ Header: &pb.ResponseHeader{}, - WatchId: -1, + WatchId: int64(mvcc.InvalidWatchID), Created: true, Canceled: true, CancelReason: err.Error(), @@ -258,7 +259,7 @@ func (wps *watchProxyStream) recvLoop() error { filters: v3rpc.FiltersFromRequest(cr), } if !w.wr.valid() { - w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true}) + w.post(&pb.WatchResponse{WatchId: int64(mvcc.InvalidWatchID), Created: true, Canceled: true}) wps.mu.Unlock() continue } diff --git a/server/storage/mvcc/watcher.go b/server/storage/mvcc/watcher.go index f48a9ef3b33f..fc415c402665 100644 --- a/server/storage/mvcc/watcher.go +++ b/server/storage/mvcc/watcher.go @@ -22,9 +22,14 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" ) -// AutoWatchID is the watcher ID passed in WatchStream.Watch when no -// user-provided ID is available. If pass, an ID will automatically be assigned. -const AutoWatchID WatchID = 0 +const ( + // InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch. + InvalidWatchID WatchID = -1 + + // AutoWatchID is the watcher ID passed in WatchStream.Watch when no + // user-provided ID is available. If pass, an ID will automatically be assigned. + AutoWatchID WatchID = 0 +) var ( ErrWatcherNotExist = errors.New("mvcc: watcher does not exist") diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 45b62f7acbbc..84c1ff056290 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -27,6 +27,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -390,8 +391,8 @@ func TestV3WatchWrongRange(t *testing.T) { if cresp.Canceled != tt.canceled { t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled) } - if tt.canceled && cresp.WatchId != -1 { - t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId) + if tt.canceled && cresp.WatchId != int64(mvcc.InvalidWatchID) { + t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, mvcc.InvalidWatchID) } } }