From d41daebc4728999060dc4a040cd01f254a83c440 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 12 Sep 2018 13:47:59 -0400 Subject: [PATCH 1/2] storage: remove entries from RaftEntryCache on Raft application Before this change, we only removed entries from the RaftEntryCache when the entries were truncated. We now remove them from the cache immediately after applying them. We may pull these entries back into the cache if we need to catch up a slow follower which doesn't need a snapshot, but this should be rare. The change also avoids adding entries to the RaftEntryCache at all if they are proposed and immediately committed. This effectively means that we never use the raftEntryCache in single-node clusters. This results in somewhere around a 1.5% speedup on `kv0` when tested on a single-node GCE cluster with a n1-highcpu-16 machine. Release note (performance improvement): More aggressively prune the raft entry cache, keeping it to a more manageable size. --- pkg/storage/replica.go | 33 +++++++++++++++++++++++++++---- pkg/storage/replica_test.go | 39 ++++++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index fc2101c96ba6..922e1440110e 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4305,10 +4305,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now()) } - // Update raft log entry cache. We clear any older, uncommitted log entries - // and cache the latest ones. - r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) - r.sendRaftMessages(ctx, otherMsgs) for _, e := range rd.CommittedEntries { @@ -4430,6 +4426,35 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Unlock() } + // Update the raft entry cache. + if len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 { + allCommitted := false + if len(rd.Entries) == len(rd.CommittedEntries) { + // If all newly proposed entries are also being committed in the + // same Raft Ready processing iteration, then we can skip adding + // them to the cache because we'd just immediately remove them. This + // effectively means that we never use the raftEntryCache in + // single-node clusters. + allCommitted = rd.Entries[0].Index == rd.CommittedEntries[0].Index + } + if !allCommitted { + // Update raft log entry cache. We clear any older, uncommitted log entries + // and cache the latest ones. + r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) + + if len(rd.CommittedEntries) > 0 { + // Clear the entries that we just applied out of the Raft log entry + // cache. We may pull these entries back into the cache if we need + // to catch up a slow follower which doesn't need a snapshot, but + // this should be rare and we don't mind hitting RocksDB in that + // case. + lo := rd.CommittedEntries[0].Index + hi := rd.CommittedEntries[len(rd.CommittedEntries)-1].Index + r.store.raftEntryCache.delEntries(r.RangeID, lo, hi+1) + } + } + } + // TODO(bdarnell): need to check replica id and not Advance if it // has changed. Or do we need more locking to guarantee that replica // ID cannot change during handleRaftReady? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9bcff24cdb92..eaff048251ab 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7287,8 +7287,10 @@ func TestEntries(t *testing.T) { // Setup, if not nil, is called before running the test case. setup func() }{ - // Case 0: All of the entries from cache. - {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 10, setup: nil}, + // Case 0: All of the entries from cache. Entries would never have + // been added to the cache because they were immediately applied, + // so all miss and populate. + {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 0, setup: nil}, // Case 1: Get the first entry from cache. {lo: indexes[0], hi: indexes[1], expResultCount: 1, expCacheCount: 1, setup: nil}, // Case 2: Get the last entry from cache. @@ -7311,40 +7313,45 @@ func TestEntries(t *testing.T) { // Case 8: hi value is just past the last index, should return all // available entries. {lo: indexes[5], hi: indexes[9] + 1, expResultCount: 5, expCacheCount: 5, setup: nil}, - // Case 9: all values have been truncated from cache and storage. + // Case 9: hi value is just past the index of log truncation, should + // return all available entries. + {lo: indexes[5], hi: indexes[9] + 2, expResultCount: 6, expCacheCount: 5, setup: nil}, + // Case 10: all values have been truncated from cache and storage. {lo: indexes[1], hi: indexes[2], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 10: hi has just been truncated from cache and storage. + // Case 11: hi has just been truncated from cache and storage. {lo: indexes[1], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 11: another case where hi has just been truncated from + // Case 12: another case where hi has just been truncated from // cache and storage. {lo: indexes[3], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 12: lo has been truncated and hi is the truncation point. + // Case 13: lo has been truncated and hi is the truncation point. {lo: indexes[4], hi: indexes[5], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 13: lo has been truncated but hi is available. + // Case 14: lo has been truncated but hi is available. {lo: indexes[4], hi: indexes[9], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 14: lo has been truncated and hi is not available. + // Case 15: lo has been truncated and hi is not available. {lo: indexes[4], hi: indexes[9] + 100, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 15: lo has been truncated but hi is available, and maxBytes is + // Case 16: lo has been truncated but hi is available, and maxBytes is // set low. {lo: indexes[4], hi: indexes[9], maxBytes: 1, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 16: lo is available but hi is not. + // Case 17: lo is available but hi is not. {lo: indexes[5], hi: indexes[9] + 100, expCacheCount: 6, expError: raft.ErrUnavailable, setup: nil}, - // Case 17: both lo and hi are not available, cache miss. + // Case 18: both lo and hi are not available, cache miss. {lo: indexes[9] + 100, hi: indexes[9] + 1000, expCacheCount: 0, expError: raft.ErrUnavailable, setup: nil}, - // Case 18: lo is available, hi is not, but it was cut off by maxBytes. + // Case 19: lo is available, hi is not, but it was cut off by maxBytes. {lo: indexes[5], hi: indexes[9] + 1000, maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 19: lo and hi are available, but entry cache evicted. + // Case 20: lo and hi are available, but entry cache evicted. {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 0, setup: func() { // Manually evict cache for the first 10 log entries. repl.store.raftEntryCache.delEntries(rangeID, indexes[0], indexes[9]+1) indexes = append(indexes, populateLogs(10, 40)...) }}, - // Case 20: lo and hi are available, entry cache evicted and hi available in cache. + // Case 21: lo and hi are available, entry cache evicted and hi available in cache. {lo: indexes[5], hi: indexes[9] + 5, expResultCount: 9, expCacheCount: 4, setup: nil}, - // Case 21: lo and hi are available and in entry cache. + // Case 22: lo and hi not yet available in entry cache. + {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 3, setup: nil}, + // Case 23: lo and hi are available and in entry cache. {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 30, setup: nil}, - // Case 22: lo is available and hi is not. + // Case 24: lo is available and hi is not. {lo: indexes[9] + 2, hi: indexes[9] + 33, expCacheCount: 30, expError: raft.ErrUnavailable, setup: nil}, } { if tc.setup != nil { From 6c212b70f4fa9e8919ecfe8b971422cbf81a2364 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 12 Sep 2018 16:31:57 -0400 Subject: [PATCH 2/2] storage/raftentry: rewrite raftEntryCache with hashmaps and a Replica-level LRU policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit !!! Disclaimer !!! This approach has a lot of promise (see benchmarks below), but #30151 may have stolen some of its benefit by making the current raftEntryCache less of an issue. It's unclear at the moment whether such an aggressive change is still needed. At a minimum, it's probably too big of a change for 2.1. _### Problem Profiles are showing that the `raftEntryCache` is a performance bottleneck on certain workloads. `tpcc1000` on a 3-node cluster with `n1-highcpu-16` machines presents the following profiles. This CPU profile shows that the `raftEntryCache` is responsible for just over **3%** of all CPU utilization on the node. Interestingly, the cost isn't centralized in a single method. Instead we can see both `addEntries` and `getEntries` in the profile. Most of what we see is `OrderedCache` manipulation as the cache interacts with its underlying red-black tree. This blocking profile is filtered to show all Mutex contention (ignoring other forms like channels and Cond vars). We can see that blocking in the `raftEntryCache` is responsible for **99%** of all Mutex contention. This seems absurd, but it adds up given that the cache is responsible for **3%** of CPU utilization on a node and requires mutual exclusion across an entire `Store`. We've also seen in changes like #29596 how expensive these cache accesses have become, especially as the cache grows because most entry accesses take `log(n)` time, where `n` is the number of `raftpb.Entry`s in the cache across all Ranges on a Store. _### Rewrite This PR rewrites the `raftEntryCache` as a new `storage/raftentries.Cache` type. The rewrite diverges from the original implementation in two important ways, both of which exploit and optimize for the unique access patterns that this cache is subject to. _### LLRB -> Hashmaps The first change is that the rewrite trades in the balanced binary tree structure for a multi-level hashmap structure. This structure is preferable because it improves the time complexity of entry access. It's also more memory efficient and allocation friendly because it takes advantage of builtin Go hashmaps and their compile-time specialization. Finally, it should be more GC friendly because it cuts down on the number of pointers in use. Of course, a major benefit of a balanced binary tree is that its elements are ordered, which allows it to accommodate range-based access patterns on a sparse set of elements. While the ordering across replicas was wasted with the `raftEntryCache`, it did appear that the ordering within replicas was useful. However, it turns out that Raft entries are densely ordered with discrete indices. This means that given a low and a high index, we can simply iterate through the range efficiently, without the need for an ordered data-structure. This property was also exploited in 6e4e57f. _#### Replica-level LRU policy The second change is that the rewrite maintains an LRU-policy across Replicas, instead of across individual Raft entries. This makes maintaining the LRU linked-list significantly cheaper because we only need to update it once per Replica access instead of once per entry access. It also means that the linked-list will be significantly smaller, resulting in far fewer memory allocations and a reduced GC footprint. This reduction in granularity of the LRU-policy changes its behavior. Instead of the cache holding on to the last N Raft entries accessed across all Replicas, it will hold on to all Raft entries for the last N Replicas accessed. I suspect that this is actually a better policy for Raft's usage because Replicas access entries in chunks and missing even a single entry in the cache results in an expensive RocksDB seek. Furthermore, I think the LRU policy, as it was, was actually somewhat pathological because when a replica looks up its entries, it almost always starts by requesting its oldest entry and scanning up from there. _### Performance ``` name old time/op new time/op delta EntryCache-4 3.00ms ± 9% 0.21ms ± 2% -92.90% (p=0.000 n=10+10) EntryCacheClearTo-4 313µs ± 9% 37µs ± 2% -88.21% (p=0.000 n=10+10) name old alloc/op new alloc/op delta EntryCache-4 113kB ± 0% 180kB ± 0% +60.15% (p=0.000 n=9+10) EntryCacheClearTo-4 8.31kB ± 0% 0.00kB -100.00% (p=0.000 n=10+10) name old allocs/op new allocs/op delta EntryCache-4 2.02k ± 0% 0.01k ± 0% -99.75% (p=0.000 n=10+10) EntryCacheClearTo-4 14.0 ± 0% 0.0 -100.00% (p=0.000 n=10+10) ``` Testing with TPC-C is still needed. I'll need to verify that the cache hit rate does not go down because of this change and that the change translates to the expected improvements on CPU and blocking profiles. Release note (performance improvement): Rewrite Raft entry cache to optimize for access patterns, reduce lock contention, and reduce memory footprint. --- pkg/storage/entry_cache.go | 302 --------------------------- pkg/storage/entry_cache_test.go | 184 ---------------- pkg/storage/raftentry/cache.go | 276 ++++++++++++++++++++++++ pkg/storage/raftentry/cache_test.go | 216 +++++++++++++++++++ pkg/storage/replica.go | 4 +- pkg/storage/replica_proposal.go | 3 +- pkg/storage/replica_raftstorage.go | 17 +- pkg/storage/replica_sideload.go | 5 +- pkg/storage/replica_sideload_test.go | 19 +- pkg/storage/replica_test.go | 8 +- pkg/storage/store.go | 5 +- 11 files changed, 525 insertions(+), 514 deletions(-) delete mode 100644 pkg/storage/entry_cache.go delete mode 100644 pkg/storage/entry_cache_test.go create mode 100644 pkg/storage/raftentry/cache.go create mode 100644 pkg/storage/raftentry/cache_test.go diff --git a/pkg/storage/entry_cache.go b/pkg/storage/entry_cache.go deleted file mode 100644 index 554ef859e7b8..000000000000 --- a/pkg/storage/entry_cache.go +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. See the AUTHORS file -// for names of contributors. - -package storage - -import ( - "github.com/biogo/store/llrb" - "go.etcd.io/etcd/raft/raftpb" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/cache" - "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -var ( - metaEntryCacheBytes = metric.Metadata{ - Name: "raft.entrycache.bytes", - Help: "Aggregate size of all Raft entries in the Raft entry cache", - Measurement: "Entry Bytes", - Unit: metric.Unit_BYTES, - } - metaEntryCacheSize = metric.Metadata{ - Name: "raft.entrycache.size", - Help: "Number of Raft entries in the Raft entry cache", - Measurement: "Entry Count", - Unit: metric.Unit_COUNT, - } - metaEntryCacheLookups = metric.Metadata{ - Name: "raft.entrycache.lookups", - Help: "Number of cache lookups in the Raft entry cache", - Measurement: "Lookups", - Unit: metric.Unit_COUNT, - } - metaEntryCacheHits = metric.Metadata{ - Name: "raft.entrycache.hits", - Help: "Number of successful cache lookups in the Raft entry cache", - Measurement: "Hits", - Unit: metric.Unit_COUNT, - } -) - -// RaftEntryCacheMetrics is the set of metrics for the raft entry cache. -type RaftEntryCacheMetrics struct { - Bytes *metric.Gauge - Size *metric.Gauge - Lookups *metric.Counter - Hits *metric.Counter -} - -func makeRaftEntryCacheMetrics() RaftEntryCacheMetrics { - return RaftEntryCacheMetrics{ - Bytes: metric.NewGauge(metaEntryCacheBytes), - Size: metric.NewGauge(metaEntryCacheSize), - Lookups: metric.NewCounter(metaEntryCacheLookups), - Hits: metric.NewCounter(metaEntryCacheHits), - } -} - -type entryCacheKey struct { - RangeID roachpb.RangeID - Index uint64 -} - -// Compare implements the llrb.Comparable interface for entryCacheKey, so that -// it can be used as a key for util.OrderedCache. -func (a *entryCacheKey) Compare(b llrb.Comparable) int { - bk := b.(*entryCacheKey) - switch { - case a.RangeID < bk.RangeID: - return -1 - case a.RangeID > bk.RangeID: - return 1 - case a.Index < bk.Index: - return -1 - case a.Index > bk.Index: - return 1 - default: - return 0 - } -} - -const defaultEntryCacheFreeListSize = 16 - -// entryCacheFreeList represents a free list of raftEntryCache cache.Entry -// objects. The free list is initially empty and is populated as entries are -// freed. -// -// A freelist is used here instead of a sync.Pool because all access to the -// freelist is protected by the raftEntryCache lock so it doesn't need its -// own locking. This allows us to avoid the added synchronization cost of -// a sync.Pool. -type entryCacheFreeList []*cache.Entry - -func makeEntryCacheFreeList(size int) entryCacheFreeList { - return make(entryCacheFreeList, 0, size) -} - -func (f *entryCacheFreeList) newEntry(key entryCacheKey, value raftpb.Entry) *cache.Entry { - i := len(*f) - 1 - if i < 0 { - alloc := struct { - key entryCacheKey - value raftpb.Entry - entry cache.Entry - }{ - key: key, - value: value, - } - alloc.entry.Key = &alloc.key - alloc.entry.Value = &alloc.value - return &alloc.entry - } - e := (*f)[i] - (*f)[i] = nil - (*f) = (*f)[:i] - - *e.Key.(*entryCacheKey) = key - *e.Value.(*raftpb.Entry) = value - return e -} - -func (f *entryCacheFreeList) freeEntry(e *cache.Entry) { - if len(*f) < cap(*f) { - *e.Key.(*entryCacheKey) = entryCacheKey{} - *e.Value.(*raftpb.Entry) = raftpb.Entry{} - *f = append(*f, e) - } -} - -// A raftEntryCache maintains a global cache of Raft group log entries. The -// cache mostly prevents unnecessary reads from disk of recently-written log -// entries between log append and application to the FSM. -// -// This cache stores entries with sideloaded proposals inlined (i.e. ready to -// be sent to followers). -type raftEntryCache struct { - syncutil.RWMutex // protects Cache for concurrent access - bytes uint64 // total size of the cache in bytes - cache *cache.OrderedCache // LRU cache of log entries, keyed by rangeID / log index - freeList entryCacheFreeList // used to avoid allocations on insertion - metrics RaftEntryCacheMetrics -} - -// newRaftEntryCache returns a new RaftEntryCache with the given -// maximum size in bytes. -func newRaftEntryCache(maxBytes uint64) *raftEntryCache { - rec := &raftEntryCache{ - cache: cache.NewOrderedCache(cache.Config{Policy: cache.CacheLRU}), - freeList: makeEntryCacheFreeList(defaultEntryCacheFreeListSize), - metrics: makeRaftEntryCacheMetrics(), - } - // The raft entry cache mutex will be held when the ShouldEvict - // and OnEvictedEntry callbacks are invoked. - // - // On ShouldEvict, compare the total size of the cache in bytes to the - // configured maxBytes. We also insist that at least one entry remains - // in the cache to prevent the case where a very large entry isn't able - // to be cached at all. - rec.cache.Config.ShouldEvict = func(n int, k, v interface{}) bool { - return rec.bytes > maxBytes && n >= 1 - } - rec.cache.Config.OnEvictedEntry = func(e *cache.Entry) { - ent := e.Value.(*raftpb.Entry) - rec.bytes -= uint64(ent.Size()) - rec.freeList.freeEntry(e) - } - - return rec -} - -// addEntries adds the slice of raft entries, using the range ID and the -// entry indexes as each cached entry's key. -func (rec *raftEntryCache) addEntries(rangeID roachpb.RangeID, ents []raftpb.Entry) { - if len(ents) == 0 { - return - } - rec.Lock() - defer rec.Unlock() - - for _, e := range ents { - key := entryCacheKey{RangeID: rangeID, Index: e.Index} - entry := rec.freeList.newEntry(key, e) - rec.cache.AddEntry(entry) - rec.bytes += uint64(e.Size()) - } - rec.updateGauges() -} - -// getTerm returns the term for the specified index and true for the second -// return value. If the index is not present in the cache, false is returned. -func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint64, bool) { - rec.metrics.Lookups.Inc(1) - rec.RLock() - defer rec.RUnlock() - - fromKey := entryCacheKey{RangeID: rangeID, Index: index} - k, v, ok := rec.cache.Ceil(&fromKey) - if !ok { - return 0, false - } - ecKey := k.(*entryCacheKey) - if ecKey.RangeID != rangeID || ecKey.Index != index { - return 0, false - } - rec.metrics.Hits.Inc(1) - ent := v.(*raftpb.Entry) - return ent.Term, true -} - -// getEntries returns entries between [lo, hi) for specified range. -// If any entries are returned for the specified indexes, they will -// start with index lo and proceed sequentially without gaps until -// 1) all entries exclusive of hi are fetched, 2) fetching another entry -// would add up to more than maxBytes of data, or 3) a cache miss occurs. -// The returned size reflects the size of the returned entries. -func (rec *raftEntryCache) getEntries( - ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64, -) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) { - rec.metrics.Lookups.Inc(1) - rec.RLock() - defer rec.RUnlock() - var bytes uint64 - nextIndex = lo - - fromKey := entryCacheKey{RangeID: rangeID, Index: lo} - toKey := entryCacheKey{RangeID: rangeID, Index: hi} - rec.cache.DoRange(func(k, v interface{}) bool { - ecKey := k.(*entryCacheKey) - if ecKey.Index != nextIndex { - return true - } - ent := v.(*raftpb.Entry) - size := uint64(ent.Size()) - if bytes+size > maxBytes { - exceededMaxBytes = true - if len(ents) > 0 { - return true - } - } - nextIndex++ - bytes += size - ents = append(ents, *ent) - return exceededMaxBytes - }, &fromKey, &toKey) - - if nextIndex == hi || exceededMaxBytes { - // We only consider an access a "hit" if it returns all requested - // entries or stops short because of a maximum bytes limit. - rec.metrics.Hits.Inc(1) - } - return ents, bytes, nextIndex, exceededMaxBytes -} - -// delEntries deletes entries between [lo, hi) for specified range. -func (rec *raftEntryCache) delEntries(rangeID roachpb.RangeID, lo, hi uint64) { - rec.Lock() - defer rec.Unlock() - if lo >= hi { - return - } - var cacheEnts []*cache.Entry - fromKey := entryCacheKey{RangeID: rangeID, Index: lo} - toKey := entryCacheKey{RangeID: rangeID, Index: hi} - rec.cache.DoRangeEntry(func(e *cache.Entry) bool { - cacheEnts = append(cacheEnts, e) - return false - }, &fromKey, &toKey) - - for _, e := range cacheEnts { - rec.cache.DelEntry(e) - } - rec.updateGauges() -} - -// clearTo clears the entries in the cache for specified range up to, -// but not including the specified index. -func (rec *raftEntryCache) clearTo(rangeID roachpb.RangeID, index uint64) { - rec.delEntries(rangeID, 0, index) -} - -// Metrics returns a struct which contains metrics for the raft entry cache. -func (rec *raftEntryCache) Metrics() RaftEntryCacheMetrics { - return rec.metrics -} - -func (rec *raftEntryCache) updateGauges() { - rec.metrics.Bytes.Update(int64(rec.bytes)) - rec.metrics.Size.Update(int64(rec.cache.Len())) -} diff --git a/pkg/storage/entry_cache_test.go b/pkg/storage/entry_cache_test.go deleted file mode 100644 index 5d876f299517..000000000000 --- a/pkg/storage/entry_cache_test.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package storage - -import ( - "math" - "reflect" - "testing" - - "go.etcd.io/etcd/raft/raftpb" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -const noLimit = math.MaxUint64 - -func newEntry(index, size uint64) raftpb.Entry { - return raftpb.Entry{ - Index: index, - Data: make([]byte, size), - } -} - -func addEntries(rec *raftEntryCache, rangeID roachpb.RangeID, lo, hi uint64) []raftpb.Entry { - ents := []raftpb.Entry{} - for i := lo; i < hi; i++ { - ents = append(ents, newEntry(i, 1)) - } - rec.addEntries(rangeID, ents) - return ents -} - -func verifyGet( - t *testing.T, - rec *raftEntryCache, - rangeID roachpb.RangeID, - lo, hi uint64, - expEnts []raftpb.Entry, - expNextIndex uint64, -) { - ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit) - if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) { - t.Fatalf("expected entries %+v; got %+v", expEnts, ents) - } - if nextIndex != expNextIndex { - t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex) - } - for _, e := range ents { - term, ok := rec.getTerm(rangeID, e.Index) - if !ok { - t.Fatalf("expected to be able to retrieve term") - } - if term != e.Term { - t.Fatalf("expected term %d, but got %d", e.Term, term) - } - } -} - -func TestEntryCache(t *testing.T) { - defer leaktest.AfterTest(t)() - rec := newRaftEntryCache(100) - rangeID := roachpb.RangeID(2) - // Add entries for range 1, indexes (1-10). - ents := addEntries(rec, rangeID, 1, 11) - // Fetch all data with an exact match. - verifyGet(t, rec, rangeID, 1, 11, ents, 11) - // Fetch point entry. - verifyGet(t, rec, rangeID, 1, 2, ents[0:1], 2) - // Fetch overlapping first half. - verifyGet(t, rec, rangeID, 0, 5, []raftpb.Entry{}, 0) - // Fetch overlapping second half. - verifyGet(t, rec, rangeID, 9, 12, ents[8:], 11) - // Fetch data from earlier range. - verifyGet(t, rec, roachpb.RangeID(1), 1, 11, []raftpb.Entry{}, 1) - // Fetch data from later range. - verifyGet(t, rec, roachpb.RangeID(3), 1, 11, []raftpb.Entry{}, 1) - // Create a gap in the entries. - rec.delEntries(rangeID, 4, 8) - // Fetch all data; verify we get only first three. - verifyGet(t, rec, rangeID, 1, 11, ents[0:3], 4) - // Try to fetch from within the gap; expect no entries. - verifyGet(t, rec, rangeID, 5, 11, []raftpb.Entry{}, 5) - // Fetch after the gap. - verifyGet(t, rec, rangeID, 8, 11, ents[7:], 11) - // Delete the prefix of entries. - rec.delEntries(rangeID, 1, 3) - // Verify entries are gone. - verifyGet(t, rec, rangeID, 1, 5, []raftpb.Entry{}, 1) - // Delete the suffix of entries. - rec.delEntries(rangeID, 10, 11) - // Verify get of entries at end of range. - verifyGet(t, rec, rangeID, 8, 11, ents[7:9], 10) - - for _, index := range []uint64{0, 12} { - if term, ok := rec.getTerm(rangeID, index); ok { - t.Fatalf("expected no term, but found %d", term) - } - } -} - -func TestEntryCacheClearTo(t *testing.T) { - defer leaktest.AfterTest(t)() - rangeID := roachpb.RangeID(1) - rec := newRaftEntryCache(100) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)}) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)}) - rec.clearTo(rangeID, 21) - if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 { - t.Errorf("expected no entries after clearTo") - } - if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 { - t.Errorf("expected entry 22 to remain in the cache clearTo") - } -} - -func TestEntryCacheEviction(t *testing.T) { - defer leaktest.AfterTest(t)() - rangeID := roachpb.RangeID(1) - rec := newRaftEntryCache(100) - rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)}) - ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit) - if len(ents) != 2 || hi != 3 { - t.Errorf("expected both entries; got %+v, %d", ents, hi) - } - // Add another entry to evict first. - rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)}) - ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit) - if len(ents) != 2 || hi != 4 { - t.Errorf("expected only two entries; got %+v, %d", ents, hi) - } -} - -func BenchmarkEntryCache(b *testing.B) { - rangeID := roachpb.RangeID(1) - ents := make([]raftpb.Entry, 1000) - for i := range ents { - ents[i] = newEntry(uint64(i+1), 8) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - rec := newRaftEntryCache(8 * uint64(len(ents)*len(ents[0].Data))) - for i := roachpb.RangeID(0); i < 10; i++ { - if i != rangeID { - rec.addEntries(i, ents) - } - } - b.StartTimer() - rec.addEntries(rangeID, ents) - _, _, _, _ = rec.getEntries(nil, rangeID, 0, uint64(len(ents)-10), noLimit) - rec.clearTo(rangeID, uint64(len(ents)-10)) - } -} - -func BenchmarkEntryCacheClearTo(b *testing.B) { - rangeID := roachpb.RangeID(1) - ents := make([]raftpb.Entry, 1000) - for i := range ents { - ents[i] = newEntry(uint64(i+1), 8) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - rec := newRaftEntryCache(uint64(len(ents) * len(ents[0].Data))) - rec.addEntries(rangeID, ents) - b.StartTimer() - rec.clearTo(rangeID, uint64(len(ents)-10)) - } -} diff --git a/pkg/storage/raftentry/cache.go b/pkg/storage/raftentry/cache.go new file mode 100644 index 000000000000..bcf0ef9898f0 --- /dev/null +++ b/pkg/storage/raftentry/cache.go @@ -0,0 +1,276 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package raftentry + +import ( + "container/list" + + "go.etcd.io/etcd/raft/raftpb" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +var ( + metaEntryCacheSize = metric.Metadata{ + Name: "raft.entrycache.size", + Help: "Number of Raft entries in the Raft entry cache", + Measurement: "Entry Count", + Unit: metric.Unit_COUNT, + } + metaEntryCacheBytes = metric.Metadata{ + Name: "raft.entrycache.bytes", + Help: "Aggregate size of all Raft entries in the Raft entry cache", + Measurement: "Entry Bytes", + Unit: metric.Unit_BYTES, + } + metaEntryCacheAccesses = metric.Metadata{ + Name: "raft.entrycache.accesses", + Help: "Number of cache lookups in the Raft entry cache", + Measurement: "Accesses", + Unit: metric.Unit_COUNT, + } + metaEntryCacheHits = metric.Metadata{ + Name: "raft.entrycache.hits", + Help: "Number of successful cache lookups in the Raft entry cache", + Measurement: "Hits", + Unit: metric.Unit_COUNT, + } +) + +// Metrics is the set of metrics for the raft entry cache. +type Metrics struct { + Size *metric.Gauge + Bytes *metric.Gauge + Accesses *metric.Counter + Hits *metric.Counter +} + +func makeMetrics() Metrics { + return Metrics{ + Size: metric.NewGauge(metaEntryCacheSize), + Bytes: metric.NewGauge(metaEntryCacheBytes), + Accesses: metric.NewCounter(metaEntryCacheAccesses), + Hits: metric.NewCounter(metaEntryCacheHits), + } +} + +// Cache maintains a global cache of Raft group log entries. The cache mostly +// prevents unnecessary reads from disk of recently-written log entries between +// log append and application to the FSM. +// +// This cache stores entries with sideloaded proposals inlined (i.e. ready to be +// sent to followers). +type Cache struct { + mu syncutil.Mutex + parts map[roachpb.RangeID]*partition + lru list.List + size uint64 // count of all entries + bytes uint64 // aggregate size of all entries + maxBytes uint64 + metrics Metrics +} + +// NB: partitions could maintain their own lock, allowing for a partitioned +// locking scheme where Cache only needs to hold a read lock when a partition is +// performing internal modifications. However, a global write lock would still +// be necessary to maintain the linked-list to enforce the LRU policy. Since we +// don't expect internal modifications to partitions to be slow due to the +// constant-time operations on the entry hashmap, this doesn't appear to be +// worth it. +type partition struct { + r roachpb.RangeID + ents map[uint64]raftpb.Entry + el *list.Element + bytes uint64 // aggregate size of all entries in partition +} + +// NewCache returns a new Cache instance. +func NewCache(maxBytes uint64) *Cache { + return &Cache{ + parts: make(map[roachpb.RangeID]*partition), + maxBytes: maxBytes, + metrics: makeMetrics(), + } +} + +// Add adds the slice of Raft entries to the cache. +func (c *Cache) Add(r roachpb.RangeID, ents []raftpb.Entry) { + if len(ents) == 0 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + // Get the partition. Create one if missing. + p, ok := c.parts[r] + if !ok { + p = &partition{ + r: r, + ents: make(map[uint64]raftpb.Entry, len(ents)), + } + p.el = c.lru.PushFront(p) + c.parts[r] = p + } else { + c.lru.MoveToFront(p.el) + } + + // Add entries to the current partition. + for _, e := range ents { + s := uint64(e.Size()) + + // Check if we're replacing an existing entry. + if oldE, ok := p.ents[e.Index]; ok { + // NB: it's possible that we're replacing an entry with a larger one + // that puts us over the size limit. We could delete the old entry + // and not insert the new one, but that would make this code more + // complicated, so it's not worth it to prevent that rare case. + s -= uint64(oldE.Size()) + p.ents[e.Index] = e + p.bytes += s + c.bytes += s + } else { + // If this new entry would push us above the byte limit, evict + // partitions to try to get below the limit. + for c.bytes+s > c.maxBytes && c.lru.Len() > 1 { + e := c.lru.Back() + ep := c.lru.Remove(e).(*partition) + c.bytes -= ep.bytes + c.size -= uint64(len(ep.ents)) + delete(c.parts, ep.r) + } + // If this will still push us above the limit, we must be the only + // partition. Break if this isn't the first entry in this partition. + if c.bytes+s > c.maxBytes { + if c.lru.Len() != 1 { + panic("unreachable") + } + if len(p.ents) > 0 { + break + } + } + + p.ents[e.Index] = e + c.size++ + p.bytes += s + c.bytes += s + } + } + c.updateGauges() +} + +// Clear deletes entries between [lo, hi) for specified range. +func (c *Cache) Clear(r roachpb.RangeID, lo, hi uint64) { + if lo >= hi { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return + } + // Don't `c.lru.MoveToFront(p.el)`. Clearing from a partition + // is not considered an "access" with respect to the LRU policy. + + // Delete entries in range. Maintain stats. + for i := lo; i < hi && len(p.ents) > 0; i++ { + if e, ok := p.ents[i]; ok { + delete(p.ents, i) + c.size-- + c.bytes -= uint64(e.Size()) + } + } + c.updateGauges() + + // Delete the partition if it's now empty. + if len(p.ents) == 0 { + c.lru.Remove(p.el) + delete(c.parts, p.r) + } +} + +// Get returns the entry for the specified index and true for the second return +// value. If the index is not present in the cache, false is returned. +func (c *Cache) Get(r roachpb.RangeID, idx uint64) (raftpb.Entry, bool) { + c.metrics.Accesses.Inc(1) + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return raftpb.Entry{}, false + } + c.lru.MoveToFront(p.el) + + e, ok := p.ents[idx] + if ok { + c.metrics.Hits.Inc(1) + } + return e, ok +} + +// Scan returns entries between [lo, hi) for specified range. If any entries are +// returned for the specified indexes, they will start with index lo and proceed +// sequentially without gaps until 1) all entries exclusive of hi are fetched, +// 2) fetching another entry would add up to more than maxBytes of data, or 3) a +// cache miss occurs. The returned size reflects the size of the returned +// entries. +func (c *Cache) Scan( + ents []raftpb.Entry, r roachpb.RangeID, lo, hi, maxBytes uint64, +) (_ []raftpb.Entry, bytes uint64, nextIdx uint64, exceededMaxBytes bool) { + c.metrics.Accesses.Inc(1) + c.mu.Lock() + defer c.mu.Unlock() + + p, ok := c.parts[r] + if !ok { + return ents, 0, lo, false + } + c.lru.MoveToFront(p.el) + + for nextIdx = lo; nextIdx < hi && !exceededMaxBytes; nextIdx++ { + e, ok := p.ents[nextIdx] + if !ok { + // Break if there are any gaps. + break + } + s := uint64(e.Size()) + if bytes+s > maxBytes { + exceededMaxBytes = true + if len(ents) > 0 { + break + } + } + bytes += s + ents = append(ents, e) + } + if nextIdx == hi || exceededMaxBytes { + // We only consider an access a "hit" if it returns all requested + // entries or stops short because of a maximum bytes limit. + c.metrics.Hits.Inc(1) + } + return ents, bytes, nextIdx, exceededMaxBytes +} + +// Metrics returns a struct which contains metrics for the cache. +func (c *Cache) Metrics() Metrics { return c.metrics } + +func (c *Cache) updateGauges() { + c.metrics.Bytes.Update(int64(c.bytes)) + c.metrics.Size.Update(int64(c.size)) +} diff --git a/pkg/storage/raftentry/cache_test.go b/pkg/storage/raftentry/cache_test.go new file mode 100644 index 000000000000..363936c03325 --- /dev/null +++ b/pkg/storage/raftentry/cache_test.go @@ -0,0 +1,216 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package raftentry + +import ( + "math" + "reflect" + "testing" + + "go.etcd.io/etcd/raft/raftpb" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +const noLimit = math.MaxUint64 + +func newEntry(index, size uint64) raftpb.Entry { + return raftpb.Entry{ + Index: index, + Data: make([]byte, size), + } +} + +func addEntries(c *Cache, rangeID roachpb.RangeID, lo, hi uint64) []raftpb.Entry { + ents := []raftpb.Entry{} + for i := lo; i < hi; i++ { + ents = append(ents, newEntry(i, 1)) + } + c.Add(rangeID, ents) + return ents +} + +func verifyGet( + t *testing.T, + c *Cache, + rangeID roachpb.RangeID, + lo, hi uint64, + expEnts []raftpb.Entry, + expNextIndex uint64, +) { + ents, _, nextIndex, _ := c.Scan(nil, rangeID, lo, hi, noLimit) + if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) { + t.Fatalf("expected entries %+v; got %+v", expEnts, ents) + } + if nextIndex != expNextIndex { + t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex) + } + for _, e := range ents { + found, ok := c.Get(rangeID, e.Index) + if !ok { + t.Fatalf("expected to be able to retrieve term") + } + if !reflect.DeepEqual(found, e) { + t.Fatalf("expected entry %v, but got %v", e, found) + } + } +} + +func TestEntryCache(t *testing.T) { + defer leaktest.AfterTest(t)() + c := NewCache(100) + rangeID := roachpb.RangeID(2) + // Add entries for range 1, indexes (1-10). + ents := addEntries(c, rangeID, 1, 11) + // Fetch all data with an exact match. + verifyGet(t, c, rangeID, 1, 11, ents, 11) + // Fetch point entry. + verifyGet(t, c, rangeID, 1, 2, ents[0:1], 2) + // Fetch overlapping first half. + verifyGet(t, c, rangeID, 0, 5, []raftpb.Entry{}, 0) + // Fetch overlapping second half. + verifyGet(t, c, rangeID, 9, 12, ents[8:], 11) + // Fetch data from earlier range. + verifyGet(t, c, roachpb.RangeID(1), 1, 11, []raftpb.Entry{}, 1) + // Fetch data from later range. + verifyGet(t, c, roachpb.RangeID(3), 1, 11, []raftpb.Entry{}, 1) + // Create a gap in the entries. + c.Clear(rangeID, 4, 8) + // Fetch all data; verify we get only first three. + verifyGet(t, c, rangeID, 1, 11, ents[0:3], 4) + // Try to fetch from within the gap; expect no entries. + verifyGet(t, c, rangeID, 5, 11, []raftpb.Entry{}, 5) + // Fetch after the gap. + verifyGet(t, c, rangeID, 8, 11, ents[7:], 11) + // Delete the prefix of entries. + c.Clear(rangeID, 1, 3) + // Verify entries are gone. + verifyGet(t, c, rangeID, 1, 5, []raftpb.Entry{}, 1) + // Delete the suffix of entries. + c.Clear(rangeID, 10, 11) + // Verify get of entries at end of range. + verifyGet(t, c, rangeID, 8, 11, ents[7:9], 10) + + for _, index := range []uint64{0, 12} { + if e, ok := c.Get(rangeID, index); ok { + t.Fatalf("expected no entry, but found %v", e) + } + } +} + +func TestEntryCacheClearTo(t *testing.T) { + defer leaktest.AfterTest(t)() + rangeID := roachpb.RangeID(1) + c := NewCache(100) + c.Add(rangeID, []raftpb.Entry{newEntry(2, 1)}) + c.Add(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)}) + c.Clear(rangeID, 0, 21) + if ents, _, _, _ := c.Scan(nil, rangeID, 2, 21, noLimit); len(ents) != 0 { + t.Errorf("expected no entries after clearTo") + } + if ents, _, _, _ := c.Scan(nil, rangeID, 21, 22, noLimit); len(ents) != 1 { + t.Errorf("expected entry 22 to remain in the cache clearTo") + } +} + +func TestEntryCacheEviction(t *testing.T) { + defer leaktest.AfterTest(t)() + rangeID, rangeID2 := roachpb.RangeID(1), roachpb.RangeID(2) + c := NewCache(100) + c.Add(rangeID, []raftpb.Entry{newEntry(1, 30), newEntry(2, 30)}) + ents, _, hi, _ := c.Scan(nil, rangeID, 1, 3, noLimit) + if len(ents) != 2 || hi != 3 { + t.Errorf("expected both entries; got %+v, %d", ents, hi) + } + if c.size != 2 { + t.Errorf("expected size=2; got %d", c.size) + } + // Add another entry to the same range. This would have + // exceeded the size limit, so it won't be added. + c.Add(rangeID, []raftpb.Entry{newEntry(3, 30)}) + ents, _, hi, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 2 || hi != 3 { + t.Errorf("expected two entries; got %+v, %d", ents, hi) + } + if c.size != 2 { + t.Errorf("expected size=2; got %d", c.size) + } + // Replace the first entry with a smaller one, then add + // third entry again. This time, there should be enough + // space. + c.Add(rangeID, []raftpb.Entry{newEntry(1, 5), newEntry(3, 30)}) + ents, _, hi, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 3 || hi != 4 { + t.Errorf("expected three entries; got %+v, %d", ents, hi) + } + if c.size != 3 { + t.Errorf("expected size=3; got %d", c.size) + } + // Add an entry to the new range. Should cause the old + // range's partition to be evicted. + c.Add(rangeID2, []raftpb.Entry{newEntry(1, 30)}) + ents, _, _, _ = c.Scan(nil, rangeID, 1, 4, noLimit) + if len(ents) != 0 { + t.Errorf("expected no entries; got %+v", ents) + } + ents, _, hi, _ = c.Scan(nil, rangeID2, 1, 2, noLimit) + if len(ents) != 1 || hi != 2 { + t.Errorf("expected one entry; got %+v, %d", ents, hi) + } + if c.size != 1 { + t.Errorf("expected size=1; got %d", c.size) + } +} + +func BenchmarkEntryCache(b *testing.B) { + rangeID := roachpb.RangeID(1) + ents := make([]raftpb.Entry, 1000) + for i := range ents { + ents[i] = newEntry(uint64(i+1), 8) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + c := NewCache(15 * uint64(len(ents)*len(ents[0].Data))) + for i := roachpb.RangeID(0); i < 10; i++ { + if i != rangeID { + c.Add(i, ents) + } + } + b.StartTimer() + c.Add(rangeID, ents) + _, _, _, _ = c.Scan(nil, rangeID, 0, uint64(len(ents)-10), noLimit) + c.Clear(rangeID, 0, uint64(len(ents)-10)) + } +} + +func BenchmarkEntryCacheClearTo(b *testing.B) { + rangeID := roachpb.RangeID(1) + ents := make([]raftpb.Entry, 1000) + for i := range ents { + ents[i] = newEntry(uint64(i+1), 8) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + c := NewCache(uint64(len(ents) * len(ents[0].Data))) + c.Add(rangeID, ents) + b.StartTimer() + c.Clear(rangeID, 0, uint64(len(ents)-10)) + } +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 922e1440110e..9e9bf679bf5b 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4440,7 +4440,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if !allCommitted { // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. - r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) + r.store.raftEntryCache.Add(r.RangeID, rd.Entries) if len(rd.CommittedEntries) > 0 { // Clear the entries that we just applied out of the Raft log entry @@ -4450,7 +4450,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // case. lo := rd.CommittedEntries[0].Index hi := rd.CommittedEntries[len(rd.CommittedEntries)-1].Index - r.store.raftEntryCache.delEntries(r.RangeID, lo, hi+1) + r.store.raftEntryCache.Clear(r.RangeID, lo, hi+1) } } } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 21372cddec14..eb51c245408a 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -548,12 +548,13 @@ func (r *Replica) handleReplicatedEvalResult( rResult.State.TruncatedState = nil // for assertion r.mu.Lock() + oldTruncatedState := r.mu.state.TruncatedState r.mu.state.TruncatedState = newTruncState r.mu.Unlock() // Clear any entries in the Raft log entry cache for this range up // to and including the most recently truncated index. - r.store.raftEntryCache.clearTo(r.RangeID, newTruncState.Index+1) + r.store.raftEntryCache.Clear(r.RangeID, oldTruncatedState.Index+1, newTruncState.Index+1) // Truncate the sideloaded storage. Note that this is safe only if the new truncated state // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 10949b13d3cd..da0e20dc1859 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -101,7 +102,7 @@ func entries( rsl stateloader.StateLoader, e engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, sideloaded sideloadStorage, lo, hi, maxBytes uint64, ) ([]raftpb.Entry, error) { @@ -115,7 +116,7 @@ func entries( } ents := make([]raftpb.Entry, 0, n) - ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes) + ents, size, hitIndex, exceededMaxBytes := eCache.Scan(ents, rangeID, lo, hi, maxBytes) // Return results if the correct number of results came back or if // we ran into the max bytes limit. @@ -174,7 +175,7 @@ func entries( } // Cache the fetched entries, if we may. if canCache { - eCache.addEntries(rangeID, ents) + eCache.Add(rangeID, ents) } // Did the correct number of results come back? If so, we're all good. @@ -255,8 +256,8 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) { return r.mu.lastTerm, nil } // Try to retrieve the term for the desired entry from the entry cache. - if term, ok := r.store.raftEntryCache.getTerm(r.RangeID, i); ok { - return term, nil + if e, ok := r.store.raftEntryCache.Get(r.RangeID, i); ok { + return e.Term, nil } readonly := r.store.Engine().NewReadOnly() defer readonly.Close() @@ -274,7 +275,7 @@ func term( rsl stateloader.StateLoader, eng engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, i uint64, ) (uint64, error) { // entries() accepts a `nil` sideloaded storage and will skip inlining of @@ -460,7 +461,7 @@ type OutgoingSnapshot struct { // or RaftSnap -- a log truncation could have removed files from the // sideloaded storage in the meantime. WithSideloaded func(func(sideloadStorage) error) error - RaftEntryCache *raftEntryCache + RaftEntryCache *raftentry.Cache snapType string } @@ -494,7 +495,7 @@ func snapshot( snapType string, snap engine.Reader, rangeID roachpb.RangeID, - eCache *raftEntryCache, + eCache *raftentry.Cache, withSideloaded func(func(sideloadStorage) error) error, startKey roachpb.RKey, ) (OutgoingSnapshot, error) { diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 8b421cfb90e7..9efd8472a084 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -18,6 +18,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -176,7 +177,7 @@ func maybeInlineSideloadedRaftCommand( rangeID roachpb.RangeID, ent raftpb.Entry, sideloaded sideloadStorage, - entryCache *raftEntryCache, + entryCache *raftentry.Cache, ) (*raftpb.Entry, error) { if !sniffSideloadedRaftCommand(ent.Data) { return nil, nil @@ -185,7 +186,7 @@ func maybeInlineSideloadedRaftCommand( // We could unmarshal this yet again, but if it's committed we // are very likely to have appended it recently, in which case // we can save work. - cachedSingleton, _, _, _ := entryCache.getEntries( + cachedSingleton, _, _, _ := entryCache.Scan( nil, rangeID, ent.Index, ent.Index+1, 1<<20, ) diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index aa00c8644ff5..adbb5ec5d819 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -368,7 +369,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { // after having (perhaps) been modified. thin, fat raftpb.Entry // Populate the raft entry cache and sideload storage before running the test. - setup func(*raftEntryCache, sideloadStorage) + setup func(*raftentry.Cache, sideloadStorage) // If nonempty, the error expected from maybeInlineSideloadedRaftCommand. expErr string // If nonempty, a regex that the recorded trace span must match. @@ -383,7 +384,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { CRC32: 0, // not checked } - putOnDisk := func(ec *raftEntryCache, ss sideloadStorage) { + putOnDisk := func(ec *raftentry.Cache, ss sideloadStorage) { if err := ss.Put(context.Background(), 5, 6, sstFat.Data); err != nil { t.Fatal(err) } @@ -410,14 +411,14 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { }, "v2-with-payload-with-file-with-cache": { thin: mkEnt(v2, 5, 6, &sstThin), fat: mkEnt(v2, 5, 6, &sstFat), - setup: func(ec *raftEntryCache, ss sideloadStorage) { + setup: func(ec *raftentry.Cache, ss sideloadStorage) { putOnDisk(ec, ss) - ec.addEntries(rangeID, []raftpb.Entry{mkEnt(v2, 5, 6, &sstFat)}) + ec.Add(rangeID, []raftpb.Entry{mkEnt(v2, 5, 6, &sstFat)}) }, expTrace: "using cache hit", }, "v2-fat-without-file": { thin: mkEnt(v2, 5, 6, &sstFat), fat: mkEnt(v2, 5, 6, &sstFat), - setup: func(ec *raftEntryCache, ss sideloadStorage) {}, + setup: func(ec *raftentry.Cache, ss sideloadStorage) {}, expTrace: "already inlined", }, } @@ -426,7 +427,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording") defer cancel() - ec := newRaftEntryCache(1024) // large enough + ec := raftentry.NewCache(1024) // large enough ss := mustNewInMemSideloadStorage(rangeID, roachpb.ReplicaID(1), ".") if test.setup != nil { test.setup(ec, ss) @@ -832,7 +833,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() for _, withSS := range []bool{false, true} { - tc.store.raftEntryCache.clearTo(tc.repl.RangeID, sideloadedIndex+1) + tc.store.raftEntryCache.Clear(tc.repl.RangeID, 0, sideloadedIndex+1) var ss sideloadStorage if withSS { @@ -849,7 +850,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { if len(entries) != 1 { t.Fatalf("no or too many entries returned from cache: %+v", entries) } - ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) + ents, _, _, _ := tc.store.raftEntryCache.Scan(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) if withSS { // We passed the sideload storage, so we expect to get our // inlined index back from the cache. @@ -898,7 +899,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { tc.repl.raftMu.Unlock() // Additionally we need to clear out the entry from the cache because // that would still save the day. - tc.store.raftEntryCache.clearTo(tc.repl.RangeID, sideloadedIndex+1) + tc.store.raftEntryCache.Clear(tc.repl.RangeID, 0, sideloadedIndex+1) mockSender := &mockSender{} err = sendSnapshot( diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index eaff048251ab..97fcfd12ee5c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7342,7 +7342,7 @@ func TestEntries(t *testing.T) { // Case 20: lo and hi are available, but entry cache evicted. {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 0, setup: func() { // Manually evict cache for the first 10 log entries. - repl.store.raftEntryCache.delEntries(rangeID, indexes[0], indexes[9]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[0], indexes[9]+1) indexes = append(indexes, populateLogs(10, 40)...) }}, // Case 21: lo and hi are available, entry cache evicted and hi available in cache. @@ -7360,7 +7360,7 @@ func TestEntries(t *testing.T) { if tc.maxBytes == 0 { tc.maxBytes = math.MaxUint64 } - cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) + cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.Scan(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) if len(cacheEntries) != tc.expCacheCount { t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries)) } @@ -7395,7 +7395,7 @@ func TestEntries(t *testing.T) { if err := engine.MVCCDelete(context.Background(), tc.store.Engine(), nil, keys.RaftLogKey(rangeID, indexes[6]), hlc.Timestamp{}, nil); err != nil { t.Fatal(err) } - repl.store.raftEntryCache.delEntries(rangeID, indexes[6], indexes[6]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[6], indexes[6]+1) repl.mu.Lock() defer repl.mu.Unlock() @@ -7415,7 +7415,7 @@ func TestEntries(t *testing.T) { } // Case 25b: don't hit the gap due to maxBytes, cache cleared. { - repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1) + repl.store.raftEntryCache.Clear(rangeID, indexes[5], indexes[5]+1) ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1) if err != nil { t.Errorf("25: expected no error, got %s", err) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c44dd58c6fab..2111b9ad38f4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/idalloc" + "github.com/cockroachdb/cockroach/pkg/storage/raftentry" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -395,7 +396,7 @@ type Store struct { consistencyQueue *consistencyQueue // Replica consistency check queue metrics *StoreMetrics intentResolver *intentResolver - raftEntryCache *raftEntryCache + raftEntryCache *raftentry.Cache limiters batcheval.Limiters // gossipRangeCountdown and leaseRangeCountdown are countdowns of @@ -946,7 +947,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript s.draining.Store(false) s.scheduler = newRaftScheduler(s.metrics, s, storeSchedulerConcurrency) - s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize) + s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) s.coalescedMu.Lock()