Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 860d9c5

Browse files
committed
support per-pattern configurable index pruning
fix #868
1 parent 8fe658f commit 860d9c5

File tree

15 files changed

+119
-65
lines changed

15 files changed

+119
-65
lines changed

conf/init.go

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// see https://graphite.readthedocs.io/en/0.9.9/config-carbon.html#storage-schemas-conf
44
// * storage-aggregation.conf
55
// see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
6+
// as well as our own file index-rules.conf
67
//
78
// it also adds defaults (the same ones as graphite),
89
// so that even if nothing is matched in the user provided schemas or aggregations,

docker/docker-chaos/metrictank.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ timeout = 1s
332332
num-conns = 10
333333
# Max number of metricDefs allowed to be unwritten to cassandra
334334
write-queue-size = 100000
335-
#automatically clear series from the index if they have not been seen for this much time.
336-
max-stale = 0
337335
#Interval at which the index should be checked for stale series.
338336
prune-interval = 3h
339337
# synchronize index changes to cassandra. not all your nodes need to do this.
@@ -366,3 +364,5 @@ tag-support = false
366364
tag-query-workers = 50
367365
# size of regular expression cache in tag query evaluation
368366
match-cache-size = 1000
367+
# path to index-rules.conf file
368+
index-rules-conf = /etc/metrictank/index-rules.conf

docker/docker-cluster/metrictank.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ timeout = 1s
332332
num-conns = 10
333333
# Max number of metricDefs allowed to be unwritten to cassandra
334334
write-queue-size = 100000
335-
#automatically clear series from the index if they have not been seen for this much time.
336-
max-stale = 0
337335
#Interval at which the index should be checked for stale series.
338336
prune-interval = 3h
339337
# synchronize index changes to cassandra. not all your nodes need to do this.
@@ -366,3 +364,5 @@ tag-support = false
366364
tag-query-workers = 50
367365
# size of regular expression cache in tag query evaluation
368366
match-cache-size = 1000
367+
# path to index-rules.conf file
368+
index-rules-conf = /etc/metrictank/index-rules.conf

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ timeout = 1s
332332
num-conns = 10
333333
# Max number of metricDefs allowed to be unwritten to cassandra
334334
write-queue-size = 100000
335-
#automatically clear series from the index if they have not been seen for this much time.
336-
max-stale = 0
337335
#Interval at which the index should be checked for stale series.
338336
prune-interval = 3h
339337
# synchronize index changes to cassandra. not all your nodes need to do this.
@@ -366,3 +364,5 @@ tag-support = false
366364
tag-query-workers = 50
367365
# size of regular expression cache in tag query evaluation
368366
match-cache-size = 1000
367+
# path to index-rules.conf file
368+
index-rules-conf = /etc/metrictank/index-rules.conf

docs/config.md

+39-24
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
Metrictank comes with an [example main config file](https://github.com/grafana/metrictank/blob/master/metrictank-sample.ini),
44
a [storage-schemas.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-schemas.conf) and
55
a [storage-aggregation.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/storage-aggregation.conf)
6+
an [index-rules.conf file](https://github.com/grafana/metrictank/blob/master/scripts/config/index-rules.conf)
67

78
The files themselves are well documented, but for your convenience, they are replicated below.
89

@@ -394,8 +395,6 @@ timeout = 1s
394395
num-conns = 10
395396
# Max number of metricDefs allowed to be unwritten to cassandra
396397
write-queue-size = 100000
397-
#automatically clear series from the index if they have not been seen for this much time.
398-
max-stale = 0
399398
#Interval at which the index should be checked for stale series.
400399
prune-interval = 3h
401400
# synchronize index changes to cassandra. not all your nodes need to do this.
@@ -431,6 +430,44 @@ tag-support = false
431430
tag-query-workers = 50
432431
# size of regular expression cache in tag query evaluation
433432
match-cache-size = 1000
433+
# path to index-rules.conf file
434+
index-rules-conf = /etc/metrictank/index-rules.conf
435+
```
436+
437+
# index-rules.conf
438+
439+
```
440+
# This config file controls when to prune metrics from the index
441+
# Note:
442+
# * This file is optional. If it is not present, we won't prune data
443+
# * Anything not matched will not be pruned
444+
# * Patterns are string prefix matchers
445+
# * max-stale is a duration like 7d
446+
447+
[default]
448+
pattern =
449+
max-stale = 0
450+
451+
# storage-aggregation.conf
452+
453+
```
454+
# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf
455+
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
456+
# Note:
457+
# * This file is optional. If it is not present, we will use avg for everything
458+
# * Anything not matched also uses avg for everything
459+
# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5.
460+
# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average.
461+
# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do.
462+
# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading.
463+
# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank.
464+
#
465+
# see https://github.com/grafana/metrictank/blob/master/docs/consolidation.md for related info.
466+
467+
[default]
468+
pattern = .*
469+
xFilesFactor = 0.1
470+
aggregationMethod = avg,min,max
434471
```
435472
436473
# storage-schemas.conf
@@ -494,27 +531,5 @@ retentions = 1s:35d:10min:7
494531
# reorderBuffer = 20
495532
```
496533
497-
# storage-aggregation.conf
498-
499-
```
500-
# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf
501-
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
502-
# Note:
503-
# * This file is optional. If it is not present, we will use avg for everything
504-
# * Anything not matched also uses avg for everything
505-
# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5.
506-
# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average.
507-
# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do.
508-
# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading.
509-
# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank.
510-
#
511-
# see https://github.com/grafana/metrictank/blob/master/docs/consolidation.md for related info.
512-
513-
[default]
514-
pattern = .*
515-
xFilesFactor = 0.1
516-
aggregationMethod = avg,min,max
517-
```
518-
519534
This file is generated by [config-to-doc](https://github.com/grafana/metrictank/blob/master/scripts/dev/config-to-doc.sh)
520535

idx/cassandra/cassandra.go

+17-22
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ var (
6464
numConns int
6565
writeQueueSize int
6666
protoVer int
67-
maxStale time.Duration
6867
pruneInterval time.Duration
6968
updateCassIdx bool
7069
updateInterval time.Duration
@@ -83,7 +82,6 @@ func ConfigSetup() *flag.FlagSet {
8382
casIdx.IntVar(&writeQueueSize, "write-queue-size", 100000, "Max number of metricDefs allowed to be unwritten to cassandra")
8483
casIdx.BoolVar(&updateCassIdx, "update-cassandra-index", true, "synchronize index changes to cassandra. not all your nodes need to do this.")
8584
casIdx.DurationVar(&updateInterval, "update-interval", time.Hour*3, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates")
86-
casIdx.DurationVar(&maxStale, "max-stale", 0, "clear series from the index if they have not been seen for this much time.")
8785
casIdx.DurationVar(&pruneInterval, "prune-interval", time.Hour*3, "Interval at which the index should be checked for stale series.")
8886
casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use")
8987
casIdx.BoolVar(&createKeyspace, "create-keyspace", true, "enable the creation of the index keyspace and tables, only one node needs this")
@@ -234,7 +232,7 @@ func (c *CasIdx) Init() error {
234232
//Rebuild the in-memory index.
235233
c.rebuildIndex()
236234

237-
if maxStale > 0 {
235+
if memory.IndexRules.Prunable() {
238236
if pruneInterval == 0 {
239237
return fmt.Errorf("pruneInterval must be greater then 0")
240238
}
@@ -355,35 +353,30 @@ func (c *CasIdx) rebuildIndex() {
355353
log.Info("cassandra-idx Rebuilding Memory Index from metricDefinitions in Cassandra")
356354
pre := time.Now()
357355
var defs []schema.MetricDefinition
358-
var staleTs uint32
359-
if maxStale != 0 {
360-
staleTs = uint32(time.Now().Add(maxStale * -1).Unix())
361-
}
362356
for _, partition := range cluster.Manager.GetPartitions() {
363-
defs = c.LoadPartition(partition, defs, staleTs)
357+
defs = c.LoadPartition(partition, defs)
364358
}
365359
num := c.MemoryIdx.Load(defs)
366360
log.Info("cassandra-idx Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
367361
}
368362

369363
func (c *CasIdx) Load(defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition {
370364
iter := c.session.Query("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx").Iter()
371-
return c.load(defs, iter, cutoff)
365+
return c.load(defs, iter)
372366
}
373367

374-
func (c *CasIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition {
368+
func (c *CasIdx) LoadPartition(partition int32, defs []schema.MetricDefinition) []schema.MetricDefinition {
375369
iter := c.session.Query("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx where partition=?", partition).Iter()
376-
return c.load(defs, iter, cutoff)
370+
return c.load(defs, iter)
377371
}
378372

379-
func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, cutoff uint32) []schema.MetricDefinition {
373+
func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator) []schema.MetricDefinition {
380374
defsByNames := make(map[string][]*schema.MetricDefinition)
381375
var id, name, unit, mtype string
382376
var orgId, interval int
383377
var partition int32
384378
var lastupdate int64
385379
var tags []string
386-
cutoff64 := int64(cutoff)
387380
for iter.Scan(&id, &orgId, &partition, &name, &interval, &unit, &mtype, &tags, &lastupdate) {
388381
mkey, err := schema.MKeyFromString(id)
389382
if err != nil {
@@ -412,10 +405,14 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, cutoff u
412405
log.Fatal(4, "Could not close iterator: %s", err.Error())
413406
}
414407

408+
indexChecks := memory.IndexRules.Checks(time.Now())
409+
415410
NAMES:
416411
for name, defsByName := range defsByNames {
412+
irId, _ := memory.IndexRules.Match(name)
413+
check := indexChecks[irId]
417414
for _, def := range defsByName {
418-
if def.LastUpdate >= cutoff64 {
415+
if check.Keep || def.LastUpdate >= check.Cutoff {
419416
// if one of the defs in a name is not stale, then we'll need to add
420417
// all the associated MDs to the defs slice
421418
for _, defToAdd := range defsByNames[name] {
@@ -528,19 +525,17 @@ func (c *CasIdx) deleteDefAsync(key schema.MKey, part int32) {
528525
}()
529526
}
530527

531-
func (c *CasIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
532-
pre := time.Now()
533-
pruned, err := c.MemoryIdx.Prune(oldest)
534-
statPruneDuration.Value(time.Since(pre))
528+
func (c *CasIdx) Prune(now time.Time) ([]idx.Archive, error) {
529+
pruned, err := c.MemoryIdx.Prune(now)
530+
statPruneDuration.Value(time.Since(now))
535531
return pruned, err
536532
}
537533

538534
func (c *CasIdx) prune() {
539535
ticker := time.NewTicker(pruneInterval)
540-
for range ticker.C {
541-
log.Debug("cassandra-idx: pruning items from index that have not been seen for %s", maxStale.String())
542-
staleTs := time.Now().Add(maxStale * -1)
543-
_, err := c.Prune(staleTs)
536+
for now := range ticker.C {
537+
log.Debug("cassandra-idx: pruning items")
538+
_, err := c.Prune(now)
544539
if err != nil {
545540
log.Error(3, "cassandra-idx: prune error. %s", err)
546541
}

idx/idx.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Archive struct {
2828
schema.MetricDefinition
2929
SchemaId uint16 // index in mdata.schemas (not persisted)
3030
AggId uint16 // index in mdata.aggregations (not persisted)
31+
IrId uint16 // index in mdata.indexrules (not persisted)
3132
LastSave uint32 // last time the metricDefinition was saved to a backend store (cassandra)
3233
}
3334

idx/memory/memory.go

+32-7
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package memory
33
import (
44
"flag"
55
"fmt"
6+
"io/ioutil"
67
"regexp"
78
"sort"
89
"strings"
910
"sync"
1011
"time"
1112

13+
"github.com/grafana/metrictank/conf"
1214
"github.com/grafana/metrictank/errors"
1315
"github.com/grafana/metrictank/idx"
1416
"github.com/grafana/metrictank/mdata"
@@ -50,6 +52,8 @@ var (
5052
matchCacheSize int
5153
TagSupport bool
5254
TagQueryWorkers int // number of workers to spin up when evaluation tag expressions
55+
indexRulesFile string
56+
IndexRules conf.IndexRules
5357
)
5458

5559
func ConfigSetup() {
@@ -58,9 +62,26 @@ func ConfigSetup() {
5862
memoryIdx.BoolVar(&TagSupport, "tag-support", false, "enables/disables querying based on tags")
5963
memoryIdx.IntVar(&TagQueryWorkers, "tag-query-workers", 50, "number of workers to spin up to evaluate tag queries")
6064
memoryIdx.IntVar(&matchCacheSize, "match-cache-size", 1000, "size of regular expression cache in tag query evaluation")
65+
memoryIdx.StringVar(&indexRulesFile, "rules-file", "/etc/metrictank/index-rules.conf", "path to index-rules.conf file")
6166
globalconf.Register("memory-idx", memoryIdx)
6267
}
6368

69+
func ConfigProcess() {
70+
// read index-rules.conf
71+
// file is optional, quit on errors
72+
// since we can't distinguish errors reading vs parsing, we'll just try a read separately first
73+
_, err := ioutil.ReadFile(indexRulesFile)
74+
if err == nil {
75+
IndexRules, err = conf.ReadIndexRules(indexRulesFile)
76+
if err != nil {
77+
log.Fatal(3, "can't read index-rules file %q: %s", indexRulesFile, err.Error())
78+
}
79+
} else {
80+
log.Info("Could not read %s: %s: using defaults", indexRulesFile, err)
81+
IndexRules = conf.NewIndexRules()
82+
}
83+
}
84+
6485
type Tree struct {
6586
Items map[string]*Node // key is the full path of the node.
6687
}
@@ -373,11 +394,13 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
373394

374395
schemaId, _ := mdata.MatchSchema(path, def.Interval)
375396
aggId, _ := mdata.MatchAgg(path)
397+
irId, _ := IndexRules.Match(path)
376398
sort.Strings(def.Tags)
377399
archive := &idx.Archive{
378400
MetricDefinition: *def,
379401
SchemaId: schemaId,
380402
AggId: aggId,
403+
IrId: irId,
381404
}
382405

383406
if TagSupport && len(def.Tags) > 0 {
@@ -915,7 +938,7 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
915938
log.Debug("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, def.LastUpdate)
916939
continue
917940
}
918-
log.Debug("memory-idx Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.LastSave)
941+
log.Debug("memory-idx Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d irId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.IrId, def.LastSave)
919942
idxNode.Defs = append(idxNode.Defs, *def)
920943
}
921944
if len(idxNode.Defs) == 0 {
@@ -1225,9 +1248,8 @@ func (m *MemoryIdx) delete(orgId uint32, n *Node, deleteEmptyParents, deleteChil
12251248
return deletedDefs
12261249
}
12271250

1228-
// delete series from the index if they have not been seen since "oldest"
1229-
func (m *MemoryIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
1230-
oldestUnix := oldest.Unix()
1251+
// Prune prunes series from the index if they have become stale per their index-rule
1252+
func (m *MemoryIdx) Prune(now time.Time) ([]idx.Archive, error) {
12311253
orgs := make(map[uint32]struct{})
12321254
log.Info("memory-idx: pruning stale metricDefs across all orgs")
12331255
m.RLock()
@@ -1251,9 +1273,12 @@ func (m *MemoryIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
12511273
pre := time.Now()
12521274

12531275
m.RLock()
1276+
1277+
indexChecks := IndexRules.Checks(now)
12541278
DEFS:
12551279
for _, def := range m.defById {
1256-
if def.LastUpdate >= oldestUnix {
1280+
check := indexChecks[def.IrId]
1281+
if check.Keep || def.LastUpdate >= check.Cutoff {
12571282
continue DEFS
12581283
}
12591284

@@ -1269,7 +1294,7 @@ DEFS:
12691294
}
12701295

12711296
for _, id := range n.Defs {
1272-
if m.defById[id].LastUpdate >= oldestUnix {
1297+
if m.defById[id].LastUpdate >= check.Cutoff {
12731298
continue DEFS
12741299
}
12751300
}
@@ -1280,7 +1305,7 @@ DEFS:
12801305
// if any other MetricDef with the same tag set is not expired yet,
12811306
// then we do not want to prune any of them
12821307
for def := range defs {
1283-
if def.LastUpdate >= oldestUnix {
1308+
if def.LastUpdate >= check.Cutoff {
12841309
continue DEFS
12851310
}
12861311
}

metrictank-sample.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,6 @@ timeout = 1s
335335
num-conns = 10
336336
# Max number of metricDefs allowed to be unwritten to cassandra
337337
write-queue-size = 100000
338-
#automatically clear series from the index if they have not been seen for this much time.
339-
max-stale = 0
340338
#Interval at which the index should be checked for stale series.
341339
prune-interval = 3h
342340
# synchronize index changes to cassandra. not all your nodes need to do this.
@@ -369,3 +367,5 @@ tag-support = false
369367
tag-query-workers = 50
370368
# size of regular expression cache in tag query evaluation
371369
match-cache-size = 1000
370+
# path to index-rules.conf file
371+
index-rules-conf = /etc/metrictank/index-rules.conf

metrictank.go

+1
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func main() {
221221
***********************************/
222222
inCarbon.ConfigProcess()
223223
inKafkaMdm.ConfigProcess(*instance)
224+
memory.ConfigProcess()
224225
inPrometheus.ConfigProcess()
225226
notifierNsq.ConfigProcess()
226227
notifierKafka.ConfigProcess(*instance)

0 commit comments

Comments
 (0)