Skip to content

Commit

Permalink
Create a new interrupt iterator that will stop emitting points after …
Browse files Browse the repository at this point in the history
…an interrupt

Use of the iterator is spread out into both `IteratorCreators` and
inside of the iterators themselves. Part of the interrupt must be
handled inside of the engine so it stops trying to emit points when an
interrupt is found and another part of the interrupt has to happen when
combining the iterators so it doesn't just start reading the next shard.
  • Loading branch information
jsternberg committed Mar 15, 2016
1 parent 30a6852 commit 1db7c9e
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (e *QueryExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordU
func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, chunkSize, statementID int, results chan *influxql.Result, closing <-chan struct{}) error {
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{}
opt := influxql.SelectOptions{InterruptCh: closing}

// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
Expand Down
5 changes: 3 additions & 2 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,9 @@ func BenchmarkCallIterator_Min_Float(b *testing.B) {
b.ReportAllocs()

itr, err := influxql.NewCallIterator(input, influxql.IteratorOptions{
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
Expr: MustParseExpr("min(value)"),
Interval: influxql.Interval{Duration: 1 * time.Hour},
InterruptCh: make(chan struct{}),
})
if err != nil {
b.Fatal(err)
Expand Down
132 changes: 132 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,39 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
return p
}

// floatInterruptIterator represents a float implementation of InterruptIterator.
type floatInterruptIterator struct {
input FloatIterator
closing <-chan struct{}
count int
}

func newFloatInterruptIterator(input FloatIterator, closing <-chan struct{}) *floatInterruptIterator {
return &floatInterruptIterator{input: input, closing: closing}
}

func (itr *floatInterruptIterator) Close() error { return itr.input.Close() }

func (itr *floatInterruptIterator) Next() *FloatPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
Expand Down Expand Up @@ -1894,6 +1927,39 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
return p
}

// integerInterruptIterator represents a integer implementation of InterruptIterator.
type integerInterruptIterator struct {
input IntegerIterator
closing <-chan struct{}
count int
}

func newIntegerInterruptIterator(input IntegerIterator, closing <-chan struct{}) *integerInterruptIterator {
return &integerInterruptIterator{input: input, closing: closing}
}

func (itr *integerInterruptIterator) Close() error { return itr.input.Close() }

func (itr *integerInterruptIterator) Next() *IntegerPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
Expand Down Expand Up @@ -3215,6 +3281,39 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
return p
}

// stringInterruptIterator represents a string implementation of InterruptIterator.
type stringInterruptIterator struct {
input StringIterator
closing <-chan struct{}
count int
}

func newStringInterruptIterator(input StringIterator, closing <-chan struct{}) *stringInterruptIterator {
return &stringInterruptIterator{input: input, closing: closing}
}

func (itr *stringInterruptIterator) Close() error { return itr.input.Close() }

func (itr *stringInterruptIterator) Next() *StringPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
Expand Down Expand Up @@ -4536,6 +4635,39 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
return p
}

// booleanInterruptIterator represents a boolean implementation of InterruptIterator.
type booleanInterruptIterator struct {
input BooleanIterator
closing <-chan struct{}
count int
}

func newBooleanInterruptIterator(input BooleanIterator, closing <-chan struct{}) *booleanInterruptIterator {
return &booleanInterruptIterator{input: input, closing: closing}
}

func (itr *booleanInterruptIterator) Close() error { return itr.input.Close() }

func (itr *booleanInterruptIterator) Next() *BooleanPoint {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
Expand Down
33 changes: 33 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,39 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
return p
}

// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
type {{$k.name}}InterruptIterator struct {
input {{$k.Name}}Iterator
closing <-chan struct{}
count int
}

func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
}

func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }

func (itr *{{$k.name}}InterruptIterator) Next() *{{$k.Name}}Point {
// Only check if the channel is closed every 256 points. This
// intentionally checks on both 0 and 256 so that if the iterator
// has been interrupted before the first point is emitted it will
// not emit any points.
if itr.count&0x100 == 0 {
select {
case <-itr.closing:
return nil
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
}
}

// Increment the counter for every point read.
itr.count++
return itr.input.Next()
}

// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
Expand Down
34 changes: 33 additions & 1 deletion influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,23 @@ func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator {
}
}

// NewInterruptIterator returns an iterator that will stop producing output when a channel
// has been closed on the passed in channel.
func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
switch input := input.(type) {
case FloatIterator:
return newFloatInterruptIterator(input, closing)
case IntegerIterator:
return newIntegerInterruptIterator(input, closing)
case StringIterator:
return newStringInterruptIterator(input, closing)
case BooleanIterator:
return newBooleanInterruptIterator(input, closing)
default:
panic(fmt.Sprintf("unsupported fill iterator type: %T", input))
}
}

// AuxIterator represents an iterator that can split off separate auxilary iterators.
type AuxIterator interface {
Iterator
Expand Down Expand Up @@ -466,7 +483,11 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)

// Merge into a single iterator.
if opt.MergeSorted() {
return NewSortedMergeIterator(itrs, opt), nil
itr := NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}

itr := NewMergeIterator(itrs, opt)
Expand All @@ -478,6 +499,10 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error)
}
}
}

if opt.InterruptCh != nil {
itr = NewInterruptIterator(itr, opt.InterruptCh)
}
return NewCallIterator(itr, opt)
}

Expand Down Expand Up @@ -604,6 +629,10 @@ type IteratorOptions struct {

// Removes duplicate rows from raw queries.
Dedupe bool

// If this channel is set and is closed, the iterator should try to exit
// and close as soon as possible.
InterruptCh <-chan struct{}
}

// newIteratorOptionsStmt creates the iterator options from stmt.
Expand Down Expand Up @@ -655,6 +684,9 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite
opt.Fill, opt.FillValue = stmt.Fill, stmt.FillValue
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
if sopt != nil {
opt.InterruptCh = sopt.InterruptCh
}

return opt, nil
}
Expand Down
4 changes: 4 additions & 0 deletions influxql/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type SelectOptions struct {

// The upper bound for a select call.
MaxTime time.Time

// An optional channel that, if closed, signals that the select should be
// interrupted.
InterruptCh <-chan struct{}
}

// Select executes stmt against ic and returns a list of iterators to stream from.
Expand Down
13 changes: 11 additions & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,14 +686,23 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator
if err != nil {
return nil, err
}
return influxql.NewCallIterator(influxql.NewMergeIterator(inputs, opt), opt)

input := influxql.NewMergeIterator(inputs, opt)
if opt.InterruptCh != nil {
input = influxql.NewInterruptIterator(input, opt.InterruptCh)
}
return influxql.NewCallIterator(input, opt)
}

itrs, err := e.createVarRefIterator(opt)
if err != nil {
return nil, err
}
return influxql.NewSortedMergeIterator(itrs, opt), nil
itr := influxql.NewSortedMergeIterator(itrs, opt)
if opt.InterruptCh != nil {
itr = influxql.NewInterruptIterator(itr, opt.InterruptCh)
}
return itr, nil
}

func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
Expand Down

0 comments on commit 1db7c9e

Please sign in to comment.