From ee162aa450a7cd1a06301bf63b5f6701b6cd46a1 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Thu, 28 Nov 2024 17:40:31 +0800 Subject: [PATCH] perf(v2): tune parquet row iterator batching (#3726) --- .../query_backend/query_profile_entry.go | 26 ++++++++++------- .../query_backend/query_time_series.go | 2 +- pkg/phlaredb/query/repeated.go | 29 +++++++++++++++---- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/pkg/experiment/query_backend/query_profile_entry.go b/pkg/experiment/query_backend/query_profile_entry.go index cf76362dca..8ceb8579e3 100644 --- a/pkg/experiment/query_backend/query_profile_entry.go +++ b/pkg/experiment/query_backend/query_profile_entry.go @@ -13,6 +13,20 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" ) +// As we expect rows to be very small, we want to fetch a bigger +// batch of rows at once to amortize the latency of reading. +const bigBatchSize = 2 << 10 + +type ProfileEntry struct { + RowNum int64 + Timestamp model.Time + Fingerprint model.Fingerprint + Labels phlaremodel.Labels + Partition uint64 +} + +func (e ProfileEntry) RowNumber() int64 { return e.RowNum } + func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[ProfileEntry], error) { series, err := getSeriesLabels(q.ds.Index(), q.req.matchers, groupBy...) if err != nil { @@ -28,7 +42,7 @@ func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[Pro buf := make([][]parquet.Value, 3) entries := iter.NewAsyncBatchIterator[*parquetquery.IteratorResult, ProfileEntry]( - results, 128, + results, bigBatchSize, func(r *parquetquery.IteratorResult) ProfileEntry { buf = r.Columns(buf, schemav1.SeriesIndexColumnName, @@ -48,16 +62,6 @@ func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[Pro return entries, nil } -type ProfileEntry struct { - RowNum int64 - Timestamp model.Time - Fingerprint model.Fingerprint - Labels phlaremodel.Labels - Partition uint64 -} - -func (e ProfileEntry) RowNumber() int64 { return e.RowNum } - type seriesLabels struct { fingerprint model.Fingerprint labels phlaremodel.Labels diff --git a/pkg/experiment/query_backend/query_time_series.go b/pkg/experiment/query_backend/query_time_series.go index 740348ed3e..8f8a85cb61 100644 --- a/pkg/experiment/query_backend/query_time_series.go +++ b/pkg/experiment/query_backend/query_time_series.go @@ -40,7 +40,7 @@ func queryTimeSeries(q *queryContext, query *queryv1.Query) (r *queryv1.Report, return nil, err } - rows := parquetquery.NewRepeatedRowIterator(q.ctx, entries, q.ds.Profiles().RowGroups(), column.ColumnIndex) + rows := parquetquery.NewRepeatedRowIteratorBatchSize(q.ctx, entries, q.ds.Profiles().RowGroups(), bigBatchSize, column.ColumnIndex) defer runutil.CloseWithErrCapture(&err, rows, "failed to close column iterator") builder := phlaremodel.NewTimeSeriesBuilder(query.TimeSeries.GroupBy...) diff --git a/pkg/phlaredb/query/repeated.go b/pkg/phlaredb/query/repeated.go index ff85f9e1cf..7c4dbb4eeb 100644 --- a/pkg/phlaredb/query/repeated.go +++ b/pkg/phlaredb/query/repeated.go @@ -38,6 +38,19 @@ const ( // // How many values we expect per a row, the upper boundary? repeatedRowColumnIteratorReadSize = 2 << 10 + + // Batch size specifies how many rows to read from a column at once. + // Note that the batched rows are buffered in-memory but do not reference + // the pages from which they were read. + // + // The default value is extremely conservative, as in most cases, rows are + // quite large (e.g., profile samples). Given that we run many queries + // concurrently, the memory waste outweighs the benefits of the "read-ahead" + // optimization that batching is intended to provide. + // + // However, in cases where the rows are small (such as in time series), + // the value should be increased significantly. + defaultBatchSize = 4 ) func NewRepeatedRowIterator[T any]( @@ -45,17 +58,23 @@ func NewRepeatedRowIterator[T any]( rows iter.Iterator[T], rowGroups []parquet.RowGroup, columns ...int, +) iter.Iterator[RepeatedRow[T]] { + return NewRepeatedRowIteratorBatchSize(ctx, rows, rowGroups, defaultBatchSize, columns...) +} + +func NewRepeatedRowIteratorBatchSize[T any]( + ctx context.Context, + rows iter.Iterator[T], + rowGroups []parquet.RowGroup, + batchSize int64, + columns ...int, ) iter.Iterator[RepeatedRow[T]] { rows, rowNumbers := iter.Tee(rows) return &repeatedRowIterator[T]{ rows: rows, columns: NewMultiColumnIterator(ctx, WrapWithRowNumber(rowNumbers), - // Batch size specifies how many rows to be read - // from a column at once. Note that the batched rows - // are buffered in-memory, but not reference pages - // they were read from. - 4, + int(batchSize), rowGroups, columns..., ),