Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58273: sql: add catalog.Index interface, replace catalog.TableDescriptor methods for indexes r=postamar a=postamar

In an effort to not directly use `descpb.IndexDescriptor protos` everywhere, this multi-commit PR:
1. introduces a new `catalog.Index` interface to encapsulate them, and a bunch of new methods in `catalog.TableDescriptor` to replace those which have `descpb.IndexDescriptor` in their signatures, this is done in the first commit;
2. replaces calls to the old methods with calls to the new methods throughout the codebase, this is done one method at a time in the subsequent commits.

This breakdown into multiple commits is done to ease review, after all most of the contents of this diff are effectively little more than noise. Still, there are a few non-straightforward changes, but I figured they were worth it, though. 

This PR is motivated by cockroachdb#57465, see also its parent issue cockroachdb#56306 for more details. 

Subsequent work should consist in propagating this `descpb.IndexDescriptor -> catalog.Index` change further throughout the codebase.

Co-authored-by: Marius Posta <marius@cockroachlabs.com>
  • Loading branch information
craig[bot] and Marius Posta committed Jan 7, 2021
2 parents d8b5cb0 + 5a09894 commit c4947ad
Show file tree
Hide file tree
Showing 99 changed files with 2,300 additions and 1,285 deletions.
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ func getLogicallyMergedTableSpans(
checkForKVInBounds func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error),
) ([]roachpb.Span, error) {
var nonDropIndexIDs []descpb.IndexID
if err := table.ForeachNonDropIndex(func(idxDesc *descpb.IndexDescriptor) error {
key := tableAndIndex{tableID: table.GetID(), indexID: idxDesc.ID}
if err := catalog.ForEachNonDropIndex(table, func(idx catalog.Index) error {
key := tableAndIndex{tableID: table.GetID(), indexID: idx.GetID()}
if added[key] {
return nil
}
added[key] = true
nonDropIndexIDs = append(nonDropIndexIDs, idxDesc.ID)
nonDropIndexIDs = append(nonDropIndexIDs, idx.GetID())
return nil
}); err != nil {
return nil, err
Expand Down Expand Up @@ -225,11 +225,11 @@ func getLogicallyMergedTableSpans(
lhsSpan := table.IndexSpan(codec, lhsIndexID)
rhsSpan := table.IndexSpan(codec, rhsIndexID)

lhsIndex, err := table.FindIndexByID(lhsIndexID)
lhsIndex, err := table.FindIndexWithID(lhsIndexID)
if err != nil {
return nil, err
}
rhsIndex, err := table.FindIndexByID(rhsIndexID)
rhsIndex, err := table.FindIndexWithID(rhsIndexID)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,11 +833,11 @@ func spansForAllRestoreTableIndexes(
added := make(map[tableAndIndex]bool, len(tables))
sstIntervalTree := interval.NewTree(interval.ExclusiveOverlapper)
for _, table := range tables {
for _, index := range table.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.ID)), false); err != nil {
for _, index := range table.NonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.GetID())), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[tableAndIndex{tableID: table.GetID(), indexID: index.ID}] = true
added[tableAndIndex{tableID: table.GetID(), indexID: index.GetID()}] = true
}
}
// If there are desc revisions, ensure that we also add any index spans
Expand All @@ -853,10 +853,10 @@ func spansForAllRestoreTableIndexes(
rawTbl := descpb.TableFromDescriptor(rev.Desc, hlc.Timestamp{})
if rawTbl != nil && rawTbl.State != descpb.DescriptorState_DROP {
tbl := tabledesc.NewImmutable(*rawTbl)
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
for _, idx := range tbl.NonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.GetID()}
if !added[key] {
if err := sstIntervalTree.Insert(intervalSpan(tbl.IndexSpan(codec, idx.ID)), false); err != nil {
if err := sstIntervalTree.Insert(intervalSpan(tbl.IndexSpan(codec, idx.GetID())), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[key] = true
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,8 @@ func RewriteTableDescs(
return err
}

if err := table.ForeachNonDropIndex(func(index *descpb.IndexDescriptor) error {
if err := catalog.ForEachNonDropIndex(table, func(indexI catalog.Index) error {
index := indexI.IndexDesc()
// Verify that for any interleaved index being restored, the interleave
// parent is also being restored. Otherwise, the interleave entries in the
// restored IndexDescriptors won't have anything to point to.
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,17 +668,19 @@ func ensureInterleavesIncluded(tables []catalog.TableDescriptor) error {
}

for _, table := range tables {
if err := table.ForeachIndex(catalog.IndexOpts{
if err := catalog.ForEachIndex(table, catalog.IndexOpts{
AddMutations: true,
}, func(index *descpb.IndexDescriptor, _ bool) error {
for _, a := range index.Interleave.Ancestors {
}, func(index catalog.Index) error {
for i := 0; i < index.NumInterleaveAncestors(); i++ {
a := index.GetInterleaveAncestor(i)
if !inBackup[a.TableID] {
return errors.Errorf(
"cannot backup table %q without interleave parent (ID %d)", table.GetName(), a.TableID,
)
}
}
for _, c := range index.InterleavedBy {
for i := 0; i < index.NumInterleavedBy(); i++ {
c := index.GetInterleavedBy(i)
if !inBackup[c.Table] {
return errors.Errorf(
"cannot backup table %q without interleave child table (ID %d)", table.GetName(), c.Table,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestAvroSchema(t *testing.T) {
`{"type":["null","long"],"name":"_u0001f366_","default":null,`+
`"__crdb__":"🍦 INT8 NOT NULL"}]}`,
tableSchema.codec.Schema())
indexSchema, err := indexToAvroSchema(tableDesc, tableDesc.GetPrimaryIndex())
indexSchema, err := indexToAvroSchema(tableDesc, tableDesc.GetPrimaryIndex().IndexDesc())
require.NoError(t, err)
require.Equal(t,
`{"type":"record","name":"_u2603_","fields":[`+
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ func (e *jsonEncoder) EncodeKey(_ context.Context, row encodeRow) ([]byte, error

func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) {
colIdxByID := row.tableDesc.ColumnIdxMap()
jsonEntries := make([]interface{}, len(row.tableDesc.GetPrimaryIndex().ColumnIDs))
for i, colID := range row.tableDesc.GetPrimaryIndex().ColumnIDs {
primaryIndex := row.tableDesc.GetPrimaryIndex()
jsonEntries := make([]interface{}, primaryIndex.NumColumns())
for i := 0; i < primaryIndex.NumColumns(); i++ {
colID := primaryIndex.GetColumnID(i)
idx, ok := colIdxByID.Get(colID)
if !ok {
return nil, errors.Errorf(`unknown column id: %d`, colID)
Expand Down Expand Up @@ -342,7 +344,7 @@ func (e *confluentAvroEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]
registered, ok := e.keyCache[cacheKey]
if !ok {
var err error
registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex())
registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex().IndexDesc())
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *rowFetcherCache) TableDescForKey(
}

// Skip over the column data.
for ; skippedCols < len(tableDesc.GetPrimaryIndex().ColumnIDs); skippedCols++ {
for ; skippedCols < tableDesc.GetPrimaryIndex().NumColumns(); skippedCols++ {
l, err := encoding.PeekLength(remaining)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
row.FetcherTableArgs{
Spans: tableDesc.AllIndexSpans(c.codec),
Desc: tableDesc,
Index: tableDesc.GetPrimaryIndex(),
Index: tableDesc.GetPrimaryIndex().IndexDesc(),
ColIdxMap: colIdxMap,
IsSecondaryIndex: false,
Cols: tableDesc.Columns,
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -190,11 +191,10 @@ func makeInputConverter(
}

if singleTable != nil {
indexes := singleTable.DeletableIndexes()
for _, idx := range indexes {
if idx.IsPartial() {
return nil, unimplemented.NewWithIssue(50225, "cannot import into table with partial indexes")
}
if idx := catalog.FindDeletableNonPrimaryIndex(singleTable, func(idx catalog.Index) bool {
return idx.IsPartial()
}); idx != nil {
return nil, unimplemented.NewWithIssue(50225, "cannot import into table with partial indexes")
}

// If we're using a format like CSV where data columns are not "named", and
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
PARTITION p2 VALUES IN (2)
)`)
tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv")
indexDesc, _, err := tableDesc.FindIndexByName("i")
index, err := tableDesc.FindIndexWithName("i")
if err != nil {
t.Fatal(err)
}
indexSpan := tableDesc.IndexSpan(keys.SystemSQLCodec, indexDesc.ID)
indexSpan := tableDesc.IndexSpan(keys.SystemSQLCodec, index.GetID())
tests.CheckKeyCount(t, kvDB, indexSpan, numRows)

// Set zone configs on the primary index, secondary index, and one partition
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
}
}
tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv")
if _, _, err := tableDesc.FindIndexByName("i"); err == nil {
if _, err := tableDesc.FindIndexWithName("i"); err == nil {
t.Fatalf("table descriptor still contains index after index is dropped")
}
close(asyncNotification)
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/partitionccl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,11 @@ func selectPartitionExprs(

a := &rowenc.DatumAlloc{}
var prefixDatums []tree.Datum
if err := tableDesc.ForeachIndex(catalog.IndexOpts{
if err := catalog.ForEachIndex(tableDesc, catalog.IndexOpts{
AddMutations: true,
}, func(idxDesc *descpb.IndexDescriptor, _ bool) error {
genExpr := true
}, func(idx catalog.Index) error {
return selectPartitionExprsByName(
a, evalCtx, tableDesc, idxDesc, &idxDesc.Partitioning, prefixDatums, exprsByPartName, genExpr)
a, evalCtx, tableDesc, idx.IndexDesc(), &idx.IndexDesc().Partitioning, prefixDatums, exprsByPartName, true /* genExpr */)
}); err != nil {
return nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,21 @@ func (pt *partitioningTest) parse() error {
if !strings.HasPrefix(indexName, "@") {
panic(errors.Errorf("unsupported config: %s", c))
}
idxDesc, _, err := pt.parsed.tableDesc.FindIndexByName(indexName[1:])
idx, err := pt.parsed.tableDesc.FindIndexWithName(indexName[1:])
if err != nil {
return errors.Wrapf(err, "could not find index %s", indexName)
}
subzone.IndexID = uint32(idxDesc.ID)
subzone.IndexID = uint32(idx.GetID())
if len(constraints) > 0 {
if subzone.PartitionName == "" {
fmt.Fprintf(&zoneConfigStmts,
`ALTER INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]';`,
pt.parsed.tableName, idxDesc.Name, constraints,
pt.parsed.tableName, idx.GetName(), constraints,
)
} else {
fmt.Fprintf(&zoneConfigStmts,
`ALTER PARTITION %s OF INDEX %s@%s CONFIGURE ZONE USING constraints = '[%s]';`,
subzone.PartitionName, pt.parsed.tableName, idxDesc.Name, constraints,
subzone.PartitionName, pt.parsed.tableName, idx.GetName(), constraints,
)
}
}
Expand Down Expand Up @@ -1326,23 +1326,23 @@ func TestRepartitioning(t *testing.T) {
}
sqlDB.Exec(t, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", test.old.parsed.tableName, test.new.parsed.tableName))

testIndex, _, err := test.new.parsed.tableDesc.FindIndexByName(test.index)
testIndex, err := test.new.parsed.tableDesc.FindIndexWithName(test.index)
if err != nil {
t.Fatalf("%+v", err)
}

var repartition bytes.Buffer
if testIndex.ID == test.new.parsed.tableDesc.GetPrimaryIndexID() {
if testIndex.GetID() == test.new.parsed.tableDesc.GetPrimaryIndexID() {
fmt.Fprintf(&repartition, `ALTER TABLE %s `, test.new.parsed.tableName)
} else {
fmt.Fprintf(&repartition, `ALTER INDEX %s@%s `, test.new.parsed.tableName, testIndex.Name)
fmt.Fprintf(&repartition, `ALTER INDEX %s@%s `, test.new.parsed.tableName, testIndex.GetName())
}
if testIndex.Partitioning.NumColumns == 0 {
if testIndex.GetPartitioning().NumColumns == 0 {
repartition.WriteString(`PARTITION BY NOTHING`)
} else {
if err := sql.ShowCreatePartitioning(
&rowenc.DatumAlloc{}, keys.SystemSQLCodec, test.new.parsed.tableDesc, testIndex,
&testIndex.Partitioning, &repartition, 0 /* indent */, 0, /* colOffset */
&rowenc.DatumAlloc{}, keys.SystemSQLCodec, test.new.parsed.tableDesc, testIndex.IndexDesc(),
&testIndex.IndexDesc().Partitioning, &repartition, 0 /* indent */, 0, /* colOffset */
); err != nil {
t.Fatalf("%+v", err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/partitionccl/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,14 @@ func TestGenerateSubzoneSpans(t *testing.T) {
var actual []string
for _, span := range spans {
subzone := test.parsed.subzones[span.SubzoneIndex]
idxDesc, err := test.parsed.tableDesc.FindIndexByID(descpb.IndexID(subzone.IndexID))
idx, err := test.parsed.tableDesc.FindIndexWithID(descpb.IndexID(subzone.IndexID))
if err != nil {
t.Fatalf("could not find index with ID %d: %+v", subzone.IndexID, err)
}

directions := []encoding.Direction{encoding.Ascending /* index ID */}
for _, cd := range idxDesc.ColumnDirections {
for i := 0; i < idx.NumColumns(); i++ {
cd := idx.GetColumnDirection(i)
ed, err := cd.ToEncodingDirection()
if err != nil {
t.Fatal(err)
Expand All @@ -319,7 +320,7 @@ func TestGenerateSubzoneSpans(t *testing.T) {
if len(subzone.PartitionName) > 0 {
subzoneShort = "." + subzone.PartitionName
} else {
subzoneShort = "@" + idxDesc.Name
subzoneShort = "@" + idx.GetName()
}

// Verify that we're always doing the space savings when we can.
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/storageccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func MakeKeyRewriter(descs map[descpb.ID]*tabledesc.Immutable) (*KeyRewriter, er
// The PrefixEnd() of index 1 is the same as the prefix of index 2, so use a
// map to avoid duplicating entries.

for _, index := range desc.AllNonDropIndexes() {
oldPrefix := roachpb.Key(makeKeyRewriterPrefixIgnoringInterleaved(oldID, index.ID))
newPrefix := roachpb.Key(makeKeyRewriterPrefixIgnoringInterleaved(desc.ID, index.ID))
for _, index := range desc.NonDropIndexes() {
oldPrefix := roachpb.Key(makeKeyRewriterPrefixIgnoringInterleaved(oldID, index.GetID()))
newPrefix := roachpb.Key(makeKeyRewriterPrefixIgnoringInterleaved(desc.ID, index.GetID()))
if !seenPrefixes[string(oldPrefix)] {
seenPrefixes[string(oldPrefix)] = true
prefixes.rewrites = append(prefixes.rewrites, prefixRewrite{
Expand Down Expand Up @@ -190,22 +190,22 @@ func (kr *KeyRewriter) RewriteKey(key []byte, isFromSpan bool) ([]byte, bool, er
// If there isn't any more data, we are at some split boundary.
return key, true, nil
}
idx, err := desc.FindIndexByID(descpb.IndexID(indexID))
idx, err := desc.FindIndexWithID(descpb.IndexID(indexID))
if err != nil {
return nil, false, err
}
if len(idx.InterleavedBy) == 0 {
if idx.NumInterleavedBy() == 0 {
// Not interleaved.
return key, true, nil
}
// We do not support interleaved secondary indexes.
if idx.ID != desc.GetPrimaryIndexID() {
if !idx.Primary() {
return nil, false, errors.New("restoring interleaved secondary indexes not supported")
}
colIDs, _ := idx.FullColumnIDs()
colIDs, _ := idx.IndexDesc().FullColumnIDs()
var skipCols int
for _, ancestor := range idx.Interleave.Ancestors {
skipCols += int(ancestor.SharedPrefixLen)
for i := 0; i < idx.NumInterleaveAncestors(); i++ {
skipCols += int(idx.GetInterleaveAncestor(i).SharedPrefixLen)
}
for i := 0; i < len(colIDs)-skipCols; i++ {
n, err := encoding.PeekLength(k)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/settingsworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func processSystemConfigKVs(
{
types := []*types.T{tbl.Columns[0].Type}
nameRow := make([]rowenc.EncDatum, 1)
_, matches, _, err := rowenc.DecodeIndexKey(codec, tbl, tbl.GetPrimaryIndex(), types, nameRow, nil, kv.Key)
_, matches, _, err := rowenc.DecodeIndexKey(codec, tbl, tbl.GetPrimaryIndex().IndexDesc(), types, nameRow, nil, kv.Key)
if err != nil {
return errors.Wrap(err, "failed to decode key")
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/alter_column_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,14 @@ func alterColumnTypeGeneral(

// Disallow ALTER COLUMN TYPE general for columns that are
// part of indexes.
for _, idx := range tableDesc.AllNonDropIndexes() {
for _, id := range append(idx.ColumnIDs, idx.ExtraColumnIDs...) {
if col.ID == id {
for _, idx := range tableDesc.NonDropIndexes() {
for i := 0; i < idx.NumColumns(); i++ {
if idx.GetColumnID(i) == col.ID {
return colInIndexNotSupportedErr
}
}
for i := 0; i < idx.NumExtraColumns(); i++ {
if idx.GetExtraColumnID(i) == col.ID {
return colInIndexNotSupportedErr
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/alter_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode,
// different copy than the one in the tableDesc. To make it easier for the
// code below, get a pointer to the index descriptor that's actually in
// tableDesc.
indexDesc, err = tableDesc.FindIndexByID(indexDesc.ID)
index, err := tableDesc.FindIndexWithID(indexDesc.ID)
if err != nil {
return nil, err
}
return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil
return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: index.IndexDesc()}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
Expand Down
Loading

0 comments on commit c4947ad

Please sign in to comment.