Skip to content

Commit

Permalink
row: remove TODOSQLCodec in TestJobBackedSeqChunkProvider
Browse files Browse the repository at this point in the history
Fixes: #82886

Release note: None
  • Loading branch information
adityamaru committed Jun 14, 2022
1 parent d82ac30 commit aa926db
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 53 deletions.
1 change: 1 addition & 0 deletions pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descpb",
Expand Down
80 changes: 40 additions & 40 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (pos importRandPosition) distance(o importRandPosition) int64 {
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
// We expect r.pos to increment by numInstances for each row. Therefore,
// assuming that rowID increments by 1 for every row, we will initialize the
// position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *importRand) maybeReseed(c *CellInfoAnnotation) {
// position, so if we skip some rows we need to reseed.
// We may skip rows because a single row converter may be responsible for
// converting several non-contiguous batches of KVs.
newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
newRowPos := getPosForRandImport(c.RowID, c.SourceID, c.randInstancePerRow)
rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow)
if rowsSkipped {
// Reseed at the new position, since our internally tracked r.pos is now out
Expand Down Expand Up @@ -148,10 +148,10 @@ func makeBuiltinOverride(
// SequenceMetadata contains information used when processing columns with
// default expressions which use sequences.
type SequenceMetadata struct {
seqDesc catalog.TableDescriptor
instancesPerRow int64
curChunk *jobspb.SequenceValChunk
curVal int64
SeqDesc catalog.TableDescriptor
InstancesPerRow int64
CurChunk *jobspb.SequenceValChunk
CurVal int64
}

type overrideVolatility bool
Expand All @@ -173,8 +173,8 @@ const cellInfoAddr tree.AnnotationIdx = iota + 1
// CellInfoAnnotation encapsulates the AST annotation for the various supported
// default expressions for import.
type CellInfoAnnotation struct {
sourceID int32
rowID int64
SourceID int32
RowID int64

// Annotations for unique_rowid().
uniqueRowIDInstance int
Expand All @@ -196,13 +196,13 @@ func getCellInfoAnnotation(t *tree.Annotations) *CellInfoAnnotation {
}

func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) {
c.sourceID = sourceID
c.rowID = rowID
c.SourceID = sourceID
c.RowID = rowID
c.uniqueRowIDInstance = 0
}

func makeImportRand(c *CellInfoAnnotation) randomSource {
pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
pos := getPosForRandImport(c.RowID, c.SourceID, c.randInstancePerRow)
randSource := &importRand{}
randSource.reseed(pos)
return randSource
Expand Down Expand Up @@ -246,8 +246,8 @@ func makeImportRand(c *CellInfoAnnotation) randomSource {
func importUniqueRowID(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
avoidCollisionsWithSQLsIDs := uint64(1 << 63)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance)
returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex)
shiftedIndex := int64(c.uniqueRowIDTotal)*c.RowID + int64(c.uniqueRowIDInstance)
returnIndex := (uint64(c.SourceID) << rowIDBits) ^ uint64(shiftedIndex)
c.uniqueRowIDInstance++
evalCtx.Annotations.Set(cellInfoAddr, c)
return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil
Expand Down Expand Up @@ -318,11 +318,11 @@ func (j *SeqChunkProvider) RequestChunk(
// ensure correct behavior on job resumption.
// We never want to end up in a situation where row x is assigned a different
// sequence value on subsequent import job resumptions.
fileProgress := progress.GetImport().SequenceDetails[c.sourceID]
fileProgress := progress.GetImport().SequenceDetails[c.SourceID]
if fileProgress.SeqIdToChunks == nil {
fileProgress.SeqIdToChunks = make(map[int32]*jobspb.SequenceDetails_SequenceChunks)
}
seqID := seqMetadata.seqDesc.GetID()
seqID := seqMetadata.SeqDesc.GetID()
if _, ok := fileProgress.SeqIdToChunks[int32(seqID)]; !ok {
fileProgress.SeqIdToChunks[int32(seqID)] = &jobspb.SequenceDetails_SequenceChunks{
Chunks: make([]*jobspb.SequenceValChunk, 0),
Expand All @@ -331,7 +331,7 @@ func (j *SeqChunkProvider) RequestChunk(
// We can cleanup some of the older chunks which correspond to rows
// below the resume pos as we are never going to reprocess those
// check pointed rows on job resume.
resumePos := progress.GetImport().ResumePos[c.sourceID]
resumePos := progress.GetImport().ResumePos[c.SourceID]
trim, chunks := 0, fileProgress.SeqIdToChunks[int32(seqID)].Chunks
// If the resumePos is below the max bound of the current chunk we need
// to keep this chunk in case the job is re-resumed.
Expand All @@ -341,7 +341,7 @@ func (j *SeqChunkProvider) RequestChunk(
fileProgress.SeqIdToChunks[int32(seqID)].Chunks[trim:]

fileProgress.SeqIdToChunks[int32(seqID)].Chunks = append(
fileProgress.SeqIdToChunks[int32(seqID)].Chunks, seqMetadata.curChunk)
fileProgress.SeqIdToChunks[int32(seqID)].Chunks, seqMetadata.CurChunk)
ju.UpdateProgress(progress)
return nil
}
Expand All @@ -354,7 +354,7 @@ func (j *SeqChunkProvider) RequestChunk(
// Now that the job progress has been written to, we can use the newly
// allocated chunk.
if !foundFromPreviouslyAllocatedChunk {
seqMetadata.curVal = seqMetadata.curChunk.ChunkStartVal
seqMetadata.CurVal = seqMetadata.CurChunk.ChunkStartVal
}
return nil
})
Expand Down Expand Up @@ -417,24 +417,24 @@ func boundsExceededError(descriptor catalog.TableDescriptor) error {
func (j *SeqChunkProvider) checkForPreviouslyAllocatedChunks(
seqMetadata *SequenceMetadata, c *CellInfoAnnotation, progress *jobspb.Progress,
) (bool, error) {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
var found bool
fileProgress := progress.GetImport().SequenceDetails[c.sourceID]
fileProgress := progress.GetImport().SequenceDetails[c.SourceID]
if fileProgress.SeqIdToChunks == nil {
return found, nil
}
var allocatedSeqChunks *jobspb.SequenceDetails_SequenceChunks
var ok bool
if allocatedSeqChunks, ok = fileProgress.SeqIdToChunks[int32(seqMetadata.seqDesc.GetID())]; !ok {
if allocatedSeqChunks, ok = fileProgress.SeqIdToChunks[int32(seqMetadata.SeqDesc.GetID())]; !ok {
return found, nil
}

for _, chunk := range allocatedSeqChunks.Chunks {
// We have found the chunk of sequence values that was assigned to the
// swath of rows encompassing rowID.
if chunk.ChunkStartRow <= c.rowID && chunk.NextChunkStartRow > c.rowID {
relativeRowIndex := c.rowID - chunk.ChunkStartRow
seqMetadata.curVal = chunk.ChunkStartVal + seqOpts.Increment*(seqMetadata.instancesPerRow*relativeRowIndex)
if chunk.ChunkStartRow <= c.RowID && chunk.NextChunkStartRow > c.RowID {
relativeRowIndex := c.RowID - chunk.ChunkStartRow
seqMetadata.CurVal = chunk.ChunkStartVal + seqOpts.Increment*(seqMetadata.InstancesPerRow*relativeRowIndex)
found = true
return found, nil
}
Expand All @@ -447,40 +447,40 @@ func (j *SeqChunkProvider) checkForPreviouslyAllocatedChunks(
func reserveChunkOfSeqVals(
evalCtx *eval.Context, c *CellInfoAnnotation, seqMetadata *SequenceMetadata, db *kv.DB,
) error {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
newChunkSize := int64(initialChunkSize)
// If we are allocating a subsequent chunk of sequence values, we attempt
// to reserve a factor of 10 more than reserved the last time so as to
// prevent clobbering the chunk reservation logic which involves writing
// to job progress.
if seqMetadata.curChunk != nil {
newChunkSize = chunkSizeIncrementRate * seqMetadata.curChunk.ChunkSize
if seqMetadata.CurChunk != nil {
newChunkSize = chunkSizeIncrementRate * seqMetadata.CurChunk.ChunkSize
if newChunkSize > maxChunkSize {
newChunkSize = maxChunkSize
}
}

// We want to encompass at least one complete row with our chunk
// allocation.
if newChunkSize < seqMetadata.instancesPerRow {
newChunkSize = seqMetadata.instancesPerRow
if newChunkSize < seqMetadata.InstancesPerRow {
newChunkSize = seqMetadata.InstancesPerRow
}

incrementValBy := newChunkSize * seqOpts.Increment
// incrementSequenceByVal keeps retrying until it is able to find a slot
// of incrementValBy.
seqVal, err := incrementSequenceByVal(evalCtx.Context, seqMetadata.seqDesc, db,
seqVal, err := incrementSequenceByVal(evalCtx.Context, seqMetadata.SeqDesc, db,
evalCtx.Codec, incrementValBy)
if err != nil {
return err
}

// Update the sequence metadata to reflect the newly reserved chunk.
seqMetadata.curChunk = &jobspb.SequenceValChunk{
seqMetadata.CurChunk = &jobspb.SequenceValChunk{
ChunkStartVal: seqVal - incrementValBy + seqOpts.Increment,
ChunkSize: newChunkSize,
ChunkStartRow: c.rowID,
NextChunkStartRow: c.rowID + (newChunkSize / seqMetadata.instancesPerRow),
ChunkStartRow: c.RowID,
NextChunkStartRow: c.RowID + (newChunkSize / seqMetadata.InstancesPerRow),
}
return nil
}
Expand Down Expand Up @@ -525,24 +525,24 @@ func importDefaultToDatabasePrimaryRegion(
func importNextValHelper(
evalCtx *eval.Context, c *CellInfoAnnotation, seqMetadata *SequenceMetadata,
) (tree.Datum, error) {
seqOpts := seqMetadata.seqDesc.GetSequenceOpts()
seqOpts := seqMetadata.SeqDesc.GetSequenceOpts()
if c.seqChunkProvider == nil {
return nil, errors.New("no sequence chunk provider configured for the import job")
}

// If the current importWorker does not have an active chunk for the sequence
// seqName, or the row we are processing is outside the range of rows covered
// by the active chunk, we need to request a chunk.
if seqMetadata.curChunk == nil || c.rowID == seqMetadata.curChunk.NextChunkStartRow {
if seqMetadata.CurChunk == nil || c.RowID == seqMetadata.CurChunk.NextChunkStartRow {
if err := c.seqChunkProvider.RequestChunk(evalCtx, c, seqMetadata); err != nil {
return nil, err
}
} else {
// The current chunk of sequence values can be used for the row being
// processed.
seqMetadata.curVal += seqOpts.Increment
seqMetadata.CurVal += seqOpts.Increment
}
return tree.NewDInt(tree.DInt(seqMetadata.curVal)), nil
return tree.NewDInt(tree.DInt(seqMetadata.CurVal)), nil
}

// Besides overriding, there are also counters that we want to keep track of as
Expand Down Expand Up @@ -644,7 +644,7 @@ var supportedImportFuncOverrides = map[string]*customFunc{
return errors.Newf("sequence %s not found in annotation", seqIdentifier.SeqName)
}
}
sequenceMetadata.instancesPerRow++
sequenceMetadata.InstancesPerRow++
return nil
},
override: makeBuiltinOverride(
Expand Down
26 changes: 14 additions & 12 deletions pkg/sql/row/expr_walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package row
package row_test

import (
"context"
Expand All @@ -18,9 +18,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {

evalCtx := &eval.Context{
Context: ctx,
Codec: keys.TODOSQLCodec,
Codec: s.ExecutorConfig().(sql.ExecutorConfig).Codec,
}

registry := s.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -187,22 +189,22 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
job := createMockImportJob(ctx, t, registry, test.allocatedChunks, test.resumePos)
j := &SeqChunkProvider{
j := &row.SeqChunkProvider{
Registry: registry, JobID: job.ID(), DB: db,
}
annot := &CellInfoAnnotation{
sourceID: 0,
rowID: test.rowID,
annot := &row.CellInfoAnnotation{
SourceID: 0,
RowID: test.rowID,
}

for id, val := range test.seqIDToExpectedVal {
seqDesc := createAndIncrementSeqDescriptor(ctx, t, id, keys.TODOSQLCodec,
test.incrementBy, test.seqIDToOpts[id], db)
seqMetadata := &SequenceMetadata{
seqDesc: seqDesc,
instancesPerRow: test.instancesPerRow,
curChunk: nil,
curVal: 0,
seqMetadata := &row.SequenceMetadata{
SeqDesc: seqDesc,
InstancesPerRow: test.instancesPerRow,
CurChunk: nil,
CurVal: 0,
}
require.NoError(t, j.RequestChunk(evalCtx, annot, seqMetadata))
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1`
Expand All @@ -214,7 +216,7 @@ func TestJobBackedSeqChunkProvider(t *testing.T) {
chunks := progress.GetImport().SequenceDetails[0].SeqIdToChunks[int32(id)].Chunks

// Ensure that the sequence value for the row is what we expect.
require.Equal(t, val, seqMetadata.curVal)
require.Equal(t, val, seqMetadata.CurVal)
// Ensure we have as many chunks written to the job progress as we
// expect.
require.Equal(t, test.seqIDToNumChunks[id], len(chunks))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *DatumRowConverter) getSequenceAnnotation(
if seqDesc.GetSequenceOpts() == nil {
return errors.Errorf("relation %q (%d) is not a sequence", seqDesc.GetName(), seqDesc.GetID())
}
seqMetadata := &SequenceMetadata{seqDesc: seqDesc}
seqMetadata := &SequenceMetadata{SeqDesc: seqDesc}
seqNameToMetadata[seqDesc.GetName()] = seqMetadata
seqIDToMetadata[seqID] = seqMetadata
}
Expand Down

0 comments on commit aa926db

Please sign in to comment.