diff --git a/pkg/storage/entry_cache.go b/pkg/storage/entry_cache.go deleted file mode 100644 index 554ef859e7b8..000000000000 --- a/pkg/storage/entry_cache.go +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. See the AUTHORS file -// for names of contributors. - -package storage - -import ( - "github.com/biogo/store/llrb" - "go.etcd.io/etcd/raft/raftpb" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/cache" - "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -var ( - metaEntryCacheBytes = metric.Metadata{ - Name: "raft.entrycache.bytes", - Help: "Aggregate size of all Raft entries in the Raft entry cache", - Measurement: "Entry Bytes", - Unit: metric.Unit_BYTES, - } - metaEntryCacheSize = metric.Metadata{ - Name: "raft.entrycache.size", - Help: "Number of Raft entries in the Raft entry cache", - Measurement: "Entry Count", - Unit: metric.Unit_COUNT, - } - metaEntryCacheLookups = metric.Metadata{ - Name: "raft.entrycache.lookups", - Help: "Number of cache lookups in the Raft entry cache", - Measurement: "Lookups", - Unit: metric.Unit_COUNT, - } - metaEntryCacheHits = metric.Metadata{ - Name: "raft.entrycache.hits", - Help: "Number of successful cache lookups in the Raft entry cache", - Measurement: "Hits", - Unit: metric.Unit_COUNT, - } -) - -// RaftEntryCacheMetrics is the set of metrics for the raft entry cache. -type RaftEntryCacheMetrics struct { - Bytes *metric.Gauge - Size *metric.Gauge - Lookups *metric.Counter - Hits *metric.Counter -} - -func makeRaftEntryCacheMetrics() RaftEntryCacheMetrics { - return RaftEntryCacheMetrics{ - Bytes: metric.NewGauge(metaEntryCacheBytes), - Size: metric.NewGauge(metaEntryCacheSize), - Lookups: metric.NewCounter(metaEntryCacheLookups), - Hits: metric.NewCounter(metaEntryCacheHits), - } -} - -type entryCacheKey struct { - RangeID roachpb.RangeID - Index uint64 -} - -// Compare implements the llrb.Comparable interface for entryCacheKey, so that -// it can be used as a key for util.OrderedCache. -func (a *entryCacheKey) Compare(b llrb.Comparable) int { - bk := b.(*entryCacheKey) - switch { - case a.RangeID < bk.RangeID: - return -1 - case a.RangeID > bk.RangeID: - return 1 - case a.Index < bk.Index: - return -1 - case a.Index > bk.Index: - return 1 - default: - return 0 - } -} - -const defaultEntryCacheFreeListSize = 16 - -// entryCacheFreeList represents a free list of raftEntryCache cache.Entry -// objects. The free list is initially empty and is populated as entries are -// freed. -// -// A freelist is used here instead of a sync.Pool because all access to the -// freelist is protected by the raftEntryCache lock so it doesn't need its -// own locking. This allows us to avoid the added synchronization cost of -// a sync.Pool. -type entryCacheFreeList []*cache.Entry - -func makeEntryCacheFreeList(size int) entryCacheFreeList { - return make(entryCacheFreeList, 0, size) -} - -func (f *entryCacheFreeList) newEntry(key entryCacheKey, value raftpb.Entry) *cache.Entry { - i := len(*f) - 1 - if i < 0 { - alloc := struct { - key entryCacheKey - value raftpb.Entry - entry cache.Entry - }{ - key: key, - value: value, - } - alloc.entry.Key = &alloc.key - alloc.entry.Value = &alloc.value - return &alloc.entry - } - e := (*f)[i] - (*f)[i] = nil - (*f) = (*f)[:i] - - *e.Key.(*entryCacheKey) = key - *e.Value.(*raftpb.Entry) = value - return e -} - -func (f *entryCacheFreeList) freeEntry(e *cache.Entry) { - if len(*f) < cap(*f) { - *e.Key.(*entryCacheKey) = entryCacheKey{} - *e.Value.(*raftpb.Entry) = raftpb.Entry{} - *f = append(*f, e) - } -} - -// A raftEntryCache maintains a global cache of Raft group log entries. The -// cache mostly prevents unnecessary reads from disk of recently-written log -// entries between log append and application to the FSM. -// -// This cache stores entries with sideloaded proposals inlined (i.e. ready to -// be sent to followers). -type raftEntryCache struct { - syncutil.RWMutex // protects Cache for concurrent access - bytes uint64 // total size of the cache in bytes - cache *cache.OrderedCache // LRU cache of log entries, keyed by rangeID / log index - freeList entryCacheFreeList // used to avoid allocations on insertion - metrics RaftEntryCacheMetrics -} - -// newRaftEntryCache returns a new RaftEntryCache with the given -// maximum size in bytes. -func newRaftEntryCache(maxBytes uint64) *raftEntryCache { - rec := &raftEntryCache{ - cache: cache.NewOrderedCache(cache.Config{Policy: cache.CacheLRU}), - freeList: makeEntryCacheFreeList(defaultEntryCacheFreeListSize), - metrics: makeRaftEntryCacheMetrics(), - } - // The raft entry cache mutex will be held when the ShouldEvict - // and OnEvictedEntry callbacks are invoked. - // - // On ShouldEvict, compare the total size of the cache in bytes to the - // configured maxBytes. We also insist that at least one entry remains - // in the cache to prevent the case where a very large entry isn't able - // to be cached at all. - rec.cache.Config.ShouldEvict = func(n int, k, v interface{}) bool { - return rec.bytes > maxBytes && n >= 1 - } - rec.cache.Config.OnEvictedEntry = func(e *cache.Entry) { - ent := e.Value.(*raftpb.Entry) - rec.bytes -= uint64(ent.Size()) - rec.freeList.freeEntry(e) - } - - return rec -} - -// addEntries adds the slice of raft entries, using the range ID and the -// entry indexes as each cached entry's key. -func (rec *raftEntryCache) addEntries(rangeID roachpb.RangeID, ents []raftpb.Entry) { - if len(ents) == 0 { - return - } - rec.Lock() - defer rec.Unlock() - - for _, e := range ents { - key := entryCacheKey{RangeID: rangeID, Index: e.Index} - entry := rec.freeList.newEntry(key, e) - rec.cache.AddEntry(entry) - rec.bytes += uint64(e.Size()) - } - rec.updateGauges() -} - -// getTerm returns the term for the specified index and true for the second -// return value. If the index is not present in the cache, false is returned. -func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint64, bool) { - rec.metrics.Lookups.Inc(1) - rec.RLock() - defer rec.RUnlock() - - fromKey := entryCacheKey{RangeID: rangeID, Index: index} - k, v, ok := rec.cache.Ceil(&fromKey) - if !ok { - return 0, false - } - ecKey := k.(*entryCacheKey) - if ecKey.RangeID != rangeID || ecKey.Index != index { - return 0, false - } - rec.metrics.Hits.Inc(1) - ent := v.(*raftpb.Entry) - return ent.Term, true -} - -// getEntries returns entries between [lo, hi) for specified range. -// If any entries are returned for the specified indexes, they will -// start with index lo and proceed sequentially without gaps until -// 1) all entries exclusive of hi are fetched, 2) fetching another entry -// would add up to more than maxBytes of data, or 3) a cache miss occurs. -// The returned size reflects the size of the returned entries. -func (rec *raftEntryCache) getEntries( - ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64, -) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) { - rec.metrics.Lookups.Inc(1) - rec.RLock() - defer rec.RUnlock() - var bytes uint64 - nextIndex = lo - - fromKey := entryCacheKey{RangeID: rangeID, Index: lo} - toKey := entryCacheKey{RangeID: rangeID, Index: hi} - rec.cache.DoRange(func(k, v interface{}) bool { - ecKey := k.(*entryCacheKey) - if ecKey.Index != nextIndex { - return true - } - ent := v.(*raftpb.Entry) - size := uint64(ent.Size()) - if bytes+size > maxBytes { - exceededMaxBytes = true - if len(ents) > 0 { - return true - } - } - nextIndex++ - bytes += size - ents = append(ents, *ent) - return exceededMaxBytes - }, &fromKey, &toKey) - - if nextIndex == hi || exceededMaxBytes { - // We only consider an access a "hit" if it returns all requested - // entries or stops short because of a maximum bytes limit. - rec.metrics.Hits.Inc(1) - } - return ents, bytes, nextIndex, exceededMaxBytes -} - -// delEntries deletes entries between [lo, hi) for specified range. -func (rec *raftEntryCache) delEntries(rangeID roachpb.RangeID, lo, hi uint64) { - rec.Lock() - defer rec.Unlock() - if lo >= hi { - return - } - var cacheEnts []*cache.Entry - fromKey := entryCacheKey{RangeID: rangeID, Index: lo} - toKey := entryCacheKey{RangeID: rangeID, Index: hi} - rec.cache.DoRangeEntry(func(e *cache.Entry) bool { - cacheEnts = append(cacheEnts, e) - return false - }, &fromKey, &toKey) - - for _, e := range cacheEnts { - rec.cache.DelEntry(e) - } - rec.updateGauges() -} - -// clearTo clears the entries in the cache for specified range up to, -// but not including the specified index. -func (rec *raftEntryCache) clearTo(rangeID roachpb.RangeID, index uint64) { - rec.delEntries(rangeID, 0, index) -} - -// Metrics returns a struct which contains metrics for the raft entry cache. -func (rec *raftEntryCache) Metrics() RaftEntryCacheMetrics { - return rec.metrics -} - -func (rec *raftEntryCache) updateGauges() { - rec.metrics.Bytes.Update(int64(rec.bytes)) - rec.metrics.Size.Update(int64(rec.cache.Len())) -} diff --git a/pkg/storage/entry_cache_test.go b/pkg/storage/entry_cache_test.go deleted file mode 100644 index 5d876f299517..000000000000 --- a/pkg/storage/entry_cache_test.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package storage - -import ( - "math" - "reflect" - "testing" - - "go.etcd.io/etcd/raft/raftpb" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -const noLimit = math.MaxUint64 - -func newEntry(index, size uint64) raftpb.Entry { - return raftpb.Entry{ - Index: index, - Data: make([]byte, size), - } -} - -func addEntries(rec *raftEntryCache, rangeID roachpb.RangeID, lo, hi uint64) []raftpb.Entry { - ents := []raftpb.Entry{} - for i := lo; i < hi; i++ { - ents = append(ents, newEntry(i, 1)) - } - rec.addEntries(rangeID, ents) - return ents -} - -func verifyGet( - t *testing.T, - rec *raftEntryCache, - rangeID roachpb.RangeID, - lo, hi uint64, - expEnts []raftpb.Entry, - expNextIndex uint64, -) { - ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit) - if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) { - t.Fatalf("expected entries %+v; got %+v", expEnts, ents) - } - if nextIndex != expNextIndex { - t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex) - } - for _, e := range ents { - term, ok := rec.getTerm(rangeID, e.Index) - if !ok { - t.Fatalf("expected to be able to retrieve term") - } - if term != e.Term { - t.Fatalf("expected term %d, but got %d", e.Term, term) - } - } -} - -func TestEntryCache(t *testing.T) { - defer leaktest.AfterTest(t)() - rec := newRaftEntryCache(100) - rangeID := roachpb.RangeID(2) - // Add entries for range 1, indexes (1-10). - ents := addEntries(rec, rangeID, 1, 11) - // Fetch all data with an exact match. - verifyGet(t, rec, rangeID, 1, 11, ents, 11) - // Fetch point entry. - verifyGet(t, rec, rangeID, 1, 2, ents[0:1], 2) - // Fetch overlapping first half. - verifyGet(t, rec, rangeID, 0, 5, []raftpb.Entry{}, 0) - // Fetch overlapping second half. - verifyGet(t, rec, rangeID, 9, 12, ents[8:], 11) - // Fetch data from earlier range. - verifyGet(t, rec, roachpb.RangeID(1), 1, 11, []raftpb.Entry{}, 1) - // Fetch data from later range. - verifyGet(t, rec, roachpb.RangeID(3), 1, 11, []raftpb.Entry{}, 1) - // Create a gap in the entries. - rec.delEntries(rangeID, 4, 8) - // Fetch all data; verify we get only first three. - verifyGet(t, rec, rangeID, 1, 11, ents[0:3], 4) - // Try to fetch from within the gap; expect no entries. - verifyGet(t, rec, rangeID, 5, 11, []raftpb.Entry{}, 5) - // Fetch after the gap. - verifyGet(t, rec, rangeID, 8, 11, ents[7:], 11) - // Delete the prefix of entries. - rec.delEntries(rangeID, 1, 3) - // Verify entries are gone. - verifyGet(t, rec, rangeID, 1, 5, []raftpb.Entry{}, 1) - // Delete the suffix of entries. - rec.delEntries(rangeID, 10, 11) - // Verify get of entries at end of range. - verifyGet(t, rec, rangeID, 8, 11, ents[7:9], 10) - - for _, index := range []uint64{0, 12} { - if term, ok := rec.getTerm(rangeID, index); ok { - t.Fatalf("expected no term, but found %d", term) - } - } -} - -func TestEntryCacheClearTo(t *testing.T) { - defer leaktest.AfterTest(t)() - rangeID := roachpb.RangeID(1) - rec := newRaftEntryCache(100) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)}) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)}) - rec.clearTo(rangeID, 21) - if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 { - t.Errorf("expected no entries after clearTo") - } - if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 { - t.Errorf("expected entry 22 to remain in the cache clearTo") - } -} - -func TestEntryCacheEviction(t *testing.T) { - defer leaktest.AfterTest(t)() - rangeID := roachpb.RangeID(1) - rec := newRaftEntryCache(100) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)}) - ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit) - if len(ents) != 2 || hi != 3 { - t.Errorf("expected both entries; got %+v, %d", ents, hi) - } - // Add another entry to evict first. - rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)}) - ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit) - if len(ents) != 2 || hi != 4 { - t.Errorf("expected only two entries; got %+v, %d", ents, hi) - } -} - -func BenchmarkEntryCache(b *testing.B) { - rangeID := roachpb.RangeID(1) - ents := make([]raftpb.Entry, 1000) - for i := range ents { - ents[i] = newEntry(uint64(i+1), 8) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - rec := newRaftEntryCache(8 * uint64(len(ents)*len(ents[0].Data))) - for i := roachpb.RangeID(0); i < 10; i++ { - if i != rangeID { - rec.addEntries(i, ents) - } - } - b.StartTimer() - rec.addEntries(rangeID, ents) - _, _, _, _ = rec.getEntries(nil, rangeID, 0, uint64(len(ents)-10), noLimit) - rec.clearTo(rangeID, uint64(len(ents)-10)) - } -} - -func BenchmarkEntryCacheClearTo(b *testing.B) { - rangeID := roachpb.RangeID(1) - ents := make([]raftpb.Entry, 1000) - for i := range ents { - ents[i] = newEntry(uint64(i+1), 8) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - rec := newRaftEntryCache(uint64(len(ents) * len(ents[0].Data))) - rec.addEntries(rangeID, ents) - b.StartTimer() - rec.clearTo(rangeID, uint64(len(ents)-10)) - } -} diff --git a/pkg/storage/raftentry/cache.go b/pkg/storage/raftentry/cache.go new file mode 100644 index 000000000000..bcf0ef9898f0 --- /dev/null +++ b/pkg/storage/raftentry/cache.go @@ -0,0 +1,276 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package raftentry + +import ( + "container/list" + + "go.etcd.io/etcd/raft/raftpb" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +var ( + metaEntryCacheSize = metric.Metadata{ + Name: "raft.entrycache.size", + Help: "Number of Raft entries in the Raft entry cache", + Measurement: "Entry Count", + Unit: metric.Unit_COUNT, + } + metaEntryCacheBytes = metric.Metadata{ + Name: "raft.entrycache.bytes", + Help: "Aggregate size of all Raft entries in the Raft entry cache", + Measurement: "Entry Bytes", + Unit: metric.Unit_BYTES, + } + metaEntryCacheAccesses = metric.Metadata{ + Name: "raft.entrycache.accesses", + Help: "Number of cache lookups in the Raft entry cache", + Measurement: "Accesses", + Unit: metric.Unit_COUNT, + } + metaEntryCacheHits = metric.Metadata{ + Name: "raft.entrycache.hits", + Help: "Number of successful cache lookups in the Raft entry cache", + Measurement: "Hits", + Unit: metric.Unit_COUNT, + } +) + +// Metrics is the set of metrics for the raft entry cache. +type Metrics struct { + Size *metric.Gauge + Bytes *metric.Gauge + Accesses *metric.Counter + Hits *metric.Counter +} + +func makeMetrics() Metrics { + return Metrics{ + Size: metric.NewGauge(metaEntryCacheSize), + Bytes: metric.NewGauge(metaEntryCacheBytes), + Accesses: metric.NewCounter(metaEntryCacheAccesses), + Hits: metric.NewCounter(metaEntryCacheHits), + } +} + +// Cache maintains a global cache of Raft group log entries. The cache mostly +// prevents unnecessary reads from disk of recently-written log entries between +// log append and application to the FSM. +// +// This cache stores entries with sideloaded proposals inlined (i.e. ready to be +// sent to followers). +type Cache struct { + mu syncutil.Mutex + parts map[roachpb.RangeID]*partition + lru list.List + size uint64 // count of all entries + bytes uint64 // aggregate size of all entries + maxBytes uint64 + metrics Metrics +} + +// NB: partitions could maintain their own lock, allowing for a partitioned +// locking scheme where Cache only needs to hold a read lock when a partition is +// performing internal modifications. However, a global write lock would still +// be necessary to maintain the linked-list to enforce the LRU policy. Since we +// don't expect internal modifications to partitions to be slow due to the +// constant-time operations on the entry hashmap, this doesn't appear to be +// worth it. +type partition struct { + r roachpb.RangeID + ents map[uint64]raftpb.Entry + el *list.Element + bytes uint64 // aggregate size of all entries in partition +} + +// NewCache returns a new Cache instance. +func NewCache(maxBytes uint64) *Cache { + return &Cache{ + parts: make(map[roachpb.RangeID]*partition), + maxBytes: maxBytes, + metrics: makeMetrics(), + } +} + +// Add adds the slice of Raft entries to the cache. +func (c *Cache) Add(r roachpb.RangeID, ents []raftpb.Entry) { + if len(ents) == 0 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + // Get the partition. Create one if missing. + p, ok := c.parts[r] + if !ok { + p = &partition{ + r: r, + ents: make(map[uint64]raftpb.Entry, len(ents)), + } + p.el = c.lru.PushFront(p) + c.parts[r] = p + } else { + c.lru.MoveToFront(p.el) + } + + // Add entries to the current partition. + for _, e := range ents { + s := uint64(e.Size()) + + // Check if we're replacing an existing entry. + if oldE, ok := p.ents[e.Index]; ok { + // NB: it's possible that we're replacing an entry with a larger one + // that puts us over the size limit. We could delete the old entry + // and not insert the new one, but that would make this code more + // complicated, so it's not worth it to prevent that rare case. + s -= uint64(oldE.Size()) + p.ents[e.Index] = e + p.bytes += s + c.bytes += s + } else { + // If this new entry would push us above the byte limit, evict + // partitions to try to get below the limit. + for c.bytes+s > c.maxBytes && c.lru.Len() > 1 { + e := c.lru.Back() + ep := c.lru.Remove(e).(*partition) + c.bytes -= ep.bytes + c.size -= uint64(len(ep.ents)) + delete(c.parts, ep.r) + } + // If this will still push us above the limit, we must be the only + // partition. Break if this isn't the first entry in this partition. + if c.bytes+s > c.maxBytes { + if c.lru.Len() != 1 { + panic("unreachable") + } + if len(p.ents) > 0 { + break + } + } + + p.ents[e.Index] = e + c.size++ + p.bytes += s + c.bytes += s + } + } + c.updateGauges() +} + +// Clear deletes entries between [lo, hi) for specified range. +func (c *Cache) Clear(r roachpb.RangeID, lo, hi uint64) { + if lo >= hi { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return + } + // Don't `c.lru.MoveToFront(p.el)`. Clearing from a partition + // is not considered an "access" with respect to the LRU policy. + + // Delete entries in range. Maintain stats. + for i := lo; i < hi && len(p.ents) > 0; i++ { + if e, ok := p.ents[i]; ok { + delete(p.ents, i) + c.size-- + c.bytes -= uint64(e.Size()) + } + } + c.updateGauges() + + // Delete the partition if it's now empty. + if len(p.ents) == 0 { + c.lru.Remove(p.el) + delete(c.parts, p.r) + } +} + +// Get returns the entry for the specified index and true for the second return +// value. If the index is not present in the cache, false is returned. +func (c *Cache) Get(r roachpb.RangeID, idx uint64) (raftpb.Entry, bool) { + c.metrics.Accesses.Inc(1) + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return raftpb.Entry{}, false + } + c.lru.MoveToFront(p.el) + + e, ok := p.ents[idx] + if ok { + c.metrics.Hits.Inc(1) + } + return e, ok +} + +// Scan returns entries between [lo, hi) for specified range. If any entries are +// returned for the specified indexes, they will start with index lo and proceed +// sequentially without gaps until 1) all entries exclusive of hi are fetched, +// 2) fetching another entry would add up to more than maxBytes of data, or 3) a +// cache miss occurs. The returned size reflects the size of the returned +// entries. +func (c *Cache) Scan( + ents []raftpb.Entry, r roachpb.RangeID, lo, hi, maxBytes uint64, +) (_ []raftpb.Entry, bytes uint64, nextIdx uint64, exceededMaxBytes bool) { + c.metrics.Accesses.Inc(1) + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return ents, 0, lo, false + } + c.lru.MoveToFront(p.el) + + for nextIdx = lo; nextIdx < hi && !exceededMaxBytes; nextIdx++ { + e, ok := p.ents[nextIdx] + if !ok { + // Break if there are any gaps. + break + } + s := uint64(e.Size()) + if bytes+s > maxBytes { + exceededMaxBytes = true + if len(ents) > 0 { + break + } + } + bytes += s + ents = append(ents, e) + } + if nextIdx == hi || exceededMaxBytes { + // We only consider an access a "hit" if it returns all requested + // entries or stops short because of a maximum bytes limit. + c.metrics.Hits.Inc(1) + } + return ents, bytes, nextIdx, exceededMaxBytes +} + +// Metrics returns a struct which contains metrics for the cache. +func (c *Cache) Metrics() Metrics { return c.metrics } + +func (c *Cache) updateGauges() { + c.metrics.Bytes.Update(int64(c.bytes)) + c.metrics.Size.Update(int64(c.size)) +} diff --git a/pkg/storage/raftentry/cache_test.go b/pkg/storage/raftentry/cache_test.go new file mode 100644 index 000000000000..363936c03325 --- /dev/null +++ b/pkg/storage/raftentry/cache_test.go @@ -0,0 +1,216 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package raftentry + +import ( + "math" + "reflect" + "testing" + + "go.etcd.io/etcd/raft/raftpb" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +const noLimit = math.MaxUint64 + +func newEntry(index, size uint64) raftpb.Entry { + return raftpb.Entry{ + Index: index, + Data: make([]byte, size), + } +} + +func addEntries(c *Cache, rangeID roachpb.RangeID, lo, hi uint64) []raftpb.Entry { + ents := []raftpb.Entry{} + for i := lo; i < hi; i++ { + ents = append(ents, newEntry(i, 1)) + } + c.Add(rangeID, ents) + return ents +} + +func verifyGet( + t *testing.T, + c *Cache, + rangeID roachpb.RangeID, + lo, hi uint64, + expEnts []raftpb.Entry, + expNextIndex uint64, +) { + ents, _, nextIndex, _ := c.Scan(nil, rangeID, lo, hi, noLimit) + if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) { + t.Fatalf("expected entries %+v; got %+v", expEnts, ents) + } + if nextIndex != expNextIndex { + t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex) + } + for _, e := range ents { + found, ok := c.Get(rangeID, e.Index) + if !ok { + t.Fatalf("expected to be able to retrieve term") + } + if !reflect.DeepEqual(found, e) { + t.Fatalf("expected entry %v, but got %v", e, found) + } + } +} + +func TestEntryCache(t *testing.T) { + defer leaktest.AfterTest(t)() + c := NewCache(100) + rangeID := roachpb.RangeID(2) + // Add entries for range 1, indexes (1-10). + ents := addEntries(c, rangeID, 1, 11) + // Fetch all data with an exact match. + verifyGet(t, c, rangeID, 1, 11, ents, 11) + // Fetch point entry. + verifyGet(t, c, rangeID, 1, 2, ents[0:1], 2) + // Fetch overlapping first half. + verifyGet(t, c, rangeID, 0, 5, []raftpb.Entry{}, 0) + // Fetch overlapping second half. + verifyGet(t, c, rangeID, 9, 12, ents[8:], 11) + // Fetch data from earlier range. + verifyGet(t, c, roachpb.RangeID(1), 1, 11, []raftpb.Entry{}, 1) + // Fetch data from later range. + verifyGet(t, c, roachpb.RangeID(3), 1, 11, []raftpb.Entry{}, 1) + // Create a gap in the entries. + c.Clear(rangeID, 4, 8) + // Fetch all data; verify we get only first three. + verifyGet(t, c, rangeID, 1, 11, ents[0:3], 4) + // Try to fetch from within the gap; expect no entries. + verifyGet(t, c, rangeID, 5, 11, []raftpb.Entry{}, 5) + // Fetch after the gap. + verifyGet(t, c, rangeID, 8, 11, ents[7:], 11) + // Delete the prefix of entries. + c.Clear(rangeID, 1, 3) + // Verify entries are gone. + verifyGet(t, c, rangeID, 1, 5, []raftpb.Entry{}, 1) + // Delete the suffix of entries. + c.Clear(rangeID, 10, 11) + // Verify get of entries at end of range. + verifyGet(t, c, rangeID, 8, 11, ents[7:9], 10) + + for _, index := range []uint64{0, 12} { + if e, ok := c.Get(rangeID, index); ok { + t.Fatalf("expected no entry, but found %v", e) + } + } +} + +func TestEntryCacheClearTo(t *testing.T) { + defer leaktest.AfterTest(t)() + rangeID := roachpb.RangeID(1) + c := NewCache(100) + c.Add(rangeID, []raftpb.Entry{newEntry(2, 1)}) + c.Add(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)}) + c.Clear(rangeID, 0, 21) + if ents, _, _, _ := c.Scan(nil, rangeID, 2, 21, noLimit); len(ents) != 0 { + t.Errorf("expected no entries after clearTo") + } + if ents, _, _, _ := c.Scan(nil, rangeID, 21, 22, noLimit); len(ents) != 1 { + t.Errorf("expected entry 22 to remain in the cache clearTo") + } +} + +func TestEntryCacheEviction(t *testing.T) { + defer leaktest.AfterTest(t)() + rangeID, rangeID2 := roachpb.RangeID(1), roachpb.RangeID(2) + c := NewCache(100) + c.Add(rangeID, []raftpb.Entry{newEntry(1, 30), newEntry(2, 30)}) + ents, _, hi, _ := c.Scan(nil, rangeID, 1, 3, noLimit) + if len(ents) != 2 || hi != 3 { + t.Errorf("expected both entries; got %+v, %d", ents, hi) + } + if c.size != 2 { + t.Errorf("expected size=2; got %d", c.size) + } + // Add another entry to the same range. This would have + // exceeded the size limit, so it won't be added. + c.Add(rangeID, []raftpb.Entry{newEntry(3, 30)}) + ents, _, hi, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 2 || hi != 3 { + t.Errorf("expected two entries; got %+v, %d", ents, hi) + } + if c.size != 2 { + t.Errorf("expected size=2; got %d", c.size) + } + // Replace the first entry with a smaller one, then add + // third entry again. This time, there should be enough + // space. + c.Add(rangeID, []raftpb.Entry{newEntry(1, 5), newEntry(3, 30)}) + ents, _, hi, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 3 || hi != 4 { + t.Errorf("expected three entries; got %+v, %d", ents, hi) + } + if c.size != 3 { + t.Errorf("expected size=3; got %d", c.size) + } + // Add an entry to the new range. Should cause the old + // range's partition to be evicted. + c.Add(rangeID2, []raftpb.Entry{newEntry(1, 30)}) + ents, _, _, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 0 { + t.Errorf("expected no entries; got %+v", ents) + } + ents, _, hi, _ = c.Scan(nil, rangeID2, 1, 2, noLimit) + if len(ents) != 1 || hi != 2 { + t.Errorf("expected one entry; got %+v, %d", ents, hi) + } + if c.size != 1 { + t.Errorf("expected size=1; got %d", c.size) + } +} + +func BenchmarkEntryCache(b *testing.B) { + rangeID := roachpb.RangeID(1) + ents := make([]raftpb.Entry, 1000) + for i := range ents { + ents[i] = newEntry(uint64(i+1), 8) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + c := NewCache(15 * uint64(len(ents)*len(ents[0].Data))) + for i := roachpb.RangeID(0); i < 10; i++ { + if i != rangeID { + c.Add(i, ents) + } + } + b.StartTimer() + c.Add(rangeID, ents) + _, _, _, _ = c.Scan(nil, rangeID, 0, uint64(len(ents)-10), noLimit) + c.Clear(rangeID, 0, uint64(len(ents)-10)) + } +} + +func BenchmarkEntryCacheClearTo(b *testing.B) { + rangeID := roachpb.RangeID(1) + ents := make([]raftpb.Entry, 1000) + for i := range ents { + ents[i] = newEntry(uint64(i+1), 8) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + c := NewCache(uint64(len(ents) * len(ents[0].Data))) + c.Add(rangeID, ents) + b.StartTimer() + c.Clear(rangeID, 0, uint64(len(ents)-10)) + } +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index fc2101c96ba6..9e9bf679bf5b 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4305,10 +4305,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now()) } - // Update raft log entry cache. We clear any older, uncommitted log entries - // and cache the latest ones. - r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) - r.sendRaftMessages(ctx, otherMsgs) for _, e := range rd.CommittedEntries { @@ -4430,6 +4426,35 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Unlock() } + // Update the raft entry cache. + if len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 { + allCommitted := false + if len(rd.Entries) == len(rd.CommittedEntries) { + // If all newly proposed entries are also being committed in the + // same Raft Ready processing iteration, then we can skip adding + // them to the cache because we'd just immediately remove them. This + // effectively means that we never use the raftEntryCache in + // single-node clusters. + allCommitted = rd.Entries[0].Index == rd.CommittedEntries[0].Index + } + if !allCommitted { + // Update raft log entry cache. We clear any older, uncommitted log entries + // and cache the latest ones. + r.store.raftEntryCache.Add(r.RangeID, rd.Entries) + + if len(rd.CommittedEntries) > 0 { + // Clear the entries that we just applied out of the Raft log entry + // cache. We may pull these entries back into the cache if we need + // to catch up a slow follower which doesn't need a snapshot, but + // this should be rare and we don't mind hitting RocksDB in that + // case. + lo := rd.CommittedEntries[0].Index + hi := rd.CommittedEntries[len(rd.CommittedEntries)-1].Index + r.store.raftEntryCache.Clear(r.RangeID, lo, hi+1) + } + } + } + // TODO(bdarnell): need to check replica id and not Advance if it // has changed. Or do we need more locking to guarantee that replica // ID cannot change during handleRaftReady? diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 21372cddec14..eb51c245408a 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -548,12 +548,13 @@ func (r *Replica) handleReplicatedEvalResult( rResult.State.TruncatedState = nil // for assertion r.mu.Lock() + oldTruncatedState := r.mu.state.TruncatedState r.mu.state.TruncatedState = newTruncState r.mu.Unlock() // Clear any entries in the Raft log entry cache for this range up // to and including the most recently truncated index. - r.store.raftEntryCache.clearTo(r.RangeID, newTruncState.Index+1) + r.store.raftEntryCache.Clear(r.RangeID, oldTruncatedState.Index+1, newTruncState.Index+1) // Truncate the sideloaded storage. Note that this is safe only if the new truncated state // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 10949b13d3cd..da0e20dc1859 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -101,7 +102,7 @@ func entries( rsl stateloader.StateLoader, e engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, sideloaded sideloadStorage, lo, hi, maxBytes uint64, ) ([]raftpb.Entry, error) { @@ -115,7 +116,7 @@ func entries( } ents := make([]raftpb.Entry, 0, n) - ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes) + ents, size, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes) // Return results if the correct number of results came back or if // we ran into the max bytes limit. @@ -174,7 +175,7 @@ func entries( } // Cache the fetched entries, if we may. if canCache { - eCache.addEntries(rangeID, ents) + eCache.Add(rangeID, ents) } // Did the correct number of results come back? If so, we're all good. @@ -255,8 +256,8 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) { return r.mu.lastTerm, nil } // Try to retrieve the term for the desired entry from the entry cache. - if term, ok := r.store.raftEntryCache.getTerm(r.RangeID, i); ok { - return term, nil + if e, ok := r.store.raftEntryCache.Get(r.RangeID, i); ok { + return e.Term, nil } readonly := r.store.Engine().NewReadOnly() defer readonly.Close() @@ -274,7 +275,7 @@ func term( rsl stateloader.StateLoader, eng engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, i uint64, ) (uint64, error) { // entries() accepts a `nil` sideloaded storage and will skip inlining of @@ -460,7 +461,7 @@ type OutgoingSnapshot struct { // or RaftSnap -- a log truncation could have removed files from the // sideloaded storage in the meantime. WithSideloaded func(func(sideloadStorage) error) error - RaftEntryCache *raftEntryCache + RaftEntryCache *raftentry.Cache snapType string } @@ -494,7 +495,7 @@ func snapshot( snapType string, snap engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, withSideloaded func(func(sideloadStorage) error) error, startKey roachpb.RKey, ) (OutgoingSnapshot, error) { diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 8b421cfb90e7..9efd8472a084 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -18,6 +18,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -176,7 +177,7 @@ func maybeInlineSideloadedRaftCommand( rangeID roachpb.RangeID, ent raftpb.Entry, sideloaded sideloadStorage, - entryCache *raftEntryCache, + entryCache *raftentry.Cache, ) (*raftpb.Entry, error) { if !sniffSideloadedRaftCommand(ent.Data) { return nil, nil @@ -185,7 +186,7 @@ func maybeInlineSideloadedRaftCommand( // We could unmarshal this yet again, but if it's committed we // are very likely to have appended it recently, in which case // we can save work. - cachedSingleton, _, _, _ := entryCache.getEntries( + cachedSingleton, _, _, _ := entryCache.Scan( nil, rangeID, ent.Index, ent.Index+1, 1<<20, ) diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index aa00c8644ff5..adbb5ec5d819 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -368,7 +369,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { // after having (perhaps) been modified. thin, fat raftpb.Entry // Populate the raft entry cache and sideload storage before running the test. - setup func(*raftEntryCache, sideloadStorage) + setup func(*raftentry.Cache, sideloadStorage) // If nonempty, the error expected from maybeInlineSideloadedRaftCommand. expErr string // If nonempty, a regex that the recorded trace span must match. @@ -383,7 +384,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { CRC32: 0, // not checked } - putOnDisk := func(ec *raftEntryCache, ss sideloadStorage) { + putOnDisk := func(ec *raftentry.Cache, ss sideloadStorage) { if err := ss.Put(context.Background(), 5, 6, sstFat.Data); err != nil { t.Fatal(err) } @@ -410,14 +411,14 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { }, "v2-with-payload-with-file-with-cache": { thin: mkEnt(v2, 5, 6, &sstThin), fat: mkEnt(v2, 5, 6, &sstFat), - setup: func(ec *raftEntryCache, ss sideloadStorage) { + setup: func(ec *raftentry.Cache, ss sideloadStorage) { putOnDisk(ec, ss) - ec.addEntries(rangeID, []raftpb.Entry{mkEnt(v2, 5, 6, &sstFat)}) + ec.Add(rangeID, []raftpb.Entry{mkEnt(v2, 5, 6, &sstFat)}) }, expTrace: "using cache hit", }, "v2-fat-without-file": { thin: mkEnt(v2, 5, 6, &sstFat), fat: mkEnt(v2, 5, 6, &sstFat), - setup: func(ec *raftEntryCache, ss sideloadStorage) {}, + setup: func(ec *raftentry.Cache, ss sideloadStorage) {}, expTrace: "already inlined", }, } @@ -426,7 +427,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording") defer cancel() - ec := newRaftEntryCache(1024) // large enough + ec := raftentry.NewCache(1024) // large enough ss := mustNewInMemSideloadStorage(rangeID, roachpb.ReplicaID(1), ".") if test.setup != nil { test.setup(ec, ss) @@ -832,7 +833,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() for _, withSS := range []bool{false, true} { - tc.store.raftEntryCache.clearTo(tc.repl.RangeID, sideloadedIndex+1) + tc.store.raftEntryCache.Clear(tc.repl.RangeID, 0, sideloadedIndex+1) var ss sideloadStorage if withSS { @@ -849,7 +850,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { if len(entries) != 1 { t.Fatalf("no or too many entries returned from cache: %+v", entries) } - ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) + ents, _, _, _ := tc.store.raftEntryCache.Scan(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) if withSS { // We passed the sideload storage, so we expect to get our // inlined index back from the cache. @@ -898,7 +899,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { tc.repl.raftMu.Unlock() // Additionally we need to clear out the entry from the cache because // that would still save the day. - tc.store.raftEntryCache.clearTo(tc.repl.RangeID, sideloadedIndex+1) + tc.store.raftEntryCache.Clear(tc.repl.RangeID, 0, sideloadedIndex+1) mockSender := &mockSender{} err = sendSnapshot( diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9bcff24cdb92..97fcfd12ee5c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7287,8 +7287,10 @@ func TestEntries(t *testing.T) { // Setup, if not nil, is called before running the test case. setup func() }{ - // Case 0: All of the entries from cache. - {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 10, setup: nil}, + // Case 0: All of the entries from cache. Entries would never have + // been added to the cache because they were immediately applied, + // so all miss and populate. + {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 0, setup: nil}, // Case 1: Get the first entry from cache. {lo: indexes[0], hi: indexes[1], expResultCount: 1, expCacheCount: 1, setup: nil}, // Case 2: Get the last entry from cache. @@ -7311,40 +7313,45 @@ func TestEntries(t *testing.T) { // Case 8: hi value is just past the last index, should return all // available entries. {lo: indexes[5], hi: indexes[9] + 1, expResultCount: 5, expCacheCount: 5, setup: nil}, - // Case 9: all values have been truncated from cache and storage. + // Case 9: hi value is just past the index of log truncation, should + // return all available entries. + {lo: indexes[5], hi: indexes[9] + 2, expResultCount: 6, expCacheCount: 5, setup: nil}, + // Case 10: all values have been truncated from cache and storage. {lo: indexes[1], hi: indexes[2], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 10: hi has just been truncated from cache and storage. + // Case 11: hi has just been truncated from cache and storage. {lo: indexes[1], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 11: another case where hi has just been truncated from + // Case 12: another case where hi has just been truncated from // cache and storage. {lo: indexes[3], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 12: lo has been truncated and hi is the truncation point. + // Case 13: lo has been truncated and hi is the truncation point. {lo: indexes[4], hi: indexes[5], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 13: lo has been truncated but hi is available. + // Case 14: lo has been truncated but hi is available. {lo: indexes[4], hi: indexes[9], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 14: lo has been truncated and hi is not available. + // Case 15: lo has been truncated and hi is not available. {lo: indexes[4], hi: indexes[9] + 100, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 15: lo has been truncated but hi is available, and maxBytes is + // Case 16: lo has been truncated but hi is available, and maxBytes is // set low. {lo: indexes[4], hi: indexes[9], maxBytes: 1, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 16: lo is available but hi is not. + // Case 17: lo is available but hi is not. {lo: indexes[5], hi: indexes[9] + 100, expCacheCount: 6, expError: raft.ErrUnavailable, setup: nil}, - // Case 17: both lo and hi are not available, cache miss. + // Case 18: both lo and hi are not available, cache miss. {lo: indexes[9] + 100, hi: indexes[9] + 1000, expCacheCount: 0, expError: raft.ErrUnavailable, setup: nil}, - // Case 18: lo is available, hi is not, but it was cut off by maxBytes. + // Case 19: lo is available, hi is not, but it was cut off by maxBytes. {lo: indexes[5], hi: indexes[9] + 1000, maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 19: lo and hi are available, but entry cache evicted. + // Case 20: lo and hi are available, but entry cache evicted. {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 0, setup: func() { // Manually evict cache for the first 10 log entries. - repl.store.raftEntryCache.delEntries(rangeID, indexes[0], indexes[9]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[0], indexes[9]+1) indexes = append(indexes, populateLogs(10, 40)...) }}, - // Case 20: lo and hi are available, entry cache evicted and hi available in cache. + // Case 21: lo and hi are available, entry cache evicted and hi available in cache. {lo: indexes[5], hi: indexes[9] + 5, expResultCount: 9, expCacheCount: 4, setup: nil}, - // Case 21: lo and hi are available and in entry cache. + // Case 22: lo and hi not yet available in entry cache. + {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 3, setup: nil}, + // Case 23: lo and hi are available and in entry cache. {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 30, setup: nil}, - // Case 22: lo is available and hi is not. + // Case 24: lo is available and hi is not. {lo: indexes[9] + 2, hi: indexes[9] + 33, expCacheCount: 30, expError: raft.ErrUnavailable, setup: nil}, } { if tc.setup != nil { @@ -7353,7 +7360,7 @@ func TestEntries(t *testing.T) { if tc.maxBytes == 0 { tc.maxBytes = math.MaxUint64 } - cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) + cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.Scan(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) if len(cacheEntries) != tc.expCacheCount { t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries)) } @@ -7388,7 +7395,7 @@ func TestEntries(t *testing.T) { if err := engine.MVCCDelete(context.Background(), tc.store.Engine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { t.Fatal(err) } - repl.store.raftEntryCache.delEntries(rangeID, indexes[6], indexes[6]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[6], indexes[6]+1) repl.mu.Lock() defer repl.mu.Unlock() @@ -7408,7 +7415,7 @@ func TestEntries(t *testing.T) { } // Case 25b: don't hit the gap due to maxBytes, cache cleared. { - repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[5], indexes[5]+1) ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1) if err != nil { t.Errorf("25: expected no error, got %s", err) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c44dd58c6fab..2111b9ad38f4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/idalloc" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -395,7 +396,7 @@ type Store struct { consistencyQueue *consistencyQueue // Replica consistency check queue metrics *StoreMetrics intentResolver *intentResolver - raftEntryCache *raftEntryCache + raftEntryCache *raftentry.Cache limiters batcheval.Limiters // gossipRangeCountdown and leaseRangeCountdown are countdowns of @@ -946,7 +947,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript s.draining.Store(false) s.scheduler = newRaftScheduler(s.metrics, s, storeSchedulerConcurrency) - s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize) + s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) s.coalescedMu.Lock()