Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 7000ea2

Browse files
authored
Merge pull request #1433 from grafana/meta_tag_enrichment
Meta tag enrichment
2 parents b6ffeb1 + 7b1a530 commit 7000ea2

25 files changed

+1111
-59
lines changed

api/dataprocessor.go

+2
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats,
255255
}
256256
out = append(out, resp.series...)
257257
}
258+
258259
log.Debugf("DP getTargetsRemote: total of %d series found on peers", len(out))
259260
return out, nil
260261
}
@@ -317,6 +318,7 @@ LOOP:
317318
}
318319
out = append(out, resp.series...)
319320
}
321+
320322
ss.Trace(span)
321323
log.Debugf("DP getTargetsLocal: %d series found locally", len(out))
322324
return out, nil

api/graphite.go

+14
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
662662
minFrom := uint32(math.MaxUint32)
663663
var maxTo uint32
664664
var reqs []models.Req
665+
metaTagEnrichmentData := make(map[string]tagquery.Tags)
665666

666667
// note that different patterns to query can have different from / to, so they require different index lookups
667668
// e.g. target=movingAvg(foo.*, "1h")&target=foo.*
@@ -722,6 +723,10 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
722723
archive.Id, archive.NameWithTags(), r.Query, r.From, r.To, plan.MaxDataPoints, uint32(archive.Interval), cons, consReq, s.Node, archive.SchemaId, archive.AggId)
723724
reqs = append(reqs, newReq)
724725
}
726+
727+
if tagquery.MetaTagSupport && len(metric.Defs) > 0 && len(metric.MetaTags) > 0 {
728+
metaTagEnrichmentData[metric.Defs[0].NameWithTags()] = metric.MetaTags
729+
}
725730
}
726731
}
727732
}
@@ -770,6 +775,14 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
770775

771776
out = mergeSeries(out)
772777

778+
if len(metaTagEnrichmentData) > 0 {
779+
for i := range out {
780+
if metaTags, ok := metaTagEnrichmentData[out[i].Target]; ok {
781+
out[i].EnrichWithTags(metaTags)
782+
}
783+
}
784+
}
785+
773786
// instead of waiting for all data to come in and then start processing everything, we could consider starting processing earlier, at the risk of doing needless work
774787
// if we need to cancel the request due to a fetch error
775788

@@ -787,6 +800,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
787800

788801
preRun := time.Now()
789802
out, err = plan.Run(data)
803+
790804
meta.RenderStats.PlanRunDuration = time.Since(preRun)
791805
planRunDuration.Value(meta.RenderStats.PlanRunDuration)
792806
return out, meta, err

api/models/series.go

+41
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package models
33
import (
44
"bytes"
55
"math"
6+
"sort"
67
"strconv"
78
"strings"
89

910
"github.com/grafana/metrictank/consolidation"
11+
"github.com/grafana/metrictank/expr/tagquery"
1012
"github.com/grafana/metrictank/schema"
1113
pickle "github.com/kisielk/og-rek"
1214
)
@@ -68,6 +70,45 @@ func (s *Series) SetTags() {
6870
s.Tags["name"] = name
6971
}
7072

73+
func (s *Series) EnrichWithTags(tags tagquery.Tags) {
74+
// every serie has at least one tag, the name tag
75+
// if there are no tags defined, this means the tags must not have been set yet
76+
if len(s.Tags) == 0 {
77+
s.SetTags()
78+
}
79+
80+
for _, tag := range tags {
81+
if _, ok := s.Tags[tag.Key]; !ok {
82+
s.Tags[tag.Key] = tag.Value
83+
}
84+
}
85+
86+
s.buildTargetFromTags()
87+
}
88+
89+
func (s *Series) buildTargetFromTags() {
90+
buf := bytes.NewBufferString(s.Tags["name"])
91+
92+
keys := make([]string, 0, len(s.Tags)-1)
93+
for key := range s.Tags {
94+
if key == "name" {
95+
continue
96+
}
97+
keys = append(keys, key)
98+
}
99+
100+
sort.Strings(keys)
101+
102+
for _, key := range keys {
103+
buf.WriteRune(';')
104+
buf.WriteString(key)
105+
buf.WriteRune('=')
106+
buf.WriteString(s.Tags[key])
107+
}
108+
109+
s.Target = buf.String()
110+
}
111+
71112
func (s Series) Copy(emptyDatapoints []schema.Point) Series {
72113
newSeries := Series{
73114
Target: s.Target,

docker/docker-chaos/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ meta-tag-support = false
442442
tag-query-workers = 50
443443
# size of regular expression cache in tag query evaluation
444444
match-cache-size = 1000
445+
# size of the meta tag enrichment cache
446+
enrichment-cache-size = 10000
445447
# path to index-rules.conf file
446448
rules-file = /etc/metrictank/index-rules.conf
447449
# maximum duration each second a prune job can lock the index.

docker/docker-cluster-query/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ meta-tag-support = false
442442
tag-query-workers = 50
443443
# size of regular expression cache in tag query evaluation
444444
match-cache-size = 1000
445+
# size of the meta tag enrichment cache
446+
enrichment-cache-size = 10000
445447
# path to index-rules.conf file
446448
rules-file = /etc/metrictank/index-rules.conf
447449
# maximum duration each second a prune job can lock the index.

docker/docker-cluster/metrictank.ini

+3-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,9 @@ meta-tag-support = false
441441
# number of workers to spin up to evaluate tag queries
442442
tag-query-workers = 50
443443
# size of regular expression cache in tag query evaluation
444-
tag-query-workers = 50
444+
match-cache-size = 1000
445+
# size of the meta tag enrichment cache
446+
enrichment-cache-size = 10000
445447
# path to index-rules.conf file
446448
rules-file = /etc/metrictank/index-rules.conf
447449
# maximum duration each second a prune job can lock the index.

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ meta-tag-support = false
442442
tag-query-workers = 50
443443
# size of regular expression cache in tag query evaluation
444444
match-cache-size = 1000
445+
# size of the meta tag enrichment cache
446+
enrichment-cache-size = 10000
445447
# path to index-rules.conf file
446448
rules-file = /etc/metrictank/index-rules.conf
447449
# maximum duration each second a prune job can lock the index.

docs/config.md

+2
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,8 @@ meta-tag-support = false
516516
tag-query-workers = 50
517517
# size of regular expression cache in tag query evaluation
518518
match-cache-size = 1000
519+
# size of the meta tag enrichment cache
520+
enrichment-cache-size = 10000
519521
# path to index-rules.conf file
520522
rules-file = /etc/metrictank/index-rules.conf
521523
# maximum duration each second a prune job can lock the index.

expr/tagquery/tag.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/grafana/metrictank/schema"
1010
)
1111

12+
//go:generate msgp
1213
type Tags []Tag
1314

1415
func ParseTags(tags []string) (Tags, error) {

0 commit comments

Comments
 (0)