From 078e56182095b0456fe63ed47413fbe06242d4e3 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 6 May 2016 10:01:40 -0600 Subject: [PATCH] parallelize iterators --- CHANGELOG.md | 1 + influxql/iterator.gen.go | 236 ++++++++++++++++++++++++++++++++++ influxql/iterator.gen.go.tmpl | 60 ++++++++- influxql/iterator.go | 55 ++++++++ tsdb/engine/tsm1/engine.go | 3 +- 5 files changed, 353 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 735f9429e16..275b45a2bc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ With this release InfluxDB is moving to Go v1.6. - [#6533](https://github.com/influxdata/influxdb/issues/6533): Optimize SHOW SERIES - [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3) - [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol +- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators ### Bugfixes diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index e9926deede2..40c6149fe45 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -435,6 +435,65 @@ type floatSortedMergeHeapItem struct { ascending bool } +// floatParallelIterator represents an iterator that pulls data in a separate goroutine. +type floatParallelIterator struct { + input FloatIterator + ch chan floatPointError + + once sync.Once + closing chan struct{} +} + +// newFloatParallelIterator returns a new instance of floatParallelIterator. +func newFloatParallelIterator(input FloatIterator) *floatParallelIterator { + itr := &floatParallelIterator{ + input: input, + ch: make(chan floatPointError, 1), + closing: make(chan struct{}), + } + go itr.monitor() + return itr +} + +// Stats returns stats from the underlying iterator. +func (itr *floatParallelIterator) Stats() IteratorStats { return itr.input.Stats() } + +// Close closes the underlying iterators. +func (itr *floatParallelIterator) Close() error { + itr.once.Do(func() { close(itr.closing) }) + return itr.input.Close() +} + +// Next returns the next point from the iterator. +func (itr *floatParallelIterator) Next() (*FloatPoint, error) { + v, ok := <-itr.ch + if !ok { + return nil, io.EOF + } + return v.point, v.err +} + +// monitor runs in a separate goroutine and actively pulls the next point. +func (itr *floatParallelIterator) monitor() { + defer close(itr.ch) + + for { + // Read next point. + p, err := itr.input.Next() + + select { + case <-itr.closing: + return + case itr.ch <- floatPointError{point: p, err: err}: + } + } +} + +type floatPointError struct { + point *FloatPoint + err error +} + // floatLimitIterator represents an iterator that limits points per group. type floatLimitIterator struct { input FloatIterator @@ -2384,6 +2443,65 @@ type integerSortedMergeHeapItem struct { ascending bool } +// integerParallelIterator represents an iterator that pulls data in a separate goroutine. +type integerParallelIterator struct { + input IntegerIterator + ch chan integerPointError + + once sync.Once + closing chan struct{} +} + +// newIntegerParallelIterator returns a new instance of integerParallelIterator. +func newIntegerParallelIterator(input IntegerIterator) *integerParallelIterator { + itr := &integerParallelIterator{ + input: input, + ch: make(chan integerPointError, 1), + closing: make(chan struct{}), + } + go itr.monitor() + return itr +} + +// Stats returns stats from the underlying iterator. +func (itr *integerParallelIterator) Stats() IteratorStats { return itr.input.Stats() } + +// Close closes the underlying iterators. +func (itr *integerParallelIterator) Close() error { + itr.once.Do(func() { close(itr.closing) }) + return itr.input.Close() +} + +// Next returns the next point from the iterator. +func (itr *integerParallelIterator) Next() (*IntegerPoint, error) { + v, ok := <-itr.ch + if !ok { + return nil, io.EOF + } + return v.point, v.err +} + +// monitor runs in a separate goroutine and actively pulls the next point. +func (itr *integerParallelIterator) monitor() { + defer close(itr.ch) + + for { + // Read next point. + p, err := itr.input.Next() + + select { + case <-itr.closing: + return + case itr.ch <- integerPointError{point: p, err: err}: + } + } +} + +type integerPointError struct { + point *IntegerPoint + err error +} + // integerLimitIterator represents an iterator that limits points per group. type integerLimitIterator struct { input IntegerIterator @@ -4330,6 +4448,65 @@ type stringSortedMergeHeapItem struct { ascending bool } +// stringParallelIterator represents an iterator that pulls data in a separate goroutine. +type stringParallelIterator struct { + input StringIterator + ch chan stringPointError + + once sync.Once + closing chan struct{} +} + +// newStringParallelIterator returns a new instance of stringParallelIterator. +func newStringParallelIterator(input StringIterator) *stringParallelIterator { + itr := &stringParallelIterator{ + input: input, + ch: make(chan stringPointError, 1), + closing: make(chan struct{}), + } + go itr.monitor() + return itr +} + +// Stats returns stats from the underlying iterator. +func (itr *stringParallelIterator) Stats() IteratorStats { return itr.input.Stats() } + +// Close closes the underlying iterators. +func (itr *stringParallelIterator) Close() error { + itr.once.Do(func() { close(itr.closing) }) + return itr.input.Close() +} + +// Next returns the next point from the iterator. +func (itr *stringParallelIterator) Next() (*StringPoint, error) { + v, ok := <-itr.ch + if !ok { + return nil, io.EOF + } + return v.point, v.err +} + +// monitor runs in a separate goroutine and actively pulls the next point. +func (itr *stringParallelIterator) monitor() { + defer close(itr.ch) + + for { + // Read next point. + p, err := itr.input.Next() + + select { + case <-itr.closing: + return + case itr.ch <- stringPointError{point: p, err: err}: + } + } +} + +type stringPointError struct { + point *StringPoint + err error +} + // stringLimitIterator represents an iterator that limits points per group. type stringLimitIterator struct { input StringIterator @@ -6276,6 +6453,65 @@ type booleanSortedMergeHeapItem struct { ascending bool } +// booleanParallelIterator represents an iterator that pulls data in a separate goroutine. +type booleanParallelIterator struct { + input BooleanIterator + ch chan booleanPointError + + once sync.Once + closing chan struct{} +} + +// newBooleanParallelIterator returns a new instance of booleanParallelIterator. +func newBooleanParallelIterator(input BooleanIterator) *booleanParallelIterator { + itr := &booleanParallelIterator{ + input: input, + ch: make(chan booleanPointError, 1), + closing: make(chan struct{}), + } + go itr.monitor() + return itr +} + +// Stats returns stats from the underlying iterator. +func (itr *booleanParallelIterator) Stats() IteratorStats { return itr.input.Stats() } + +// Close closes the underlying iterators. +func (itr *booleanParallelIterator) Close() error { + itr.once.Do(func() { close(itr.closing) }) + return itr.input.Close() +} + +// Next returns the next point from the iterator. +func (itr *booleanParallelIterator) Next() (*BooleanPoint, error) { + v, ok := <-itr.ch + if !ok { + return nil, io.EOF + } + return v.point, v.err +} + +// monitor runs in a separate goroutine and actively pulls the next point. +func (itr *booleanParallelIterator) monitor() { + defer close(itr.ch) + + for { + // Read next point. + p, err := itr.input.Next() + + select { + case <-itr.closing: + return + case itr.ch <- booleanPointError{point: p, err: err}: + } + } +} + +type booleanPointError struct { + point *BooleanPoint + err error +} + // booleanLimitIterator represents an iterator that limits points per group. type booleanLimitIterator struct { input BooleanIterator diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 776338d78b2..b9cc9aa48f0 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -300,7 +300,6 @@ type {{$k.name}}MergeHeapItem struct { err error } - // {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one. type {{$k.name}}SortedMergeIterator struct { inputs []{{$k.Name}}Iterator @@ -434,6 +433,65 @@ type {{$k.name}}SortedMergeHeapItem struct { ascending bool } +// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine. +type {{$k.name}}ParallelIterator struct { + input {{$k.Name}}Iterator + ch chan {{$k.name}}PointError + + once sync.Once + closing chan struct{} +} + +// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator. +func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator { + itr := &{{$k.name}}ParallelIterator{ + input: input, + ch: make(chan {{$k.name}}PointError, 1), + closing: make(chan struct{}), + } + go itr.monitor() + return itr +} + +// Stats returns stats from the underlying iterator. +func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() } + +// Close closes the underlying iterators. +func (itr *{{$k.name}}ParallelIterator) Close() error { + itr.once.Do(func() { close(itr.closing) }) + return itr.input.Close() +} + +// Next returns the next point from the iterator. +func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) { + v, ok := <-itr.ch + if !ok { + return nil, io.EOF + } + return v.point, v.err +} + +// monitor runs in a separate goroutine and actively pulls the next point. +func (itr *{{$k.name}}ParallelIterator) monitor() { + defer close(itr.ch) + + for { + // Read next point. + p, err := itr.input.Next() + + select { + case <-itr.closing: + return + case itr.ch <- {{$k.name}}PointError{point: p, err: err}: + } + } +} + +type {{$k.name}}PointError struct { + point *{{$k.Name}}Point + err error +} + // {{$k.name}}LimitIterator represents an iterator that limits points per group. type {{$k.name}}LimitIterator struct { input {{$k.Name}}Iterator diff --git a/influxql/iterator.go b/influxql/iterator.go index 1d86cfad9f9..f1517ed558e 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -139,6 +139,41 @@ func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator { } } +// NewParallelMergeIterator returns an iterator that breaks input iterators +// into groups and processes them in parallel. +func NewParallelMergeIterator(inputs []Iterator, opt IteratorOptions, parallelism int) Iterator { + inputs = Iterators(inputs).filterNonNil() + if len(inputs) == 0 { + return nil + } else if len(inputs) == 1 { + return inputs[0] + } + + // Limit parallelism to the number of inputs. + if len(inputs) < parallelism { + parallelism = len(inputs) + } + + // Determine the number of inputs per output iterator. + n := len(inputs) / parallelism + + // Group iterators together. + outputs := make([]Iterator, parallelism) + for i := range outputs { + var slice []Iterator + if i < len(outputs)-1 { + slice = inputs[i*n : (i+1)*n] + } else { + slice = inputs[i*n:] + } + + outputs[i] = newParallelIterator(NewMergeIterator(slice, opt)) + } + + // Merge all groups together. + return NewMergeIterator(outputs, opt) +} + // NewSortedMergeIterator returns an iterator to merge itrs into one. // Inputs must either be sorted merge iterators or only contain a single // name/tag in sorted order. The iterator will output all points by name/tag, @@ -164,6 +199,26 @@ func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator { } } +// newParallelIterator returns an iterator that runs in a separate goroutine. +func newParallelIterator(input Iterator) Iterator { + if input == nil { + return nil + } + + switch itr := input.(type) { + case FloatIterator: + return newFloatParallelIterator(itr) + case IntegerIterator: + return newIntegerParallelIterator(itr) + case StringIterator: + return newStringParallelIterator(itr) + case BooleanIterator: + return newBooleanParallelIterator(itr) + default: + panic(fmt.Sprintf("unsupported parallel iterator type: %T", itr)) + } +} + // NewLimitIterator returns an iterator that limits the number of points per grouping. func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator { switch input := input.(type) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 19b1d812810..c79e93e0e24 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -8,6 +8,7 @@ import ( "math" "os" "path/filepath" + "runtime" "strings" "sync" "time" @@ -763,7 +764,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator inputs[i] = itr } - return influxql.NewMergeIterator(inputs, opt), nil + return influxql.NewParallelMergeIterator(inputs, opt, runtime.GOMAXPROCS(0)), nil } itrs, err := e.createVarRefIterator(opt)