Skip to content

Commit

Permalink
Merge #59789
Browse files Browse the repository at this point in the history
59789: sql: add catalog.Column interface, replace catalog.TableDescriptor methods for columns r=postamar a=postamar

In an effort to not directly use `descpb.ColumnDescriptor` protos everywhere, this multi-commit PR:
1. introduces a new `catalog.Column` interface to encapsulate them, and a bunch of new methods in `catalog.TableDescriptor` to replace those which have `descpb.ColumnDescriptor` in their signatures, this is done in the first commit;
2. replaces calls to the old methods with calls to the new methods throughout the codebase in the subsequent commits.

This breakdown into multiple commits is done to ease review, most of the substantial changes are in the first commit, the others follow through with the change and so are quite noisy.

Fixes #59805. 

See also the parent issue #56306, as well as the related issue #57465 which tracks the same work I've already done for `descpb.IndexDescriptor`. In this PR I reused many of the same patterns I introduced in the corresponding PR #58678.

Subsequent work should consist in propagating this `descpb.ColumnDescriptor -> catalog.Column` change further throughout the codebase.

Co-authored-by: Marius Posta <marius@cockroachlabs.com>
  • Loading branch information
craig[bot] and Marius Posta committed Feb 5, 2021
2 parents a941442 + 655a28b commit 4516f63
Show file tree
Hide file tree
Showing 99 changed files with 1,556 additions and 1,467 deletions.
38 changes: 19 additions & 19 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5287,13 +5287,13 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
require.Equal(t, tableDesc.GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerTableID,
"unexpected table is sequence owner after restore",
)
require.Equal(t, tableDesc.GetPublicColumns()[0].ID, seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, tableDesc.PublicColumns()[0].GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column is sequence owner after restore",
)
require.Equal(t, 1, len(tableDesc.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 1, tableDesc.PublicColumns()[0].NumOwnsSequences(),
"unexpected number of sequences owned by d.t after restore",
)
require.Equal(t, seqDesc.GetID(), tableDesc.GetPublicColumns()[0].OwnsSequenceIds[0],
require.Equal(t, seqDesc.GetID(), tableDesc.PublicColumns()[0].GetOwnsSequenceID(0),
"unexpected ID of sequence owned by table d.t after restore",
)
})
Expand Down Expand Up @@ -5340,7 +5340,7 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {

tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "d", "t")

require.Equal(t, 0, len(tableDesc.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 0, tableDesc.PublicColumns()[0].NumOwnsSequences(),
"expected restored table to own 0 sequences",
)

Expand Down Expand Up @@ -5371,13 +5371,13 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
require.Equal(t, tableDesc.GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerTableID,
"unexpected table is sequence owner after restore",
)
require.Equal(t, tableDesc.GetPublicColumns()[0].ID, seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, tableDesc.PublicColumns()[0].GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column is sequence owner after restore",
)
require.Equal(t, 1, len(tableDesc.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 1, tableDesc.PublicColumns()[0].NumOwnsSequences(),
"unexpected number of sequences owned by d.t after restore",
)
require.Equal(t, seqDesc.GetID(), tableDesc.GetPublicColumns()[0].OwnsSequenceIds[0],
require.Equal(t, seqDesc.GetID(), tableDesc.PublicColumns()[0].GetOwnsSequenceID(0),
"unexpected ID of sequence owned by table d.t after restore",
)
})
Expand Down Expand Up @@ -5416,7 +5416,7 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
newDB.Exec(t, `RESTORE DATABASE d2 FROM $1 WITH skip_missing_sequence_owners`, backupLocD2D3)

tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "d2", "t")
require.Equal(t, 0, len(tableDesc.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 0, tableDesc.PublicColumns()[0].NumOwnsSequences(),
"expected restored table to own no sequences.",
)

Expand All @@ -5436,12 +5436,12 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
require.Equal(t, td.GetID(), sd.GetSequenceOpts().SequenceOwner.OwnerTableID,
"unexpected table owner for sequence seq2 after restore",
)
require.Equal(t, td.GetPublicColumns()[0].ID, sd.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, td.PublicColumns()[0].GetID(), sd.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column owner for sequence seq2 after restore")
require.Equal(t, 1, len(td.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 1, td.PublicColumns()[0].NumOwnsSequences(),
"unexpected number of sequences owned by d3.t after restore",
)
require.Equal(t, sd.GetID(), td.GetPublicColumns()[0].OwnsSequenceIds[0],
require.Equal(t, sd.GetID(), td.PublicColumns()[0].GetOwnsSequenceID(0),
"unexpected ID of sequences owned by d3.t",
)
})
Expand All @@ -5465,13 +5465,13 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
require.Equal(t, tableDesc.GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerTableID,
"unexpected table is sequence owner after restore",
)
require.Equal(t, tableDesc.GetPublicColumns()[0].ID, seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, tableDesc.PublicColumns()[0].GetID(), seqDesc.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column is sequence owner after restore",
)
require.Equal(t, 1, len(tableDesc.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 1, tableDesc.PublicColumns()[0].NumOwnsSequences(),
"unexpected number of sequences owned by d.t after restore",
)
require.Equal(t, seqDesc.GetID(), tableDesc.GetPublicColumns()[0].OwnsSequenceIds[0],
require.Equal(t, seqDesc.GetID(), tableDesc.PublicColumns()[0].GetOwnsSequenceID(0),
"unexpected ID of sequence owned by table d.t after restore",
)

Expand All @@ -5490,20 +5490,20 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
"unexpected table is sequence owner of d3.seq2 after restore",
)

require.Equal(t, td.GetPublicColumns()[0].ID, sd.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, td.PublicColumns()[0].GetID(), sd.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column is sequence owner of d2.seq after restore",
)
require.Equal(t, td.GetPublicColumns()[0].ID, sdSeq2.GetSequenceOpts().SequenceOwner.OwnerColumnID,
require.Equal(t, td.PublicColumns()[0].GetID(), sdSeq2.GetSequenceOpts().SequenceOwner.OwnerColumnID,
"unexpected column is sequence owner of d3.seq2 after restore",
)

require.Equal(t, 2, len(td.GetPublicColumns()[0].OwnsSequenceIds),
require.Equal(t, 2, td.PublicColumns()[0].NumOwnsSequences(),
"unexpected number of sequences owned by d3.t after restore",
)
require.Equal(t, sd.GetID(), td.GetPublicColumns()[0].OwnsSequenceIds[0],
require.Equal(t, sd.GetID(), td.PublicColumns()[0].GetOwnsSequenceID(0),
"unexpected ID of sequence owned by table d3.t after restore",
)
require.Equal(t, sdSeq2.GetID(), td.GetPublicColumns()[0].OwnsSequenceIds[1],
require.Equal(t, sdSeq2.GetID(), td.PublicColumns()[0].GetOwnsSequenceID(1),
"unexpected ID of sequence owned by table d3.t after restore",
)
})
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,14 @@ func indexToAvroSchema(
fieldIdxByName: make(map[string]int),
colIdxByFieldIdx: make(map[int]int),
}
colIdxByID := tableDesc.ColumnIdxMap()
colIdxByID := catalog.ColumnIDToOrdinalMap(tableDesc.PublicColumns())
for _, colID := range indexDesc.ColumnIDs {
colIdx, ok := colIdxByID.Get(colID)
if !ok {
return nil, errors.Errorf(`unknown column id: %d`, colID)
}
col := tableDesc.GetColumnAtIdx(colIdx)
field, err := columnDescToAvroSchema(col)
col := tableDesc.PublicColumns()[colIdx]
field, err := columnDescToAvroSchema(col.ColumnDesc())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,13 +465,12 @@ func tableToAvroSchema(
fieldIdxByName: make(map[string]int),
colIdxByFieldIdx: make(map[int]int),
}
for colIdx := range tableDesc.GetPublicColumns() {
col := tableDesc.GetColumnAtIdx(colIdx)
field, err := columnDescToAvroSchema(col)
for _, col := range tableDesc.PublicColumns() {
field, err := columnDescToAvroSchema(col.ColumnDesc())
if err != nil {
return nil, err
}
schema.colIdxByFieldIdx[len(schema.Fields)] = colIdx
schema.colIdxByFieldIdx[len(schema.Fields)] = col.Ordinal()
schema.fieldIdxByName[field.Name] = len(schema.Fields)
schema.Fields = append(schema.Fields, field)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ func parseValues(tableDesc catalog.TableDescriptor, values string) ([]rowenc.Enc
for _, rowTuple := range valuesClause.Rows {
var row rowenc.EncDatumRow
for colIdx, expr := range rowTuple {
col := tableDesc.GetColumnAtIdx(colIdx)
col := tableDesc.PublicColumns()[colIdx]
typedExpr, err := schemaexpr.SanitizeVarFreeExpr(
ctx, expr, col.Type, "avro", &semaCtx, tree.VolatilityStable)
ctx, expr, col.GetType(), "avro", &semaCtx, tree.VolatilityStable)
if err != nil {
return nil, err
}
datum, err := typedExpr.Eval(evalCtx)
if err != nil {
return nil, errors.Wrapf(err, "evaluating %s", typedExpr)
}
row = append(row, rowenc.DatumToEncDatum(col.Type, datum))
row = append(row, rowenc.DatumToEncDatum(col.GetType(), datum))
}
rows = append(rows, row)
}
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestAvroSchema(t *testing.T) {
colType := typ.SQLString()
tableDesc, err := parseTableDesc(`CREATE TABLE foo (pk INT PRIMARY KEY, a ` + colType + `)`)
require.NoError(t, err)
field, err := columnDescToAvroSchema(tableDesc.GetColumnAtIdx(1))
field, err := columnDescToAvroSchema(tableDesc.PublicColumns()[1].ColumnDesc())
require.NoError(t, err)
schema, err := json.Marshal(field.SchemaType)
require.NoError(t, err)
Expand Down
24 changes: 11 additions & 13 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (e *jsonEncoder) EncodeKey(_ context.Context, row encodeRow) ([]byte, error
}

func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) {
colIdxByID := row.tableDesc.ColumnIdxMap()
colIdxByID := catalog.ColumnIDToOrdinalMap(row.tableDesc.PublicColumns())
primaryIndex := row.tableDesc.GetPrimaryIndex()
jsonEntries := make([]interface{}, primaryIndex.NumColumns())
for i := 0; i < primaryIndex.NumColumns(); i++ {
Expand All @@ -152,8 +152,8 @@ func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) {
return nil, errors.Errorf(`unknown column id: %d`, colID)
}
datum := row.datums[idx]
datum, col := row.datums[idx], row.tableDesc.GetColumnAtIdx(idx)
if err := datum.EnsureDecoded(col.Type, &e.alloc); err != nil {
datum, col := row.datums[idx], row.tableDesc.PublicColumns()[idx]
if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil {
return nil, err
}
var err error
Expand All @@ -173,16 +173,15 @@ func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, err

var after map[string]interface{}
if !row.deleted {
columns := row.tableDesc.GetPublicColumns()
columns := row.tableDesc.PublicColumns()
after = make(map[string]interface{}, len(columns))
for i := range columns {
col := &columns[i]
for i, col := range columns {
datum := row.datums[i]
if err := datum.EnsureDecoded(col.Type, &e.alloc); err != nil {
if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil {
return nil, err
}
var err error
after[col.Name], err = tree.AsJSON(datum.Datum, time.UTC)
after[col.GetName()], err = tree.AsJSON(datum.Datum, time.UTC)
if err != nil {
return nil, err
}
Expand All @@ -191,16 +190,15 @@ func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, err

var before map[string]interface{}
if row.prevDatums != nil && !row.prevDeleted {
columns := row.prevTableDesc.GetPublicColumns()
columns := row.prevTableDesc.PublicColumns()
before = make(map[string]interface{}, len(columns))
for i := range columns {
col := &columns[i]
for i, col := range columns {
datum := row.prevDatums[i]
if err := datum.EnsureDecoded(col.Type, &e.alloc); err != nil {
if err := datum.EnsureDecoded(col.GetType(), &e.alloc); err != nil {
return nil, err
}
var err error
before[col.Name], err = tree.AsJSON(datum.Datum, time.UTC)
before[col.GetName()], err = tree.AsJSON(datum.Datum, time.UTC)
if err != nil {
return nil, err
}
Expand Down
30 changes: 17 additions & 13 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,30 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
// guaranteed that the tables have the same version. Additionally, these
// fetchers are always initialized with a single tabledesc.Immutable.
if rf, ok := c.fetchers[idVer]; ok &&
tableDesc.UserDefinedTypeColsHaveSameVersion(rf.GetTables()[0].(catalog.TableDescriptor)) {
catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTables()[0].(catalog.TableDescriptor)) {
return rf, nil
}
// TODO(dan): Allow for decoding a subset of the columns.
var colIdxMap catalog.TableColMap
var valNeededForCol util.FastIntSet
for colIdx := range tableDesc.GetPublicColumns() {
colIdxMap.Set(tableDesc.GetPublicColumns()[colIdx].ID, colIdx)
valNeededForCol.Add(colIdx)
for _, col := range tableDesc.PublicColumns() {
colIdxMap.Set(col.GetID(), col.Ordinal())
valNeededForCol.Add(col.Ordinal())
}

var rf row.Fetcher
rfArgs := row.FetcherTableArgs{
Spans: tableDesc.AllIndexSpans(c.codec),
Desc: tableDesc,
Index: tableDesc.GetPrimaryIndex().IndexDesc(),
ColIdxMap: colIdxMap,
IsSecondaryIndex: false,
Cols: make([]descpb.ColumnDescriptor, len(tableDesc.PublicColumns())),
ValNeededForCol: valNeededForCol,
}
for i, col := range tableDesc.PublicColumns() {
rfArgs.Cols[i] = *col.ColumnDesc()
}
if err := rf.Init(
context.TODO(),
c.codec,
Expand All @@ -169,15 +181,7 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
false, /* isCheck */
&c.a,
nil, /* memMonitor */
row.FetcherTableArgs{
Spans: tableDesc.AllIndexSpans(c.codec),
Desc: tableDesc,
Index: tableDesc.GetPrimaryIndex().IndexDesc(),
ColIdxMap: colIdxMap,
IsSecondaryIndex: false,
Cols: tableDesc.GetPublicColumns(),
ValNeededForCol: valNeededForCol,
},
rfArgs,
); err != nil {
return nil, err
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,14 @@ func (t *typeDependencyTracker) removeDependency(typeID, tableID descpb.ID) {
}

func (t *typeDependencyTracker) purgeTable(tbl catalog.TableDescriptor) {
if !tbl.ContainsUserDefinedTypes() {
return
}
for _, colOrd := range tbl.GetColumnOrdinalsWithUserDefinedTypes() {
colTyp := tbl.DeletableColumns()[colOrd].Type
t.removeDependency(typedesc.UserDefinedTypeOIDToID(colTyp.Oid()), tbl.GetID())
for _, col := range tbl.UserDefinedTypeColumns() {
t.removeDependency(typedesc.UserDefinedTypeOIDToID(col.GetType().Oid()), tbl.GetID())
}
}

func (t *typeDependencyTracker) ingestTable(tbl catalog.TableDescriptor) {
if !tbl.ContainsUserDefinedTypes() {
return
}
for _, colOrd := range tbl.GetColumnOrdinalsWithUserDefinedTypes() {
colTyp := tbl.DeletableColumns()[colOrd].Type
t.addDependency(typedesc.UserDefinedTypeOIDToID(colTyp.Oid()), tbl.GetID())
for _, col := range tbl.UserDefinedTypeColumns() {
t.addDependency(typedesc.UserDefinedTypeOIDToID(col.GetType().Oid()), tbl.GetID())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func dropColumnMutationExists(desc catalog.TableDescriptor) bool {
func newColumnBackfillComplete(e TableEvent) (res bool) {
// TODO(ajwerner): What is the case where the before has a backfill mutation
// and the After doesn't? What about other queued mutations?
return len(e.Before.GetPublicColumns()) < len(e.After.GetPublicColumns()) &&
return len(e.Before.PublicColumns()) < len(e.After.PublicColumns()) &&
e.Before.HasColumnBackfillMutation() && !e.After.HasColumnBackfillMutation()
}

func newColumnNoBackfill(e TableEvent) (res bool) {
return len(e.Before.GetPublicColumns()) < len(e.After.GetPublicColumns()) &&
return len(e.Before.PublicColumns()) < len(e.After.PublicColumns()) &&
!e.Before.HasColumnBackfillMutation()
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,29 +718,29 @@ func importPlanHook(
var intoCols []string
var isTargetCol = make(map[string]bool)
for _, name := range importStmt.IntoCols {
active, err := found.FindActiveColumnsByNames(tree.NameList{name})
active, err := tabledesc.FindPublicColumnsWithNames(found, tree.NameList{name})
if err != nil {
return errors.Wrap(err, "verifying target columns")
}

isTargetCol[active[0].Name] = true
intoCols = append(intoCols, active[0].Name)
isTargetCol[active[0].GetName()] = true
intoCols = append(intoCols, active[0].GetName())
}

// Ensure that non-target columns that don't have default
// expressions are nullable.
if len(isTargetCol) != 0 {
for _, col := range found.VisibleColumns() {
if !(isTargetCol[col.Name] || col.Nullable || col.HasDefault() || col.IsComputed()) {
if !(isTargetCol[col.GetName()] || col.IsNullable() || col.HasDefault() || col.IsComputed()) {
return errors.Newf(
"all non-target columns in IMPORT INTO must be nullable "+
"or have default expressions, or have computed expressions"+
" but violated by column %q",
col.Name,
col.GetName(),
)
}
if isTargetCol[col.Name] && col.IsComputed() {
return schemaexpr.CannotWriteToComputedColError(col.Name)
if isTargetCol[col.GetName()] && col.IsComputed() {
return schemaexpr.CannotWriteToComputedColError(col.GetName())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func newImportAvroPipeline(
) (importRowProducer, importRowConsumer, error) {
fieldIdxByName := make(map[string]int)
for idx, col := range avro.importContext.tableDesc.VisibleColumns() {
fieldIdxByName[col.Name] = idx
fieldIdxByName[col.GetName()] = idx
}

consumer := &avroConsumer{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func newPgDumpReader(
targetCols[i] = tree.Name(colName)
}
for i, col := range tableDesc.VisibleColumns() {
colSubMap[col.Name] = i
colSubMap[col.GetName()] = i
}
conv, err := row.NewDatumRowConverter(ctx, tableDesc, targetCols, evalCtx, kvCh,
nil /* seqChunkProvider */)
Expand Down
Loading

0 comments on commit 4516f63

Please sign in to comment.