Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func main() {
***********************************/
inCarbon.ConfigProcess()
inKafkaMdm.ConfigProcess(*instance)
memory.ConfigProcess()
inPrometheus.ConfigProcess()
notifierNsq.ConfigProcess()
notifierKafka.ConfigProcess(*instance)
Expand Down
23 changes: 17 additions & 6 deletions cmd/mt-index-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"time"

"github.com/grafana/metrictank/cmd/mt-index-cat/out"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/logger"
"github.com/raintank/dur"
"github.com/raintank/schema"
Expand Down Expand Up @@ -187,17 +189,26 @@ func main() {
perror(err)
}

var cutoff, cutoffMin int64
now := time.Now().Unix()
memory.IndexRules = conf.IndexRules{
Rules: nil,
Default: conf.IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
}

if maxStale != "0" {
maxStaleInt, err := dur.ParseNDuration(maxStale)
perror(err)
cutoff = now - int64(maxStaleInt)
memory.IndexRules.Default.MaxStale = time.Duration(maxStaleInt) * time.Second
}

var cutoffMin int64
if minStale != "0" {
minStaleInt, err := dur.ParseNDuration(minStale)
perror(err)
cutoffMin = now - int64(minStaleInt)
cutoffMin = time.Now().Unix() - int64(minStaleInt)
}

var partitions []int32
Expand All @@ -222,9 +233,9 @@ func main() {

var defs []schema.MetricDefinition
if len(partitions) == 0 {
defs = idx.Load(nil, uint32(cutoff))
defs = idx.Load(nil, time.Now())
} else {
defs = idx.LoadPartitions(partitions, nil, uint32(cutoff))
defs = idx.LoadPartitions(partitions, nil, time.Now())
}
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
Expand Down
120 changes: 120 additions & 0 deletions conf/indexrules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package conf

import (
"fmt"
"regexp"
"strings"
"time"

"github.com/alyu/configparser"
"github.com/raintank/dur"
)

// IndexRules holds the index rule definitions
type IndexRules struct {
Rules []IndexRule
Default IndexRule
}

type IndexRule struct {
Name string
Pattern *regexp.Regexp
MaxStale time.Duration
}

// NewIndexRules create instance of IndexRules
// it has a default catchall that doesn't prune
func NewIndexRules() IndexRules {
return IndexRules{
Default: IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
},
}
}

// ReadIndexRules returns the defined index rule from a index-rules.conf file
// and adds the default
func ReadIndexRules(file string) (IndexRules, error) {
config, err := configparser.Read(file)
if err != nil {
return IndexRules{}, err
}
sections, err := config.AllSections()
if err != nil {
return IndexRules{}, err
}

result := NewIndexRules()

for _, s := range sections {
item := IndexRule{}
item.Name = strings.Trim(strings.SplitN(s.String(), "\n", 2)[0], " []")
if item.Name == "" || strings.HasPrefix(item.Name, "#") {
continue
}

item.Pattern, err = regexp.Compile(s.ValueOf("pattern"))
if err != nil {
return IndexRules{}, fmt.Errorf("[%s]: failed to parse pattern %q: %s", item.Name, s.ValueOf("pattern"), err.Error())
}
duration, err := dur.ParseDuration(s.ValueOf("max-stale"))
if err != nil {
return IndexRules{}, fmt.Errorf("[%s]: failed to parse max-stale %q: %s", item.Name, s.ValueOf("max-stale"), err.Error())
}
item.MaxStale = time.Duration(duration) * time.Second

result.Rules = append(result.Rules, item)
}

return result, nil
}

// Match returns the correct index rule setting for the given metric
// it can always find a valid setting, because there's a default catch all
// also returns the index of the setting, to efficiently reference it
func (a IndexRules) Match(metric string) (uint16, IndexRule) {
for i, s := range a.Rules {
if s.Pattern.MatchString(metric) {
return uint16(i), s
}
}
return uint16(len(a.Rules)), a.Default
}

// Get returns the index rule setting corresponding to the given index
func (a IndexRules) Get(i uint16) IndexRule {
if i >= uint16(len(a.Rules)) {
return a.Default
}
return a.Rules[i]
}

// Prunable returns whether there's any entries that require pruning
func (a IndexRules) Prunable() bool {
for _, r := range a.Rules {
if r.MaxStale > 0 {
return true
}
}
return (a.Default.MaxStale > 0)
}

// Cutoffs returns a set of cutoffs corresponding to a given timestamp and the set of all rules
func (a IndexRules) Cutoffs(now time.Time) []int64 {
out := make([]int64, len(a.Rules)+1)
for i := 0; i <= len(a.Rules); i++ {
var rule IndexRule
if i < len(a.Rules) {
rule = a.Rules[i]
} else {
rule = a.Default
}
if rule.MaxStale == 0 {
out[i] = 0
} else {
out[i] = int64(now.Add(rule.MaxStale * -1).Unix())
}
}
return out
}
158 changes: 158 additions & 0 deletions conf/indexrules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package conf

import (
"io/ioutil"
"os"
"regexp"
"testing"
"time"
)

func TestReadIndexRules(t *testing.T) {
cases := []struct {
in string
expErr bool
expRules IndexRules
}{
{
in: `
[default]
pattern =
max-stale = 0
`,
expErr: false,
expRules: IndexRules{
Rules: []IndexRule{
{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
},
Default: IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
},
},
{
in: `
[longterm]
pattern = ^long
max-stale = 1y6mon

[default]
pattern = foobar
max-stale = 7d
`,
expErr: false,
expRules: IndexRules{
Rules: []IndexRule{
{
Name: "longterm",
Pattern: regexp.MustCompile("^long"),
MaxStale: time.Duration(365+6*30) * 24 * time.Hour,
},
{
Name: "default",
Pattern: regexp.MustCompile("foobar"),
MaxStale: time.Duration(24*7) * time.Hour,
},
},
Default: IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
},
},
}
for i, c := range cases {
tmpfile, err := ioutil.TempFile("", "indexrules-test-readindexrules")
if err != nil {
panic(err)
}

if _, err := tmpfile.Write([]byte(c.in)); err != nil {
panic(err)
}
if err := tmpfile.Close(); err != nil {
panic(err)
}

rules, err := ReadIndexRules(tmpfile.Name())
if (err != nil) != c.expErr {
t.Fatalf("case %d, exp err %t, got err %v", i, c.expErr, err)
}
if err == nil {
if len(c.expRules.Rules) != len(rules.Rules) {
t.Fatalf("case %d, exp rules %v, got %v", i, c.expRules, rules)
}
for i, expRule := range c.expRules.Rules {
rule := rules.Rules[i]
if rule.Name != expRule.Name || rule.Pattern.String() != expRule.Pattern.String() || rule.MaxStale != expRule.MaxStale {
t.Fatalf("case %d, exp rules %v, got %v", i, c.expRules, rules)
}
}
rule := rules.Default
expRule := c.expRules.Default
if rule.Name != expRule.Name || rule.Pattern.String() != expRule.Pattern.String() || rule.MaxStale != expRule.MaxStale {
t.Fatalf("case %d, exp rules %v, got %v", i, c.expRules, rules)
}
}

os.Remove(tmpfile.Name())
}
}
func TestIndexRulesMatch(t *testing.T) {
rules := IndexRules{
Rules: []IndexRule{
{
Name: "longterm",
Pattern: regexp.MustCompile("^long"),
MaxStale: time.Duration(365+6*30) * 24 * time.Hour,
},
// default provided by user
{
Name: "default",
Pattern: regexp.MustCompile("foobar"),
MaxStale: time.Duration(24*7) * time.Hour,
},
},
// built-in to the software
Default: IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
}
cases := []struct {
metric string
expRuleID uint16
}{
{
"abced.f",
2,
},
{
"long.baz",
0,
},
{
"long.foobar.f", // rule 0 takes precedence over rule 1 !
0,
},
{
"abc.foobar.baz",
1,
},
}
for i, c := range cases {
ruleID, _ := rules.Match(c.metric)
if ruleID != c.expRuleID {
t.Fatalf("mismatch for case %d: exp ruleID %d, got %d", i, c.expRuleID, ruleID)
}
}

}
1 change: 1 addition & 0 deletions conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// see https://graphite.readthedocs.io/en/0.9.9/config-carbon.html#storage-schemas-conf
// * storage-aggregation.conf
// see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
// as well as our own file index-rules.conf
//
// it also adds defaults (the same ones as graphite),
// so that even if nothing is matched in the user provided schemas or aggregations,
Expand Down
4 changes: 2 additions & 2 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ timeout = 1s
num-conns = 10
# Max number of metricDefs allowed to be unwritten to cassandra
write-queue-size = 100000
#automatically clear series from the index if they have not been seen for this much time.
max-stale = 0
#Interval at which the index should be checked for stale series.
prune-interval = 3h
# synchronize index changes to cassandra. not all your nodes need to do this.
Expand Down Expand Up @@ -414,6 +412,8 @@ tag-support = false
tag-query-workers = 50
# size of regular expression cache in tag query evaluation
match-cache-size = 1000
# path to index-rules.conf file
index-rules-conf = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms

Expand Down
4 changes: 2 additions & 2 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ timeout = 1s
num-conns = 10
# Max number of metricDefs allowed to be unwritten to cassandra
write-queue-size = 100000
#automatically clear series from the index if they have not been seen for this much time.
max-stale = 0
#Interval at which the index should be checked for stale series.
prune-interval = 3h
# synchronize index changes to cassandra. not all your nodes need to do this.
Expand Down Expand Up @@ -414,6 +412,8 @@ tag-support = false
tag-query-workers = 50
# size of regular expression cache in tag query evaluation
match-cache-size = 1000
# path to index-rules.conf file
index-rules-conf = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms

Expand Down
Loading