Skip to content

Commit

Permalink
Merge pull request #1591 from influxdb/spread
Browse files Browse the repository at this point in the history
Spread
  • Loading branch information
corylanou committed Feb 12, 2015
2 parents 6309e0e + b0046bc commit bcfeb0b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
48 changes: 48 additions & 0 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func (p *Planner) planCall(e *Executor, c *Call) (Processor, error) {
mapFn, reduceFn = MapMin, ReduceMin
case "max":
mapFn, reduceFn = MapMax, ReduceMax
case "spread":
mapFn, reduceFn = MapSpread, ReduceSpread
case "stddev":
mapFn, reduceFn = MapStddev, ReduceStddev
case "first":
Expand Down Expand Up @@ -753,6 +755,52 @@ func ReduceMax(key Key, values []interface{}, e *Emitter) {
}
}

type spreadMapOutput struct {
min, max float64
}

// MapSpread collects the values to pass to the reducer
func MapSpread(itr Iterator, e *Emitter, tmax int64) {
var out spreadMapOutput
pointsYielded := false

for k, v := itr.Next(); k != 0; k, v = itr.Next() {
val := v.(float64)
// Initialize
if !pointsYielded {
out.max = val
out.min = val
pointsYielded = true
}
out.max = math.Max(out.max, val)
out.min = math.Min(out.min, val)
}
if pointsYielded {
e.Emit(Key{tmax, itr.Tags()}, out)
}
}

// ReduceSpread computes the spread of values.
func ReduceSpread(key Key, values []interface{}, e *Emitter) {
var result spreadMapOutput
pointsYielded := false

for _, v := range values {
val := v.(spreadMapOutput)
// Initialize
if !pointsYielded {
result.max = val.max
result.min = val.min
pointsYielded = true
}
result.max = math.Max(result.max, val.max)
result.min = math.Min(result.min, val.min)
}
if pointsYielded {
e.Emit(key, result.max-result.min)
}
}

// MapStddev collects the values to pass to the reducer
func MapStddev(itr Iterator, e *Emitter, tmax int64) {
var values []float64
Expand Down
66 changes: 66 additions & 0 deletions influxql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,72 @@ func TestPlanner_Plan_MinWithoutResults(t *testing.T) {
}
}

// Ensure the planner can plan and execute a spread query with results
func TestPlanner_Plan_SpreadWithResults(t *testing.T) {
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
return []influxql.Iterator{
NewIterator(nil, []Point{
{"2000-01-01T00:00:00Z", float64(100)},
{"2000-01-01T00:00:10Z", float64(200)},
{"2000-01-01T00:00:20Z", float64(300)},
{"2000-01-01T00:00:30Z", float64(400)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:01:00Z", float64(0)},
{"2000-01-01T00:01:10Z", float64(1)},
{"2000-01-01T00:01:20Z", float64(100)},
{"2000-01-01T00:01:30Z", float64(1000)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:02:20Z", float64(0)},
{"2000-01-01T00:02:30Z", float64(-10)},
{"2000-01-01T00:02:40Z", float64(-20)},
{"2000-01-01T00:02:50Z", float64(-30)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:03:20Z", float64(-10)},
{"2000-01-01T00:03:50Z", float64(0)},
{"2000-01-01T00:03:30Z", float64(-30)},
{"2000-01-01T00:03:40Z", float64(-20)},
}),
NewIterator(nil, []Point{
{"2000-01-01T00:04:20Z", float64(10)},
{"2000-01-01T00:04:30Z", float64(30)},
{"2000-01-01T00:04:40Z", float64(0)},
{"2000-01-01T00:04:50Z", float64(20)},
})}, nil
}

// Expected resultset.
exp := minify(`[{"name":"cpu","columns":["time","spread"],"values":[["2000-01-01T00:00:00Z",300],["2000-01-01T00:01:00Z",1000],["2000-01-01T00:02:00Z",30],["2000-01-01T00:03:00Z",30],["2000-01-01T00:04:00Z",30]]}]`)

// Execute and compare with results.
rs := MustPlanAndExecute(NewDB(tx), `2000-01-01T12:00:00Z`,
`SELECT spread(value) FROM cpu WHERE time >= '2000-01-01' GROUP BY time(1m)`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}

// Ensure the planner can plan and execute a spread query without results
func TestPlanner_Plan_SpreadWithoutResults(t *testing.T) {
tx := NewTx()
tx.CreateIteratorsFunc = func(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
return []influxql.Iterator{NewIterator(nil, []Point{})}, nil
}

// Expected resultset.
exp := "null"

// Execute and compare with results.
rs := MustPlanAndExecute(NewDB(tx), `2000-01-01T12:00:00Z`,
`SELECT spread(value) FROM cpu WHERE time >= '2000-01-01' GROUP BY time(1m)`)
if act := minify(jsonify(rs)); exp != act {
t.Fatalf("unexpected resultset: %s", act)
}
}

// Ensure the planner can plan and execute a standard deviation query with results
func TestPlanner_Plan_StddevWithResults(t *testing.T) {
tx := NewTx()
Expand Down

0 comments on commit bcfeb0b

Please sign in to comment.