Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

row: remove TODOSQLCodec in TestJobBackedSeqChunkProvider #82890

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descpb",
Expand Down
80 changes: 40 additions & 40 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (pos importRandPosition) distance(o importRandPosition) int64 {
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
// We expect r.pos to increment by numInstances for each row. Therefore,
// assuming that rowID increments by 1 for every row, we will initialize the
// position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *importRand) maybeReseed(c *CellInfoAnnotation) {
// position, so if we skip some rows we need to reseed.
// We may skip rows because a single row converter may be responsible for
// converting several non-contiguous batches of KVs.
newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
newRowPos := getPosForRandImport(c.RowID, c.SourceID, c.randInstancePerRow)
rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow)
if rowsSkipped {
// Reseed at the new position, since our internally tracked r.pos is now out
Expand Down Expand Up @@ -148,10 +148,10 @@ func makeBuiltinOverride(
// SequenceMetadata contains information used when processing columns with
// default expressions which use sequences.
type SequenceMetadata struct {
seqDesc catalog.TableDescriptor
instancesPerRow int64
curChunk *jobspb.SequenceValChunk
curVal int64
SeqDesc catalog.TableDescriptor
InstancesPerRow int64
CurChunk *jobspb.SequenceValChunk
CurVal int64
}

type overrideVolatility bool
Expand All @@ -173,8 +173,8 @@ const cellInfoAddr tree.AnnotationIdx = iota + 1
// CellInfoAnnotation encapsulates the AST annotation for the various supported
// default expressions for import.
type CellInfoAnnotation struct {
sourceID int32
rowID int64
SourceID int32
RowID int64

// Annotations for unique_rowid().
uniqueRowIDInstance int
Expand All @@ -196,13 +196,13 @@ func getCellInfoAnnotation(t *tree.Annotations) *CellInfoAnnotation {
}

func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) {
c.sourceID = sourceID
c.rowID = rowID
c.SourceID = sourceID
c.RowID = rowID
c.uniqueRowIDInstance = 0
}

func makeImportRand(c *CellInfoAnnotation) randomSource {
pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
pos := getPosForRandImport(c.RowID, c.SourceID, c.randInstancePerRow)
randSource := &importRand{}
randSource.reseed(pos)
return randSource
Expand Down Expand Up @@ -246,8 +246,8 @@ func makeImportRand(c *CellInfoAnnotation) randomSource {
func importUniqueRowID(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
avoidCollisionsWithSQLsIDs := uint64(1 << 63)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance)
returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.RowID + int64(c.uniqueRowIDInstance)
returnIndex := (uint64(c.SourceID) << rowIDBits) ^ uint64(shiftedIndex)
c.uniqueRowIDInstance++
evalCtx.Annotations.Set(cellInfoAddr, c)
return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil
Expand Down Expand Up @@ -318,11 +318,11 @@ func (j *SeqChunkProvider) RequestChunk(
// ensure correct behavior on job resumption.
// We never want to end up in a situation where row x is assigned a different
// sequence value on subsequent import job resumptions.
fileProgress := progress.GetImport().SequenceDetails[c.sourceID]
fileProgress := progress.GetImport().SequenceDetails[c.SourceID]
if fileProgress.SeqIdToChunks == nil {
fileProgress.SeqIdToChunks = make(map[int32]*jobspb.SequenceDetails_SequenceChunks)
}
seqID := seqMetadata.seqDesc.GetID()
seqID := seqMetadata.SeqDesc.GetID()
if _, ok := fileProgress.SeqIdToChunks[int32(seqID)]; !ok {
fileProgress.SeqIdToChunks[int32(seqID)] = &jobspb.SequenceDetails_SequenceChunks{
Chunks: make([]*jobspb.SequenceValChunk, 0),
Expand All @@ -331,7 +331,7 @@ func (j *SeqChunkProvider) RequestChunk(
// We can cleanup some of the older chunks which correspond to rows
// below the resume pos as we are never going to reprocess those
// check pointed rows on job resume.
resumePos := progress.GetImport().ResumePos[c.sourceID]
resumePos := progress.GetImport().ResumePos[c.SourceID]
trim, chunks := 0, fileProgress.SeqIdToChunks[int32(seqID)].Chunks
// If the resumePos is below the max bound of the current chunk we need
// to keep this chunk in case the job is re-resumed.
Expand All @@ -341,7 +341,7 @@ func (j *SeqChunkProvider) RequestChunk(
fileProgress.SeqIdToChunks[int32(seqID)].Chunks[trim:]

fileProgress.SeqIdToChunks[int32(seqID)].Chunks = append(
fileProgress.SeqIdToChunks[int32(seqID)].Chunks, seqMetadata.curChunk)
fileProgress.SeqIdToChunks[int32(seqID)].Chunks, seqMetadata.CurChunk)
ju.UpdateProgress(progress)
return nil
}
Expand All @@ -354,7 +354,7 @@ func (j *SeqChunkProvider) RequestChunk(
// Now that the job progress has been written to, we can use the newly
// allocated chunk.
if !foundFromPreviouslyAllocatedChunk {
seqMetadata.curVal = seqMetadata.curChunk.ChunkStartVal
seqMetadata.CurVal = seqMetadata.CurChunk.ChunkStartVal
}
return nil
})
Expand Down Expand Up @@ -417,24 +417,24 @@ func boundsExceededError(descriptor catalog.TableDescriptor) error {
func (j *SeqChunkProvider) checkForPreviouslyAllocatedChunks(
seqMetadata *SequenceMetadata, c *CellInfoAnnotation, progress *jobspb.Progress,
) (bool, error) {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
var found bool
fileProgress := progress.GetImport().SequenceDetails[c.sourceID]
fileProgress := progress.GetImport().SequenceDetails[c.SourceID]
if fileProgress.SeqIdToChunks == nil {
return found, nil
}
var allocatedSeqChunks *jobspb.SequenceDetails_SequenceChunks
var ok bool
if allocatedSeqChunks, ok = fileProgress.SeqIdToChunks[int32(seqMetadata.seqDesc.GetID())]; !ok {
if allocatedSeqChunks, ok = fileProgress.SeqIdToChunks[int32(seqMetadata.SeqDesc.GetID())]; !ok {
return found, nil
}

for _, chunk := range allocatedSeqChunks.Chunks {
// We have found the chunk of sequence values that was assigned to the
// swath of rows encompassing rowID.
if chunk.ChunkStartRow <= c.rowID && chunk.NextChunkStartRow > c.rowID {
relativeRowIndex := c.rowID - chunk.ChunkStartRow
seqMetadata.curVal = chunk.ChunkStartVal + seqOpts.Increment*(seqMetadata.instancesPerRow*relativeRowIndex)
if chunk.ChunkStartRow <= c.RowID && chunk.NextChunkStartRow > c.RowID {
relativeRowIndex := c.RowID - chunk.ChunkStartRow
seqMetadata.CurVal = chunk.ChunkStartVal + seqOpts.Increment*(seqMetadata.InstancesPerRow*relativeRowIndex)
found = true
return found, nil
}
Expand All @@ -447,40 +447,40 @@ func (j *SeqChunkProvider) checkForPreviouslyAllocatedChunks(
func reserveChunkOfSeqVals(
evalCtx *eval.Context, c *CellInfoAnnotation, seqMetadata *SequenceMetadata, db *kv.DB,
) error {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
newChunkSize := int64(initialChunkSize)
// If we are allocating a subsequent chunk of sequence values, we attempt
// to reserve a factor of 10 more than reserved the last time so as to
// prevent clobbering the chunk reservation logic which involves writing
// to job progress.
if seqMetadata.curChunk != nil {
newChunkSize = chunkSizeIncrementRate * seqMetadata.curChunk.ChunkSize
if seqMetadata.CurChunk != nil {
newChunkSize = chunkSizeIncrementRate * seqMetadata.CurChunk.ChunkSize
if newChunkSize > maxChunkSize {
newChunkSize = maxChunkSize
}
}

// We want to encompass at least one complete row with our chunk
// allocation.
if newChunkSize < seqMetadata.instancesPerRow {
newChunkSize = seqMetadata.instancesPerRow
if newChunkSize < seqMetadata.InstancesPerRow {
newChunkSize = seqMetadata.InstancesPerRow
}

incrementValBy := newChunkSize * seqOpts.Increment
// incrementSequenceByVal keeps retrying until it is able to find a slot
// of incrementValBy.
seqVal, err := incrementSequenceByVal(evalCtx.Context, seqMetadata.seqDesc, db,
seqVal, err := incrementSequenceByVal(evalCtx.Context, seqMetadata.SeqDesc, db,
evalCtx.Codec, incrementValBy)
if err != nil {
return err
}

// Update the sequence metadata to reflect the newly reserved chunk.
seqMetadata.curChunk = &jobspb.SequenceValChunk{
seqMetadata.CurChunk = &jobspb.SequenceValChunk{
ChunkStartVal: seqVal - incrementValBy + seqOpts.Increment,
ChunkSize: newChunkSize,
ChunkStartRow: c.rowID,
NextChunkStartRow: c.rowID + (newChunkSize / seqMetadata.instancesPerRow),
ChunkStartRow: c.RowID,
NextChunkStartRow: c.RowID + (newChunkSize / seqMetadata.InstancesPerRow),
}
return nil
}
Expand Down Expand Up @@ -525,24 +525,24 @@ func importDefaultToDatabasePrimaryRegion(
func importNextValHelper(
evalCtx *eval.Context, c *CellInfoAnnotation, seqMetadata *SequenceMetadata,
) (tree.Datum, error) {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
if c.seqChunkProvider == nil {
return nil, errors.New("no sequence chunk provider configured for the import job")
}

// If the current importWorker does not have an active chunk for the sequence
// seqName, or the row we are processing is outside the range of rows covered
// by the active chunk, we need to request a chunk.
if seqMetadata.curChunk == nil || c.rowID == seqMetadata.curChunk.NextChunkStartRow {
if seqMetadata.CurChunk == nil || c.RowID == seqMetadata.CurChunk.NextChunkStartRow {
if err := c.seqChunkProvider.RequestChunk(evalCtx, c, seqMetadata); err != nil {
return nil, err
}
} else {
// The current chunk of sequence values can be used for the row being
// processed.
seqMetadata.curVal += seqOpts.Increment
seqMetadata.CurVal += seqOpts.Increment
}
return tree.NewDInt(tree.DInt(seqMetadata.curVal)), nil
return tree.NewDInt(tree.DInt(seqMetadata.CurVal)), nil
}

// Besides overriding, there are also counters that we want to keep track of as
Expand Down Expand Up @@ -644,7 +644,7 @@ var supportedImportFuncOverrides = map[string]*customFunc{
return errors.Newf("sequence %s not found in annotation", seqIdentifier.SeqName)
}
}
sequenceMetadata.instancesPerRow++
sequenceMetadata.InstancesPerRow++
return nil
},
override: makeBuiltinOverride(
Expand Down
26 changes: 14 additions & 12 deletions pkg/sql/row/expr_walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package row
package row_test

import (
"context"
Expand All @@ -18,9 +18,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {

evalCtx := &eval.Context{
Context: ctx,
Codec: keys.TODOSQLCodec,
Codec: s.ExecutorConfig().(sql.ExecutorConfig).Codec,
}

registry := s.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -187,22 +189,22 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
job := createMockImportJob(ctx, t, registry, test.allocatedChunks, test.resumePos)
j := &SeqChunkProvider{
j := &row.SeqChunkProvider{
Registry: registry, JobID: job.ID(), DB: db,
}
annot := &CellInfoAnnotation{
sourceID: 0,
rowID: test.rowID,
annot := &row.CellInfoAnnotation{
SourceID: 0,
RowID: test.rowID,
}

for id, val := range test.seqIDToExpectedVal {
seqDesc := createAndIncrementSeqDescriptor(ctx, t, id, keys.TODOSQLCodec,
test.incrementBy, test.seqIDToOpts[id], db)
seqMetadata := &SequenceMetadata{
seqDesc: seqDesc,
instancesPerRow: test.instancesPerRow,
curChunk: nil,
curVal: 0,
seqMetadata := &row.SequenceMetadata{
SeqDesc: seqDesc,
InstancesPerRow: test.instancesPerRow,
CurChunk: nil,
CurVal: 0,
}
require.NoError(t, j.RequestChunk(evalCtx, annot, seqMetadata))
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1`
Expand All @@ -214,7 +216,7 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {
chunks := progress.GetImport().SequenceDetails[0].SeqIdToChunks[int32(id)].Chunks

// Ensure that the sequence value for the row is what we expect.
require.Equal(t, val, seqMetadata.curVal)
require.Equal(t, val, seqMetadata.CurVal)
// Ensure we have as many chunks written to the job progress as we
// expect.
require.Equal(t, test.seqIDToNumChunks[id], len(chunks))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *DatumRowConverter) getSequenceAnnotation(
if seqDesc.GetSequenceOpts() == nil {
return errors.Errorf("relation %q (%d) is not a sequence", seqDesc.GetName(), seqDesc.GetID())
}
seqMetadata := &SequenceMetadata{seqDesc: seqDesc}
seqMetadata := &SequenceMetadata{SeqDesc: seqDesc}
seqNameToMetadata[seqDesc.GetName()] = seqMetadata
seqIDToMetadata[seqID] = seqMetadata
}
Expand Down