diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index cdb406de99b..6d2837b1a53 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -459,7 +459,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { pendingEventsGauge.Add(float64(len(eb.evs))) } else { // move slow watcher to victims - w.minRev = rev + 1 if victim == nil { victim = make(watcherBatch) } @@ -468,6 +467,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { s.synced.delete(w) slowWatcherGauge.Inc() } + // always update minRev + // in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced + // in case 'send' returns false, this is needed for syncWatchers + w.minRev = rev + 1 } s.addVictim(victim) } diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index b693b2ebdcd..1bfd7b81955 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -307,32 +307,22 @@ func TestWatchRestore(t *testing.T) { testKey := []byte("foo") testValue := []byte("bar") - rev := s.Put(testKey, testValue, lease.NoLease) - - newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{}) - defer cleanup(newStore, newBackend, newPath) - - w := newStore.NewWatchStream() - w.Watch(0, testKey, nil, rev-1) + w := s.NewWatchStream() + defer w.Close() + w.Watch(0, testKey, nil, 1) time.Sleep(delay) + wantRev := s.Put(testKey, testValue, lease.NoLease) - newStore.Restore(b) - select { - case resp := <-w.Chan(): - if resp.Revision != rev { - t.Fatalf("rev = %d, want %d", resp.Revision, rev) - } - if len(resp.Events) != 1 { - t.Fatalf("failed to get events from the response") - } - if resp.Events[0].Kv.ModRevision != rev { - t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) - } - case <-time.After(time.Second): - t.Fatal("failed to receive event in 1 second.") + s.Restore(b) + events := readEventsForSecond(w.Chan()) + if len(events) != 1 { + t.Errorf("Expected only one event, got %d", len(events)) + } + if events[0].Kv.ModRevision != wantRev { + t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev) } + } } @@ -340,6 +330,17 @@ func TestWatchRestore(t *testing.T) { t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } +func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { + for { + select { + case resp := <-ws: + events = append(events, resp.Events...) + case <-time.After(time.Second): + return events + } + } +} + // TestWatchRestoreSyncedWatcher tests such a case that: // 1. watcher is created with a future revision "math.MaxInt64 - 2" // 2. watcher with a future revision is added to "synced" watcher group