Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

cleanup/clarify asPercent code #1616

Merged
merged 2 commits into from
Jan 24, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 76 additions & 64 deletions expr/func_aspercent.go
Original file line number Diff line number Diff line change
@@ -42,12 +42,11 @@ func (s *FuncAsPercent) Context(context Context) Context {
}

func (s *FuncAsPercent) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
in, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

var outSeries []models.Series
var totals []models.Series
if s.totalSeries != nil {
totals, err = s.totalSeries.Exec(cache)
@@ -58,49 +57,51 @@ func (s *FuncAsPercent) Exec(cache map[Req][]models.Series) ([]models.Series, er

if s.nodes != nil {
if !math.IsNaN(s.totalFloat) {
return nil, errors.NewBadRequest("total must be None or a seriesList")
}
outSeries, err = s.execWithNodes(series, totals, cache)
} else {
if totals != nil && len(totals) != 1 && len(totals) != len(series) {
return nil, errors.NewBadRequest("asPercent second argument (total) must be missing, a single digit, reference exactly 1 series or reference the same number of series as the first argument")
return nil, errors.NewBadRequest("if nodes specified, total must be None or a seriesList")
}
outSeries, err = s.execWithoutNodes(series, totals, cache)
return s.execWithNodes(in, totals, cache)
}

// totals may be nil and totalFloat NaN, or totalFloat may be set, but here we only need to check for the cases where totals is set but the wrong length
if totals != nil && len(totals) != 1 && len(totals) != len(in) {
return nil, errors.NewBadRequest("if nodes not specified, asPercent second argument (total) must be missing, a single digit, reference exactly 1 series or reference the same number of series as the first argument")
}
return outSeries, err
return s.execWithoutNodes(in, totals, cache)
}

func (s *FuncAsPercent) execWithNodes(series, totals []models.Series, cache map[Req][]models.Series) ([]models.Series, error) {
// when nodes are given, totals can be:
// * nil -> in which case we divide by the sum of all input series in the group
// * serieslist -> we will sum the series in the group (or not, if we know that the group won't exist in `in` anyway, we don't need to do this work)
// * NOT a number in this case.
func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, cache map[Req][]models.Series) ([]models.Series, error) {
var outSeries []models.Series
// Set of keys
keys := make(map[string]struct{})
// Series grouped by key
metaSeries := groupSeriesByKey(series, s.nodes, &keys)
// The totals series for each key
var totalSeries map[string]models.Series

keys := make(map[string]struct{}) // will track all aggKeys seen, amongst inputs and totals series
inByKey := groupSeriesByKey(in, s.nodes, keys)
var totalSerieByKey map[string]models.Series

// calculate the sum

if math.IsNaN(s.totalFloat) && totals == nil {
totalSeries = getTotalSeries(metaSeries, metaSeries, cache)
// calculate sum of totals series
totalSerieByKey = getTotalSeries(inByKey, inByKey, cache)
} else if totals != nil {
totalSeriesLists := groupSeriesByKey(totals, s.nodes, &keys)
totalSeries = getTotalSeries(totalSeriesLists, metaSeries, cache)
totalSeriesByKey := groupSeriesByKey(totals, s.nodes, keys)
totalSerieByKey = getTotalSeries(totalSeriesByKey, inByKey, cache)
}

var nones []schema.Point

for key := range keys {
// No input series for a corresponding total series
if _, ok := metaSeries[key]; !ok {
nonesSerie := totalSeries[key]
nonesSerie.QueryPatt = fmt.Sprintf("asPercent(MISSING,%s)", totalSeries[key].QueryPatt)
nonesSerie.Target = fmt.Sprintf("asPercent(MISSING,%s)", totalSeries[key].Target)
if _, ok := inByKey[key]; !ok {
nonesSerie := totalSerieByKey[key]
nonesSerie.QueryPatt = fmt.Sprintf("asPercent(MISSING,%s)", totalSerieByKey[key].QueryPatt)
nonesSerie.Target = fmt.Sprintf("asPercent(MISSING,%s)", totalSerieByKey[key].Target)
nonesSerie.Tags = map[string]string{"name": nonesSerie.Target}

if nones == nil {
nones = pointSlicePool.Get().([]schema.Point)
for _, p := range totalSeries[key].Datapoints {
for _, p := range totalSerieByKey[key].Datapoints {
p.Val = math.NaN()
nones = append(nones, p)
}
@@ -112,9 +113,9 @@ func (s *FuncAsPercent) execWithNodes(series, totals []models.Series, cache map[
continue
}

for _, serie1 := range metaSeries[key] {
for _, serie1 := range inByKey[key] {
// No total series for a corresponding input series
if _, ok := totalSeries[key]; !ok {
if _, ok := totalSerieByKey[key]; !ok {
nonesSerie := serie1
nonesSerie.QueryPatt = fmt.Sprintf("asPercent(%s,MISSING)", serie1.QueryPatt)
nonesSerie.Target = fmt.Sprintf("asPercent(%s,MISSING)", serie1.Target)
@@ -133,9 +134,9 @@ func (s *FuncAsPercent) execWithNodes(series, totals []models.Series, cache map[
nonesSerie.Datapoints = nones
outSeries = append(outSeries, nonesSerie)
} else {
// key found in both metaSeries and totalSeries
// key found in both inByKey and totalSerieByKey
serie1 = serie1.Copy(pointSlicePool.Get().([]schema.Point))
serie2 := totalSeries[key]
serie2 := totalSerieByKey[key]
serie1.QueryPatt = fmt.Sprintf("asPercent(%s,%s)", serie1.QueryPatt, serie2.QueryPatt)
serie1.Target = fmt.Sprintf("asPercent(%s,%s)", serie1.Target, serie2.Target)
serie1.Tags = map[string]string{"name": serie1.Target}
@@ -151,24 +152,30 @@ func (s *FuncAsPercent) execWithNodes(series, totals []models.Series, cache map[
return outSeries, nil
}

func (s *FuncAsPercent) execWithoutNodes(series, totals []models.Series, cache map[Req][]models.Series) ([]models.Series, error) {
// execWithoutNodes returns the asPercent output series for each input series.
// the total (divisor) we use for each input series is based on the totals parameter, which can be:
// * a number -> used as divisor
// * a single series -> used as divisor for all input series
// * multiple series -> must match len(series), sort and match up in pairs to input series
// * nil -> generate total by summing the inputs
func (s *FuncAsPercent) execWithoutNodes(in, totals []models.Series, cache map[Req][]models.Series) ([]models.Series, error) {
var outSeries []models.Series
var totalsSerie models.Series
if math.IsNaN(s.totalFloat) && totals == nil {
totalsSerie = sumSeries(series, cache)
if len(series) == 1 {
totalsSerie = sumSeries(in, cache)
if len(in) == 1 {
totalsSerie.Target = fmt.Sprintf("sumSeries(%s)", totalsSerie.QueryPatt)
totalsSerie.QueryPatt = fmt.Sprintf("sumSeries(%s)", totalsSerie.QueryPatt)
totalsSerie.Tags = map[string]string{"name": totalsSerie.Target}
}
} else if totals != nil {
if len(totals) == 1 {
totalsSerie = totals[0]
} else if len(totals) == len(series) {
} else if len(totals) == len(in) {
// Sorted to match the input series with the total series based on Target.
// Mimics Graphite's implementation
sort.Slice(series, func(i, j int) bool {
return series[i].Target < series[j].Target
sort.Slice(in, func(i, j int) bool {
return in[i].Target < in[j].Target
})
sort.Slice(totals, func(i, j int) bool {
return totals[i].Target < totals[j].Target
@@ -179,8 +186,8 @@ func (s *FuncAsPercent) execWithoutNodes(series, totals []models.Series, cache m
totalsSerie.Target = fmt.Sprint(s.totalFloat)
}

for i, serie := range series {
if len(totals) == len(series) {
for i, serie := range in {
if len(totals) == len(in) {
totalsSerie = totals[i]
}
serie = serie.Copy(pointSlicePool.Get().([]schema.Point))
@@ -213,46 +220,51 @@ func computeAsPercent(in, total float64) float64 {
return in / total * 100
}

func groupSeriesByKey(series []models.Series, nodes []expr, keys *map[string]struct{}) map[string][]models.Series {
keyedSeries := make(map[string][]models.Series)
for _, serie := range series {
// groupSeriesByKey groups series by their aggkey which is derived from nodes,
// and adds all seen keys to the pre-existing keys map
func groupSeriesByKey(in []models.Series, nodes []expr, keys map[string]struct{}) map[string][]models.Series {
inByKey := make(map[string][]models.Series)
for _, serie := range in {
key := aggKey(serie, nodes)
if _, ok := keyedSeries[key]; !ok {
keyedSeries[key] = []models.Series{serie}
(*keys)[key] = struct{}{}
if _, ok := inByKey[key]; !ok {
inByKey[key] = []models.Series{serie}
keys[key] = struct{}{}
} else {
keyedSeries[key] = append(keyedSeries[key], serie)
inByKey[key] = append(inByKey[key], serie)
}
}
return keyedSeries
return inByKey
}

// Sums each seriesList in map of seriesLists
func getTotalSeries(totalSeriesLists, include map[string][]models.Series, cache map[Req][]models.Series) map[string]models.Series {
totalSeries := make(map[string]models.Series, len(totalSeriesLists))
for key := range totalSeriesLists {
if _, ok := include[key]; ok {
totalSeries[key] = sumSeries(totalSeriesLists[key], cache)
// getTotalSeries constructs a map with one total serie by key.
// if there is a value for the key in "inByKey", we sum the entries in totalSeriesByKey under that key,
// otherwise we do an optimization: we know that the datapoints for that key won't actually be used,
// in that case we only need to return a series that has the proper fields set like QueryPattern etc.
// note: inByKey is only used for its keys, the values (series slices) are not used.
func getTotalSeries(totalSeriesByKey, inByKey map[string][]models.Series, cache map[Req][]models.Series) map[string]models.Series {
totalSerieByKey := make(map[string]models.Series, len(totalSeriesByKey))
for key := range totalSeriesByKey {
if _, ok := inByKey[key]; ok {
totalSerieByKey[key] = sumSeries(totalSeriesByKey[key], cache)
} else {
totalSeries[key] = totalSeriesLists[key][0]
totalSerieByKey[key] = totalSeriesByKey[key][0]
}

}
return totalSeries
return totalSerieByKey
}

// sumSeries returns a copy-on-write series that is the sum of the inputs
func sumSeries(series []models.Series, cache map[Req][]models.Series) models.Series {
if len(series) == 1 {
return series[0]
func sumSeries(in []models.Series, cache map[Req][]models.Series) models.Series {
if len(in) == 1 {
return in[0]
}
out := pointSlicePool.Get().([]schema.Point)
crossSeriesSum(series, &out)
crossSeriesSum(in, &out)
var queryPatts []string
var meta models.SeriesMeta

Loop:
for _, v := range series {
for _, v := range in {
meta = meta.Merge(v.Meta)
// avoid duplicates
for _, qp := range queryPatts {
@@ -263,16 +275,16 @@ Loop:
queryPatts = append(queryPatts, v.QueryPatt)
}
name := fmt.Sprintf("sumSeries(%s)", strings.Join(queryPatts, ","))
cons, queryCons := summarizeCons(series)
cons, queryCons := summarizeCons(in)
sum := models.Series{
Target: name,
QueryPatt: name,
Datapoints: out,
Interval: series[0].Interval,
Interval: in[0].Interval,
Consolidator: cons,
QueryCons: queryCons,
QueryFrom: series[0].QueryFrom,
QueryTo: series[0].QueryTo,
QueryFrom: in[0].QueryFrom,
QueryTo: in[0].QueryTo,
Tags: map[string]string{"name": name},
Meta: meta,
}