Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118556: colexecagg: use heuristic to grow alloc size for hash aggregation r=yuzefovich a=yuzefovich

**colexecagg: use heuristic to grow alloc size for hash aggregation**

This commit changes how we determine the size of allocations for the hash aggregation (used to batch-allocate things for multiple buckets or groups). Previously, we would always use a fixed allocation size of 128 which is quite suboptimal with small number of groups. This commit fixes that by growing the allocation size exponentially, starting at 1, and until it reaches 128. As a result, we should be significantly more efficient in cases when we have few groups and slightly less efficient when we have a lot of groups.

Note that the following commit will use the optimizer-driven estimate for the total number of groups when computing the initial allocation size.

Additionally, this commit makes it so that every time the same agg alloc object is shared between multiple functions we increase its alloc size accordingly. We were already doing that for ordered and window aggregation, but now it makes sense do so for the hash aggregation too. Exponentially growing alloc size is handled similarly.

**sql: use optimizer-driven estimate for allocation size**

This commit plumbs the estimated row count for the number of rows produced by the GroupBy from the optimizer all the way to the execution, and then it uses this estimate to come up with the initial allocation size in the hash aggregator in the vectorized engine (mentioned in the previous commit).

As expected, benchmarks (assuming the perfect optimizer estimation) show noticeable improvement in some cases, both in speed and size of allocations, slightly increasing the number of allocated objects in a few cases: https://gist.github.com/yuzefovich/1546c9d133c6ba52cdda6ecd9297ba7d.

Informs: cockroachdb#117546.
Epic: CRDB-35243

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 2, 2024
2 parents fce4d47 + c7d322b commit 0778173
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 55 deletions.
22 changes: 13 additions & 9 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,19 +1166,23 @@ func benchmarkAggregateFunction(
"%s/%s/%s%s/groupSize=%d%s/numInputRows=%d",
fName, agg.name, inputTypesString, numSameAggsSuffix, groupSize, distinctProbString, numInputRows),
func(b *testing.B) {
// Simulate the scenario when the optimizer has the perfect
// estimate.
estimatedRowCount := uint64(math.Ceil(float64(numInputRows) / float64(groupSize)))
b.SetBytes(int64(argumentsSize * numInputRows))
b.ResetTimer()
for i := 0; i < b.N; i++ {
a := agg.new(ctx, &colexecagg.NewAggregatorArgs{
Allocator: testAllocator,
MemAccount: testMemAcc,
Input: source,
InputTypes: tc.typs,
Spec: tc.spec,
EvalCtx: &evalCtx,
Constructors: constructors,
ConstArguments: constArguments,
OutputTypes: outputTypes,
Allocator: testAllocator,
MemAccount: testMemAcc,
Input: source,
InputTypes: tc.typs,
Spec: tc.spec,
EvalCtx: &evalCtx,
Constructors: constructors,
ConstArguments: constArguments,
OutputTypes: outputTypes,
EstimatedRowCount: estimatedRowCount,
})
a.Init(ctx)
// Exhaust aggregator until all batches have been read or limit, if
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,10 +974,11 @@ func NewColOperator(
// Make a copy of the evalCtx since we're modifying it below.
evalCtx := flowCtx.NewEvalCtx()
newAggArgs := &colexecagg.NewAggregatorArgs{
Input: inputs[0].Root,
InputTypes: spec.Input[0].ColumnTypes,
Spec: aggSpec,
EvalCtx: evalCtx,
Input: inputs[0].Root,
InputTypes: spec.Input[0].ColumnTypes,
Spec: aggSpec,
EvalCtx: evalCtx,
EstimatedRowCount: args.Spec.EstimatedRowCount,
}
newAggArgs.Constructors, newAggArgs.ConstArguments, newAggArgs.OutputTypes, err = colexecagg.ProcessAggregations(
ctx, evalCtx, args.ExprHelper.SemaCtx, aggSpec.Aggregations, spec.Input[0].ColumnTypes,
Expand Down Expand Up @@ -1306,9 +1307,10 @@ func NewColOperator(
// Make a copy of the evalCtx since we're modifying it below.
evalCtx := flowCtx.NewEvalCtx()
newAggArgs := &colexecagg.NewAggregatorArgs{
InputTypes: joinOutputTypes,
Spec: aggSpec,
EvalCtx: evalCtx,
InputTypes: joinOutputTypes,
Spec: aggSpec,
EvalCtx: evalCtx,
EstimatedRowCount: args.Spec.EstimatedRowCount,
}
newAggArgs.Constructors, newAggArgs.ConstArguments, newAggArgs.OutputTypes, err = colexecagg.ProcessAggregations(
ctx, evalCtx, args.ExprHelper.SemaCtx, aggSpec.Aggregations, joinOutputTypes,
Expand Down Expand Up @@ -1647,7 +1649,8 @@ func NewColOperator(
// Min and max window functions have specialized implementations
// when the frame can shrink and has a default exclusion clause.
aggFnsAlloc, _, toClose, err = colexecagg.NewAggregateFuncsAlloc(
ctx, &aggArgs, aggregations, 1 /* allocSize */, colexecagg.WindowAggKind,
ctx, &aggArgs, aggregations, 1, /* initialAllocSize */
1 /* maxAllocSize */, colexecagg.WindowAggKind,
)
if err != nil {
colexecerror.InternalError(err)
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colexec/colexecagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/buildutil",
"//pkg/util/duration",
"//pkg/util/json", # keep
"//pkg/util/mon",
Expand Down
67 changes: 41 additions & 26 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -203,8 +202,8 @@ type aggregateFuncAlloc interface {
// newAggFunc returns the aggregate function from the pool with all
// necessary fields initialized.
newAggFunc() AggregateFunc
// incAllocSize increments allocSize of this allocator by one.
incAllocSize()
// increaseAllocSize increments allocSize of this allocator by delta.
increaseAllocSize(delta int64)
}

// AggregateFuncsAlloc is a utility struct that pools allocations of multiple
Expand All @@ -215,8 +214,11 @@ type aggregateFuncAlloc interface {
type AggregateFuncsAlloc struct {
allocator *colmem.Allocator
// allocSize determines the number of objects allocated when the previous
// allocations have been used up.
// allocations have been used up. This number will grow exponentially until
// it reaches maxAllocSize.
allocSize int64
// maxAllocSize determines the maximum allocSize value.
maxAllocSize int64
// returnFuncs is the pool for the slice to be returned in
// makeAggregateFuncs.
returnFuncs []AggregateFunc
Expand Down Expand Up @@ -249,9 +251,16 @@ func NewAggregateFuncsAlloc(
ctx context.Context,
args *NewAggregatorArgs,
aggregations []execinfrapb.AggregatorSpec_Aggregation,
allocSize int64,
initialAllocSize int64,
maxAllocSize int64,
aggKind AggKind,
) (*AggregateFuncsAlloc, *colconv.VecToDatumConverter, colexecop.Closers, error) {
if initialAllocSize > maxAllocSize {
return nil, nil, nil, errors.AssertionFailedf(
"initialAllocSize %d must be no greater than maxAllocSize %d", initialAllocSize, maxAllocSize,
)
}
allocSize := initialAllocSize
funcAllocs := make([]aggregateFuncAlloc, len(aggregations))
var toClose colexecop.Closers
var vecIdxsToConvert []int
Expand Down Expand Up @@ -506,31 +515,15 @@ func NewAggregateFuncsAlloc(
if !freshAllocator {
// If we're reusing the same allocator as for one of the
// previous aggregate functions, we want to increment the alloc
// size (unless we're doing the hash aggregation). This is the
// case since we have allocSize = 1, so for all usages of this
// allocator (except for the first one) we want to bump
// funcAllocs[i].allocSize by 1.
// TODO(yuzefovich): if we ever make allocSize configurable,
// this logic should probably be changed (like if we make the
// alloc size 1 for hash aggregation, then we'd need to bump it
// with every reuse).
switch aggKind {
case OrderedAggKind, WindowAggKind:
if buildutil.CrdbTestBuild {
if allocSize != 1 {
colexecerror.InternalError(errors.AssertionFailedf(
"expected alloc size of 1, found %d", allocSize,
))
}
}
funcAllocs[i].incAllocSize()
}
// size accordingly.
funcAllocs[i].increaseAllocSize(initialAllocSize)
}
}
}
return &AggregateFuncsAlloc{
allocator: args.Allocator,
allocSize: allocSize,
maxAllocSize: maxAllocSize,
aggFuncAllocs: funcAllocs,
}, inputArgsConverter, toClose, nil
}
Expand All @@ -550,6 +543,28 @@ func (a *AggregateFuncsAlloc) MakeAggregateFuncs() []AggregateFunc {
// of 'allocSize x number of funcs in schema' length. Every
// aggFuncAlloc will allocate allocSize of objects on the newAggFunc
// call below.
//
// But first check whether we need to grow allocSize.
if a.returnFuncs != nil {
// We're doing the very first allocation when returnFuncs is nil, so
// we don't change the allocSize then.
if a.allocSize < a.maxAllocSize {
// We need to grow the alloc size of both this alloc object and
// all aggAlloc objects.
newAllocSize := a.allocSize * 2
if newAllocSize > a.maxAllocSize {
newAllocSize = a.maxAllocSize
}
delta := newAllocSize - a.allocSize
a.allocSize = newAllocSize
// Note that the same agg alloc object can be present multiple
// times in the aggFuncAllocs slice, and we do want to increase
// its alloc size every time we see it.
for _, alloc := range a.aggFuncAllocs {
alloc.increaseAllocSize(delta)
}
}
}
a.allocator.AdjustMemoryUsage(aggregateFuncSliceOverhead + sizeOfAggregateFunc*int64(len(a.aggFuncAllocs))*a.allocSize)
a.returnFuncs = make([]AggregateFunc, len(a.aggFuncAllocs)*int(a.allocSize))
}
Expand All @@ -566,8 +581,8 @@ type aggAllocBase struct {
allocSize int64
}

func (a *aggAllocBase) incAllocSize() {
a.allocSize++
func (a *aggAllocBase) increaseAllocSize(delta int64) {
a.allocSize += delta
}

// ProcessAggregations processes all aggregate functions specified in
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/colexecagg/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type NewAggregatorArgs struct {
Constructors []execagg.AggregateConstructor
ConstArguments []tree.Datums
OutputTypes []*types.T
// EstimatedRowCount, if set, is the number of rows that will be output by
// the aggregator (i.e. total number of groups). At time of this writing it
// is only used for initialAllocSize in the hash aggregator.
// TODO(yuzefovich): consider using this information for other things too
// (e.g. sizing the output batch).
EstimatedRowCount uint64

TestingKnobs struct {
// HashTableNumBuckets if positive will override the initial number of
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecwindow/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,8 @@ func BenchmarkWindowFunctions(b *testing.B) {
colexecagg.ProcessAggregations(ctx, &evalCtx, &semaCtx, aggregations, sourceTypes)
require.NoError(b, err)
aggFnsAlloc, _, toClose, err := colexecagg.NewAggregateFuncsAlloc(
ctx, &aggArgs, aggregations, 1 /* allocSize */, colexecagg.WindowAggKind,
ctx, &aggArgs, aggregations, 1, /* initialAllocSize */
1 /* maxAllocSize */, colexecagg.WindowAggKind,
)
require.NoError(b, err)
op = NewWindowAggregatorOperator(
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,15 @@ func NewHashAggregator(
args *colexecagg.NewHashAggregatorArgs,
newSpillingQueueArgs *colexecutils.NewSpillingQueueArgs,
) colexecop.ResettableOperator {
initialAllocSize, maxAllocSize := int64(1), int64(hashAggregatorAllocSize)
if args.EstimatedRowCount != 0 {
initialAllocSize = int64(args.EstimatedRowCount)
if initialAllocSize > maxAllocSize {
initialAllocSize = maxAllocSize
}
}
aggFnsAlloc, inputArgsConverter, toClose, err := colexecagg.NewAggregateFuncsAlloc(
ctx, args.NewAggregatorArgs, args.Spec.Aggregations, hashAggregatorAllocSize, colexecagg.HashAggKind,
ctx, args.NewAggregatorArgs, args.Spec.Aggregations, initialAllocSize, maxAllocSize, colexecagg.HashAggKind,
)
if err != nil {
colexecerror.InternalError(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/ordered_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func NewOrderedAggregator(
// We will be reusing the same aggregate functions, so we use 1 as the
// allocation size.
funcsAlloc, inputArgsConverter, toClose, err := colexecagg.NewAggregateFuncsAlloc(
ctx, args, args.Spec.Aggregations, 1 /* allocSize */, colexecagg.OrderedAggKind,
ctx, args, args.Spec.Aggregations, 1, /* initialAllocSize */
1 /* maxAllocSize */, colexecagg.OrderedAggKind,
)
if err != nil {
colexecerror.InternalError(err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,7 @@ type aggregatorPlanningInfo struct {
inputMergeOrdering execinfrapb.Ordering
reqOrdering ReqOrdering
allowPartialDistribution bool
estimatedRowCount uint64
}

// addAggregators adds aggregators corresponding to a groupNode and updates the plan to
Expand Down Expand Up @@ -2328,6 +2329,7 @@ func (dsp *DistSQLPlanner) addAggregators(
groupColOrdering: n.groupColOrdering,
inputMergeOrdering: dsp.convertOrdering(planReqOrdering(n.plan), p.PlanToStreamColMap),
reqOrdering: n.reqOrdering,
estimatedRowCount: n.estimatedRowCount,
})
}

Expand Down Expand Up @@ -2977,6 +2979,11 @@ func (dsp *DistSQLPlanner) planAggregators(
p.SetMergeOrdering(dsp.convertOrdering(info.reqOrdering, p.PlanToStreamColMap))
}

// Set the estimated output row count if we have it available.
for _, pIdx := range p.ResultRouters {
p.Processors[pIdx].Spec.EstimatedRowCount = info.estimatedRowCount
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ func (e *distSQLSpecExecFactory) constructAggregators(
aggregations []exec.AggInfo,
reqOrdering exec.OutputOrdering,
isScalar bool,
estimatedRowCount uint64,
) (exec.Node, error) {
physPlan, plan := getPhysPlan(input)
// planAggregators() itself decides whether to distribute the aggregation.
Expand Down Expand Up @@ -546,6 +547,7 @@ func (e *distSQLSpecExecFactory) constructAggregators(
groupColOrdering: groupColOrdering,
inputMergeOrdering: physPlan.MergeOrdering,
reqOrdering: ReqOrdering(reqOrdering),
estimatedRowCount: estimatedRowCount,
},
); err != nil {
return nil, err
Expand All @@ -561,6 +563,7 @@ func (e *distSQLSpecExecFactory) ConstructGroupBy(
aggregations []exec.AggInfo,
reqOrdering exec.OutputOrdering,
groupingOrderType exec.GroupingOrderType,
estimatedRowCount uint64,
) (exec.Node, error) {
return e.constructAggregators(
input,
Expand All @@ -569,6 +572,7 @@ func (e *distSQLSpecExecFactory) ConstructGroupBy(
aggregations,
reqOrdering,
false, /* isScalar */
estimatedRowCount,
)
}

Expand All @@ -582,6 +586,7 @@ func (e *distSQLSpecExecFactory) ConstructScalarGroupBy(
aggregations,
exec.OutputOrdering{}, /* reqOrdering */
true, /* isScalar */
1, /* estimatedRowCount */
)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type groupNode struct {
funcs []*aggregateFuncHolder

reqOrdering ReqOrdering

// estimatedRowCount, when set, is the estimated number of rows that this
// groupNode will output.
estimatedRowCount uint64
}

func (n *groupNode) startExec(params runParams) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,8 +1641,12 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) {
return execPlan{}, err
}
orderType := exec.GroupingOrderType(groupBy.GroupingOrderType(&groupBy.RequiredPhysical().Ordering))
var rowCount uint64
if relProps := groupBy.Relational(); relProps.Statistics().Available {
rowCount = uint64(relProps.Statistics().RowCount)
}
ep.root, err = b.factory.ConstructGroupBy(
input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrdering, orderType,
input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrdering, orderType, rowCount,
)
}
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ define GroupBy {
# The grouping column order type (Streaming, PartialStreaming, or
# NoStreaming).
groupingOrderType exec.GroupingOrderType

# If set, the estimated number of rows that this GroupBy will output.
estimatedRowCount uint64
}

# ScalarGroupBy runs a scalar aggregation, i.e. one which performs a set of
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,19 +503,21 @@ func (ef *execFactory) ConstructGroupBy(
aggregations []exec.AggInfo,
reqOrdering exec.OutputOrdering,
groupingOrderType exec.GroupingOrderType,
estimatedRowCount uint64,
) (exec.Node, error) {
inputPlan := input.(planNode)
inputCols := planColumns(inputPlan)
// TODO(harding): Use groupingOrder to determine when to use a hash
// aggregator.
n := &groupNode{
plan: inputPlan,
funcs: make([]*aggregateFuncHolder, 0, len(groupCols)+len(aggregations)),
columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations),
groupCols: convertNodeOrdinalsToInts(groupCols),
groupColOrdering: groupColOrdering,
isScalar: false,
reqOrdering: ReqOrdering(reqOrdering),
plan: inputPlan,
funcs: make([]*aggregateFuncHolder, 0, len(groupCols)+len(aggregations)),
columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations),
groupCols: convertNodeOrdinalsToInts(groupCols),
groupColOrdering: groupColOrdering,
isScalar: false,
reqOrdering: ReqOrdering(reqOrdering),
estimatedRowCount: estimatedRowCount,
}
for _, col := range n.groupCols {
// TODO(radu): only generate the grouping columns we actually need.
Expand Down

0 comments on commit 0778173

Please sign in to comment.