Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

[WIP] Add method to cache many chunks at once #940

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
default:
$(MAKE) all
test:
CGO_ENABLED=1 go test -v -race $(shell go list ./... | grep -v /vendor/ | grep -v chaos)
CGO_ENABLED=1 go test -v -race $(shell go list ./... | grep -v /vendor/ | grep -v stacktest)
check:
$(MAKE) test
bin:
Expand Down
13 changes: 8 additions & 5 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,20 +561,23 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
default:
}

for _, itgen := range storeIterGens {
for i, itgen := range storeIterGens {
it, err := itgen.Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
if i > 0 {
// add all the iterators that are in good shape
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens[:i])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a nice idea, to still add those that were not corrupted

}
return iters, err
}
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.Add(ctx.AMKey, prevts, itgen)
prevts = itgen.Ts
iters = append(iters, *it)
}
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens)
}

// the End slice is in reverse order
Expand Down
2 changes: 1 addition & 1 deletion mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {

func (a *AggMetric) pushToCache(c *chunk.Chunk) {
// push into cache
go a.cachePusher.CacheIfHot(
go a.cachePusher.AddIfHot(
a.Key,
0,
*chunk.NewBareIterGen(
Expand Down
2 changes: 1 addition & 1 deletion mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) {
calledCb := make(chan bool)

mockCache := cache.MockCache{}
mockCache.CacheIfHotCb = func() { calledCb <- true }
mockCache.AddIfHotCb = func() { calledCb <- true }

numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300)
ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)}
Expand Down
18 changes: 12 additions & 6 deletions mdata/cache/cache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type MockCache struct {
sync.Mutex
AddCount int
CacheIfHotCount int
CacheIfHotCb func()
AddIfHotCount int
AddIfHotCb func()
StopCount int
SearchCount int
DelMetricArchives int
Expand All @@ -31,12 +31,18 @@ func (mc *MockCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen
mc.AddCount++
}

func (mc *MockCache) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
func (mc *MockCache) AddRange(metric schema.AMKey, prev uint32, itergens []chunk.IterGen) {
mc.Lock()
defer mc.Unlock()
mc.CacheIfHotCount++
if mc.CacheIfHotCb != nil {
mc.CacheIfHotCb()
mc.AddCount += len(itergens)
}

func (mc *MockCache) AddIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
mc.Lock()
defer mc.Unlock()
mc.AddIfHotCount++
if mc.AddIfHotCb != nil {
mc.AddIfHotCb()
}
}

Expand Down
38 changes: 35 additions & 3 deletions mdata/cache/ccache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *CCache) DelMetric(rawMetric schema.MKey) (int, int) {
}

// adds the given chunk to the cache, but only if the metric is sufficiently hot
func (c *CCache) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
func (c *CCache) AddIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
c.RLock()

var met *CCacheMetric
Expand Down Expand Up @@ -136,8 +136,8 @@ func (c *CCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {

ccm, ok := c.metricCache[metric]
if !ok {
ccm = NewCCacheMetric()
ccm.Init(metric.MKey, prev, itergen)
ccm = NewCCacheMetric(metric.MKey)
ccm.Add(prev, itergen)
c.metricCache[metric] = ccm

// if we do not have this raw key yet, create the entry with the association
Expand All @@ -157,6 +157,38 @@ func (c *CCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
c.accnt.AddChunk(metric, itergen.Ts, itergen.Size())
}

func (c *CCache) AddRange(metric schema.AMKey, prev uint32, itergens []chunk.IterGen) {
if len(itergens) == 0 {
return
}
c.Lock()
defer c.Unlock()

ccm, ok := c.metricCache[metric]
if !ok {
ccm = NewCCacheMetric(metric.MKey)
ccm.AddRange(prev, itergens)
c.metricCache[metric] = ccm

// if we do not have this raw key yet, create the entry with the association
ccms, ok := c.metricRawKeys[metric.MKey]
if !ok {
c.metricRawKeys[metric.MKey] = map[schema.Archive]struct{}{
metric.Archive: {},
}
} else {
// otherwise, make sure the association exists
ccms[metric.Archive] = struct{}{}
}
} else {
ccm.AddRange(prev, itergens)
}

for _, itergen := range itergens {
c.accnt.AddChunk(metric, itergen.Ts, itergen.Size())
Copy link
Contributor Author

@replay replay Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dieterbe when you looked at the cpu profiles, did this method ever show up? If so, we could batch-add those as well. The benefit is probably going to be smaller because this function just directly pushes the data into a channel. We could also push a whole batch into the channel instead of one-by-one and then optimize the accounting to handle batches, but i'm not sure if there is any benefit to that because in the request-handling thread this will only result in less function calls and less pushes into that channel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was wondering the same thing. it would definitely help with Failed to submit event to accounting, channel was blocked. this is a different issue, but we it shouldn't be possible to hit that situation in normal scenarios, so from that perspective we probably want to batch-add, and also batch other things that could overload the channel (like deletes perhaps?)

}
}

func (cc *CCache) Reset() (int, int) {
cc.Lock()
cc.accnt.Reset()
Expand Down
176 changes: 155 additions & 21 deletions mdata/cache/ccache_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@ type CCacheMetric struct {
}

// NewCCacheMetric creates a CCacheMetric
func NewCCacheMetric() *CCacheMetric {
func NewCCacheMetric(mkey schema.MKey) *CCacheMetric {
return &CCacheMetric{
MKey: mkey,
chunks: make(map[uint32]*CCacheChunk),
}
}

func (mc *CCacheMetric) Init(MKey schema.MKey, prev uint32, itergen chunk.IterGen) {
mc.Add(prev, itergen)
mc.MKey = MKey
}

// Del deletes chunks for the given timestamp
func (mc *CCacheMetric) Del(ts uint32) int {
mc.Lock()
Expand Down Expand Up @@ -64,11 +60,130 @@ func (mc *CCacheMetric) Del(ts uint32) int {
delete(mc.chunks, ts)

// regenerate the list of sorted keys after deleting a chunk
// NOTE: we can improve perf by just taking out the ts (partially rewriting
// the slice in one go), can we also batch deletes?
mc.generateKeys()

return len(mc.chunks)
}

// AddRange adds a range (sequence) of chunks.
// Note the following requirements:
// the sequence should be in ascending timestamp order
// the sequence should be complete (no gaps)
func (mc *CCacheMetric) AddRange(prev uint32, itergens []chunk.IterGen) {
if len(itergens) == 0 {
return
}

if len(itergens) == 1 {
mc.Add(prev, itergens[0])
return
}

mc.Lock()
defer mc.Unlock()

// pre-allocate 1 slice, cheaper than allocating one by one
chunks := make([]CCacheChunk, 0, len(itergens))

// handle the first one
itergen := itergens[0]
ts := itergen.Ts

// if we add data that is older than chunks already cached,
// we will have to sort the keys once we're done adding them
sortKeys := len(mc.keys) > 0 && mc.keys[len(mc.keys)-1] > ts

// add chunk if we don't have it yet (most likely)
if _, ok := mc.chunks[ts]; !ok {

// if previous chunk has not been passed we try to be smart and figure it out.
// this is common in a scenario where a metric continuously gets queried
// for a range that starts less than one chunkspan before now().
if prev == 0 {
res, ok := mc.seekDesc(ts - 1)
if ok {
prev = res
}
}

// if the previous chunk is cached, link it
if _, ok := mc.chunks[prev]; ok {
mc.chunks[prev].Next = ts
} else {
prev = 0
}

chunks = append(chunks, CCacheChunk{
Ts: ts,
Prev: prev,
Next: itergens[1].Ts,
Itgen: itergen,
})
mc.chunks[ts] = &chunks[len(chunks)-1]
mc.keys = append(mc.keys, ts)
} else {
mc.chunks[ts].Next = itergens[1].Ts
}

prev = ts

// handle the 2nd until the last-but-one
for i := 1; i < len(itergens)-1; i++ {
itergen = itergens[i]
ts = itergen.Ts
// add chunk, potentially overwriting pre-existing chunk (unlikely)
chunks = append(chunks, CCacheChunk{
Ts: ts,
Prev: prev,
Next: itergens[i+1].Ts,
Itgen: itergen,
})
mc.chunks[ts] = &chunks[len(chunks)-1]
mc.keys = append(mc.keys, ts)

prev = ts
}

// handle the last one
itergen = itergens[len(itergens)-1]
ts = itergen.Ts

// add chunk if we don't have it yet (most likely)
if _, ok := mc.chunks[ts]; !ok {

// if nextTs() can't figure out the end date it returns ts
next := mc.nextTsCore(itergen, ts, prev, 0)
if next == ts {
next = 0
} else {
// if the next chunk is cached, link in both directions
if _, ok := mc.chunks[next]; ok {
mc.chunks[next].Prev = ts
} else {
next = 0
}
}

chunks = append(chunks, CCacheChunk{
Ts: ts,
Prev: prev,
Next: next,
Itgen: itergen,
})
mc.chunks[ts] = &chunks[len(chunks)-1]
mc.keys = append(mc.keys, ts)
}

if sortKeys {
sort.Sort(accnt.Uint32Asc(mc.keys))
}

return
}

// Add adds a chunk to the cache
func (mc *CCacheMetric) Add(prev uint32, itergen chunk.IterGen) {
ts := itergen.Ts

Expand Down Expand Up @@ -116,12 +231,26 @@ func (mc *CCacheMetric) Add(prev uint32, itergen chunk.IterGen) {
}
}

// regenerate the list of sorted keys after adding a chunk
mc.generateKeys()
mc.addKey(ts)

return
}

func (mc *CCacheMetric) addKey(ts uint32) {

// if no keys yet, just add it and it's sorted
if len(mc.keys) == 0 {
mc.keys = append(mc.keys, ts)
return
}

// add the ts, and sort if necessary
mc.keys = append(mc.keys, ts)
if mc.keys[len(mc.keys)-1] < mc.keys[len(mc.keys)-2] {
sort.Sort(accnt.Uint32Asc(mc.keys))
}
}

// generateKeys generates sorted slice of all chunk timestamps
// assumes we have at least read lock
func (mc *CCacheMetric) generateKeys() {
Expand All @@ -137,24 +266,29 @@ func (mc *CCacheMetric) generateKeys() {
// assumes we already have at least a read lock
func (mc *CCacheMetric) nextTs(ts uint32) uint32 {
chunk := mc.chunks[ts]
span := chunk.Itgen.Span
return mc.nextTsCore(chunk.Itgen, chunk.Ts, chunk.Prev, chunk.Next)
}

// nextTsCore returns the ts of the next chunk, given a chunks key properties
// (to the extent we know them). It guesses if necessary.
// assumes we already have at least a read lock
func (mc *CCacheMetric) nextTsCore(itgen chunk.IterGen, ts, prev, next uint32) uint32 {
span := itgen.Span
if span > 0 {
// if the chunk is span-aware we don't need anything else
return chunk.Ts + span
return ts + span
}

if chunk.Next == 0 {
if chunk.Prev == 0 {
// if a chunk has no next and no previous chunk we have to assume it's length is 0
return chunk.Ts
} else {
// if chunk has no next chunk, but has a previous one, we assume the length of this one is same as the previous one
return chunk.Ts + (chunk.Ts - chunk.Prev)
}
} else {
// if chunk has a next chunk, then that's the ts we need
return chunk.Next
// if chunk has a next chunk, then that's the ts we need
if next != 0 {
return next
}
// if chunk has no next chunk, but has a previous one, we assume the length of this one is same as the previous one
if prev != 0 {
return ts + (ts - prev)
}
// if a chunk has no next and no previous chunk we have to assume it's length is 0
return ts
}

// lastTs returns the last Ts of this metric cache
Expand Down
Loading