diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 22d46e5048d..ee47c2c6d72 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -221,6 +221,7 @@ func (s *watchableStore) syncWatchersLoop() { waitDuration := 100 * time.Millisecond delayTicker := time.NewTicker(waitDuration) defer delayTicker.Stop() + var evs []mvccpb.Event for { s.mu.RLock() @@ -230,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() { unsyncedWatchers := 0 if lastUnsyncedWatchers > 0 { - unsyncedWatchers = s.syncWatchers() + unsyncedWatchers, evs = s.syncWatchers(evs) } syncDuration := time.Since(st) @@ -339,12 +340,12 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() int { +func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) { s.mu.Lock() defer s.mu.Unlock() if s.unsynced.size() == 0 { - return 0 + return 0, []mvccpb.Event{} } s.store.revMu.RLock() @@ -357,20 +358,7 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - minBytes, maxBytes := NewRevBytes(), NewRevBytes() - minBytes = RevToBytes(Revision{Main: minRev}, minBytes) - maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes) - - // UnsafeRange returns keys and values. And in boltdb, keys are revisions. - // values are actual key-value pairs in backend. - tx := s.store.b.ReadTx() - tx.RLock() - revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) - evs := kvsToEvents(s.store.lg, wg, revs, vs) - // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. - // We can only unlock after Unmarshal, which will do deep copy. - // Otherwise we will trigger SIGSEGV during boltdb re-mmap. - tx.RUnlock() + evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1) victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -419,21 +407,68 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) - return s.unsynced.size() + return s.unsynced.size(), evs +} + +// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events. +func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event { + if len(evs) == 0 { + return rangeEvents(lg, b, minRev, maxRev) + } + // append from left + if evs[0].Kv.ModRevision > minRev { + evs = append(rangeEvents(lg, b, minRev, evs[0].Kv.ModRevision), evs...) + } + // cut from left + prefixIndex := 0 + for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev { + prefixIndex++ + } + evs = evs[prefixIndex:] + + if len(evs) == 0 { + return rangeEvents(lg, b, minRev, maxRev) + } + // append from right + if evs[len(evs)-1].Kv.ModRevision+1 < maxRev { + evs = append(evs, rangeEvents(lg, b, evs[len(evs)-1].Kv.ModRevision+1, maxRev)...) + } + // cut from right + suffixIndex := len(evs) - 1 + for suffixIndex >= 0 && evs[suffixIndex].Kv.ModRevision >= maxRev { + suffixIndex-- + } + evs = evs[:suffixIndex+1] + return evs +} + +// rangeEvents returns events in range [minRev, maxRev). +func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event { + minBytes, maxBytes := NewRevBytes(), NewRevBytes() + minBytes = RevToBytes(Revision{Main: minRev}, minBytes) + maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes) + + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := b.ReadTx() + tx.RLock() + revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) + evs := kvsToEvents(lg, revs, vs) + // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. + // We can only unlock after Unmarshal, which will do deep copy. + // Otherwise we will trigger SIGSEGV during boltdb re-mmap. + tx.RUnlock() + return evs } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } - if !wg.contains(string(kv.Key)) { - continue - } - ty := mvccpb.PUT if isTombstone(revs[i]) { ty = mvccpb.DELETE diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index c15bd12326f..a418c6c78fe 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) { assert.Empty(t, s.synced.watcherSetByKey(string(testKey))) assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN) - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN) assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey))) @@ -164,6 +164,110 @@ func TestSyncWatchers(t *testing.T) { } } +func TestRangeEvents(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + lg := zaptest.NewLogger(t) + s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + + defer cleanup(s, b) + + foo1 := []byte("foo1") + foo2 := []byte("foo2") + foo3 := []byte("foo3") + value := []byte("bar") + s.Put(foo1, value, lease.NoLease) + s.Put(foo2, value, lease.NoLease) + s.Put(foo3, value, lease.NoLease) + s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events + + expectEvents := []mvccpb.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo1, + CreateRevision: 2, + ModRevision: 2, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo2, + CreateRevision: 3, + ModRevision: 3, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo3, + CreateRevision: 4, + ModRevision: 4, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Key: foo1, + ModRevision: 5, + }, + }, + { + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Key: foo2, + ModRevision: 5, + }, + }, + } + + tcs := []struct { + minRev int64 + maxRev int64 + expectEvents []mvccpb.Event + }{ + // maxRev, top to bottom + {minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]}, + {minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]}, + {minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]}, + {minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]}, + {minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]}, + + // minRev, bottom to top + {minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]}, + {minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]}, + {minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]}, + {minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]}, + {minRev: 6, maxRev: 6, expectEvents: expectEvents[0:0]}, + + // Moving window algorithm, first increase maxRev, then increase minRev, repeat. + {minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]}, + {minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]}, + {minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]}, + {minRev: 3, maxRev: 4, expectEvents: expectEvents[1:2]}, + {minRev: 3, maxRev: 5, expectEvents: expectEvents[1:3]}, + {minRev: 4, maxRev: 5, expectEvents: expectEvents[2:3]}, + {minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]}, + {minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]}, + {minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]}, + } + // reuse the evs to test rangeEventsWithReuse + var evs []mvccpb.Event + for i, tc := range tcs { + t.Run(fmt.Sprintf("%d rangeEvents(%d, %d)", i, tc.minRev, tc.maxRev), func(t *testing.T) { + assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev)) + evs = rangeEventsWithReuse(lg, b, evs, tc.minRev, tc.maxRev) + assert.ElementsMatch(t, tc.expectEvents, evs) + }) + } +} + // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) @@ -236,7 +340,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { require.NoError(t, err) } // fill up w.Chan() with 1 buf via 2 compacted watch response - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) for len(watchers) > 0 { resp := <-w.Chan() diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index 0c1fa521267..e774c70cfac 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -320,7 +320,7 @@ func TestWatcherRequestProgress(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgress(id) wrs := WatchResponse{WatchID: id, Revision: 2} @@ -359,7 +359,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgressAll() wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}