Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a8b3394

Browse files
committed
Merge pull request #73 from raintank/measure-points
Measure points
2 parents df45974 + 9d32bbf commit a8b3394

File tree

4 files changed

+26
-41
lines changed

4 files changed

+26
-41
lines changed

metric_tank/aggmetric.go

+1-27
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@ import (
1212
"github.com/raintank/raintank-metric/metric_tank/consolidation"
1313
)
1414

15-
var statsPeriod time.Duration
16-
17-
func init() {
18-
statsPeriod = time.Duration(1) * time.Second
19-
}
20-
2115
// AggMetric takes in new values, updates the in-memory data and streams the points to aggregators
2216
// it uses a circular buffer of chunks
2317
// each chunk starts at their respective t0
@@ -82,30 +76,9 @@ func NewAggMetric(key string, chunkSpan, numChunks uint32, maxDirtyChunks uint32
8276
m.aggregators = append(m.aggregators, NewAggregator(key, as.span, as.chunkSpan, as.numChunks, maxDirtyChunks))
8377
}
8478

85-
// only collect per aggmetric stats when in debug mode.
86-
// running a goroutine for each aggmetric in an environment with many hundreds of thousands of metrics
87-
// will result in the CPU just context switching and not much else.
88-
if *logLevel < 2 {
89-
go m.stats()
90-
}
91-
9279
return &m
9380
}
9481

95-
func (a *AggMetric) stats() {
96-
for range time.Tick(statsPeriod) {
97-
sum := 0
98-
a.RLock()
99-
for _, chunk := range a.Chunks {
100-
if chunk != nil {
101-
sum += int(chunk.NumPoints)
102-
}
103-
}
104-
a.RUnlock()
105-
pointsPerMetric.Value(int64(sum))
106-
}
107-
}
108-
10982
func (a *AggMetric) getChunk(pos int) *Chunk {
11083
if pos < 0 {
11184
return nil
@@ -411,6 +384,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
411384
a.Chunks = append(a.Chunks, NewChunk(t0))
412385
} else {
413386
chunkClear.Inc(1)
387+
totalPoints <- -1 * int(a.Chunks[a.CurrentChunkPos].NumPoints)
414388
msg = fmt.Sprintf("cleared chunk at %d of %d and replaced with new", a.CurrentChunkPos, len(a.Chunks))
415389
a.Chunks[a.CurrentChunkPos] = NewChunk(t0)
416390
}

metric_tank/aggmetrics.go

+7-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ type AggMetrics struct {
1818
maxDirtyChunks uint32
1919
}
2020

21+
var totalPoints chan int
22+
23+
func init() {
24+
// measurements can lag a bit, that's ok
25+
totalPoints = make(chan int, 10)
26+
}
27+
2128
func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, maxDirtyChunks uint32, aggSettings []aggSetting) *AggMetrics {
2229
ms := AggMetrics{
2330
Metrics: make(map[string]*AggMetric),
@@ -29,22 +36,10 @@ func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, m
2936
maxDirtyChunks: maxDirtyChunks,
3037
}
3138

32-
go ms.stats()
3339
go ms.GC()
3440
return &ms
3541
}
3642

37-
func (ms *AggMetrics) stats() {
38-
pointsPerMetric.Value(0)
39-
40-
for range time.Tick(time.Duration(1) * time.Second) {
41-
ms.RLock()
42-
l := len(ms.Metrics)
43-
ms.RUnlock()
44-
metricsActive.Value(int64(l))
45-
}
46-
}
47-
4843
// periodically scan chunks and close any that have not received data in a while
4944
// TODO instrument occurences and duration of GC
5045
func (ms *AggMetrics) GC() {

metric_tank/chunk.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ func (c *Chunk) Push(t uint32, v float64) error {
3636
c.NumPoints += 1
3737
c.LastTs = t
3838
c.LastWrite = uint32(time.Now().Unix())
39+
totalPoints <- 1
3940
return nil
4041
}

metric_tank/metric_tank.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ var cassandraPutDuration met.Timer
9090
var cassandraBlockDuration met.Timer
9191
var cassandraGetDuration met.Timer
9292
var inItems met.Meter
93-
var pointsPerMetric met.Meter
93+
var points met.Gauge
9494
var msgsHandleOK met.Count
9595
var msgsHandleFail met.Count
9696
var alloc met.Gauge
@@ -281,11 +281,26 @@ func initMetrics(stats met.Backend) {
281281
cassandraBlockDuration = stats.NewTimer("cassandra_block_duration", 0)
282282
cassandraPutDuration = stats.NewTimer("cassandra_put_duration", 0)
283283
inItems = stats.NewMeter("in.items", 0)
284-
pointsPerMetric = stats.NewMeter("points_per_metric", 0)
284+
points = stats.NewGauge("total_points", 0)
285285
msgsHandleOK = stats.NewCount("handle.ok")
286286
msgsHandleFail = stats.NewCount("handle.fail")
287287
alloc = stats.NewGauge("bytes_alloc.not_freed", 0)
288288
totalAlloc = stats.NewGauge("bytes_alloc.incl_freed", 0)
289289
sysBytes = stats.NewGauge("bytes_sys", 0)
290290
metricsActive = stats.NewGauge("metrics_active", 0)
291+
292+
// run a collector for some global stats
293+
go func() {
294+
currentPoints := 0
295+
296+
ticker := time.Tick(time.Duration(1) * time.Second)
297+
for {
298+
select {
299+
case <-ticker:
300+
points.Value(int64(currentPoints))
301+
case update := <-totalPoints:
302+
currentPoints += update
303+
}
304+
}
305+
}()
291306
}

0 commit comments

Comments
 (0)