Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 2ace110

Browse files
authored
Merge pull request #1794 from bloomberg/dedup_series
Dedup resolve series requests
2 parents c48f7d6 + 0f078c6 commit 2ace110

File tree

1 file changed

+35
-21
lines changed

1 file changed

+35
-21
lines changed

api/graphite.go

+35-21
Original file line numberDiff line numberDiff line change
@@ -695,11 +695,23 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
695695
reqs := NewReqMap()
696696
metaTagEnrichmentData := make(map[string]tagquery.Tags)
697697

698+
// Map identical series expressions to reduce round trips. For the purpose of resolving series
699+
// queries/patterns to matching series, uniqueness is determined by only Query, From, and To.
700+
resolveSeriesRequests := make(map[expr.Req][]expr.Req)
701+
for i, r := range plan.Reqs {
702+
strippedreq := expr.Req{
703+
Query: r.Query,
704+
From: r.From,
705+
To: r.To,
706+
}
707+
resolveSeriesRequests[strippedreq] = append(resolveSeriesRequests[strippedreq], plan.Reqs[i])
708+
}
709+
698710
// note that different patterns to query can have different from / to, so they require different index lookups
699711
// e.g. target=movingAvg(foo.*, "1h")&target=foo.*
700712
// note that in this case we fetch foo.* twice. can be optimized later
701713
pre := time.Now()
702-
for _, r := range plan.Reqs {
714+
for r, rawReqs := range resolveSeriesRequests {
703715
select {
704716
case <-ctx.Done():
705717
//request canceled
@@ -728,27 +740,29 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
728740
for _, s := range series {
729741
for _, metric := range s.Series {
730742
for _, archive := range metric.Defs {
731-
var cons consolidation.Consolidator
732-
consReq := r.Cons
733-
if consReq == 0 {
734-
// we will use the primary method dictated by the storage-aggregations rules
735-
// note:
736-
// * we can't just let the expr library take care of normalization, as we may have to fetch targets
737-
// from cluster peers; it's more efficient to have them normalize the data at the source.
738-
// * a pattern may expand to multiple series, each of which can have their own aggregation method.
739-
fn := mdata.Aggregations.Get(archive.AggId).AggregationMethod[0]
740-
cons = consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
741-
} else {
742-
// user specified a runtime consolidation function via consolidateBy()
743-
// get the consolidation method of the most appropriate rollup based on the consolidation method
744-
// requested by the user. e.g. if the user requested 'min' but we only have 'avg' and 'sum' rollups,
745-
// use 'avg'.
746-
cons = closestAggMethod(consReq, mdata.Aggregations.Get(archive.AggId).AggregationMethod)
743+
for _, rawReq := range rawReqs {
744+
var cons consolidation.Consolidator
745+
consReq := rawReq.Cons
746+
if consReq == 0 {
747+
// we will use the primary method dictated by the storage-aggregations rules
748+
// note:
749+
// * we can't just let the expr library take care of normalization, as we may have to fetch targets
750+
// from cluster peers; it's more efficient to have them normalize the data at the source.
751+
// * a pattern may expand to multiple series, each of which can have their own aggregation method.
752+
fn := mdata.Aggregations.Get(archive.AggId).AggregationMethod[0]
753+
cons = consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
754+
} else {
755+
// user specified a runtime consolidation function via consolidateBy()
756+
// get the consolidation method of the most appropriate rollup based on the consolidation method
757+
// requested by the user. e.g. if the user requested 'min' but we only have 'avg' and 'sum' rollups,
758+
// use 'avg'.
759+
cons = closestAggMethod(consReq, mdata.Aggregations.Get(archive.AggId).AggregationMethod)
760+
}
761+
762+
newReq := rawReq.ToModel()
763+
newReq.Init(archive, cons, s.Node)
764+
reqs.Add(newReq)
747765
}
748-
749-
newReq := r.ToModel()
750-
newReq.Init(archive, cons, s.Node)
751-
reqs.Add(newReq)
752766
}
753767

754768
if tagquery.MetaTagSupport && len(metric.Defs) > 0 && len(metric.MetaTags) > 0 {

0 commit comments

Comments
 (0)