Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit d4bc03c

Browse files
committed
Merge branch 'master' into coordinatePersist
Conflicts: metric_tank/aggmetric.go
2 parents e4301f5 + a8b3394 commit d4bc03c

File tree

5 files changed

+43
-58
lines changed

5 files changed

+43
-58
lines changed

graphite-watcher/test_metric.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"sync"
1212
)
1313

14-
func writeErrors(curTs int64, met stat, series graphite.Response, debug bool, errs *[]string) {
14+
func writeErrors(curTs int64, met stat, series *graphite.Response, debug bool, errs *[]string) {
1515
if len(*errs) != 0 && debug {
1616
f, err := os.Create(fmt.Sprintf("errors-%v-%d", met.def.Name, curTs))
1717
if err != nil {
@@ -20,32 +20,32 @@ func writeErrors(curTs int64, met stat, series graphite.Response, debug bool, er
2020
defer f.Close()
2121
w := bufio.NewWriter(f)
2222
for _, e := range *errs {
23-
_, err = w.WriteString(e + "\n")
23+
_, err = w.WriteString(fmt.Sprintf("ERROR %s\n", e))
2424
if err != nil {
2525
panic(err)
2626
}
2727
}
2828
w.WriteString("graphite response:\n")
29-
for _, serie := range series {
29+
for _, serie := range *series {
3030
w.WriteString(serie.Target)
3131
for _, p := range serie.Datapoints {
32-
w.WriteString(fmt.Sprintf("%s:%s\n", p[0], p[1]))
32+
w.WriteString(fmt.Sprintf("%s:%s\n", p[1], p[0]))
3333
}
3434
}
3535
w.Flush()
3636
}
3737
for _, e := range *errs {
38-
fmt.Println(e)
38+
fmt.Println(fmt.Sprintf("ERROR %d %s", curTs, e))
3939
}
4040
}
4141

4242
func test(wg *sync.WaitGroup, curTs int64, met stat, host string, debug bool) {
4343
defer wg.Done()
4444
var series graphite.Response
4545
errs := make([]string, 0)
46-
defer writeErrors(curTs, met, series, debug, &errs)
46+
defer writeErrors(curTs, met, &series, debug, &errs)
4747
e := func(str string, pieces ...interface{}) {
48-
errs = append(errs, "ERROR: "+fmt.Sprintf(str, pieces...))
48+
errs = append(errs, fmt.Sprintf(str, pieces...))
4949
}
5050

5151
g := graphite.HostHeader{Host: "http://" + host + "/render", Header: http.Header{}}
@@ -54,32 +54,32 @@ func test(wg *sync.WaitGroup, curTs int64, met stat, host string, debug bool) {
5454
q := graphite.Request{Targets: []string{met.def.Name}}
5555
series, err := g.Query(&q)
5656
if err != nil {
57-
e("querying graphite: %q", err)
57+
e("querying graphite: %v", err)
5858
return
5959
}
6060
for _, serie := range series {
6161
if met.def.Name != serie.Target {
62-
e("name %q != target name%q", met.def.Name, serie.Target)
62+
e("%v : bad target name %v", met.def.Name, serie.Target)
6363
}
6464

6565
lastTs := int64(0)
6666
oldestNull := int64(math.MaxInt64)
6767
if len(serie.Datapoints) == 0 {
68-
e("series for %q contains no points!", met.def.Name)
68+
e("%v : series contains no points!", met.def.Name)
6969
}
7070
for _, p := range serie.Datapoints {
7171
ts, err := p[1].Int64()
7272
if err != nil {
73-
e("could not parse timestamp %q", p)
73+
e("%v : could not parse timestamp %q", met.def.Name, p)
7474
}
7575
if ts <= lastTs {
76-
e("timestamp %v must be bigger than last %v", ts, lastTs)
76+
e("%v timestamp %v must be bigger than last %v", met.def.Name, ts, lastTs)
7777
}
7878
if lastTs == 0 && (ts < curTs-24*3600-60 || ts > curTs-24*3600+60) {
79-
e("first point %q should have been about 24h ago, i.e. around %d", p, curTs-24*3600)
79+
e("%v first point %q should have been about 24h ago, i.e. around %d", met.def.Name, p, curTs-24*3600)
8080
}
8181
if lastTs != 0 && ts != lastTs+int64(met.def.Interval) {
82-
e("point %v is not interval %v apart from previous point", p, met.def.Interval)
82+
e("%v point %v is not interval %v apart from previous point", met.def.Name, p, met.def.Interval)
8383
}
8484
_, err = p[0].Float64()
8585
if err != nil && ts > met.firstSeen {
@@ -88,7 +88,7 @@ func test(wg *sync.WaitGroup, curTs int64, met stat, host string, debug bool) {
8888
}
8989
if ts < curTs-30 {
9090
nullPoints.Inc(1)
91-
e("%v at %d : seeing a null for ts %v", met.def.Name, curTs, p[1])
91+
e("%v : seeing a null for ts %v", met.def.Name, p[1])
9292
}
9393
} else {
9494
// we saw a valid point, so reset oldestNull.
@@ -97,7 +97,7 @@ func test(wg *sync.WaitGroup, curTs int64, met stat, host string, debug bool) {
9797
lastTs = ts
9898
}
9999
if lastTs < curTs-int64(met.def.Interval) || lastTs > curTs+int64(met.def.Interval) {
100-
e("last point at %d is out of range. should have been around %d (now) +- %d", lastTs, curTs, met.def.Interval)
100+
e("%v : last point at %d is out of range. should have been around %d (now) +- %d", met.def.Name, lastTs, curTs, met.def.Interval)
101101
}
102102
// if there was no null, we treat the point after the last one we had as null
103103
if oldestNull == math.MaxInt64 {

metric_tank/aggmetric.go

+2-28
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@ import (
1212
"github.com/raintank/raintank-metric/metric_tank/consolidation"
1313
)
1414

15-
var statsPeriod time.Duration
16-
17-
func init() {
18-
statsPeriod = time.Duration(1) * time.Second
19-
}
20-
2115
// AggMetric takes in new values, updates the in-memory data and streams the points to aggregators
2216
// it uses a circular buffer of chunks
2317
// each chunk starts at their respective t0
@@ -82,30 +76,9 @@ func NewAggMetric(key string, chunkSpan, numChunks uint32, maxDirtyChunks uint32
8276
m.aggregators = append(m.aggregators, NewAggregator(key, as.span, as.chunkSpan, as.numChunks, maxDirtyChunks))
8377
}
8478

85-
// only collect per aggmetric stats when in debug mode.
86-
// running a goroutine for each aggmetric in an environment with many hundreds of thousands of metrics
87-
// will result in the CPU just context switching and not much else.
88-
if *logLevel < 2 {
89-
go m.stats()
90-
}
91-
9279
return &m
9380
}
9481

95-
func (a *AggMetric) stats() {
96-
for range time.Tick(statsPeriod) {
97-
sum := 0
98-
a.RLock()
99-
for _, chunk := range a.Chunks {
100-
if chunk != nil {
101-
sum += int(chunk.NumPoints)
102-
}
103-
}
104-
a.RUnlock()
105-
pointsPerMetric.Value(int64(sum))
106-
}
107-
}
108-
10982
// Sync the saved state of a chunk by its T0.
11083
func (a *AggMetric) SyncChunkSaveState(ts uint32) {
11184
a.RLock()
@@ -117,7 +90,7 @@ func (a *AggMetric) SyncChunkSaveState(ts uint32) {
11790
}
11891
}
11992

120-
/* Get a chunk by its T0. It is expected that the caller has acquired of the a.Lock()*/
93+
/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/
12194
func (a *AggMetric) getChunkByT0(ts uint32) *Chunk {
12295
// we have no chunks.
12396
if len(a.Chunks) == 0 {
@@ -509,6 +482,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
509482
a.Chunks = append(a.Chunks, NewChunk(t0))
510483
} else {
511484
chunkClear.Inc(1)
485+
totalPoints <- -1 * int(a.Chunks[a.CurrentChunkPos].NumPoints)
512486
msg = fmt.Sprintf("cleared chunk at %d of %d and replaced with new", a.CurrentChunkPos, len(a.Chunks))
513487
a.Chunks[a.CurrentChunkPos] = NewChunk(t0)
514488
}

metric_tank/aggmetrics.go

+7-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ type AggMetrics struct {
1818
maxDirtyChunks uint32
1919
}
2020

21+
var totalPoints chan int
22+
23+
func init() {
24+
// measurements can lag a bit, that's ok
25+
totalPoints = make(chan int, 10)
26+
}
27+
2128
func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, maxDirtyChunks uint32, aggSettings []aggSetting) *AggMetrics {
2229
ms := AggMetrics{
2330
Metrics: make(map[string]*AggMetric),
@@ -29,22 +36,10 @@ func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, m
2936
maxDirtyChunks: maxDirtyChunks,
3037
}
3138

32-
go ms.stats()
3339
go ms.GC()
3440
return &ms
3541
}
3642

37-
func (ms *AggMetrics) stats() {
38-
pointsPerMetric.Value(0)
39-
40-
for range time.Tick(time.Duration(1) * time.Second) {
41-
ms.RLock()
42-
l := len(ms.Metrics)
43-
ms.RUnlock()
44-
metricsActive.Value(int64(l))
45-
}
46-
}
47-
4843
// periodically scan chunks and close any that have not received data in a while
4944
// TODO instrument occurences and duration of GC
5045
func (ms *AggMetrics) GC() {

metric_tank/chunk.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ func (c *Chunk) Push(t uint32, v float64) error {
3636
c.NumPoints += 1
3737
c.LastTs = t
3838
c.LastWrite = uint32(time.Now().Unix())
39+
totalPoints <- 1
3940
return nil
4041
}

metric_tank/metric_tank.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ var cassandraPutDuration met.Timer
9595
var cassandraBlockDuration met.Timer
9696
var cassandraGetDuration met.Timer
9797
var inItems met.Meter
98-
var pointsPerMetric met.Meter
98+
var points met.Gauge
9999
var msgsHandleOK met.Count
100100
var msgsHandleFail met.Count
101101
var alloc met.Gauge
@@ -290,11 +290,26 @@ func initMetrics(stats met.Backend) {
290290
cassandraBlockDuration = stats.NewTimer("cassandra_block_duration", 0)
291291
cassandraPutDuration = stats.NewTimer("cassandra_put_duration", 0)
292292
inItems = stats.NewMeter("in.items", 0)
293-
pointsPerMetric = stats.NewMeter("points_per_metric", 0)
293+
points = stats.NewGauge("total_points", 0)
294294
msgsHandleOK = stats.NewCount("handle.ok")
295295
msgsHandleFail = stats.NewCount("handle.fail")
296296
alloc = stats.NewGauge("bytes_alloc.not_freed", 0)
297297
totalAlloc = stats.NewGauge("bytes_alloc.incl_freed", 0)
298298
sysBytes = stats.NewGauge("bytes_sys", 0)
299299
metricsActive = stats.NewGauge("metrics_active", 0)
300+
301+
// run a collector for some global stats
302+
go func() {
303+
currentPoints := 0
304+
305+
ticker := time.Tick(time.Duration(1) * time.Second)
306+
for {
307+
select {
308+
case <-ticker:
309+
points.Value(int64(currentPoints))
310+
case update := <-totalPoints:
311+
currentPoints += update
312+
}
313+
}
314+
}()
300315
}

0 commit comments

Comments
 (0)