Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

chunk cache perf fixes: AddRange + batched accounting #943

Merged
merged 17 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
default:
$(MAKE) all
test:
CGO_ENABLED=1 go test -v -race $(shell go list ./... | grep -v /vendor/ | grep -v chaos)
CGO_ENABLED=1 go test -v -race $(shell go list ./... | grep -v /vendor/ | grep -v stacktest)
check:
$(MAKE) test
bin:
Expand Down
13 changes: 8 additions & 5 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,20 +561,23 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
default:
}

for _, itgen := range storeIterGens {
for i, itgen := range storeIterGens {
it, err := itgen.Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
if i > 0 {
// add all the iterators that are in good shape
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens[:i])
}
return iters, err
}
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.Add(ctx.AMKey, prevts, itgen)
prevts = itgen.Ts
iters = append(iters, *it)
}
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens)
}

// the End slice is in reverse order
Expand Down
2 changes: 1 addition & 1 deletion mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (a *AggMetric) pushToCache(c *chunk.Chunk) {
return
}
// push into cache
go a.cachePusher.CacheIfHot(
go a.cachePusher.AddIfHot(
a.Key,
0,
*chunk.NewBareIterGen(
Expand Down
2 changes: 1 addition & 1 deletion mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) {
calledCb := make(chan bool)

mockCache := cache.MockCache{}
mockCache.CacheIfHotCb = func() { calledCb <- true }
mockCache.AddIfHotCb = func() { calledCb <- true }

numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300)
ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)}
Expand Down
107 changes: 93 additions & 14 deletions mdata/cache/accnt/flat_accnt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package accnt
import (
"sort"

"github.com/grafana/metrictank/mdata/chunk"
"github.com/raintank/worldping-api/pkg/log"
"gopkg.in/raintank/schema.v1"
)
Expand Down Expand Up @@ -47,32 +48,49 @@ type FlatAccntMet struct {
chunks map[uint32]uint64
}

// event types to be used in FlatAccntEvent
const evnt_hit_chnk uint8 = 4
const evnt_add_chnk uint8 = 5
const evnt_del_met uint8 = 6
const evnt_get_total uint8 = 7
const evnt_stop uint8 = 100
const evnt_reset uint8 = 101

type FlatAccntEvent struct {
t uint8 // event type
pl interface{} // payload
eType eventType
pl interface{} // payload
}

type eventType uint8

const (
evnt_hit_chnk eventType = iota
evnt_hit_chnks
evnt_add_chnk
evnt_add_chnks
evnt_del_met
evnt_get_total
evnt_stop
evnt_reset
)

// payload to be sent with an add event
type AddPayload struct {
metric schema.AMKey
ts uint32
size uint64
}

// payload to be sent with an add event
type AddsPayload struct {
metric schema.AMKey
chunks []chunk.IterGen
}

// payload to be sent with a hit event
type HitPayload struct {
metric schema.AMKey
ts uint32
}

// payload to be sent with a hits event
type HitsPayload struct {
metric schema.AMKey
chunks []chunk.IterGen
}

// payload to be sent with del metric event
type DelMetPayload struct {
metric schema.AMKey
Expand Down Expand Up @@ -111,9 +129,19 @@ func (a *FlatAccnt) AddChunk(metric schema.AMKey, ts uint32, size uint64) {
a.act(evnt_add_chnk, &AddPayload{metric, ts, size})
}

func (a *FlatAccnt) AddChunks(metric schema.AMKey, chunks []chunk.IterGen) {
a.act(evnt_add_chnks, &AddsPayload{metric, chunks})
}

func (a *FlatAccnt) HitChunk(metric schema.AMKey, ts uint32) {
a.act(evnt_hit_chnk, &HitPayload{metric, ts})
}
func (a *FlatAccnt) HitChunks(metric schema.AMKey, chunks []chunk.IterGen) {
if len(chunks) == 0 {
return
}
a.act(evnt_hit_chnks, &HitsPayload{metric, chunks})
}

func (a *FlatAccnt) Stop() {
a.act(evnt_stop, nil)
Expand All @@ -123,10 +151,10 @@ func (a *FlatAccnt) Reset() {
a.act(evnt_reset, nil)
}

func (a *FlatAccnt) act(t uint8, payload interface{}) {
func (a *FlatAccnt) act(eType eventType, payload interface{}) {
event := FlatAccntEvent{
t: t,
pl: payload,
eType: eType,
pl: payload,
}

select {
Expand All @@ -141,7 +169,7 @@ func (a *FlatAccnt) eventLoop() {
for {
select {
case event := <-a.eventQ:
switch event.t {
switch event.eType {
case evnt_add_chnk:
payload := event.pl.(*AddPayload)
a.add(payload.metric, payload.ts, payload.size)
Expand All @@ -152,6 +180,18 @@ func (a *FlatAccnt) eventLoop() {
Ts: payload.ts,
},
)
case evnt_add_chnks:
payload := event.pl.(*AddsPayload)
a.addRange(payload.metric, payload.chunks)
cacheChunkAdd.Add(len(payload.chunks))
for _, chunk := range payload.chunks {
a.lru.touch(
EvictTarget{
Metric: payload.metric,
Ts: chunk.Ts,
},
)
}
case evnt_hit_chnk:
payload := event.pl.(*HitPayload)
a.lru.touch(
Expand All @@ -160,6 +200,16 @@ func (a *FlatAccnt) eventLoop() {
Ts: payload.ts,
},
)
case evnt_hit_chnks:
payload := event.pl.(*HitsPayload)
for _, chunk := range payload.chunks {
a.lru.touch(
EvictTarget{
Metric: payload.metric,
Ts: chunk.Ts,
},
)
}
case evnt_del_met:
payload := event.pl.(*DelMetPayload)
a.delMet(payload.metric)
Expand Down Expand Up @@ -228,6 +278,35 @@ func (a *FlatAccnt) add(metric schema.AMKey, ts uint32, size uint64) {
cacheSizeUsed.AddUint64(size)
}

func (a *FlatAccnt) addRange(metric schema.AMKey, chunks []chunk.IterGen) {
var met *FlatAccntMet
var ok bool

if met, ok = a.metrics[metric]; !ok {
met = &FlatAccntMet{
total: 0,
chunks: make(map[uint32]uint64),
}
a.metrics[metric] = met
cacheMetricAdd.Inc()
}

var sizeDiff uint64

for _, chunk := range chunks {
if _, ok = met.chunks[chunk.Ts]; ok {
// we already have that chunk
continue
}
size := chunk.Size()
sizeDiff += size
met.chunks[chunk.Ts] = size
}

met.total = met.total + sizeDiff
cacheSizeUsed.AddUint64(sizeDiff)
}

func (a *FlatAccnt) evict() {
var met *FlatAccntMet
var targets []uint32
Expand Down
7 changes: 6 additions & 1 deletion mdata/cache/accnt/if.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package accnt

import "gopkg.in/raintank/schema.v1"
import (
"github.com/grafana/metrictank/mdata/chunk"
"gopkg.in/raintank/schema.v1"
)

// Accnt represents an instance of cache accounting.
// Currently there is only one implementation called `FlatAccnt`,
Expand All @@ -9,7 +12,9 @@ import "gopkg.in/raintank/schema.v1"
type Accnt interface {
GetEvictQ() chan *EvictTarget
AddChunk(metric schema.AMKey, ts uint32, size uint64)
AddChunks(metric schema.AMKey, chunks []chunk.IterGen)
HitChunk(metric schema.AMKey, ts uint32)
HitChunks(metric schema.AMKey, chunks []chunk.IterGen)
DelMetric(metric schema.AMKey)
Stop()
Reset()
Expand Down
4 changes: 2 additions & 2 deletions mdata/cache/accnt/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
)

type LRU struct {
list *list.List
items map[interface{}]*list.Element
list *list.List // the actual queue in which we move items around to represent their used time
items map[interface{}]*list.Element // to find entries within the LRU so we can move them to the front
}

func NewLRU() *LRU {
Expand Down
18 changes: 12 additions & 6 deletions mdata/cache/cache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type MockCache struct {
sync.Mutex
AddCount int
CacheIfHotCount int
CacheIfHotCb func()
AddIfHotCount int
AddIfHotCb func()
StopCount int
SearchCount int
DelMetricArchives int
Expand All @@ -31,12 +31,18 @@ func (mc *MockCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen
mc.AddCount++
}

func (mc *MockCache) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
func (mc *MockCache) AddRange(metric schema.AMKey, prev uint32, itergens []chunk.IterGen) {
mc.Lock()
defer mc.Unlock()
mc.CacheIfHotCount++
if mc.CacheIfHotCb != nil {
mc.CacheIfHotCb()
mc.AddCount += len(itergens)
}

func (mc *MockCache) AddIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
mc.Lock()
defer mc.Unlock()
mc.AddIfHotCount++
if mc.AddIfHotCb != nil {
mc.AddIfHotCb()
}
}

Expand Down
44 changes: 35 additions & 9 deletions mdata/cache/ccache.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *CCache) DelMetric(rawMetric schema.MKey) (int, int) {
}

// adds the given chunk to the cache, but only if the metric is sufficiently hot
func (c *CCache) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
func (c *CCache) AddIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
if c == nil {
return
}
Expand Down Expand Up @@ -154,8 +154,8 @@ func (c *CCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {

ccm, ok := c.metricCache[metric]
if !ok {
ccm = NewCCacheMetric()
ccm.Init(metric.MKey, prev, itergen)
ccm = NewCCacheMetric(metric.MKey)
ccm.Add(prev, itergen)
c.metricCache[metric] = ccm

// if we do not have this raw key yet, create the entry with the association
Expand All @@ -175,6 +175,36 @@ func (c *CCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
c.accnt.AddChunk(metric, itergen.Ts, itergen.Size())
}

func (c *CCache) AddRange(metric schema.AMKey, prev uint32, itergens []chunk.IterGen) {
if c == nil || len(itergens) == 0 {
return
}
c.Lock()
defer c.Unlock()

ccm, ok := c.metricCache[metric]
if !ok {
ccm = NewCCacheMetric(metric.MKey)
ccm.AddRange(prev, itergens)
c.metricCache[metric] = ccm

// if we do not have this raw key yet, create the entry with the association
ccms, ok := c.metricRawKeys[metric.MKey]
if !ok {
c.metricRawKeys[metric.MKey] = map[schema.Archive]struct{}{
metric.Archive: {},
}
} else {
// otherwise, make sure the association exists
ccms[metric.Archive] = struct{}{}
}
} else {
ccm.AddRange(prev, itergens)
}

c.accnt.AddChunks(metric, itergens)
}

func (cc *CCache) Reset() (int, int) {
if cc == nil {
return 0, 0
Expand Down Expand Up @@ -261,12 +291,8 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui

accnt.CacheChunkHit.Add(len(res.Start) + len(res.End))
go func() {
for _, hit := range res.Start {
c.accnt.HitChunk(metric, hit.Ts)
}
for _, hit := range res.End {
c.accnt.HitChunk(metric, hit.Ts)
}
c.accnt.HitChunks(metric, res.Start)
c.accnt.HitChunks(metric, res.End)
Copy link
Contributor

@replay replay Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think in the majority of cases len(res.End) == 0, so we could save a few function calls by only doing this if len(res.End) > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems cleaner to put that optimization in the HitChunks implementation. will do that

}()

if res.Complete {
Expand Down
Loading