Skip to content

Commit

Permalink
Merge #80810 #80874
Browse files Browse the repository at this point in the history
80810: colexecdisk: extract disk-backed operators into a new package r=yuzefovich a=yuzefovich

**colexecdisk: extract disk-backed operators into a new package**

This allows us to break dependency of `colexec` on `colexecjoin` which
speeds up the build a bit.

Addresses: #79357.

Release note: None

**execagg: extract out some aggregate helpers into a separate package**

This allows us to break dependency of `colexecagg` on `execinfra`.

Release note: None

80874: Add Andrew Baptist to AUTHORS r=andrewbaptist a=andrewbaptist



Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
  • Loading branch information
3 people committed May 2, 2022
3 parents b2de15f + c1dd4c3 + 72a192c commit 881d0ec
Show file tree
Hide file tree
Showing 48 changed files with 646 additions and 358 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Amy Gao <amy.gao@cockroachlabs.com>
Anantha Krishnan <kannan4mi3@gmail.com> Ananthakrishnan <kannan4mi3@gmail.com>
Andrei Matei <andrei@cockroachlabs.com> <andreimatei1@gmail.com>
Andrew B. Goode <andrewbgoode@gmail.com> nexdrew <andrewbgoode@gmail.com>
Andrew Baptist <baptist@cockroachlabs.com> <andrew.baptist@gmail.com>
Andrew Bonventre <abonventre@palantir.com> <andybons@gmail.com>
Andrew Couch <andrew@cockroachlabs.com> <github@couchand.com> <hi@andrewcou.ch>
Andrew Kryczka <andrew.kryczka2@gmail.com> Andrew Kryczka <ajkr@users.noreply.github.com> <@cockroachlabs.com>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ ALL_TESTS = [
"//pkg/sql/colexec/colexecbase:colexecbase_disallowed_imports_test",
"//pkg/sql/colexec/colexecbase:colexecbase_test",
"//pkg/sql/colexec/colexeccmp:colexeccmp_test",
"//pkg/sql/colexec/colexecdisk:colexecdisk_test",
"//pkg/sql/colexec/colexechash:colexechash_disallowed_imports_test",
"//pkg/sql/colexec/colexechash:colexechash_test",
"//pkg/sql/colexec/colexecjoin:colexecjoin_disallowed_imports_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ go_library(
"//pkg/sql/distsql",
"//pkg/sql/enum",
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
Expand Down
16 changes: 1 addition & 15 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@ go_library(
"columnarizer.go",
"constants.go",
"count.go",
"disk_spiller.go",
"external_distinct.go",
"external_hash_aggregator.go",
"external_hash_joiner.go",
"external_sort.go",
"hash_aggregator.go",
"hash_based_partitioner.go",
"invariants_checker.go",
"limit.go",
"materializer.go",
Expand All @@ -35,7 +29,6 @@ go_library(
"sorttopk.go",
"tuple_proj_op.go",
"unordered_distinct.go",
"utils.go",
"values.go",
":gen-exec", # keep
":gen-sort-partitioner", # keep
Expand All @@ -51,14 +44,12 @@ go_library(
"//pkg/settings",
"//pkg/sql/catalog/colinfo", # keep
"//pkg/sql/catalog/descpb",
"//pkg/sql/colcontainer",
"//pkg/sql/colconv",
"//pkg/sql/colexec/colexecagg", # keep
"//pkg/sql/colexec/colexecargs",
"//pkg/sql/colexec/colexecbase",
"//pkg/sql/colexec/colexeccmp", # keep
"//pkg/sql/colexec/colexechash", # keep
"//pkg/sql/colexec/colexecjoin",
"//pkg/sql/colexec/colexecutils",
"//pkg/sql/colexec/execgen", # keep
"//pkg/sql/colexecerror",
Expand All @@ -72,23 +63,17 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqltelemetry", # keep
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/humanizeutil",
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/stringarena",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
],
)

Expand Down Expand Up @@ -227,6 +212,7 @@ gen_sort_partitioner_rule(
disallowed_imports_test(
"colexec",
[
"//pkg/sql/colexec/colexecjoin",
"//pkg/sql/colflow",
"//pkg/sql/rowexec",
"//pkg/sql/rowflow",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/sql/colexec/colexecagg",
"//pkg/sql/colexec/colexecargs",
"//pkg/sql/colexec/colexecbase",
"//pkg/sql/colexec/colexecdisk",
"//pkg/sql/colexec/colexecjoin",
"//pkg/sql/colexec/colexecproj",
"//pkg/sql/colexec/colexecprojconst",
Expand Down
19 changes: 10 additions & 9 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecdisk"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecproj"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecprojconst"
Expand Down Expand Up @@ -413,7 +414,7 @@ func (r opResult) createDiskBackedSort(
// sorter regardless of which sorter variant we have instantiated (i.e.
// we don't take advantage of the limits and of partial ordering). We
// could improve this.
return colexec.NewOneInputDiskSpiller(
return colexecdisk.NewOneInputDiskSpiller(
input, inMemorySorter.(colexecop.BufferingInMemoryOperator),
sorterMemMonitorName,
func(input colexecop.Operator) colexecop.Operator {
Expand All @@ -434,7 +435,7 @@ func (r opResult) createDiskBackedSort(
ctx, flowCtx, opName+"-output", processorID,
), factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
es := colexec.NewExternalSorter(
es := colexecdisk.NewExternalSorter(
sortUnlimitedAllocator,
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
Expand Down Expand Up @@ -465,7 +466,7 @@ func (r opResult) makeDiskBackedSorterConstructor(
args *colexecargs.NewColOperatorArgs,
opNamePrefix redact.RedactableString,
factory coldata.ColumnFactory,
) colexec.DiskBackedSorterConstructor {
) colexecdisk.DiskBackedSorterConstructor {
return func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) colexecop.Operator {
if maxNumberPartitions < colexecop.ExternalSorterMinPartitions {
colexecerror.InternalError(errors.AssertionFailedf(
Expand Down Expand Up @@ -930,7 +931,7 @@ func NewColOperator(
// case, the wrapped aggregate functions might hit a memory
// error even when used by the external hash aggregator).
evalCtx.SingleDatumAggMemAccount = ehaMemAccount
result.Root = colexec.NewOneInputDiskSpiller(
result.Root = colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryHashAggregator.(colexecop.BufferingInMemoryOperator),
hashAggregatorMemMonitorName,
func(input colexecop.Operator) colexecop.Operator {
Expand All @@ -942,7 +943,7 @@ func NewColOperator(
newAggArgs.Allocator = colmem.NewAllocator(ctx, ehaMemAccount, factory)
newAggArgs.MemAccount = ehaMemAccount
newAggArgs.Input = input
return colexec.NewExternalHashAggregator(
return colexecdisk.NewExternalHashAggregator(
flowCtx,
args,
&newAggArgs,
Expand Down Expand Up @@ -998,14 +999,14 @@ func NewColOperator(
)
edOpName := redact.RedactableString("external-distinct")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID)
result.Root = colexec.NewOneInputDiskSpiller(
result.Root = colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryUnorderedDistinct.(colexecop.BufferingInMemoryOperator),
distinctMemMonitorName,
func(input colexecop.Operator) colexecop.Operator {
unlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, edOpName, spec.ProcessorID), factory,
)
return colexec.NewExternalDistinct(
return colexecdisk.NewExternalDistinct(
unlimitedAllocator,
flowCtx,
args,
Expand Down Expand Up @@ -1089,14 +1090,14 @@ func NewColOperator(
} else {
opName := redact.RedactableString("external-hash-joiner")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
result.Root = colexec.NewTwoInputDiskSpiller(
result.Root = colexecdisk.NewTwoInputDiskSpiller(
inputs[0].Root, inputs[1].Root, inMemoryHashJoiner.(colexecop.BufferingInMemoryOperator),
hashJoinerMemMonitorName,
func(inputOne, inputTwo colexecop.Operator) colexecop.Operator {
unlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
ehj := colexec.NewExternalHashJoiner(
ehj := colexecdisk.NewExternalHashJoiner(
unlimitedAllocator,
flowCtx,
args,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ go_library(
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -83,5 +83,6 @@ disallowed_imports_test(
"//pkg/sql/colexec/colexecprojconst",
"//pkg/sql/colexec/colexecsel",
"//pkg/sql/colexec/colexecwindow",
"//pkg/sql/execinfra",
],
)
8 changes: 4 additions & 4 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execagg"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -448,16 +448,16 @@ func ProcessAggregations(
aggregations []execinfrapb.AggregatorSpec_Aggregation,
inputTypes []*types.T,
) (
constructors []execinfra.AggregateConstructor,
constructors []execagg.AggregateConstructor,
constArguments []tree.Datums,
outputTypes []*types.T,
err error,
) {
constructors = make([]execinfra.AggregateConstructor, len(aggregations))
constructors = make([]execagg.AggregateConstructor, len(aggregations))
constArguments = make([]tree.Datums, len(aggregations))
outputTypes = make([]*types.T, len(aggregations))
for i, aggFn := range aggregations {
constructors[i], constArguments[i], outputTypes[i], err = execinfra.GetAggregateConstructor(
constructors[i], constArguments[i], outputTypes[i], err = execagg.GetAggregateConstructor(
evalCtx, semaCtx, &aggFn, inputTypes,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package colexecagg
import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execagg"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -32,7 +32,7 @@ type NewAggregatorArgs struct {
InputTypes []*types.T
Spec *execinfrapb.AggregatorSpec
EvalCtx *eval.Context
Constructors []execinfra.AggregateConstructor
Constructors []execagg.AggregateConstructor
ConstArguments []tree.Datums
OutputTypes []*types.T
}
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execagg"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -118,7 +118,7 @@ func (a *default_AGGKINDAgg) Reset() {

func newDefault_AGGKINDAggAlloc(
allocator *colmem.Allocator,
constructor execinfra.AggregateConstructor,
constructor execagg.AggregateConstructor,
evalCtx *eval.Context,
inputArgsConverter *colconv.VecToDatumConverter,
numArguments int,
Expand Down Expand Up @@ -148,7 +148,7 @@ type default_AGGKINDAggAlloc struct {
aggAllocBase
aggFuncs []default_AGGKINDAgg

constructor execinfra.AggregateConstructor
constructor execagg.AggregateConstructor
evalCtx *eval.Context
// inputArgsConverter is a converter from coldata.Vecs to tree.Datums that
// is shared among all aggregate functions and is managed by the aggregator
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/hash_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 881d0ec

Please sign in to comment.