Skip to content

Commit

Permalink
Change minmax aggregator to store float64
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Sep 20, 2016
1 parent e56b548 commit eb04366
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 75 deletions.
3 changes: 1 addition & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
// buildProcessor TODO doc
func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
"tagexclude", "taginclude"}
unsupportedFields := []string{"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
Expand Down
83 changes: 43 additions & 40 deletions plugins/aggregators/minmax/minmax.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type MinMax struct {
}

type minmax struct {
min interface{}
max interface{}
min float64
max float64
}

var sampleConfig = `
Expand Down Expand Up @@ -55,24 +55,36 @@ func (m *MinMax) apply(in telegraf.Metric) {
m.tagCache[id] = in.Tags()
m.fieldCache[id] = make(map[string]minmax)
for k, v := range in.Fields() {
m.fieldCache[id][k] = minmax{
min: v,
max: v,
if fv, ok := convert(v); ok {
m.fieldCache[id][k] = minmax{
min: fv,
max: fv,
}
}
}
} else {
for k, v := range in.Fields() {
cmpmin := compare(m.fieldCache[id][k].min, v)
cmpmax := compare(m.fieldCache[id][k].max, v)
if cmpmin == 1 {
tmp := m.fieldCache[id][k]
tmp.min = v
m.fieldCache[id][k] = tmp
}
if cmpmax == -1 {
tmp := m.fieldCache[id][k]
tmp.max = v
m.fieldCache[id][k] = tmp
if fv, ok := convert(v); ok {
if _, ok := m.fieldCache[id][k]; !ok {
// hit an uncached field of a cached metric
m.fieldCache[id][k] = minmax{
min: fv,
max: fv,
}
continue
}
cmpmin := compare(m.fieldCache[id][k].min, fv)
cmpmax := compare(m.fieldCache[id][k].max, fv)
if cmpmin == 1 {
tmp := m.fieldCache[id][k]
tmp.min = fv
m.fieldCache[id][k] = tmp
}
if cmpmax == -1 {
tmp := m.fieldCache[id][k]
tmp.max = fv
m.fieldCache[id][k] = tmp
}
}
}
}
Expand Down Expand Up @@ -156,32 +168,23 @@ func (m *MinMax) continuousHandler() {
}
}

func compare(a, b interface{}) int {
switch at := a.(type) {
case int64:
if bt, ok := b.(int64); ok {
if at < bt {
return -1
} else if at > bt {
return 1
}
return 0
} else {
return 0
}
func compare(a, b float64) int {
if a < b {
return -1
} else if a > b {
return 1
}
return 0
}

func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
if bt, ok := b.(float64); ok {
if at < bt {
return -1
} else if at > bt {
return 1
}
return 0
} else {
return 0
}
return v, true
case int64:
return float64(v), true
default:
return 0
return 0, false
}
}

Expand Down
Loading

0 comments on commit eb04366

Please sign in to comment.