Skip to content

Commit

Permalink
Merge #56399
Browse files Browse the repository at this point in the history
56399: colexec: add support for disk-backed distinct r=yuzefovich a=yuzefovich

**colmem: fix appending column to the windowed batches**

We use `MaybeAppendColumn` to enforce that a vector of the desired type
is at the correct position. However, with the introduction of the
concept of capacity to the batches we now also need to make sure that
the vector has the desired capacity. In vast majority of cases we are
reallocating the whole batch because of the dynamic size, so it
shouldn't be an issue, still this commit fixes that theoretical problem.

Additionally, this commit fixes a problem of appending a vector to the
windowed batches - such batches are instantiated with 0 capacity, and
previously when we were appending a vector to it, we would always
allocate the vector of 0 capacity.

Release note: None

**colexec: extract hash-based partitioner base from external hash joiner**

This commit extracts the logic of the Grace hash join algorithm into a
hash-based partitioner that can be reused by the unordered distinct and
the hash aggregator to add support for disk-spilling. The idea is that at
planning time we can provide different in-memory "main" and disk-backed
"fallback" strategies. If a partition fits under the memory limit, then
the former is used; however, if recursive partitioning isn't successful
in reducing the size, that partition is assigned to be handled by the
latter. In case of the external hash joiner, the in-memory hash joiner
is the "main" operator whereas the external sort and the merge joiner
form the "fallback" operator.

Release note: None

**colexec: fix test harness in some cases when comparing sets**

This commit fixes our test harness when comparing expected and actual
tuples as sets. We use a sort to speed things up, and previously in
some cases we could have different ordering for essentially the same
sets of tuples because in the actual ones we have tree.Datums and in
the expected ones we have strings.

Release note: None

**colexec: implement disk-spilling for unordered distinct**

This commit introduces an external distinct operator that reuses the
hash-based partitioner that works according to "partitioning by hash"
scheme when dividing the input into separate partitions. The "main"
strategy for the new operator is already existing in-memory unordered
distinct whereas the "fallback" strategy is the external sort followed by
the ordered distinct. The benchmarks have shown that using such approach is
significantly faster in most cases when comparing against the external
sort + ordered distinct approach (i.e. against making the fallback
strategy the main one).

One important detail of the fallback strategy is that we need to make sure
that we keep the very first tuple from the input among all tuples that
are identical on the distinct columns. If we naively use a sort
plus ordered distinct, we might break that, so in order to go around it, we
plan an ordinality operator and include the ordinality column as the
last one in the ordering, we then project out that temporary column when
feeding into the ordered distinct.

Another important detail is that distinct is expected to maintain the
ordering of the output stream (to be the same as the ordering of the
input stream) when the output ordering is specified. Previously, this was
achieved for "free" in both the vectorized and the row-by-row engines;
however, with the partitioning by hash approach we don't have that anymore.
Therefore, a new field was added to DistinctSpec that specified the desired
output ordering, and the field is now used to optionally plan an external
sort on top of the external distinct. The benchmarks have shown that the
performance overhead of having such sort is relatively small.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Dec 10, 2020
2 parents dc4eaf0 + 0ca7f6c commit f298478
Show file tree
Hide file tree
Showing 29 changed files with 1,981 additions and 1,282 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ go_library(
"deselector.go",
"disk_spiller.go",
"expr.go",
"external_distinct.go",
"external_hash_joiner.go",
"external_sort.go",
"fn_op.go",
"hash.go",
"hash_aggregator.go",
"hash_based_partitioner.go",
"hash_utils.go",
"hashjoiner.go",
"hashtable.go",
Expand Down Expand Up @@ -114,6 +116,7 @@ go_test(
"dep_test.go",
"deselector_test.go",
"distinct_test.go",
"external_distinct_test.go",
"external_hash_joiner_test.go",
"external_sort_test.go",
"hash_utils_test.go",
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/colexec/bool_vec_to_sel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type boolVecToSelOp struct {
outputCol []bool
}

var _ colexecbase.Operator = &boolVecToSelOp{}
var _ ResettableOperator = &boolVecToSelOp{}

func (p *boolVecToSelOp) Next(ctx context.Context) coldata.Batch {
// Loop until we have non-zero amount of output to return, or our input's been
Expand Down Expand Up @@ -107,6 +107,12 @@ func (p *boolVecToSelOp) Init() {
p.input.Init()
}

func (p *boolVecToSelOp) reset(ctx context.Context) {
if r, ok := p.input.(resetter); ok {
r.reset(ctx)
}
}

func boolVecToSel64(vec []bool, sel []int) []int {
l := len(vec)
for i := 0; i < l; i++ {
Expand Down
117 changes: 76 additions & 41 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ func (r opResult) createDiskBackedSort(
maxNumberPartitions = args.TestingKnobs.NumForcedRepartitions
}
es := colexec.NewExternalSorter(
ctx,
unlimitedAllocator,
standaloneMemAccount,
input, inputTypes, ordering,
Expand All @@ -508,6 +507,43 @@ func (r opResult) createDiskBackedSort(
), nil
}

// makeDistBackedSorterConstructors creates a DiskBackedSorterConstructor that
// can be used by the hash-based partitioner.
// NOTE: unless DelegateFDAcquisitions testing knob is set to true, it is up to
// the caller to acquire the necessary file descriptors up front.
func (r opResult) makeDiskBackedSorterConstructor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
args *colexec.NewColOperatorArgs,
monitorNamePrefix string,
factory coldata.ColumnFactory,
) colexec.DiskBackedSorterConstructor {
return func(input colexecbase.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) colexecbase.Operator {
if maxNumberPartitions < colexec.ExternalSorterMinPartitions {
colexecerror.InternalError(errors.AssertionFailedf(
"external sorter is attempted to be created with %d partitions, minimum %d required",
maxNumberPartitions, colexec.ExternalSorterMinPartitions,
))
}
sortArgs := *args
if !args.TestingKnobs.DelegateFDAcquisitions {
// Set the FDSemaphore to nil. This indicates that no FDs should be
// acquired. The hash-based partitioner will do this up front.
sortArgs.FDSemaphore = nil
}
sorter, err := r.createDiskBackedSort(
ctx, flowCtx, &sortArgs, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols},
0 /* matchLen */, maxNumberPartitions, args.Spec.ProcessorID,
&execinfrapb.PostProcessSpec{}, monitorNamePrefix, factory,
)
if err != nil {
colexecerror.InternalError(err)
}
return sorter
}
}

// createAndWrapRowSource takes a processor spec, creating the row source and
// wrapping it using wrapRowSources. Note that the post process spec is included
// in the processor creation, so make sure to clear it if it will be inspected
Expand Down Expand Up @@ -824,24 +860,44 @@ func NewColOperator(
if len(core.Distinct.OrderedColumns) == len(core.Distinct.DistinctColumns) {
result.Op, err = colexec.NewOrderedDistinct(inputs[0], core.Distinct.OrderedColumns, result.ColumnTypes)
} else {
distinctMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
// Create an unlimited mem account explicitly even though there is no
// disk spilling because the memory usage of an unordered distinct
// operator is proportional to the number of distinct tuples, not the
// number of input tuples.
// The row execution engine also gives an unlimited amount (that still
// needs to be approved by the upstream monitor, so not really
// "unlimited") amount of memory to the unordered distinct operator.
distinctMemAccount = result.createBufferingUnlimitedMemAccount(ctx, flowCtx, "distinct")
}
// We have separate unit tests that instantiate in-memory
// distinct operators, so we don't need to look at
// args.TestingKnobs.DiskSpillingDisabled and always instantiate
// a disk-backed one here.
distinctMemMonitorName := fmt.Sprintf("distinct-%d", spec.ProcessorID)
distinctMemAccount := result.createMemAccountForSpillStrategy(
ctx, flowCtx, distinctMemMonitorName,
)
// TODO(yuzefovich): we have an implementation of partially ordered
// distinct, and we should plan it when we have non-empty ordered
// columns and we think that the probability of distinct tuples in the
// input is about 0.01 or less.
result.Op = colexec.NewUnorderedDistinct(
colmem.NewAllocator(ctx, distinctMemAccount, factory), inputs[0],
core.Distinct.DistinctColumns, result.ColumnTypes,
allocator := colmem.NewAllocator(ctx, distinctMemAccount, factory)
inMemoryUnorderedDistinct := colexec.NewUnorderedDistinct(
allocator, inputs[0], core.Distinct.DistinctColumns, result.ColumnTypes,
)
diskAccount := result.createDiskAccount(ctx, flowCtx, distinctMemMonitorName)
result.Op = colexec.NewOneInputDiskSpiller(
inputs[0], inMemoryUnorderedDistinct.(colexecbase.BufferingInMemoryOperator),
distinctMemMonitorName,
func(input colexecbase.Operator) colexecbase.Operator {
monitorNamePrefix := "external-distinct"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, monitorNamePrefix), factory,
)
ed := colexec.NewExternalDistinct(
unlimitedAllocator,
flowCtx,
args,
input,
result.ColumnTypes,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, monitorNamePrefix, factory),
diskAccount,
)
result.ToClose = append(result.ToClose, ed.(colexecbase.Closer))
return ed
},
args.TestingKnobs.SpillingCallbackFn,
)
}

Expand Down Expand Up @@ -910,34 +966,13 @@ func NewColOperator(
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, monitorNamePrefix), factory,
)
// Make a copy of the DiskQueueCfg and set defaults for the hash
// joiner. The cache mode is chosen to automatically close the cache
// belonging to partitions at a parent level when repartitioning.
diskQueueCfg := args.DiskQueueCfg
diskQueueCfg.CacheMode = colcontainer.DiskQueueCacheModeClearAndReuseCache
diskQueueCfg.SetDefaultBufferSizeBytesForCacheMode()
ehj := colexec.NewExternalHashJoiner(
unlimitedAllocator, hjSpec,
unlimitedAllocator,
flowCtx,
args,
hjSpec,
inputOne, inputTwo,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
diskQueueCfg,
args.FDSemaphore,
func(input colexecbase.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, maxNumberPartitions int) (colexecbase.Operator, error) {
sortArgs := *args
if !args.TestingKnobs.DelegateFDAcquisitions {
// Set the FDSemaphore to nil. This indicates that no FDs
// should be acquired. The external hash joiner will do this
// up front.
sortArgs.FDSemaphore = nil
}
return result.createDiskBackedSort(
ctx, flowCtx, &sortArgs, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols},
0 /* matchLen */, maxNumberPartitions, spec.ProcessorID,
&execinfrapb.PostProcessSpec{}, monitorNamePrefix+"-", factory)
},
args.TestingKnobs.NumForcedRepartitions,
args.TestingKnobs.DelegateFDAcquisitions,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, monitorNamePrefix, factory),
diskAccount,
)
result.ToClose = append(result.ToClose, ehj.(colexecbase.Closer))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/distinct.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f298478

Please sign in to comment.