Skip to content

Commit

Permalink
Propagate the limit option to the low level iterators
Browse files Browse the repository at this point in the history
When a GROUP BY or multiple sources are used, the top level limit
iterator requires reading the entire iterator stream so it can find all
of the tag groups it needs to return. For large data series, this ends
up with the limit iterator discarding a lot of output.

This change adds a new lower level limit iterator on each series itself
so that there are fewer data points that have to be thrown away by the
top level iterator.

Fixes #5553.
  • Loading branch information
jsternberg committed Apr 15, 2016
1 parent 6f5c72e commit bd5fdd7
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 7 deletions.
14 changes: 13 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4774,7 +4774,7 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit + offset equal to the number of points with group by time",
name: "limit + offset equal to the number of points with group by time",
command: `select mean(foo) from "limited" WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY TIME(1s) LIMIT 3 OFFSET 3`,
exp: `{"results":[{"series":[{"name":"limited","columns":["time","mean"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
Expand All @@ -4785,6 +4785,18 @@ func TestServer_Query_LimitAndOffset(t *testing.T) {
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit - group by tennant",
command: `select foo from "limited" group by tennant limit 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:02Z",2]]},{"name":"limited","tags":{"tennant":"todd"},"columns":["time","foo"],"values":[["2009-11-10T23:00:05Z",5]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "limit and offset - group by tennant",
command: `select foo from "limited" group by tennant limit 1 offset 1`,
exp: `{"results":[{"series":[{"name":"limited","tags":{"tennant":"paul"},"columns":["time","foo"],"values":[["2009-11-10T23:00:03Z",3]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

for i, query := range test.queries {
Expand Down
19 changes: 16 additions & 3 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
tagSets = influxql.LimitTagSets(tagSets, opt.SLimit, opt.SOffset)

for _, t := range tagSets {
inputs := make([]influxql.Iterator, 0, len(t.SeriesKeys))
for i, seriesKey := range t.SeriesKeys {
fields := 0
if t.Filters[i] != nil {
Expand All @@ -811,13 +812,25 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.
}
}

itr, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt)
input, err := e.createVarRefSeriesIterator(ref, mm, seriesKey, t, t.Filters[i], conditionFields[:fields], opt)
if err != nil {
return err
} else if itr == nil {
} else if input == nil {
continue
}
itrs = append(itrs, itr)
inputs = append(inputs, input)
}

if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) {
var itr influxql.Iterator
if opt.MergeSorted() {
itr = influxql.NewSortedMergeIterator(inputs, opt)
} else {
itr = influxql.NewMergeIterator(inputs, opt)
}
itrs = append(itrs, newLimitIterator(itr, opt))
} else {
itrs = append(itrs, inputs...)
}
}
}
Expand Down
31 changes: 28 additions & 3 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func BenchmarkEngine_CreateIterator_Count_1M(b *testing.B) {
}

func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
benchmarkCallIterator(b, influxql.IteratorOptions{
benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("count(value)"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
Expand All @@ -517,7 +517,29 @@ func benchmarkEngineCreateIteratorCount(b *testing.B, pointN int) {
}, pointN)
}

func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) {
func BenchmarkEngine_CreateIterator_Limit_1K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000)
}
func BenchmarkEngine_CreateIterator_Limit_100K(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 100000)
}
func BenchmarkEngine_CreateIterator_Limit_1M(b *testing.B) {
benchmarkEngineCreateIteratorLimit(b, 1000000)
}

func benchmarkEngineCreateIteratorLimit(b *testing.B, pointN int) {
benchmarkIterator(b, influxql.IteratorOptions{
Expr: influxql.MustParseExpr("value"),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Dimensions: []string{"host"},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Limit: 10,
}, pointN)
}

func benchmarkIterator(b *testing.B, opt influxql.IteratorOptions, pointN int) {
e := MustInitBenchmarkEngine(pointN)
b.ResetTimer()
b.ReportAllocs()
Expand All @@ -536,6 +558,8 @@ var benchmark struct {
PointN int
}

var hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}

// MustInitBenchmarkEngine creates a new engine and fills it with points.
// Reuses previous engine if the same parameters were used.
func MustInitBenchmarkEngine(pointN int) *Engine {
Expand Down Expand Up @@ -567,7 +591,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
for i := 0; i < pointN; i += batchSize {
var buf bytes.Buffer
for j := 0; j < batchSize; j++ {
fmt.Fprintf(&buf, "cpu,host=A value=%d %d",
fmt.Fprintf(&buf, "cpu,host=%s value=%d %d",
hostNames[j%len(hostNames)],
100+rand.Intn(50)-25,
(time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond),
)
Expand Down
152 changes: 152 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,44 @@ func (itr *floatIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *floatIterator) Close() error { return nil }

// floatLimitIterator
type floatLimitIterator struct {
input influxql.FloatIterator
opt influxql.IteratorOptions
n int
}

func newFloatLimitIterator(input influxql.FloatIterator, opt influxql.IteratorOptions) *floatLimitIterator {
return &floatLimitIterator{
input: input,
opt: opt,
}
}

func (itr *floatLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *floatLimitIterator) Close() error { return itr.input.Close() }

func (itr *floatLimitIterator) Next() *influxql.FloatPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// floatCursor represents an object for iterating over a single float field.
type floatCursor interface {
cursor
Expand Down Expand Up @@ -604,6 +642,44 @@ func (itr *integerIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *integerIterator) Close() error { return nil }

// integerLimitIterator
type integerLimitIterator struct {
input influxql.IntegerIterator
opt influxql.IteratorOptions
n int
}

func newIntegerLimitIterator(input influxql.IntegerIterator, opt influxql.IteratorOptions) *integerLimitIterator {
return &integerLimitIterator{
input: input,
opt: opt,
}
}

func (itr *integerLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *integerLimitIterator) Close() error { return itr.input.Close() }

func (itr *integerLimitIterator) Next() *influxql.IntegerPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// integerCursor represents an object for iterating over a single integer field.
type integerCursor interface {
cursor
Expand Down Expand Up @@ -978,6 +1054,44 @@ func (itr *stringIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *stringIterator) Close() error { return nil }

// stringLimitIterator
type stringLimitIterator struct {
input influxql.StringIterator
opt influxql.IteratorOptions
n int
}

func newStringLimitIterator(input influxql.StringIterator, opt influxql.IteratorOptions) *stringLimitIterator {
return &stringLimitIterator{
input: input,
opt: opt,
}
}

func (itr *stringLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *stringLimitIterator) Close() error { return itr.input.Close() }

func (itr *stringLimitIterator) Next() *influxql.StringPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// stringCursor represents an object for iterating over a single string field.
type stringCursor interface {
cursor
Expand Down Expand Up @@ -1352,6 +1466,44 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *booleanIterator) Close() error { return nil }

// booleanLimitIterator
type booleanLimitIterator struct {
input influxql.BooleanIterator
opt influxql.IteratorOptions
n int
}

func newBooleanLimitIterator(input influxql.BooleanIterator, opt influxql.IteratorOptions) *booleanLimitIterator {
return &booleanLimitIterator{
input: input,
opt: opt,
}
}

func (itr *booleanLimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *booleanLimitIterator) Close() error { return itr.input.Close() }

func (itr *booleanLimitIterator) Next() *influxql.BooleanPoint {
for {
// Check if we are beyond the limit.
if (itr.n - itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// booleanCursor represents an object for iterating over a single boolean field.
type booleanCursor interface {
cursor
Expand Down
38 changes: 38 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,44 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats {
// Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error { return nil }

// {{.name}}LimitIterator
type {{.name}}LimitIterator struct {
input influxql.{{.Name}}Iterator
opt influxql.IteratorOptions
n int
}

func new{{.Name}}LimitIterator(input influxql.{{.Name}}Iterator, opt influxql.IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
input: input,
opt: opt,
}
}

func (itr *{{.name}}LimitIterator) Stats() influxql.IteratorStats { return itr.input.Stats() }
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }

func (itr *{{.name}}LimitIterator) Next() *influxql.{{.Name}}Point {
for {
// Check if we are beyond the limit.
if (itr.n-itr.opt.Offset) > itr.opt.Limit {
return nil
}

// Read the next point.
p := itr.input.Next()
if p == nil {
return nil
}

// Increment counter.
itr.n++

// Offsets are handled by a higher level iterator so return all points.
return p
}
}

// {{.name}}Cursor represents an object for iterating over a single {{.name}} field.
type {{.name}}Cursor interface {
cursor
Expand Down
22 changes: 22 additions & 0 deletions tsdb/engine/tsm1/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tsm1

import (
"fmt"

"github.com/influxdata/influxdb/influxql"
)

func newLimitIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator {
switch input := input.(type) {
case influxql.FloatIterator:
return newFloatLimitIterator(input, opt)
case influxql.IntegerIterator:
return newIntegerLimitIterator(input, opt)
case influxql.StringIterator:
return newStringLimitIterator(input, opt)
case influxql.BooleanIterator:
return newBooleanLimitIterator(input, opt)
default:
panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
}
}

0 comments on commit bd5fdd7

Please sign in to comment.