Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: allow user-provided watch ID to mvcc #9065

Merged
merged 7 commits into from
Jan 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| progress_notify | progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events. It is useful when clients wish to recover a disconnected watcher starting from a recent known revision. The etcd server may decide how often it will send notifications based on current load. | bool |
| filters | filters filter the events at server side before it sends back to the watcher. | (slice of) FilterType |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
| watch_id | If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned. | int64 |



Expand Down
5 changes: 5 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,11 @@
"description": "start_revision is an optional revision to watch from (inclusive). No start_revision is \"now\".",
"type": "string",
"format": "int64"
},
"watch_id": {
"description": "If watch_id is provided and non-zero, it will be assigned to this watcher.\nSince creating a watcher in etcd is not a synchronous operation,\nthis can be used ensure that ordering is correct when creating multiple\nwatchers on the same stream. Creating a watcher with an ID already in\nuse on the stream will cause an error to be returned.",
"type": "string",
"format": "int64"
}
}
},
Expand Down
5 changes: 4 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
}

func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
if resp.WatchId == -1 {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
// failed; no channel
close(ws.recvc)
return
Expand Down Expand Up @@ -453,6 +454,7 @@ func (w *watchGrpcStream) run() {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
Expand Down Expand Up @@ -553,6 +555,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
// TODO: return watch ID?
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
Expand Down
11 changes: 7 additions & 4 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (sws *serverWatchStream) recvLoop() error {
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1,
WatchId: creq.WatchId,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
Expand All @@ -225,8 +225,8 @@ func (sws *serverWatchStream) recvLoop() error {
if rev == 0 {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 {
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if err == nil {
sws.mu.Lock()
if creq.ProgressNotify {
sws.progress[id] = true
Expand All @@ -240,7 +240,10 @@ func (sws *serverWatchStream) recvLoop() error {
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: id == -1,
Canceled: err != nil,
}
if err != nil {
wr.CancelReason = err.Error()
}
select {
case sws.ctrlStream <- wr:
Expand Down
338 changes: 189 additions & 149 deletions etcdserver/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions etcdserver/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ message WatchCreateRequest {
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;

// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
}

message WatchCancelRequest {
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestWatchableKVWatch(t *testing.T) {
w := s.NewWatchStream()
defer w.Close()

wid := w.Watch([]byte("foo"), []byte("fop"), 0)
wid, _ := w.Watch(0, []byte("foo"), []byte("fop"), 0)

wev := []mvccpb.Event{
{Type: mvccpb.PUT,
Expand Down Expand Up @@ -783,7 +783,7 @@ func TestWatchableKVWatch(t *testing.T) {
}

w = s.NewWatchStream()
wid = w.Watch([]byte("foo1"), []byte("foo2"), 3)
wid, _ = w.Watch(0, []byte("foo1"), []byte("foo2"), 3)

select {
case resp := <-w.Chan():
Expand Down
6 changes: 3 additions & 3 deletions mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
watchIDs := make([]WatchID, b.N)
for i := range watchIDs {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(k, nil, 1)
watchIDs[i], _ = w.Watch(0, k, nil, 1)
}

b.ResetTimer()
Expand Down Expand Up @@ -142,7 +142,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// non-0 value to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
}

// random-cancel N watchers to make it not biased towards
Expand Down Expand Up @@ -182,7 +182,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// 0 for startRev to keep watchers in synced
watchIDs[i] = w.Watch(testKey, nil, 0)
watchIDs[i], _ = w.Watch(0, testKey, nil, 0)
}

// randomly cancel watchers to make it not biased towards
Expand Down
20 changes: 10 additions & 10 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestWatch(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
w.Watch(testKey, nil, 0)
w.Watch(0, testKey, nil, 0)

if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
Expand All @@ -63,7 +63,7 @@ func TestNewWatcherCancel(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, 0)
wt, _ := w.Watch(0, testKey, nil, 0)

if err := w.Cancel(wt); err != nil {
t.Error(err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestCancelUnsynced(t *testing.T) {
watchIDs := make([]WatchID, watcherN)
for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
watchIDs[i] = w.Watch(testKey, nil, 1)
watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
}

for _, idx := range watchIDs {
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSyncWatchers(t *testing.T) {

for i := 0; i < watcherN; i++ {
// specify rev as 1 to keep watchers in unsynced
w.Watch(testKey, nil, 1)
w.Watch(0, testKey, nil, 1)
}

// Before running s.syncWatchers() synced should be empty because we manually
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestWatchCompacted(t *testing.T) {
}

w := s.NewWatchStream()
wt := w.Watch(testKey, nil, compactRev-1)
wt, _ := w.Watch(0, testKey, nil, compactRev-1)

select {
case resp := <-w.Chan():
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestWatchFutureRev(t *testing.T) {

w := s.NewWatchStream()
wrev := int64(10)
w.Watch(testKey, nil, wrev)
w.Watch(0, testKey, nil, wrev)

for i := 0; i < 10; i++ {
rev := s.Put(testKey, testValue, lease.NoLease)
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) {
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)
w.Watch(0, testKey, nil, rev-1)

newStore.Restore(b)
select {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestWatchBatchUnsynced(t *testing.T) {
}

w := s.NewWatchStream()
w.Watch(v, nil, 1)
w.Watch(0, v, nil, 1)
for i := 0; i < batches; i++ {
if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestWatchVictims(t *testing.T) {
for i := 0; i < numWatches; i++ {
go func() {
w := s.NewWatchStream()
w.Watch(testKey, nil, 1)
w.Watch(0, testKey, nil, 1)
defer func() {
w.Close()
wg.Done()
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestStressWatchCancelClose(t *testing.T) {
w := s.NewWatchStream()
ids := make([]WatchID, 10)
for i := range ids {
ids[i] = w.Watch(testKey, nil, 0)
ids[i], _ = w.Watch(0, testKey, nil, 0)
}
<-readyc
wg.Add(1 + len(ids)/2)
Expand Down
37 changes: 25 additions & 12 deletions mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
)

// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0

var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
)

type WatchID int64
Expand All @@ -36,12 +42,13 @@ type WatchStream interface {
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If `startRev` <=0, watch observes events after currentRev.
// If "startRev" <=0, watch observes events after currentRev.
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// The returned "id" is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
// an auto-generated watch ID is returned.
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)

// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
Expand Down Expand Up @@ -98,28 +105,34 @@ type watchStream struct {
}

// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1
return -1, ErrEmptyWatcherRange
}

ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1
return -1, ErrEmptyWatcherRange
}

id := ws.nextID
ws.nextID++
if id == AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}

w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

ws.cancels[id] = c
ws.watchers[id] = w
return id
return id, nil
}

func (ws *watchStream) Chan() <-chan WatchResponse {
Expand Down
2 changes: 1 addition & 1 deletion mvcc/watcher_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
w.Watch([]byte(fmt.Sprint("foo", i)), nil, 0)
w.Watch(0, []byte(fmt.Sprint("foo", i)), nil, 0)
}
}
Loading