Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: implement lag and lead window functions in vectorized engine #65634

Merged
merged 3 commits into from
May 28, 2021

Conversation

DrewKimball
Copy link
Collaborator

lag and lead are window functions that return an expression
evaluated at some row offset rows from the current row, defaulting
to default if such a row does not exist in the partition.

This patch provides vectorized implementations of the lag and lead
window functions. The logic to buffer each partition into a queue is
now separate from calculation of the window function itself so that
lag, lead, and ntile (and others) can share common logic. As
before, each batch is emitted as soon as its output column has been
entirely filled.

See #37035

Release note (sql change): the vectorized engine now supports the lag
and lead window functions.

@DrewKimball DrewKimball requested a review from a team as a code owner May 24, 2021 20:05
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@DrewKimball
Copy link
Collaborator Author

I still need to write tests for the SpillingBuffer code, and I'd like to add some logic tests as well.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I skimmed the code (and will take a closer look later today), just have a suggestion about the code organization.

We should separate the logic of extracting BufferedWindow into a separate commit.

Reviewed 3 of 18 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)


pkg/col/colserde/arrowbatchconverter.go, line 517 at r1 (raw file):

	}
	offsets := bytesArr.ValueOffsets()
	if len(offsets) > 0 {

It'd be good to add a unit test for this scenario and probably extract this change into a separate commit too.

I also wonder whether the correct fix is actually modifying the behavior of ToArrowSerializationFormat if Bytes is a window. Currently, we'd be serializing the stuff that is "before" the window redundantly. E.g. if we have something like

b.data = []byte{0, 1, 2, 3}
b.offsets = []int32{0, 1, 2, 3, 4}

if we do b.Window(2, 4).ToArrowSerializationFormat, we will get []byte{0, 1, 2, 3}, []int32{2, 3, 4} meaning that we're still serializing []byte{0, 1} prefix of data although it is not used. What do you think?


pkg/sql/colexec/colbuilder/execplan.go, line 1306 at r1 (raw file):

					)
				case execinfrapb.WindowerSpec_LAG:
					opName := opNamePrefix + "lag"

nit: some code around opName, unlimitedAllocator, and diskAcc is repeated for every disk-backed window function. I think it'd be nice to extract it out.


pkg/sql/colexec/colexecwindow/buffered_window.go, line 31 at r1 (raw file):

// NewBufferedWindowOperator creates a new Operator that computes the given
// window function.
func NewBufferedWindowOperator(

nit: this could be unexported, right?

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 15 of 18 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 28 at r1 (raw file):

// SpillingBuffer wraps an AppendOnlyBufferedBatch to store tuples. It
// supports an append operation, as well as access to a windowed batch starting
// at a given index. Once the memory limit is reached, SpillingBuffer spills

nit: it'd be good to clarify what happens to already accumulated tuples in memory - do they get moved to disk or no?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 45 at r1 (raw file):

	diskQueueCfg   colcontainer.DiskQueueCfg
	diskQueue      colcontainer.Queue

nit: this is always RewindableQueue, right?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 73 at r1 (raw file):

	// must reserve memory for the disk queue and dequeue scratch batch.
	maxMemoryLimit := memoryLimit - int64(colmem.EstimateBatchSizeBytes(typs, coldata.BatchSize()))
	maxMemoryLimit -= int64(diskQueueCfg.BufferSizeBytes)

In some test setups this might become negative (if memoryLimit is 1 for example which is the case for -disk configs of logic tests). We usually introduce an arbitrary minimum below which maxMemoryLimit cannot go.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 87 at r1 (raw file):

// AppendTuples adds the columns from the given batch signified by colIdxs to
// the randomAccessBuffer. It expects that the given batch does not use a

nit: probably s/randomAccessBuffer/SpillingBuffer/.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 110 at r1 (raw file):

		length := endIdx - startIdx
		var additionalUsedMem int64
		for {

I think this logic is a bit too complicated and still could be not very precise. If we have variable length types, then EstimateBatchSizeBytes could be wrong.

IMO it would be better and simpler to just have something like:

  • have we already spilled?
    • no, then append all tuples into the in-memory buffer and use PerformOperation for the precise memory accounting. Check whether we have exceeded the memory limit, if so, the next batch will have to go to disk
    • yes, then this batch goes to disk.

We're using this general approach (or performing an allocation and doing after the fact accounting) throughout the vectorized engine since we will be wrong by at most one batch of data which shouldn't be too bad. (It is possible that columns are wide, and 1024 rows is like 10MB or more, but these cases should be unlikely.)

We could improve the heuristic and see whether the estimated batch size puts us over the max limit before appending to in-memory buffer, and spill to disk if it does.

The difficulty with EstimateBatchSizeBytes is that before actually appending the data, we don't know whether a new allocation will take place that puts us over the limit. Consider an example with flat bytes where len(data) == 10, cap(data) == 20 - if we're appending 10 bytes or less, the footprint of the in-memory buffer doesn't change, but if we're appending more, data will need to be reallocated and might push us over the limit.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 154 at r1 (raw file):

		}
		b.scratch.SetLength(endIdx - startIdx)
		if err = b.diskQueue.Enqueue(ctx, b.scratch); err != nil {

I might have not gotten to the relevant code, but we need to make sure that we enqueue a zero-length batch once all tuples are appended.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 155 at r1 (raw file):

		b.scratch.SetLength(endIdx - startIdx)
		if err = b.diskQueue.Enqueue(ctx, b.scratch); err != nil {
			colexecerror.InternalError(err)

nit: you should use HandleErrorFromDiskQueue on the errors returned from Enqueue. Actually, it might be just better to modify Enqueue method to not return an error and call HandleErrorFromDiskQueue internally.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 161 at r1 (raw file):

// GetWindowedBatch returns a windowed batch that starts with the tuple at the
// given index into all queued tuples. The batch is not allowed to be modified,

Can the caller set a selection vector on the returned batch? If so, we will need to unset it in the body of the function.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 167 at r1 (raw file):

// This is useful for callers that wish to iterate through adjacent tuples.
//
// getTuples optimizes for the case when subsequent calls access tuples from the

nit: s/getTuples/GetWindowedBatch/.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 169 at r1 (raw file):

// getTuples optimizes for the case when subsequent calls access tuples from the
// same batch, and to a lesser degree when tuples from a subsequent batch are
// accessed. It assumes that startIdx is within the range: [0, num_tuples); it

nit: where does num_tuples come from?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 176 at r1 (raw file):

		// The requested tuple is stored in-memory.
		endIdx := b.bufferedTuples.Length()
		for i := range b.typs {

What happens if colIdxs argument of AppendTuples didn't include all columns? It's probably worth mentioning in the comment above.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 196 at r1 (raw file):

		}
		b.numDequeued = 0
		b.dequeueScratch = nil

This seems a bit suboptimal - we should reset the dequeue scratch if it is non-nil. Actually, even resetting is not needed - Dequeue operation modifies the batch as it sees fit. Or is it because of some complications with the flat bytes?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 200 at r1 (raw file):

	for {
		// Dequeue batches until we reach the one with the idx'th tuple.
		if b.dequeueScratch == nil {

nit: this if block can probably be lifted outside of the for loop.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 203 at r1 (raw file):

			// We have already set aside enough memory capacity for the dequeueScratch
			// batch to have the maximum batch size, so the allocation will not exceed
			// the memory limit. Unregister the memory usage estimate so it doesn't

Same comment as above about EstimateBatchSizeBytes being imprecise - for variable length types we might have not reserved enough bytes. Take a look at SpillingQueue.Dequeue for how we try to be precise.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 208 at r1 (raw file):

			b.unlimitedAllocator.ReleaseMemory(colmem.GetBatchMemSize(b.dequeueScratch))
		}
		if b.dequeueScratch.Length() == 0 {

When can this case occur? I would imagine that we need zero-length check after Dequeue below.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 241 at r1 (raw file):

func (b *SpillingBuffer) numFDsOpenAtAnyGivenTime() int {
	if b.diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault {

Hm, I think we need to think a bit about this. Because we assume that all writes must occur before any reads, we might always use non-default cache mode, regardless of what the config says.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 168 at r1 (raw file):

		return
	}
	_OP_NAMEVec := batch.ColVec(w.outputColIdx)

super nit: having _OP_NAME here in the names of local variables isn't strictly needed and looks unusual to me; however, if you prefer it this way so that the generated code is nicer, ignore this comment.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 176 at r1 (raw file):

	// {{end}}

	offsetVec := batch.ColVec(w.offsetIdx)

w.offsetIdx and w.defaultIdx might be tree.NoColumnIdx, right?


pkg/sql/colexec/execgen/cmd/execgen/lag_gen.go, line 22 at r1 (raw file):

const lagTmpl = "pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go"

func genLagOp(inputFileContents string, wr io.Writer) error {

nit: lag_gen and lead_gen are very similar. It is possible to have a single generator file that is actually responsible for generating two files. Take a look at projection_ops-gen for a somewhat complicated example that does this.

Copy link
Collaborator Author

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)


pkg/col/colserde/arrowbatchconverter.go, line 517 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

It'd be good to add a unit test for this scenario and probably extract this change into a separate commit too.

I also wonder whether the correct fix is actually modifying the behavior of ToArrowSerializationFormat if Bytes is a window. Currently, we'd be serializing the stuff that is "before" the window redundantly. E.g. if we have something like

b.data = []byte{0, 1, 2, 3}
b.offsets = []int32{0, 1, 2, 3, 4}

if we do b.Window(2, 4).ToArrowSerializationFormat, we will get []byte{0, 1, 2, 3}, []int32{2, 3, 4} meaning that we're still serializing []byte{0, 1} prefix of data although it is not used. What do you think?

Yes, I think doing it in the serialization step makes more sense. Done.


pkg/sql/colexec/colbuilder/execplan.go, line 1306 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: some code around opName, unlimitedAllocator, and diskAcc is repeated for every disk-backed window function. I think it'd be nice to extract it out.

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 28 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: it'd be good to clarify what happens to already accumulated tuples in memory - do they get moved to disk or no?

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 45 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this is always RewindableQueue, right?

Yup, Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 73 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

In some test setups this might become negative (if memoryLimit is 1 for example which is the case for -disk configs of logic tests). We usually introduce an arbitrary minimum below which maxMemoryLimit cannot go.

Done. (I went with zero as the limit)


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 87 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: probably s/randomAccessBuffer/SpillingBuffer/.

Oops, Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 110 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think this logic is a bit too complicated and still could be not very precise. If we have variable length types, then EstimateBatchSizeBytes could be wrong.

IMO it would be better and simpler to just have something like:

  • have we already spilled?
    • no, then append all tuples into the in-memory buffer and use PerformOperation for the precise memory accounting. Check whether we have exceeded the memory limit, if so, the next batch will have to go to disk
    • yes, then this batch goes to disk.

We're using this general approach (or performing an allocation and doing after the fact accounting) throughout the vectorized engine since we will be wrong by at most one batch of data which shouldn't be too bad. (It is possible that columns are wide, and 1024 rows is like 10MB or more, but these cases should be unlikely.)

We could improve the heuristic and see whether the estimated batch size puts us over the max limit before appending to in-memory buffer, and spill to disk if it does.

The difficulty with EstimateBatchSizeBytes is that before actually appending the data, we don't know whether a new allocation will take place that puts us over the limit. Consider an example with flat bytes where len(data) == 10, cap(data) == 20 - if we're appending 10 bytes or less, the footprint of the in-memory buffer doesn't change, but if we're appending more, data will need to be reallocated and might push us over the limit.

Yes, I think it will be nicer to do it this way. The changes are pretty significant, so you should probably take a second look at the updated version.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 154 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I might have not gotten to the relevant code, but we need to make sure that we enqueue a zero-length batch once all tuples are appended.

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 155 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: you should use HandleErrorFromDiskQueue on the errors returned from Enqueue. Actually, it might be just better to modify Enqueue method to not return an error and call HandleErrorFromDiskQueue internally.

Done. I think I'll leave changing Enqueue to a future PR.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 161 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Can the caller set a selection vector on the returned batch? If so, we will need to unset it in the body of the function.

I would consider that modifying the batch, but I guess it wouldn't hurt to set the selection vector to nil. Do you think I should?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 167 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/getTuples/GetWindowedBatch/.

Oops, Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 169 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: where does num_tuples come from?

You know, I think I'll just add an assertion to ensure the index is in range. As an aside, are there any rules as to when we should use AssertionFailedf vs Errorf?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 176 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

What happens if colIdxs argument of AppendTuples didn't include all columns? It's probably worth mentioning in the comment above.

I'll add an assertion, too. Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 196 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

This seems a bit suboptimal - we should reset the dequeue scratch if it is non-nil. Actually, even resetting is not needed - Dequeue operation modifies the batch as it sees fit. Or is it because of some complications with the flat bytes?

That's a good point, better to set the length to zero instead.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 200 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this if block can probably be lifted outside of the for loop.

Good catch. Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 203 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Same comment as above about EstimateBatchSizeBytes being imprecise - for variable length types we might have not reserved enough bytes. Take a look at SpillingQueue.Dequeue for how we try to be precise.

Done. I've decided to only register the memory used by the batch that ends up getting left in dequeueScratch when the function returns, rather than repeatedly updating for each intervening dequeued batch. Does that sound reasonable?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 208 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

When can this case occur? I would imagine that we need zero-length check after Dequeue below.

If the batch is freshly constructed or this is the first dequeue after a call to Reset, the length will be zero. But, I've just realized that the next condition will catch that case, so we'll end up dequeuing as expected anyway. I'll just remove the initial dequeue. Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 241 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, I think we need to think a bit about this. Because we assume that all writes must occur before any reads, we might always use non-default cache mode, regardless of what the config says.

Done.


pkg/sql/colexec/colexecwindow/buffered_window.go, line 31 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this could be unexported, right?

Oh, so it could. Done.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 168 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

super nit: having _OP_NAME here in the names of local variables isn't strictly needed and looks unusual to me; however, if you prefer it this way so that the generated code is nicer, ignore this comment.

I'll go with leadLagVARNAME instead. Done.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 176 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

w.offsetIdx and w.defaultIdx might be tree.NoColumnIdx, right?

Actually they can't - turns out that if the optional arguments are not provided a column will still be projected for them. I'll fix that bit in execplan.go that you were probably looking at. It might be worth it to add optimized versions for those cases, though.


pkg/sql/colexec/execgen/cmd/execgen/lag_gen.go, line 22 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: lag_gen and lead_gen are very similar. It is possible to have a single generator file that is actually responsible for generating two files. Take a look at projection_ops-gen for a somewhat complicated example that does this.

Done.

@DrewKimball DrewKimball force-pushed the lead_lag branch 3 times, most recently from b524079 to 392fe45 Compare May 26, 2021 23:38
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 20 files at r2, 4 of 4 files at r3, 20 of 20 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball and @michae2)


pkg/col/coldata/bytes_test.go, line 25 at r2 (raw file):

	"github.com/cockroachdb/errors"
	"github.com/stretchr/testify/require"
	"github.com/cockroachdb/cockroach/pkg/sql/randgen"

nit: ordering.


pkg/col/coldata/bytes_test.go, line 500 at r2 (raw file):

	for i := 1; i < len(offsets); i++ {
		if offsets[i] < offsets[i-1] {
			panic(errors.AssertionFailedf("unexpectedly found decreasing offsets: %v", offsets))

nit: it's a bit unusual for me to see a panic in the test - I think we usually do something like t.Fatal when we have access to *testing.T object. My personal preference in such scenarios is using require with something like require.GreaterOrEqual(t, offsets[i], offsets[i-1], "unexpectedly found decreasing offsets", offsets).


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 110 at r1 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

Yes, I think it will be nicer to do it this way. The changes are pretty significant, so you should probably take a second look at the updated version.

Looks good!


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 161 at r1 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

I would consider that modifying the batch, but I guess it wouldn't hurt to set the selection vector to nil. Do you think I should?

No need, the contract makes sense - I think I originally didn't fully comprehend that the returned batch is not allowed to be modified, and I would also consider changing the selection vector on the batch as a modification.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 169 at r1 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

You know, I think I'll just add an assertion to ensure the index is in range. As an aside, are there any rules as to when we should use AssertionFailedf vs Errorf?

I think we usually use AssertionFailedf whenever some assumption or invariant is broken, especially if that breakage is likely to come from the code you've written. AssertionFailedf will have a stack trace included which might help understand how the assumption got broken.

However, if the error is due to some user-behavior or is expected to occur (depending on the circumstances), then Errorf should be used.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 196 at r1 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

That's a good point, better to set the length to zero instead.

Sounds good.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 203 at r1 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

Done. I've decided to only register the memory used by the batch that ends up getting left in dequeueScratch when the function returns, rather than repeatedly updating for each intervening dequeued batch. Does that sound reasonable?

Yep, that sounds good - in SpillingQueue.Dequeue we are always dequeueing just one batch, but here we need to dequeue many.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 100 at r4 (raw file):

	diskQueueCfg colcontainer.DiskQueueCfg,
	fdSemaphore semaphore.Semaphore,
	typs []*types.T,

nit: it's worth mentioning that typs define the type schema of the columns that we're appending, but the input batches could have a different schema when colIdxs is non-nil.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 158 at r4 (raw file):

	memLimitReached := b.unlimitedAllocator.Used() + b.diskReservedMem > b.memoryLimit
	maxInMemTuplesLimitReached := b.testingKnobs.maxTuplesStoredInMemory > 0 &&
		b.bufferedTuples.Length() < b.testingKnobs.maxTuplesStoredInMemory

I think in the second comparison we need >=, not <.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 162 at r4 (raw file):

		originalLength := b.bufferedTuples.Length()
		b.unlimitedAllocator.PerformOperation(b.bufferedTuples.ColVecs(), func() {
			for i, colIdx := range b.colIdxs {

Why not use AppendTuples that exists on AppendOnlyBufferedBatch already? I think if you pass in b.colIdxs as colsToStore in the constructor, it should just work, and here we will only need to perform the memory accounting.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 188 at r4 (raw file):

			colexecerror.InternalError(err)
		}
		b.alreadySpilled = true

nit: we could use b.diskQueue != nil as alreadySpilled == true.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 307 at r4 (raw file):

// Reset resets the SpillingBuffer.
func (b *SpillingBuffer) Reset(ctx context.Context) {

Hm, the current behavior of Reset seems a bit unusual - we are deeply resetting the in-memory buffered batch (so that its memory is released), but then we are keeping the reference to the dequeue scratch. I think we should choose to either perform a deep reset for everything or a shallow reset for everything.

IMO the cleanest (and probably easiest) way is to perform a deep reset by clearing all memory registered with the unlimited allocator, creating a new in-memory buffered batch, and setting the dequeue scratch to nil.

This also made me realize that we probably have a bug in SpillingQueue.Reset where we forget to clear up the unlimited allocator.


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 25 at r4 (raw file):

	"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
	"github.com/cockroachdb/cockroach/pkg/util/randutil"
	"context"

nit: ordering. Do you have a file watcher enabled? (https://cockroachlabs.atlassian.net/wiki/spaces/ENG/pages/154206209/Goland+Tips+and+Tricks#GolandTipsandTricks-EnablecrlfmtWatcher)


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 47 at r4 (raw file):

	} {
		alwaysCompress := rng.Float64() < 0.5
		diskQueueCacheMode := colcontainer.DiskQueueCacheModeClearAndReuseCache

nit: diskQueueCacheMode is always the same, so we can omit it here and hard-code it where we're assigning it to the diskQueueCfg.


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 139 at r4 (raw file):

				t.Fatalf("buffer failed to return batch starting at index %d", startIdx)
			}
			// MakeWindowIntoBatch creates a window into tuples in the range

nit: you're not using MakeWindowIntoBatch here, so the comment needs an adjustment. But why not use it?


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 68 at r4 (raw file):

) (colexecop.Operator, error) {
	argType := inputTypes[argIdx]
	// Allow the direct-access buffer 25% of the available memory. The rest will

nit: the comment needs an update. Also a quick note on how the number was derived (or at least some rule of thumb) would be helpful.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 284 at r4 (raw file):

		// We have to use CopySlice here because the column already has a length of
		// n elements, and Set cannot set values before the last one.
		leadLagCol.CopySlice(col, i, idx, idx+1)

Hm, I'm still a bit confused why we have to use CopySlice here. My understanding is that i is constantly increasing, even between different _PROCESS_BATCH "calls". VectorTypeEnforcer should be calling Bytes.Reset so that maxSetIndex is set to 0 although the length of the vector is n.

Copy link
Collaborator Author

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @michae2, and @yuzefovich)


pkg/col/coldata/bytes_test.go, line 25 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: ordering.

Done.


pkg/col/coldata/bytes_test.go, line 500 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: it's a bit unusual for me to see a panic in the test - I think we usually do something like t.Fatal when we have access to *testing.T object. My personal preference in such scenarios is using require with something like require.GreaterOrEqual(t, offsets[i], offsets[i-1], "unexpectedly found decreasing offsets", offsets).

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 169 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we usually use AssertionFailedf whenever some assumption or invariant is broken, especially if that breakage is likely to come from the code you've written. AssertionFailedf will have a stack trace included which might help understand how the assumption got broken.

However, if the error is due to some user-behavior or is expected to occur (depending on the circumstances), then Errorf should be used.

Got it, thanks.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 100 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: it's worth mentioning that typs define the type schema of the columns that we're appending, but the input batches could have a different schema when colIdxs is non-nil.

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 158 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think in the second comparison we need >=, not <.

Good catch, Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 162 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Why not use AppendTuples that exists on AppendOnlyBufferedBatch already? I think if you pass in b.colIdxs as colsToStore in the constructor, it should just work, and here we will only need to perform the memory accounting.

The problem is that I need to be able to just return the AppendOnlyBufferedBatch when GetBatchWithTuple is called, but if I give it non-nil colsToStore I will have to also give it the schema of the input batches, which means the schema of the batches returned from dequeueing will be different from that of the AppendOnlyBufferedBatch. I only did this to avoid creating a window when appending, but I don't think it's worth trying to make that optimization for now anyway, so I'll go back to making a window on the input batch and then calling AppendTuples as you say.

Actually, I think doing it this way fails the linter, anyway.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 188 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: we could use b.diskQueue != nil as alreadySpilled == true.

Good point. Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 307 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, the current behavior of Reset seems a bit unusual - we are deeply resetting the in-memory buffered batch (so that its memory is released), but then we are keeping the reference to the dequeue scratch. I think we should choose to either perform a deep reset for everything or a shallow reset for everything.

IMO the cleanest (and probably easiest) way is to perform a deep reset by clearing all memory registered with the unlimited allocator, creating a new in-memory buffered batch, and setting the dequeue scratch to nil.

This also made me realize that we probably have a bug in SpillingQueue.Reset where we forget to clear up the unlimited allocator.

Yes, that definitely looks a lot cleaner. Done.


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 25 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: ordering. Do you have a file watcher enabled? (https://cockroachlabs.atlassian.net/wiki/spaces/ENG/pages/154206209/Goland+Tips+and+Tricks#GolandTipsandTricks-EnablecrlfmtWatcher)

Well, I do now. Done.


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 47 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: diskQueueCacheMode is always the same, so we can omit it here and hard-code it where we're assigning it to the diskQueueCfg.

Done.


pkg/sql/colexec/colexecutils/spilling_buffer_test.go, line 139 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: you're not using MakeWindowIntoBatch here, so the comment needs an adjustment. But why not use it?

I wanted the ability to specify that only a subset of columns are to be included in the window. I've removed the comment


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 68 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: the comment needs an update. Also a quick note on how the number was derived (or at least some rule of thumb) would be helpful.

To tell the truth, there isn't that much though behind the number, I should probably play around with it once I've got a good benchmark. I'll add a todo and update the comment. Done.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 284 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, I'm still a bit confused why we have to use CopySlice here. My understanding is that i is constantly increasing, even between different _PROCESS_BATCH "calls". VectorTypeEnforcer should be calling Bytes.Reset so that maxSetIndex is set to 0 although the length of the vector is n.

The problem is that every time we copy the batch, the maxSetIndex gets set. In the copy in buffered_window.go I'm not copying the output column, so maxSetIndex won't be set there. However, I have no control over whether the output column is copied when the batch is enqueued. I couldn't just reset before processing because there may be multiple partitions in the batch, in which case resetting would destroy the previous work.

@DrewKimball DrewKimball force-pushed the lead_lag branch 4 times, most recently from d34ea0c to 36e4737 Compare May 27, 2021 06:31
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! :lgtm:

No strong need to do it in this PR, but a nit about the fixupx: some of the changes to the code introduced in the first commit are fixed in the third commit. This makes the third commit contain seemingly unrelated things. Also, we strive for having a "clean state" on every commit in isolation, and e.g. right now the first commit in isolation will have updated ordering of imports once the crlfmt file watcher runs.

Are you familiar with git commit --fixup SHA and git rebase -i --autosquash SHA? The idea is that if you have a PR consisting of multiple commits (say two with SHA1 and SHA2), and you want to update code in both, then you make changes you want to include into the first commit, do git add -u && git commit --fixup SHA1; then you do the same for changes in the second commit and then git add -u && git commit --fixup SHA2. This will result in your branch having four commits: two original ones SHA1 and SHA2 but also two additional "fixup" commits. Then you run git rebase -i --autosqash HEAD~4, and you'll see that SHA1 and fixup1 will be squashed into one (updated SHA1'), and the same will go for SHA2 and fixup2.

Reviewed 13 of 13 files at r5.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @DrewKimball and @michae2)


pkg/col/coldata/bytes_test.go, line 500 at r2 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

Done.

nit: you don't even need to have the if condition - GreaterOrEqualf performs that check itself.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 307 at r4 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

Yes, that definitely looks a lot cleaner. Done.

Oh, I didn't realize that we're resetting the buffer for every partition (I thought Reset was only being used in tests). In the production setting it is a bit sad to lose the references to already allocated memory (under the AppendOnlyBufferedBatch), and it'd be great to actually perform a shallow reset (but close the disk queue if spilled). I think this can be done in a follow-up PR and will be especially interesting to do once we have the benchmarks setup. Maybe leave a TODO to look into this?


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 142 at r5 (raw file):

// The disk queue will always use DiskQueueCacheModeClearAndReuseCache, so
// all writes will always occur before all reads.
const numFDsOpenAtAnyGivenTime = 1

nit: maybe include something about the spilling buffer in the variable name? Maybe something like spillingBufferNumFDs? (Otherwise, some other component in this package could mistakenly use it.)


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 286 at r5 (raw file):

		return nil
	}
	if b.diskQueue != nil {

nit: I know that we have the same issue in SpillingQueue.Close (and I'll fix it in a sec), but it would be good to lose the reference to the AppendOnlyBufferedBatch so that it could be garbage collected sooner. Similarly, it would be good to set the disk queue to nil.


pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go, line 284 at r4 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

The problem is that every time we copy the batch, the maxSetIndex gets set. In the copy in buffered_window.go I'm not copying the output column, so maxSetIndex won't be set there. However, I have no control over whether the output column is copied when the batch is enqueued. I couldn't just reset before processing because there may be multiple partitions in the batch, in which case resetting would destroy the previous work.

I think I see it, but I have a slightly different way of thinking about this. The problem is that batch argument doesn't come to us directly from the VectorTypeEnforcer - instead, it comes from the bufferedWindowOp and is either a dequeued batch or the current batch that we copied from the input batch into. In both cases, the bytes-like vectors are not in a reset state. You're correctly pointing out that resetting the vectors could be incorrect too if multiple partitions fit within a single batch.

I guess my comment is essentially what you said above, so carry on. This sgtm.

@DrewKimball
Copy link
Collaborator Author

Are you familiar with git commit --fixup SHA and git rebase -i --autosquash SHA? The idea is that if you have a PR consisting of multiple commits (say two with SHA1 and SHA2), and you want to update code in both, then you make changes you want to include into the first commit, do git add -u && git commit --fixup SHA1; then you do the same for changes in the second commit and then git add -u && git commit --fixup SHA2. This will result in your branch having four commits: two original ones SHA1 and SHA2 but also two additional "fixup" commits. Then you run git rebase -i --autosqash HEAD~4, and you'll see that SHA1 and fixup1 will be squashed into one (updated SHA1'), and the same will go for SHA2 and fixup2.

I'm afraid my git-fu is on the amateurish side, but I'll make sure to use this in the future - looks really useful!

A window into a 'Bytes' column is represented by reusing the `data`
slice and simply using a slice into the base `offsets` from startIdx
to endIdx. This means that for a window, the first offset value may
be greater than zero.

Previously, the serialization step simply returned the `offsets`, even
in the case when they didn't begin at index zero. The problem was,
during the deserialization step, the `data` slice was truncated
but the `offsets` were not updated. This resulted in incorrect batches
being deserialized when the serialized batch was a window.

This patch modifies the serialization process to make a copy of the
`offsets` slice when the first offset is not zero. The first offset
value is then subtracted from each offset value to ensure the offsets
once again begin at zero. This works because the offsets are
guaranteed to be non-decreasing. Finally, `data` is truncated as well
to reflect the fact the the offsets are now zero-based.

Release note (bug fix): None
Previously, the ntile vectorized window operator handled all logic
needed to buffer batches until the end of a partition has been
reached, and then process the tuples in that partition.

This patch pulls the logic required to buffer and emit the batches
into `bufferedWindowOp`, which calls functions on an interface
`bufferedWindower` that implements the logic specific to each
window function. (For example, ntile needs to retrieve the first
non-null value for the number of buckets).

Release note: None
`lag` and `lead` are window functions that return an expression
evaluated at some row `offset` rows from the current row, defaulting
to `default` if such a row does not exist in the partition.

This patch provides vectorized implementations of the `lag` and `lead`
window functions. The logic to buffer each partition into a queue is
now separate from calculation of the window function itself so that
`lag`, `lead`, and `ntile` (and others) can share common logic. As
before, each batch is emitted as soon as its output column has been
entirely filled.

See cockroachdb#37035

Release note (sql change): the vectorized engine now supports the lag
and lead window functions.
Copy link
Collaborator Author

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @DrewKimball and @michae2)


pkg/col/coldata/bytes_test.go, line 500 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: you don't even need to have the if condition - GreaterOrEqualf performs that check itself.

Oh wasn't thinking. Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 307 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Oh, I didn't realize that we're resetting the buffer for every partition (I thought Reset was only being used in tests). In the production setting it is a bit sad to lose the references to already allocated memory (under the AppendOnlyBufferedBatch), and it'd be great to actually perform a shallow reset (but close the disk queue if spilled). I think this can be done in a follow-up PR and will be especially interesting to do once we have the benchmarks setup. Maybe leave a TODO to look into this?

I'll add a TODO. We'd have to be careful with a shallow reset, though - we'd need some way to know that appending tuples after the reset wouldn't increase memory usage. Though I guess it still could in principle, which could cause usage to creep up with multiple large partitions.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 142 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe include something about the spilling buffer in the variable name? Maybe something like spillingBufferNumFDs? (Otherwise, some other component in this package could mistakenly use it.)

Done.


pkg/sql/colexec/colexecutils/spilling_buffer.go, line 286 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: I know that we have the same issue in SpillingQueue.Close (and I'll fix it in a sec), but it would be good to lose the reference to the AppendOnlyBufferedBatch so that it could be garbage collected sooner. Similarly, it would be good to set the disk queue to nil.

Done.

@DrewKimball
Copy link
Collaborator Author

TFTR!

@DrewKimball
Copy link
Collaborator Author

bors r+

@craig
Copy link
Contributor

craig bot commented May 28, 2021

Build succeeded:

@craig craig bot merged commit bf25f12 into cockroachdb:master May 28, 2021
@DrewKimball DrewKimball deleted the lead_lag branch May 28, 2021 01:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants