Skip to content

Commit

Permalink
Merge pull request #6391 from influxdata/js-5553-limit-queries-slow-w…
Browse files Browse the repository at this point in the history
…ith-group-by

Propagate the limit option to the low level iterators
  • Loading branch information
jsternberg committed Apr 16, 2016
2 parents 8ed877a + bd5fdd7 commit 93745d9
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 7 deletions.
14 changes: 13 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4774,7 +4774,7 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit + offset equal to the number of points with group by time",
name: "limit + offset equal to the number of points with group by time",
command: `select mean(foo) from "limited" WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY TIME(1s) LIMIT 3 OFFSET 3`,
exp: `{"results":[{"series":[{"name":"limited","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
Expand All @@ -4785,6 +4785,18 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit - group by tennant",
command: `select foo from "limited" group by tennant limit 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:02Z",2]]},{"name":"limited","tags":{"tennant":"todd"},"columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit and offset - group by tennant",
command: `select foo from "limited" group by tennant limit 1 offset 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

for i, query := range test.queries {
Expand Down
19 changes: 16 additions & 3 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)

for _, t := range tagSets {
inputs := make([]influxql.Iterator, 0, len(t.SeriesKeys))
for i, seriesKey := range t.SeriesKeys {
fields := 0
if t.Filters[i] != nil {
Expand All @@ -802,13 +803,25 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
}
}

itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt)
input, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt)
if err != nil {
return err
} else if itr == nil {
} else if input == nil {
continue
}
itrs = append(itrs, itr)
inputs = append(inputs, input)
}

if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) {
var itr influxql.Iterator
if opt.MergeSorted() {
itr = influxql.NewSortedMergeIterator(inputs, opt)
} else {
itr = influxql.NewMergeIterator(inputs, opt)
}
itrs = append(itrs, newLimitIterator(itr, opt))
} else {
itrs = append(itrs, inputs...)
}
}
}
Expand Down
31 changes: 28 additions & 3 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func BenchmarkEngine_CreateIterator_Count_1M(b *testing.B) {
}

func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
benchmarkCallIterator(b, influxql.IteratorOptions{
benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("count(value)"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
Expand All @@ -517,7 +517,29 @@ func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
}, pointN)
}

func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) {
func BenchmarkEngine_CreateIterator_Limit_1K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000)
}
func BenchmarkEngine_CreateIterator_Limit_100K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 100000)
}
func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000000)
}

func benchmarkEngineCreateIteratorLimit(b *testing.B, pointN int) {
benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("value"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Limit: 10,
}, pointN)
}

func benchmarkIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) {
e := MustInitBenchmarkEngine(pointN)
b.ResetTimer()
b.ReportAllocs()
Expand All @@ -536,6 +558,8 @@ var benchmark struct {
PointN int
}

var hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}

// MustInitBenchmarkEngine creates a new engine and fills it with points.
// Reuses previous engine if the same parameters were used.
func MustInitBenchmarkEngine(pointN int) *Engine {
Expand Down Expand Up @@ -567,7 +591,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
for i := 0; i < pointN; i += batchSize {
var buf bytes.Buffer
for j := 0; j < batchSize; j++ {
fmt.Fprintf(&buf, "cpu,host=A value=%d %d",
fmt.Fprintf(&buf, "cpu,host=%s value=%d %d",
hostNames[j%len(hostNames)],
100+rand.Intn(50)-25,
(time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond),
)
Expand Down
152 changes: 152 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,44 @@ func (itr *floatIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *floatIterator) Close() error { return nil }

// floatLimitIterator
type floatLimitIterator struct {
input influxql.FloatIterator
opt influxql.IteratorOptions
n int
}

func newFloatLimitIterator(input influxql.FloatIterator, opt influxql.IteratorOptions) *floatLimitIterator {
return &floatLimitIterator{
input: input,
opt: opt,
}
}

func (itr *floatLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *floatLimitIterator) Close() error { return itr.input.Close() }

func (itr *floatLimitIterator) Next() *influxql.FloatPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// floatCursor represents an object for iterating over a single float field.
type floatCursor interface {
cursor
Expand Down Expand Up @@ -604,6 +642,44 @@ func (itr *integerIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *integerIterator) Close() error { return nil }

// integerLimitIterator
type integerLimitIterator struct {
input influxql.IntegerIterator
opt influxql.IteratorOptions
n int
}

func newIntegerLimitIterator(input influxql.IntegerIterator, opt influxql.IteratorOptions) *integerLimitIterator {
return &integerLimitIterator{
input: input,
opt: opt,
}
}

func (itr *integerLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *integerLimitIterator) Close() error { return itr.input.Close() }

func (itr *integerLimitIterator) Next() *influxql.IntegerPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// integerCursor represents an object for iterating over a single integer field.
type integerCursor interface {
cursor
Expand Down Expand Up @@ -978,6 +1054,44 @@ func (itr *stringIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *stringIterator) Close() error { return nil }

// stringLimitIterator
type stringLimitIterator struct {
input influxql.StringIterator
opt influxql.IteratorOptions
n int
}

func newStringLimitIterator(input influxql.StringIterator, opt influxql.IteratorOptions) *stringLimitIterator {
return &stringLimitIterator{
input: input,
opt: opt,
}
}

func (itr *stringLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *stringLimitIterator) Close() error { return itr.input.Close() }

func (itr *stringLimitIterator) Next() *influxql.StringPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// stringCursor represents an object for iterating over a single string field.
type stringCursor interface {
cursor
Expand Down Expand Up @@ -1352,6 +1466,44 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *booleanIterator) Close() error { return nil }

// booleanLimitIterator
type booleanLimitIterator struct {
input influxql.BooleanIterator
opt influxql.IteratorOptions
n int
}

func newBooleanLimitIterator(input influxql.BooleanIterator, opt influxql.IteratorOptions) *booleanLimitIterator {
return &booleanLimitIterator{
input: input,
opt: opt,
}
}

func (itr *booleanLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *booleanLimitIterator) Close() error { return itr.input.Close() }

func (itr *booleanLimitIterator) Next() *influxql.BooleanPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// booleanCursor represents an object for iterating over a single boolean field.
type booleanCursor interface {
cursor
Expand Down
38 changes: 38 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,44 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error { return nil }

// {{.name}}LimitIterator
type {{.name}}LimitIterator struct {
input influxql.{{.Name}}Iterator
opt influxql.IteratorOptions
n int
}

func new{{.Name}}LimitIterator(input influxql.{{.Name}}Iterator, opt influxql.IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
input: input,
opt: opt,
}
}

func (itr *{{.name}}LimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }

func (itr *{{.name}}LimitIterator) Next() *influxql.{{.Name}}Point {
for {
// Check if we are beyond the limit.
if (itr.n-itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// {{.name}}Cursor represents an object for iterating over a single {{.name}} field.
type {{.name}}Cursor interface {
cursor
Expand Down
22 changes: 22 additions & 0 deletions tsdb/engine/tsm1/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tsm1

import (
"fmt"

"github.com/influxdata/influxdb/influxql"
)

func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator {
switch input := input.(type) {
case influxql.FloatIterator:
return newFloatLimitIterator(input, opt)
case influxql.IntegerIterator:
return newIntegerLimitIterator(input, opt)
case influxql.StringIterator:
return newStringLimitIterator(input, opt)
case influxql.BooleanIterator:
return newBooleanLimitIterator(input, opt)
default:
panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
}
}

0 comments on commit 93745d9

Please sign in to comment.