Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit e1d94cd

Browse files
committed
account for the cap() of byte slices in ccache
1 parent 7f2fd2c commit e1d94cd

File tree

6 files changed

+49
-38
lines changed

6 files changed

+49
-38
lines changed

cmd/mt-whisper-importer-writer/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (s *Server) insertChunks(table, id string, ttl uint32, itergens []chunk.Ite
270270
success := false
271271
attempts := 0
272272
for !success {
273-
err := s.Session.Query(query, rowKey, ig.Ts, cassandraStore.PrepareChunkData(ig.Span, ig.Bytes())).Exec()
273+
err := s.Session.Query(query, rowKey, ig.Ts, cassandraStore.PrepareChunkData(ig.Span, ig.B)).Exec()
274274
if err != nil {
275275
if (attempts % 20) == 0 {
276276
log.Warnf("CS: failed to save chunk to cassandra after %d attempts. %s", attempts+1, err)

mdata/cache/accnt/flat_accnt.go

+45-27
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,15 @@ type FlatAccnt struct {
4343
eventQ chan FlatAccntEvent
4444
}
4545

46+
type chunkSize struct {
47+
len uint64
48+
cap uint64
49+
}
50+
4651
type FlatAccntMet struct {
47-
total uint64
48-
chunks map[uint32]uint64
52+
totalLen uint64
53+
totalCap uint64
54+
chunks map[uint32]chunkSize
4955
}
5056

5157
type FlatAccntEvent struct {
@@ -70,7 +76,7 @@ const (
7076
type AddPayload struct {
7177
metric schema.AMKey
7278
ts uint32
73-
size uint64
79+
size chunkSize
7480
}
7581

7682
// payload to be sent with an add event
@@ -125,8 +131,8 @@ func (a *FlatAccnt) GetTotal() uint64 {
125131
return <-res_chan
126132
}
127133

128-
func (a *FlatAccnt) AddChunk(metric schema.AMKey, ts uint32, size uint64) {
129-
a.act(evnt_add_chnk, &AddPayload{metric, ts, size})
134+
func (a *FlatAccnt) AddChunk(metric schema.AMKey, ts uint32, len, cap uint64) {
135+
a.act(evnt_add_chnk, &AddPayload{metric, ts, chunkSize{len: len, cap: cap}})
130136
}
131137

132138
func (a *FlatAccnt) AddChunks(metric schema.AMKey, chunks []chunk.IterGen) {
@@ -222,6 +228,7 @@ func (a *FlatAccnt) eventLoop() {
222228
a.metrics = make(map[schema.AMKey]*FlatAccntMet)
223229
a.lru.reset()
224230
cacheSizeUsed.SetUint64(0)
231+
cacheCapUsed.SetUint64(0)
225232
}
226233

227234
// evict until we're below the max
@@ -251,31 +258,35 @@ func (a *FlatAccnt) delMet(metric schema.AMKey) {
251258
)
252259
}
253260

254-
cacheSizeUsed.DecUint64(met.total)
261+
cacheSizeUsed.DecUint64(met.totalLen)
262+
cacheCapUsed.DecUint64(met.totalCap)
255263
delete(a.metrics, metric)
256264
}
257265

258-
func (a *FlatAccnt) add(metric schema.AMKey, ts uint32, size uint64) {
266+
func (a *FlatAccnt) add(metric schema.AMKey, ts uint32, size chunkSize) {
259267
var met *FlatAccntMet
260268
var ok bool
261269

262270
if met, ok = a.metrics[metric]; !ok {
263271
met = &FlatAccntMet{
264-
total: 0,
265-
chunks: make(map[uint32]uint64),
272+
totalLen: 0,
273+
totalCap: 0,
274+
chunks: make(map[uint32]chunkSize),
266275
}
267276
a.metrics[metric] = met
268277
cacheMetricAdd.Inc()
269-
}
270-
271-
if _, ok = met.chunks[ts]; ok {
272-
// we already have that chunk
273-
return
278+
} else {
279+
if _, ok = met.chunks[ts]; ok {
280+
// we already have that chunk
281+
return
282+
}
274283
}
275284

276285
met.chunks[ts] = size
277-
met.total = met.total + size
278-
cacheSizeUsed.AddUint64(size)
286+
met.totalLen = met.totalLen + size.len
287+
met.totalCap = met.totalCap + size.cap
288+
cacheSizeUsed.AddUint64(size.len)
289+
cacheCapUsed.AddUint64(size.cap)
279290
}
280291

281292
func (a *FlatAccnt) addRange(metric schema.AMKey, chunks []chunk.IterGen) {
@@ -284,34 +295,39 @@ func (a *FlatAccnt) addRange(metric schema.AMKey, chunks []chunk.IterGen) {
284295

285296
if met, ok = a.metrics[metric]; !ok {
286297
met = &FlatAccntMet{
287-
total: 0,
288-
chunks: make(map[uint32]uint64),
298+
totalLen: 0,
299+
totalCap: 0,
300+
chunks: make(map[uint32]chunkSize),
289301
}
290302
a.metrics[metric] = met
291303
cacheMetricAdd.Inc()
292304
}
293305

294-
var sizeDiff uint64
306+
var lenDiff uint64
307+
var capDiff uint64
295308

296309
for _, chunk := range chunks {
297310
if _, ok = met.chunks[chunk.Ts]; ok {
298311
// we already have that chunk
299312
continue
300313
}
301-
size := chunk.Size()
302-
sizeDiff += size
314+
size := chunkSize{len: uint64(len(chunk.B)), cap: uint64(cap(chunk.B))}
315+
lenDiff += size.len
316+
capDiff += size.cap
303317
met.chunks[chunk.Ts] = size
304318
}
305319

306-
met.total = met.total + sizeDiff
307-
cacheSizeUsed.AddUint64(sizeDiff)
320+
met.totalLen = met.totalLen + lenDiff
321+
met.totalCap = met.totalCap + capDiff
322+
cacheSizeUsed.AddUint64(lenDiff)
323+
cacheCapUsed.AddUint64(capDiff)
308324
}
309325

310326
func (a *FlatAccnt) evict() {
311327
var met *FlatAccntMet
312328
var targets []uint32
313329
var ts uint32
314-
var size uint64
330+
var size chunkSize
315331
var ok bool
316332
var e interface{}
317333
var target EvictTarget
@@ -342,8 +358,10 @@ func (a *FlatAccnt) evict() {
342358

343359
for _, ts = range targets {
344360
size = met.chunks[ts]
345-
met.total = met.total - size
346-
cacheSizeUsed.DecUint64(size)
361+
met.totalLen = met.totalLen - size.len
362+
met.totalCap = met.totalCap - size.cap
363+
cacheSizeUsed.DecUint64(size.len)
364+
cacheCapUsed.DecUint64(size.cap)
347365
cacheChunkEvict.Inc()
348366
a.evictQ <- &EvictTarget{
349367
Metric: target.Metric,
@@ -352,7 +370,7 @@ func (a *FlatAccnt) evict() {
352370
delete(met.chunks, ts)
353371
}
354372

355-
if met.total <= 0 {
373+
if met.totalLen <= 0 {
356374
cacheMetricEvict.Inc()
357375
delete(a.metrics, target.Metric)
358376
}

mdata/cache/accnt/if.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
// in the future if they just implement this interface.
1212
type Accnt interface {
1313
GetEvictQ() chan *EvictTarget
14-
AddChunk(metric schema.AMKey, ts uint32, size uint64)
14+
AddChunk(metric schema.AMKey, ts uint32, len, cap uint64)
1515
AddChunks(metric schema.AMKey, chunks []chunk.IterGen)
1616
HitChunk(metric schema.AMKey, ts uint32)
1717
HitChunks(metric schema.AMKey, chunks []chunk.IterGen)

mdata/cache/accnt/stats.go

+1
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ var (
3333

3434
cacheSizeMax = stats.NewGauge64("cache.size.max")
3535
cacheSizeUsed = stats.NewGauge64("cache.size.used")
36+
cacheCapUsed = stats.NewGauge64("cache.cap.used")
3637
)

mdata/cache/ccache.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (c *CCache) Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) {
172172
ccm.Add(prev, itergen)
173173
}
174174

175-
c.accnt.AddChunk(metric, itergen.Ts, itergen.Size())
175+
c.accnt.AddChunk(metric, itergen.Ts, uint64(len(itergen.B)), uint64(cap(itergen.B)))
176176
}
177177

178178
func (c *CCache) AddRange(metric schema.AMKey, prev uint32, itergens []chunk.IterGen) {

mdata/chunk/itergen.go

-8
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,6 @@ func (ig *IterGen) Get() (*Iter, error) {
5656
return &Iter{it}, nil
5757
}
5858

59-
func (ig *IterGen) Size() uint64 {
60-
return uint64(len(ig.B))
61-
}
62-
63-
func (ig IterGen) Bytes() []byte {
64-
return ig.B
65-
}
66-
6759
// end of itergen (exclusive)
6860
func (ig IterGen) EndTs() uint32 {
6961
return ig.Ts + ig.Span

0 commit comments

Comments
 (0)