This repository was archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 107
add findCache for memoryIdx #1233
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
c596f23
add findCache for memoryIdx
woodsaj a988601
update deps
woodsaj e61cc0c
gofmt
woodsaj 9857ef1
update docs/metrics.md
woodsaj 040ec58
purge findCache if rate of new series is high
woodsaj 68f9601
disable the findCache for 1min when a flood of new series is received.
woodsaj 2088aab
make findCache settings configurable
woodsaj b219cdc
fix buggy InvalidateFor func. Add idx.PurgeAll() func
woodsaj 45a97ae
update ConcurrentInsertFind benchmark to perform real searches
woodsaj 3c3ff4b
fix docs/metrics.md for findCache metrics
woodsaj 2d6eb07
increment findCacheMiss when the findCache is empty
woodsaj e20f590
correctly build tree in findCache.InvalidateFor
woodsaj 4450bb6
add findCache settings for metrictank-docker.ini
woodsaj 7881e57
be consistent with return values in FindCache.Get
robert-milan 6d798d8
avoid race conditions in find_cache
woodsaj 1b7f60f
dont print info logs in tests and benchmarks
woodsaj 5cebf22
pass findCache tunables to constructor
woodsaj 1c17c57
add unit tests for findCache
woodsaj 3b6d799
call InvlidateFor when there are not too many items to purge.
woodsaj 0471892
update code comments for findCache
woodsaj d27fb9b
call findCache.InvalidateFor if number of deletedDefs is low
woodsaj 15786e9
improve accuracy of findCache unit test
woodsaj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
package memory | ||
|
||
import ( | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/grafana/metrictank/stats" | ||
lru "github.com/hashicorp/golang-lru" | ||
"github.com/raintank/schema" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
var ( | ||
// metric idx.memory.find-cache.hit is a counter of findCache hits | ||
findCacheHit = stats.NewCounterRate32("idx.memory.find-cache.hit") | ||
// metric idx.memory.find-cache.miss is a counter of findCache misses | ||
findCacheMiss = stats.NewCounterRate32("idx.memory.find-cache.miss") | ||
) | ||
|
||
// FindCache is a caching layer for the in-memory index. The cache provides | ||
// per org LRU caches of patterns and the resulting []*Nodes from searches | ||
// on the index. Users should call `InvalidateFor(orgId, path)` when new | ||
// entries are added to the cache to invalidate any cached patterns that match | ||
// the path. `invalidateQueueSize` sets the maximum number of invalidations for | ||
// a specific orgId that can be running at any time. If this number is exceeded | ||
// then the cache for that orgId will be immediately purged and disabled for | ||
// `backoffTime`. This mechanism protects the instance from excessive resource | ||
// usage when a large number of new series are added at once. | ||
type FindCache struct { | ||
sync.RWMutex | ||
cache map[uint32]*lru.Cache | ||
size int | ||
invalidateQueueSize int | ||
backoffTime time.Duration | ||
newSeries map[uint32]chan struct{} | ||
backoff map[uint32]time.Time | ||
} | ||
|
||
func NewFindCache(size, invalidateQueueSize int, backoffTime time.Duration) *FindCache { | ||
fc := &FindCache{ | ||
cache: make(map[uint32]*lru.Cache), | ||
size: size, | ||
invalidateQueueSize: invalidateQueueSize, | ||
backoffTime: backoffTime, | ||
newSeries: make(map[uint32]chan struct{}), | ||
backoff: make(map[uint32]time.Time), | ||
} | ||
return fc | ||
} | ||
|
||
func (c *FindCache) Get(orgId uint32, pattern string) ([]*Node, bool) { | ||
c.RLock() | ||
cache, ok := c.cache[orgId] | ||
c.RUnlock() | ||
if !ok { | ||
findCacheMiss.Inc() | ||
return nil, ok | ||
} | ||
nodes, ok := cache.Get(pattern) | ||
if !ok { | ||
woodsaj marked this conversation as resolved.
Show resolved
Hide resolved
woodsaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
findCacheMiss.Inc() | ||
return nil, ok | ||
} | ||
findCacheHit.Inc() | ||
return nodes.([]*Node), ok | ||
} | ||
|
||
func (c *FindCache) Add(orgId uint32, pattern string, nodes []*Node) { | ||
c.RLock() | ||
cache, ok := c.cache[orgId] | ||
t := c.backoff[orgId] | ||
c.RUnlock() | ||
var err error | ||
if !ok { | ||
// dont init the cache if we are in backoff mode. | ||
if time.Until(t) > 0 { | ||
return | ||
} | ||
cache, err = lru.New(c.size) | ||
if err != nil { | ||
log.Errorf("memory-idx: findCache failed to create lru. err=%s", err) | ||
return | ||
} | ||
c.Lock() | ||
c.cache[orgId] = cache | ||
c.newSeries[orgId] = make(chan struct{}, c.invalidateQueueSize) | ||
c.Unlock() | ||
} | ||
cache.Add(pattern, nodes) | ||
} | ||
|
||
// Purge clears the cache for the specified orgId | ||
func (c *FindCache) Purge(orgId uint32) { | ||
c.RLock() | ||
cache, ok := c.cache[orgId] | ||
c.RUnlock() | ||
if !ok { | ||
return | ||
} | ||
cache.Purge() | ||
} | ||
|
||
// PurgeAll clears the caches for all orgIds | ||
func (c *FindCache) PurgeAll() { | ||
c.RLock() | ||
orgs := make([]uint32, len(c.cache)) | ||
i := 0 | ||
for k := range c.cache { | ||
orgs[i] = k | ||
i++ | ||
} | ||
c.RUnlock() | ||
for _, org := range orgs { | ||
c.Purge(org) | ||
} | ||
} | ||
|
||
// InvalidateFor removes entries from the cache for 'orgId' | ||
// that match the provided path. If lots of InvalidateFor calls | ||
// are made at once and we end up with `invalidateQueueSize` concurrent | ||
// goroutines processing the invalidations, we purge the cache and | ||
// disable it for `backoffTime`. Future InvalidateFor calls made during | ||
// the backoff time will then return immediately. | ||
func (c *FindCache) InvalidateFor(orgId uint32, path string) { | ||
c.RLock() | ||
ch := c.newSeries[orgId] | ||
cache, ok := c.cache[orgId] | ||
c.RUnlock() | ||
if !ok || cache.Len() < 1 { | ||
return | ||
} | ||
|
||
select { | ||
case ch <- struct{}{}: | ||
default: | ||
c.Lock() | ||
c.backoff[orgId] = time.Now().Add(c.backoffTime) | ||
delete(c.cache, orgId) | ||
c.Unlock() | ||
for i := 0; i < len(ch); i++ { | ||
select { | ||
case <-ch: | ||
default: | ||
} | ||
} | ||
log.Infof("memory-idx: findCache invalidate-queue full. Disabling cache for %s. num-cached-entries=%d", c.backoffTime.String(), cache.Len()) | ||
return | ||
} | ||
|
||
// convert our path to a tree so that we can call `find(tree, pattern)` | ||
// for each pattern in the cache. | ||
tree := treeFromPath(path) | ||
|
||
for _, k := range cache.Keys() { | ||
matches, err := find(tree, k.(string)) | ||
if err != nil { | ||
log.Errorf("memory-idx: checking if new series matches expressions in findCache. series=%s expr=%s err=%s", path, k, err) | ||
continue | ||
} | ||
if len(matches) > 0 { | ||
cache.Remove(k) | ||
} | ||
} | ||
select { | ||
case <-ch: | ||
default: | ||
} | ||
} | ||
|
||
// PurgeFindCache purges the findCaches for all orgIds | ||
func (m *UnpartitionedMemoryIdx) PurgeFindCache() { | ||
m.findCache.PurgeAll() | ||
} | ||
|
||
// PurgeFindCache purges the findCaches for all orgIds | ||
// across all partitions | ||
func (p *PartitionedMemoryIdx) PurgeFindCache() { | ||
for _, m := range p.Partition { | ||
m.findCache.PurgeAll() | ||
} | ||
} | ||
|
||
// treeFromPath creates a index tree from a series path. | ||
// The tree will have a single leaf node and nodes for | ||
// each branch. | ||
func treeFromPath(path string) *Tree { | ||
tree := &Tree{ | ||
Items: map[string]*Node{ | ||
"": { | ||
Path: "", | ||
Children: make([]string, 0), | ||
Defs: make([]schema.MKey, 0), | ||
}, | ||
}, | ||
} | ||
pos := strings.Index(path, ".") | ||
prevPos := 0 | ||
for { | ||
woodsaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
branch := path[:pos] | ||
// add as child of parent branch | ||
thisNode := branch[prevPos+1:] | ||
if prevPos == 0 { | ||
thisNode = branch[prevPos:] | ||
} | ||
tree.Items[path[:prevPos]].Children = []string{thisNode} | ||
|
||
// create this branch/leaf | ||
tree.Items[branch] = &Node{ | ||
Path: branch, | ||
} | ||
if branch == path { | ||
tree.Items[branch].Defs = []schema.MKey{{}} | ||
break | ||
} | ||
prevPos = pos | ||
nextPos := strings.Index(path[pos+1:], ".") | ||
if nextPos < 0 { | ||
pos = len(path) | ||
} else { | ||
pos = pos + nextPos + 1 | ||
} | ||
} | ||
|
||
return tree | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance that
cache
could have already been deleted before attempting this call in some fringe situations?Edit:
I think all of the calls are wrapped inside RLocks or Locks in the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that one thread calls
cache, ok := c.cache[orgId]
then releases the lock, then beforenodes, ok := cache.Get(pattern)
is executed another thread gets a write lock and callsdelete(c.cache, orgId)
. But it doesnt really matter. This is still safe as the items in c.cache[orgId] are pointers. The end result is that calls tofindCache.Get()
will return results based on the content of cache when the Rlock() was acquired an not on the contents of the cache at the specific time thatnodes, ok := cache.Get(pattern)
is executed.