diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 208554f52c4..feeb365f4fe 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -4774,7 +4774,7 @@ func TestServer_Query_LimitAndOffset(t *testing.T) { params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: "limit + offset equal to the number of points with group by time", + name: "limit + offset equal to the number of points with group by time", command: `select mean(foo) from "limited" WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY TIME(1s) LIMIT 3 OFFSET 3`, exp: `{"results":[{"series":[{"name":"limited","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`, params: url.Values{"db": []string{"db0"}}, @@ -4785,6 +4785,18 @@ func TestServer_Query_LimitAndOffset(t *testing.T) { exp: `{"results":[{}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: "limit - group by tennant", + command: `select foo from "limited" group by tennant limit 1`, + exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:02Z",2]]},{"name":"limited","tags":{"tennant":"todd"},"columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "limit and offset - group by tennant", + command: `select foo from "limited" group by tennant limit 1 offset 1`, + exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, }...) for i, query := range test.queries { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f97a1f17e44..823dfb4da07 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -792,6 +792,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql. tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset) for _, t := range tagSets { + inputs := make([]influxql.Iterator, 0, len(t.SeriesKeys)) for i, seriesKey := range t.SeriesKeys { fields := 0 if t.Filters[i] != nil { @@ -802,13 +803,25 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql. } } - itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt) + input, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt) if err != nil { return err - } else if itr == nil { + } else if input == nil { continue } - itrs = append(itrs, itr) + inputs = append(inputs, input) + } + + if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) { + var itr influxql.Iterator + if opt.MergeSorted() { + itr = influxql.NewSortedMergeIterator(inputs, opt) + } else { + itr = influxql.NewMergeIterator(inputs, opt) + } + itrs = append(itrs, newLimitIterator(itr, opt)) + } else { + itrs = append(itrs, inputs...) } } } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 5b3c96a5a13..5d5e68534c3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -508,7 +508,7 @@ func BenchmarkEngine_CreateIterator_Count_1M(b *testing.B) { } func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) { - benchmarkCallIterator(b, influxql.IteratorOptions{ + benchmarkIterator(b, influxql.IteratorOptions{ Expr: influxql.MustParseExpr("count(value)"), Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, Ascending: true, @@ -517,7 +517,29 @@ func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) { }, pointN) } -func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) { +func BenchmarkEngine_CreateIterator_Limit_1K(b *testing.B) { + benchmarkEngineCreateIteratorLimit(b, 1000) +} +func BenchmarkEngine_CreateIterator_Limit_100K(b *testing.B) { + benchmarkEngineCreateIteratorLimit(b, 100000) +} +func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) { + benchmarkEngineCreateIteratorLimit(b, 1000000) +} + +func benchmarkEngineCreateIteratorLimit(b *testing.B, pointN int) { + benchmarkIterator(b, influxql.IteratorOptions{ + Expr: influxql.MustParseExpr("value"), + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + Dimensions: []string{"host"}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + Limit: 10, + }, pointN) +} + +func benchmarkIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) { e := MustInitBenchmarkEngine(pointN) b.ResetTimer() b.ReportAllocs() @@ -536,6 +558,8 @@ var benchmark struct { PointN int } +var hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} + // MustInitBenchmarkEngine creates a new engine and fills it with points. // Reuses previous engine if the same parameters were used. func MustInitBenchmarkEngine(pointN int) *Engine { @@ -567,7 +591,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine { for i := 0; i < pointN; i += batchSize { var buf bytes.Buffer for j := 0; j < batchSize; j++ { - fmt.Fprintf(&buf, "cpu,host=A value=%d %d", + fmt.Fprintf(&buf, "cpu,host=%s value=%d %d", + hostNames[j%len(hostNames)], 100+rand.Intn(50)-25, (time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond), ) diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index 6022bd40b7e..b7fa1b3ee24 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -230,6 +230,44 @@ func (itr *floatIterator) Stats() influxql.IteratorStats { // Close closes the iterator. func (itr *floatIterator) Close() error { return nil } +// floatLimitIterator +type floatLimitIterator struct { + input influxql.FloatIterator + opt influxql.IteratorOptions + n int +} + +func newFloatLimitIterator(input influxql.FloatIterator, opt influxql.IteratorOptions) *floatLimitIterator { + return &floatLimitIterator{ + input: input, + opt: opt, + } +} + +func (itr *floatLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() } +func (itr *floatLimitIterator) Close() error { return itr.input.Close() } + +func (itr *floatLimitIterator) Next() *influxql.FloatPoint { + for { + // Check if we are beyond the limit. + if (itr.n - itr.opt.Offset) > itr.opt.Limit { + return nil + } + + // Read the next point. + p := itr.input.Next() + if p == nil { + return nil + } + + // Increment counter. + itr.n++ + + // Offsets are handled by a higher level iterator so return all points. + return p + } +} + // floatCursor represents an object for iterating over a single float field. type floatCursor interface { cursor @@ -604,6 +642,44 @@ func (itr *integerIterator) Stats() influxql.IteratorStats { // Close closes the iterator. func (itr *integerIterator) Close() error { return nil } +// integerLimitIterator +type integerLimitIterator struct { + input influxql.IntegerIterator + opt influxql.IteratorOptions + n int +} + +func newIntegerLimitIterator(input influxql.IntegerIterator, opt influxql.IteratorOptions) *integerLimitIterator { + return &integerLimitIterator{ + input: input, + opt: opt, + } +} + +func (itr *integerLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() } +func (itr *integerLimitIterator) Close() error { return itr.input.Close() } + +func (itr *integerLimitIterator) Next() *influxql.IntegerPoint { + for { + // Check if we are beyond the limit. + if (itr.n - itr.opt.Offset) > itr.opt.Limit { + return nil + } + + // Read the next point. + p := itr.input.Next() + if p == nil { + return nil + } + + // Increment counter. + itr.n++ + + // Offsets are handled by a higher level iterator so return all points. + return p + } +} + // integerCursor represents an object for iterating over a single integer field. type integerCursor interface { cursor @@ -978,6 +1054,44 @@ func (itr *stringIterator) Stats() influxql.IteratorStats { // Close closes the iterator. func (itr *stringIterator) Close() error { return nil } +// stringLimitIterator +type stringLimitIterator struct { + input influxql.StringIterator + opt influxql.IteratorOptions + n int +} + +func newStringLimitIterator(input influxql.StringIterator, opt influxql.IteratorOptions) *stringLimitIterator { + return &stringLimitIterator{ + input: input, + opt: opt, + } +} + +func (itr *stringLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() } +func (itr *stringLimitIterator) Close() error { return itr.input.Close() } + +func (itr *stringLimitIterator) Next() *influxql.StringPoint { + for { + // Check if we are beyond the limit. + if (itr.n - itr.opt.Offset) > itr.opt.Limit { + return nil + } + + // Read the next point. + p := itr.input.Next() + if p == nil { + return nil + } + + // Increment counter. + itr.n++ + + // Offsets are handled by a higher level iterator so return all points. + return p + } +} + // stringCursor represents an object for iterating over a single string field. type stringCursor interface { cursor @@ -1352,6 +1466,44 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats { // Close closes the iterator. func (itr *booleanIterator) Close() error { return nil } +// booleanLimitIterator +type booleanLimitIterator struct { + input influxql.BooleanIterator + opt influxql.IteratorOptions + n int +} + +func newBooleanLimitIterator(input influxql.BooleanIterator, opt influxql.IteratorOptions) *booleanLimitIterator { + return &booleanLimitIterator{ + input: input, + opt: opt, + } +} + +func (itr *booleanLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() } +func (itr *booleanLimitIterator) Close() error { return itr.input.Close() } + +func (itr *booleanLimitIterator) Next() *influxql.BooleanPoint { + for { + // Check if we are beyond the limit. + if (itr.n - itr.opt.Offset) > itr.opt.Limit { + return nil + } + + // Read the next point. + p := itr.input.Next() + if p == nil { + return nil + } + + // Increment counter. + itr.n++ + + // Offsets are handled by a higher level iterator so return all points. + return p + } +} + // booleanCursor represents an object for iterating over a single boolean field. type booleanCursor interface { cursor diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpl b/tsdb/engine/tsm1/iterator.gen.go.tmpl index 0f0770a19d3..6cee53453da 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpl @@ -226,6 +226,44 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats { // Close closes the iterator. func (itr *{{.name}}Iterator) Close() error { return nil } +// {{.name}}LimitIterator +type {{.name}}LimitIterator struct { + input influxql.{{.Name}}Iterator + opt influxql.IteratorOptions + n int +} + +func new{{.Name}}LimitIterator(input influxql.{{.Name}}Iterator, opt influxql.IteratorOptions) *{{.name}}LimitIterator { + return &{{.name}}LimitIterator{ + input: input, + opt: opt, + } +} + +func (itr *{{.name}}LimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() } +func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() } + +func (itr *{{.name}}LimitIterator) Next() *influxql.{{.Name}}Point { + for { + // Check if we are beyond the limit. + if (itr.n-itr.opt.Offset) > itr.opt.Limit { + return nil + } + + // Read the next point. + p := itr.input.Next() + if p == nil { + return nil + } + + // Increment counter. + itr.n++ + + // Offsets are handled by a higher level iterator so return all points. + return p + } +} + // {{.name}}Cursor represents an object for iterating over a single {{.name}} field. type {{.name}}Cursor interface { cursor diff --git a/tsdb/engine/tsm1/iterator.go b/tsdb/engine/tsm1/iterator.go new file mode 100644 index 00000000000..e9b8416bb1a --- /dev/null +++ b/tsdb/engine/tsm1/iterator.go @@ -0,0 +1,22 @@ +package tsm1 + +import ( + "fmt" + + "github.com/influxdata/influxdb/influxql" +) + +func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator { + switch input := input.(type) { + case influxql.FloatIterator: + return newFloatLimitIterator(input, opt) + case influxql.IntegerIterator: + return newIntegerLimitIterator(input, opt) + case influxql.StringIterator: + return newStringLimitIterator(input, opt) + case influxql.BooleanIterator: + return newBooleanLimitIterator(input, opt) + default: + panic(fmt.Sprintf("unsupported limit iterator type: %T", input)) + } +}