Skip to content

Commit

Permalink
*: avoid closing a watch with ID 0 incorrectly
Browse files Browse the repository at this point in the history
Signed-off-by: Kafuu Chino <KafuuChinoQ@gmail.com>
  • Loading branch information
kafuu-chino committed Sep 8, 2022
1 parent 1d95b82 commit 8d860e2
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
22 changes: 14 additions & 8 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -37,6 +36,13 @@ const (
EventTypePut = mvccpb.PUT

closeSendErrTimeout = 250 * time.Millisecond

// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
AutoWatchID = 0

// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
InvalidWatchID = -1
)

type Event mvccpb.Event
Expand Down Expand Up @@ -450,7 +456,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {

func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
Expand Down Expand Up @@ -481,7 +487,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
} else if ws.outc != nil {
close(ws.outc)
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
delete(w.substreams, ws.id)
return
}
Expand Down Expand Up @@ -543,7 +549,7 @@ func (w *watchGrpcStream) run() {
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
id: InvalidWatchID,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
Expand Down Expand Up @@ -669,7 +675,7 @@ func (w *watchGrpcStream) run() {
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
cancelSet[ws.id] = struct{}{}
Expand Down Expand Up @@ -716,9 +722,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
cancelReason: pbresp.CancelReason,
}

// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
// indicate they should be broadcast.
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
return w.broadcastResponse(wr)
}

Expand Down Expand Up @@ -873,7 +879,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
ws.id = InvalidWatchID
w.resuming = append(w.resuming, ws)
}
// strip out nils, if any
Expand Down
13 changes: 10 additions & 3 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/verify"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/apply"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (sws *serverWatchStream) recvLoop() error {
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId,
WatchId: int64(mvcc.InvalidWatchID),
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
Expand Down Expand Up @@ -304,7 +305,10 @@ func (sws *serverWatchStream) recvLoop() error {
sws.fragment[id] = true
}
sws.mu.Unlock()
} else {
id = mvcc.InvalidWatchID
}

wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Expand Down Expand Up @@ -341,7 +345,7 @@ func (sws *serverWatchStream) recvLoop() error {
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
WatchId: int64(mvcc.InvalidWatchID), // response is not associated with any WatchId and will be broadcast to all watch channels
}
}
default:
Expand Down Expand Up @@ -465,7 +469,10 @@ func (sws *serverWatchStream) sendLoop() {

// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {

verify.Assert(!(c.Canceled && c.Created) || wid == mvcc.InvalidWatchID, "unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, mvcc.InvalidWatchID)

if c.Canceled && wid != mvcc.InvalidWatchID {
delete(ids, wid)
continue
}
Expand Down
5 changes: 3 additions & 2 deletions server/proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage/mvcc"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (wps *watchProxyStream) recvLoop() error {
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
wps.watchCh <- &pb.WatchResponse{
Header: &pb.ResponseHeader{},
WatchId: -1,
WatchId: int64(mvcc.InvalidWatchID),
Created: true,
Canceled: true,
CancelReason: err.Error(),
Expand All @@ -258,7 +259,7 @@ func (wps *watchProxyStream) recvLoop() error {
filters: v3rpc.FiltersFromRequest(cr),
}
if !w.wr.valid() {
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
w.post(&pb.WatchResponse{WatchId: int64(mvcc.InvalidWatchID), Created: true, Canceled: true})
wps.mu.Unlock()
continue
}
Expand Down
11 changes: 8 additions & 3 deletions server/storage/mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
)

// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0
const (
// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
InvalidWatchID WatchID = -1

// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
AutoWatchID WatchID = 0
)

var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/framework/integration"
)

Expand Down Expand Up @@ -390,8 +391,8 @@ func TestV3WatchWrongRange(t *testing.T) {
if cresp.Canceled != tt.canceled {
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
}
if tt.canceled && cresp.WatchId != -1 {
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
if tt.canceled && cresp.WatchId != int64(mvcc.InvalidWatchID) {
t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, mvcc.InvalidWatchID)
}
}
}
Expand Down

0 comments on commit 8d860e2

Please sign in to comment.