Skip to content

Commit

Permalink
Merge pull request #6585 from benbjohnson/parallelize
Browse files Browse the repository at this point in the history
Parallelize iterators
  • Loading branch information
benbjohnson committed May 10, 2016
2 parents b4f922d + 078e561 commit 514c98c
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ With this release InfluxDB is moving to Go v1.6.
- [#6533](https://github.com/influxdata/influxdb/issues/6533): Optimize SHOW SERIES
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators

### Bugfixes

Expand Down
236 changes: 236 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,65 @@ type floatSortedMergeHeapItem struct {
ascending bool
}

// floatParallelIterator represents an iterator that pulls data in a separate goroutine.
type floatParallelIterator struct {
input FloatIterator
ch chan floatPointError

once sync.Once
closing chan struct{}
}

// newFloatParallelIterator returns a new instance of floatParallelIterator.
func newFloatParallelIterator(input FloatIterator) *floatParallelIterator {
itr := &floatParallelIterator{
input: input,
ch: make(chan floatPointError, 1),
closing: make(chan struct{}),
}
go itr.monitor()
return itr
}

// Stats returns stats from the underlying iterator.
func (itr *floatParallelIterator) Stats() IteratorStats { return itr.input.Stats() }

// Close closes the underlying iterators.
func (itr *floatParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
}

// Next returns the next point from the iterator.
func (itr *floatParallelIterator) Next() (*FloatPoint, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}

// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *floatParallelIterator) monitor() {
defer close(itr.ch)

for {
// Read next point.
p, err := itr.input.Next()

select {
case <-itr.closing:
return
case itr.ch <- floatPointError{point: p, err: err}:
}
}
}

type floatPointError struct {
point *FloatPoint
err error
}

// floatLimitIterator represents an iterator that limits points per group.
type floatLimitIterator struct {
input FloatIterator
Expand Down Expand Up @@ -2384,6 +2443,65 @@ type integerSortedMergeHeapItem struct {
ascending bool
}

// integerParallelIterator represents an iterator that pulls data in a separate goroutine.
type integerParallelIterator struct {
input IntegerIterator
ch chan integerPointError

once sync.Once
closing chan struct{}
}

// newIntegerParallelIterator returns a new instance of integerParallelIterator.
func newIntegerParallelIterator(input IntegerIterator) *integerParallelIterator {
itr := &integerParallelIterator{
input: input,
ch: make(chan integerPointError, 1),
closing: make(chan struct{}),
}
go itr.monitor()
return itr
}

// Stats returns stats from the underlying iterator.
func (itr *integerParallelIterator) Stats() IteratorStats { return itr.input.Stats() }

// Close closes the underlying iterators.
func (itr *integerParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
}

// Next returns the next point from the iterator.
func (itr *integerParallelIterator) Next() (*IntegerPoint, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}

// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *integerParallelIterator) monitor() {
defer close(itr.ch)

for {
// Read next point.
p, err := itr.input.Next()

select {
case <-itr.closing:
return
case itr.ch <- integerPointError{point: p, err: err}:
}
}
}

type integerPointError struct {
point *IntegerPoint
err error
}

// integerLimitIterator represents an iterator that limits points per group.
type integerLimitIterator struct {
input IntegerIterator
Expand Down Expand Up @@ -4330,6 +4448,65 @@ type stringSortedMergeHeapItem struct {
ascending bool
}

// stringParallelIterator represents an iterator that pulls data in a separate goroutine.
type stringParallelIterator struct {
input StringIterator
ch chan stringPointError

once sync.Once
closing chan struct{}
}

// newStringParallelIterator returns a new instance of stringParallelIterator.
func newStringParallelIterator(input StringIterator) *stringParallelIterator {
itr := &stringParallelIterator{
input: input,
ch: make(chan stringPointError, 1),
closing: make(chan struct{}),
}
go itr.monitor()
return itr
}

// Stats returns stats from the underlying iterator.
func (itr *stringParallelIterator) Stats() IteratorStats { return itr.input.Stats() }

// Close closes the underlying iterators.
func (itr *stringParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
}

// Next returns the next point from the iterator.
func (itr *stringParallelIterator) Next() (*StringPoint, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}

// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *stringParallelIterator) monitor() {
defer close(itr.ch)

for {
// Read next point.
p, err := itr.input.Next()

select {
case <-itr.closing:
return
case itr.ch <- stringPointError{point: p, err: err}:
}
}
}

type stringPointError struct {
point *StringPoint
err error
}

// stringLimitIterator represents an iterator that limits points per group.
type stringLimitIterator struct {
input StringIterator
Expand Down Expand Up @@ -6276,6 +6453,65 @@ type booleanSortedMergeHeapItem struct {
ascending bool
}

// booleanParallelIterator represents an iterator that pulls data in a separate goroutine.
type booleanParallelIterator struct {
input BooleanIterator
ch chan booleanPointError

once sync.Once
closing chan struct{}
}

// newBooleanParallelIterator returns a new instance of booleanParallelIterator.
func newBooleanParallelIterator(input BooleanIterator) *booleanParallelIterator {
itr := &booleanParallelIterator{
input: input,
ch: make(chan booleanPointError, 1),
closing: make(chan struct{}),
}
go itr.monitor()
return itr
}

// Stats returns stats from the underlying iterator.
func (itr *booleanParallelIterator) Stats() IteratorStats { return itr.input.Stats() }

// Close closes the underlying iterators.
func (itr *booleanParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
}

// Next returns the next point from the iterator.
func (itr *booleanParallelIterator) Next() (*BooleanPoint, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}

// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *booleanParallelIterator) monitor() {
defer close(itr.ch)

for {
// Read next point.
p, err := itr.input.Next()

select {
case <-itr.closing:
return
case itr.ch <- booleanPointError{point: p, err: err}:
}
}
}

type booleanPointError struct {
point *BooleanPoint
err error
}

// booleanLimitIterator represents an iterator that limits points per group.
type booleanLimitIterator struct {
input BooleanIterator
Expand Down
60 changes: 59 additions & 1 deletion influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ type {{$k.name}}MergeHeapItem struct {
err error
}


// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
type {{$k.name}}SortedMergeIterator struct {
inputs []{{$k.Name}}Iterator
Expand Down Expand Up @@ -434,6 +433,65 @@ type {{$k.name}}SortedMergeHeapItem struct {
ascending bool
}

// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine.
type {{$k.name}}ParallelIterator struct {
input {{$k.Name}}Iterator
ch chan {{$k.name}}PointError

once sync.Once
closing chan struct{}
}

// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator {
itr := &{{$k.name}}ParallelIterator{
input: input,
ch: make(chan {{$k.name}}PointError, 1),
closing: make(chan struct{}),
}
go itr.monitor()
return itr
}

// Stats returns stats from the underlying iterator.
func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() }

// Close closes the underlying iterators.
func (itr *{{$k.name}}ParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
}

// Next returns the next point from the iterator.
func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
v, ok := <-itr.ch
if !ok {
return nil, io.EOF
}
return v.point, v.err
}

// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *{{$k.name}}ParallelIterator) monitor() {
defer close(itr.ch)

for {
// Read next point.
p, err := itr.input.Next()

select {
case <-itr.closing:
return
case itr.ch <- {{$k.name}}PointError{point: p, err: err}:
}
}
}

type {{$k.name}}PointError struct {
point *{{$k.Name}}Point
err error
}

// {{$k.name}}LimitIterator represents an iterator that limits points per group.
type {{$k.name}}LimitIterator struct {
input {{$k.Name}}Iterator
Expand Down
Loading

0 comments on commit 514c98c

Please sign in to comment.