Skip to content

Commit

Permalink
Merge pull request #203 from grafana/20220905_close-page-and-handle-e…
Browse files Browse the repository at this point in the history
…rrors

Close page after read and return errors properly
  • Loading branch information
simonswine authored Sep 6, 2022
2 parents 9154e4c + c21a16f commit cd343b5
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 64 deletions.
9 changes: 6 additions & 3 deletions pkg/firedb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,22 +873,25 @@ func (r *parquetReader[M, P]) Close() error {
return nil
}

func (r *parquetReader[M, P]) relPath() string {
return r.persister.Name() + block.ParquetSuffix
}

func (r *parquetReader[M, P]) info() block.File {
return block.File{
Parquet: &block.ParquetFile{
NumRows: uint64(r.file.NumRows()),
NumRowGroups: uint64(len(r.file.RowGroups())),
},
SizeBytes: uint64(r.file.Size()),
RelPath: r.persister.Name() + block.ParquetSuffix,
RelPath: r.relPath(),
}
}

func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, predicate query.Predicate, alias string) query.Iterator {
index, _ := query.GetColumnIndexByPath(r.file, columnName)
if index == -1 {
// TODO - don't panic, error instead
panic("column not found in parquet file:" + columnName)
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
}
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
}
Expand Down
136 changes: 75 additions & 61 deletions pkg/firedb/query/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/opentracing/opentracing-go"
"github.com/segmentio/parquet-go"
pq "github.com/segmentio/parquet-go"

"github.com/grafana/fire/pkg/iter"
)
Expand Down Expand Up @@ -169,7 +170,6 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][]
buffer[i] = append(buffer[i], e.v)
break
}

}
}
return buffer
Expand All @@ -178,6 +178,10 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][]
// iterator - Every iterator follows this interface and can be composed.
type Iterator = iter.SeekIterator[*IteratorResult, RowNumberWithDefinitionLevel]

func NewErrIterator(err error) Iterator {
return iter.NewErrSeekIterator[*IteratorResult, RowNumberWithDefinitionLevel](err)
}

var columnIteratorPool = sync.Pool{
New: func() interface{} {
return &columnIteratorBuffer{}
Expand Down Expand Up @@ -253,6 +257,7 @@ var _ Iterator = (*ColumnIterator)(nil)
type columnIteratorBuffer struct {
rowNumbers []RowNumber
values []parquet.Value
err error
}

func NewColumnIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *ColumnIterator {
Expand Down Expand Up @@ -322,83 +327,91 @@ func (c *ColumnIterator) iterate(ctx context.Context, readSize int) {
}
}

pgs := col.Pages()
for {
span2, _ := opentracing.StartSpanFromContext(ctx2, "columnIterator.iterate.ReadPage")
pg, err := pgs.ReadPage()
span2.Finish()

if pg == nil || err == io.EOF {
break
}
if err != nil {
return
}
func(col pq.ColumnChunk) {
pgs := col.Pages()
defer func() {
if err := pgs.Close(); err != nil {
span.LogKV("closing error", err)
}
}()
for {
span2, _ := opentracing.StartSpanFromContext(ctx2, "columnIterator.iterate.ReadPage")
pg, err := pgs.ReadPage()
span2.Finish()

if checkSkip(pg.NumRows()) {
// Skip page
rn.Skip(pg.NumRows())
continue
}
if pg == nil || err == io.EOF {
break
}
if err != nil {
return
}

if c.filter != nil {
if !c.filter.KeepPage(pg) {
if checkSkip(pg.NumRows()) {
// Skip page
rn.Skip(pg.NumRows())
continue
}
}

vr := pg.Values()
for {
count, err := vr.ReadValues(buffer)
if count > 0 {
if c.filter != nil {
if !c.filter.KeepPage(pg) {
// Skip page
rn.Skip(pg.NumRows())
continue
}
}

vr := pg.Values()
for {
count, err := vr.ReadValues(buffer)
if count > 0 {

// Assign row numbers, filter values, and collect the results.
newBuffer := columnIteratorPoolGet(readSize, 0)
// Assign row numbers, filter values, and collect the results.
newBuffer := columnIteratorPoolGet(readSize, 0)

for i := 0; i < count; i++ {
for i := 0; i < count; i++ {

v := buffer[i]
v := buffer[i]

// We have to do this for all values (even if the
// value is excluded by the predicate)
rn.Next(v.RepetitionLevel(), v.DefinitionLevel())
// We have to do this for all values (even if the
// value is excluded by the predicate)
rn.Next(v.RepetitionLevel(), v.DefinitionLevel())

if c.filter != nil {
if !c.filter.KeepValue(v) {
continue
if c.filter != nil {
if !c.filter.KeepValue(v) {
continue
}
}

newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn)
newBuffer.values = append(newBuffer.values, v)
}

newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn)
newBuffer.values = append(newBuffer.values, v)
if len(newBuffer.rowNumbers) > 0 {
select {
case c.ch <- newBuffer:
case <-c.quit:
return
}
} else {
// All values excluded, we go ahead and immediately
// return the buffer to the pool.
columnIteratorPoolPut(newBuffer)
}
}

if len(newBuffer.rowNumbers) > 0 {
select {
case c.ch <- newBuffer:
case <-c.quit:
return
}
} else {
// All values excluded, we go ahead and immediately
// return the buffer to the pool.
columnIteratorPoolPut(newBuffer)
// Error checks MUST occur after processing any returned data
// following io.Reader behavior.
if err == io.EOF {
break
}
if err != nil {
c.ch <- &columnIteratorBuffer{err: err}
return
}
}

// Error checks MUST occur after processing any returned data
// following io.Reader behavior.
if err == io.EOF {
break
}
if err != nil {
// todo: bubble up?
return
}
}
}
}(col)
}
}

Expand All @@ -410,7 +423,6 @@ func (c *ColumnIterator) At() *IteratorResult {
// Next returns the next matching value from the iterator.
// Returns nil when finished.
func (c *ColumnIterator) Next() bool {

t, v := c.next()
if t.Valid() {
c.result = c.makeResult(t, v)
Expand Down Expand Up @@ -438,6 +450,10 @@ func (c *ColumnIterator) next() (RowNumber, parquet.Value) {
}

if v, ok := <-c.ch; ok {
if v.err != nil {
c.err = v.err
return EmptyRowNumber(), parquet.Value{}
}
// Got next buffer, guaranteed to have at least 1 element
c.curr = v
c.currN = 0
Expand Down Expand Up @@ -520,7 +536,6 @@ func (j *JoinIterator) At() *IteratorResult {
}

func (j *JoinIterator) Next() bool {

// Here is the algorithm for joins: On each pass of the iterators
// we remember which ones are pointing at the earliest rows. If all
// are the lowest (and therefore pointing at the same thing) then
Expand Down Expand Up @@ -675,8 +690,8 @@ func NewUnionIterator(definitionLevel int, iters []Iterator, pred GroupPredicate
func (u *UnionIterator) At() *IteratorResult {
return u.result
}
func (u *UnionIterator) Next() bool {

func (u *UnionIterator) Next() bool {
// Here is the algorithm for unions: On each pass of the iterators
// we remember which ones are pointing at the earliest same row. The
// lowest iterators are then collected and a result is produced. Keep
Expand Down Expand Up @@ -823,7 +838,6 @@ func NewKeyValueGroupPredicate(keys, values []string) *KeyValueGroupPredicate {
// KeepGroup checks if the given group contains all of the requested
// key/value pairs.
func (a *KeyValueGroupPredicate) KeepGroup(group *IteratorResult) bool {
//printGroup(group)
a.buffer = group.Columns(a.buffer, "keys", "values")

keys, vals := a.buffer[0], a.buffer[1]
Expand Down
128 changes: 128 additions & 0 deletions pkg/firedb/query/iters_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,139 @@
package parquetquery

import (
"context"
"errors"
"testing"

"github.com/segmentio/parquet-go"
"github.com/stretchr/testify/require"
)

type testData struct {
ID int64 `parquet:"id"`
Name string `parquet:"name"`
}

func newTestBuffer[A any](rows []A) parquet.RowGroup {
buffer := parquet.NewBuffer()
for i := range rows {
err := buffer.Write(rows[i])
if err != nil {
panic(err.Error())
}
}
return buffer
}

type errRowGroup struct {
parquet.RowGroup
}

func (e *errRowGroup) ColumnChunks() []parquet.ColumnChunk {
chunks := e.RowGroup.ColumnChunks()
for pos := range chunks {
chunks[pos] = &errColumnChunk{chunks[pos]}
}
return chunks
}

type errColumnChunk struct {
parquet.ColumnChunk
}

func (e *errColumnChunk) Pages() parquet.Pages {
return &errPages{e.ColumnChunk.Pages()}
}

type errPages struct {
parquet.Pages
}

func (e *errPages) ReadPage() (parquet.Page, error) {
p, err := e.Pages.ReadPage()
return &errPage{p}, err
}

type errPage struct {
parquet.Page
}

func (e *errPage) Values() parquet.ValueReader {
return &errValueReader{e.Page.Values()}
}

type errValueReader struct {
parquet.ValueReader
}

func (e *errValueReader) ReadValues(vals []parquet.Value) (int, error) {
_, _ = e.ValueReader.ReadValues(vals)
return 0, errors.New("read error")
}

func withReadValueError(rg []parquet.RowGroup) []parquet.RowGroup {
for pos := range rg {
rg[pos] = &errRowGroup{rg[pos]}
}
return rg
}

func newTestSet() []parquet.RowGroup {
return []parquet.RowGroup{
newTestBuffer(
[]testData{
{1, "one"},
{2, "two"},
}),
newTestBuffer(
[]testData{
{3, "three"},
{5, "five"},
}),
}
}

func TestColumnIterator(t *testing.T) {
for _, tc := range []struct {
name string
result []parquet.Value
rowGroups []parquet.RowGroup
err error
}{
{
name: "read-int-column",
rowGroups: newTestSet(),
result: []parquet.Value{
parquet.ValueOf(1),
parquet.ValueOf(2),
parquet.ValueOf(3),
parquet.ValueOf(5),
},
},
{
name: "err-read-values",
rowGroups: withReadValueError(newTestSet()),
err: errors.New("read error"),
},
} {
t.Run(tc.name, func(t *testing.T) {
var (
buffer [][]parquet.Value

ctx = context.Background()
i = NewColumnIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id")
)
for i.Next() {
require.Nil(t, i.Err())
buffer = i.At().Columns(buffer, "id")
}

require.Equal(t, tc.err, i.Err())

})
}
}

func TestRowNumber(t *testing.T) {
tr := EmptyRowNumber()
require.Equal(t, RowNumber{-1, -1, -1, -1, -1, -1}, tr)
Expand Down

0 comments on commit cd343b5

Please sign in to comment.