Skip to content

Commit

Permalink
sql: remove deprecated column slice methods in TableDescriptor
Browse files Browse the repository at this point in the history
Previously, these methods would be called on a table descriptor interface
to obtain []descpb.ColumnDescriptor values.

This patch removes these calls, along with the method definitions, in
favour of new methods which use the catalog.Column interface type instead.

GetPublicColumns() is treated in a separate commit owing to its large
number of call sites.

Fixes cockroachdb#59805.

Release note: None
  • Loading branch information
Marius Posta committed Feb 5, 2021
1 parent f3ffee8 commit 473f9df
Show file tree
Hide file tree
Showing 22 changed files with 145 additions and 267 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func makeInputConverter(
// suffix of the columns are computed, and then expect the data file to have
// #(visible columns) - #(computed columns).
if len(singleTableTargetCols) == 0 && !formatHasNamedColumns(spec.Format.Format) {
for _, col := range singleTable.VisibleColumns() {
for _, col := range singleTable.VisibleColumnsNew() {
if col.IsComputed() {
return nil, unimplemented.NewWithIssueDetail(56002, "import.computed",
"to use computed columns, use IMPORT INTO")
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,17 +730,17 @@ func importPlanHook(
// Ensure that non-target columns that don't have default
// expressions are nullable.
if len(isTargetCol) != 0 {
for _, col := range found.VisibleColumns() {
if !(isTargetCol[col.Name] || col.Nullable || col.HasDefault() || col.IsComputed()) {
for _, col := range found.VisibleColumnsNew() {
if !(isTargetCol[col.GetName()] || col.IsNullable() || col.HasDefault() || col.IsComputed()) {
return errors.Newf(
"all non-target columns in IMPORT INTO must be nullable "+
"or have default expressions, or have computed expressions"+
" but violated by column %q",
col.Name,
col.GetName(),
)
}
if isTargetCol[col.Name] && col.IsComputed() {
return schemaexpr.CannotWriteToComputedColError(col.Name)
if isTargetCol[col.GetName()] && col.IsComputed() {
return schemaexpr.CannotWriteToComputedColError(col.GetName())
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ func newImportAvroPipeline(
avro *avroInputReader, input *fileReader,
) (importRowProducer, importRowConsumer, error) {
fieldIdxByName := make(map[string]int)
for idx, col := range avro.importContext.tableDesc.VisibleColumns() {
fieldIdxByName[col.Name] = idx
for idx, col := range avro.importContext.tableDesc.VisibleColumnsNew() {
fieldIdxByName[col.GetName()] = idx
}

consumer := &avroConsumer{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newCSVInputReader(
) *csvInputReader {
numExpectedDataCols := len(targetCols)
if numExpectedDataCols == 0 {
numExpectedDataCols = len(tableDesc.VisibleColumns())
numExpectedDataCols = len(tableDesc.VisibleColumnsNew())
}

return &csvInputReader{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,8 @@ func newPgDumpReader(
for i, colName := range table.TargetCols {
targetCols[i] = tree.Name(colName)
}
for i, col := range tableDesc.VisibleColumns() {
colSubMap[col.Name] = i
for i, col := range tableDesc.VisibleColumnsNew() {
colSubMap[col.GetName()] = i
}
conv, err := row.NewDatumRowConverter(ctx, tableDesc, targetCols, evalCtx, kvCh,
nil /* seqChunkProvider */)
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/catalog/colinfo/column_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -30,20 +29,20 @@ import (
// mutations are added.
func ProcessTargetColumns(
tableDesc catalog.TableDescriptor, nameList tree.NameList, ensureColumns, allowMutations bool,
) ([]descpb.ColumnDescriptor, error) {
) ([]catalog.Column, error) {
if len(nameList) == 0 {
if ensureColumns {
// VisibleColumns is used here to prevent INSERT INTO <table> VALUES (...)
// (as opposed to INSERT INTO <table> (...) VALUES (...)) from writing
// hidden columns. At present, the only hidden column is the implicit rowid
// primary key column.
return tableDesc.VisibleColumns(), nil
return tableDesc.VisibleColumnsNew(), nil
}
return nil, nil
}

var colIDSet catalog.TableColSet
cols := make([]descpb.ColumnDescriptor, len(nameList))
cols := make([]catalog.Column, len(nameList))
for i, colName := range nameList {
col, err := tableDesc.FindColumnWithName(colName)
if err != nil {
Expand All @@ -58,7 +57,7 @@ func ProcessTargetColumns(
"multiple assignments to the same column %q", &nameList[i])
}
colIDSet.Add(col.GetID())
cols[i] = *col.ColumnDesc()
cols[i] = col
}

return cols, nil
Expand Down
9 changes: 1 addition & 8 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,9 @@ type TableDescriptor interface {
GetPublicColumns() []descpb.ColumnDescriptor // deprecated
NamesForColumnIDs(ids descpb.ColumnIDs) ([]string, error)
ColumnIdxMap() TableColMap
GetColumnAtIdx(idx int) *descpb.ColumnDescriptor // deprecated
AllNonDropColumns() []descpb.ColumnDescriptor // deprecated
VisibleColumns() []descpb.ColumnDescriptor // deprecated
ColumnsWithMutations(includeMutations bool) []descpb.ColumnDescriptor // deprecated
GetColumnAtIdx(idx int) *descpb.ColumnDescriptor // deprecated
ColumnIdxMapWithMutations(includeMutations bool) TableColMap
DeletableColumns() []descpb.ColumnDescriptor // deprecated
MutationColumns() []descpb.ColumnDescriptor // deprecated
ContainsUserDefinedTypes() bool
WritableColumns() []descpb.ColumnDescriptor // deprecated
ReadableColumns() []descpb.ColumnDescriptor // deprecated
GetNextColumnID() descpb.ColumnID
ColumnTypes() []*types.T
ColumnTypesWithMutations(mutations bool) []*types.T
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/catalog/schemaexpr/default_exprs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@ func ProcessColumnSet(

// Add all public or columns in DELETE_AND_WRITE_ONLY state
// that satisfy the condition.
writable := tableDesc.WritableColumns()
for i := range writable {
col := &writable[i]
if inSet(col) {
if !colIDSet.Contains(col.ID) {
colIDSet.Add(col.ID)
cols = append(cols, *col)
for _, col := range tableDesc.WritableColumnsNew() {
if inSet(col.ColumnDesc()) {
if !colIDSet.Contains(col.GetID()) {
colIDSet.Add(col.GetID())
cols = append(cols, *col.ColumnDesc())
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/catalog/schemaexpr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ func DequalifyAndValidateExpr(
tn *tree.TableName,
) (string, catalog.TableColSet, error) {
var colIDs catalog.TableColSet
nonDropColumns := desc.NonDropColumnsNew()
nonDropColumnDescs := make([]descpb.ColumnDescriptor, len(nonDropColumns))
for i, col := range nonDropColumns {
nonDropColumnDescs[i] = *col.ColumnDesc()
}

sourceInfo := colinfo.NewSourceInfoForSingleTable(
*tn, colinfo.ResultColumnsFromColDescs(
desc.GetID(),
desc.AllNonDropColumns(),
nonDropColumnDescs,
),
)
expr, err := dequalifyColumnRefs(ctx, sourceInfo, expr)
Expand Down
129 changes: 46 additions & 83 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,6 @@ func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
return len(desc.Families), nil
}

// AllNonDropColumns returns all the columns, including those being added
// in the mutations.
func (desc *wrapper) AllNonDropColumns() []descpb.ColumnDescriptor {
cols := make([]descpb.ColumnDescriptor, 0, len(desc.Columns)+len(desc.Mutations))
cols = append(cols, desc.Columns...)
for _, m := range desc.Mutations {
if col := m.GetColumn(); col != nil {
if m.Direction == descpb.DescriptorMutation_ADD {
cols = append(cols, *col)
}
}
}
return cols
}

// buildIndexName sets desc.Name to a value that is not EqualName to any
// of tableDesc's indexes. allocateName roughly follows PostgreSQL's
// convention for automatically-named indexes.
Expand Down Expand Up @@ -1928,47 +1913,45 @@ func (desc *wrapper) ValidateTable(ctx context.Context) error {
func (desc *wrapper) validateColumns(
columnNames map[string]descpb.ColumnID, columnIDs map[descpb.ColumnID]*descpb.ColumnDescriptor,
) error {
colDescs := desc.AllNonDropColumns()
for colIdx := range colDescs {
column := &colDescs[colIdx]
for _, column := range desc.NonDropColumnsNew() {

if err := catalog.ValidateName(column.Name, "column"); err != nil {
if err := catalog.ValidateName(column.GetName(), "column"); err != nil {
return err
}
if column.ID == 0 {
return errors.AssertionFailedf("invalid column ID %d", errors.Safe(column.ID))
if column.GetID() == 0 {
return errors.AssertionFailedf("invalid column ID %d", errors.Safe(column.GetID()))
}

if _, columnNameExists := columnNames[column.Name]; columnNameExists {
if _, columnNameExists := columnNames[column.GetName()]; columnNameExists {
for i := range desc.Columns {
if desc.Columns[i].Name == column.Name {
if desc.Columns[i].Name == column.GetName() {
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate column name: %q", column.Name)
"duplicate column name: %q", column.GetName())
}
}
return pgerror.Newf(pgcode.DuplicateColumn,
"duplicate: column %q in the middle of being added, not yet public", column.Name)
"duplicate: column %q in the middle of being added, not yet public", column.GetName())
}
if colinfo.IsSystemColumnName(column.Name) {
if colinfo.IsSystemColumnName(column.GetName()) {
return pgerror.Newf(pgcode.DuplicateColumn,
"column name %q conflicts with a system column name", column.Name)
"column name %q conflicts with a system column name", column.GetName())
}
columnNames[column.Name] = column.ID
columnNames[column.GetName()] = column.GetID()

if other, ok := columnIDs[column.ID]; ok {
if other, ok := columnIDs[column.GetID()]; ok {
return fmt.Errorf("column %q duplicate ID of column %q: %d",
column.Name, other.Name, column.ID)
column.GetName(), other.Name, column.GetID())
}
columnIDs[column.ID] = column
columnIDs[column.GetID()] = column.ColumnDesc()

if column.ID >= desc.NextColumnID {
if column.GetID() >= desc.NextColumnID {
return errors.AssertionFailedf("column %q invalid ID (%d) >= next column ID (%d)",
column.Name, errors.Safe(column.ID), errors.Safe(desc.NextColumnID))
column.GetName(), errors.Safe(column.GetID()), errors.Safe(desc.NextColumnID))
}

if column.IsComputed() {
// Verify that the computed column expression is valid.
expr, err := parser.ParseExpr(*column.ComputeExpr)
expr, err := parser.ParseExpr(column.GetComputeExpr())
if err != nil {
return err
}
Expand All @@ -1978,10 +1961,10 @@ func (desc *wrapper) validateColumns(
}
if !valid {
return fmt.Errorf("computed column %q refers to unknown columns in expression: %s",
column.Name, *column.ComputeExpr)
column.GetName(), column.GetComputeExpr())
}
} else if column.Virtual {
return fmt.Errorf("virtual column %q is not computed", column.Name)
} else if column.IsVirtual() {
return fmt.Errorf("virtual column %q is not computed", column.GetName())
}
}
return nil
Expand Down Expand Up @@ -2529,10 +2512,10 @@ func notIndexableError(cols []descpb.ColumnDescriptor) error {
func checkColumnsValidForIndex(tableDesc *Mutable, indexColNames []string) error {
invalidColumns := make([]descpb.ColumnDescriptor, 0, len(indexColNames))
for _, indexCol := range indexColNames {
for _, col := range tableDesc.AllNonDropColumns() {
if col.Name == indexCol {
if !colinfo.ColumnTypeIsIndexable(col.Type) {
invalidColumns = append(invalidColumns, col)
for _, col := range tableDesc.NonDropColumnsNew() {
if col.GetName() == indexCol {
if !colinfo.ColumnTypeIsIndexable(col.GetType()) {
invalidColumns = append(invalidColumns, *col.ColumnDesc())
}
}
}
Expand All @@ -2546,30 +2529,30 @@ func checkColumnsValidForIndex(tableDesc *Mutable, indexColNames []string) error
func checkColumnsValidForInvertedIndex(tableDesc *Mutable, indexColNames []string) error {
lastCol := len(indexColNames) - 1
for i, indexCol := range indexColNames {
for _, col := range tableDesc.AllNonDropColumns() {
if col.Name == indexCol {
for _, col := range tableDesc.NonDropColumnsNew() {
if col.GetName() == indexCol {
// The last column indexed by an inverted index must be
// inverted indexable.
if i == lastCol && !colinfo.ColumnTypeIsInvertedIndexable(col.Type) {
if i == lastCol && !colinfo.ColumnTypeIsInvertedIndexable(col.GetType()) {
return errors.WithHint(
pgerror.Newf(
pgcode.FeatureNotSupported,
"column %s of type %s is not allowed as the last column in an inverted index",
col.Name,
col.Type.Name(),
col.GetName(),
col.GetType().Name(),
),
"see the documentation for more information about inverted indexes",
)

}
// Any preceding columns must not be inverted indexable.
if i < lastCol && !colinfo.ColumnTypeIsIndexable(col.Type) {
if i < lastCol && !colinfo.ColumnTypeIsIndexable(col.GetType()) {
return errors.WithHint(
pgerror.Newf(
pgcode.FeatureNotSupported,
"column %s of type %s is only allowed as the last column in an inverted index",
col.Name,
col.Type.Name(),
col.GetName(),
col.GetType().Name(),
),
"see the documentation for more information about inverted indexes",
)
Expand Down Expand Up @@ -3649,45 +3632,21 @@ func (desc *Mutable) IsNew() bool {
return desc.ClusterVersion.ID == descpb.InvalidID
}

// VisibleColumns returns all non hidden columns.
func (desc *wrapper) VisibleColumns() []descpb.ColumnDescriptor {
var cols []descpb.ColumnDescriptor
for i := range desc.Columns {
col := &desc.Columns[i]
if !col.Hidden {
cols = append(cols, *col)
}
}
return cols
}

// ColumnTypes returns the types of all columns.
func (desc *wrapper) ColumnTypes() []*types.T {
return desc.ColumnTypesWithMutations(false)
}

// ColumnsWithMutations returns all column descriptors, optionally including
// mutation columns.
func (desc *wrapper) ColumnsWithMutations(includeMutations bool) []descpb.ColumnDescriptor {
n := len(desc.Columns)
columns := desc.Columns[:n:n] // immutable on append
if includeMutations {
for i := range desc.Mutations {
if col := desc.Mutations[i].GetColumn(); col != nil {
columns = append(columns, *col)
}
}
}
return columns
}

// ColumnTypesWithMutations returns the types of all columns, optionally
// including mutation columns, which will be returned if the input bool is true.
func (desc *wrapper) ColumnTypesWithMutations(mutations bool) []*types.T {
columns := desc.ColumnsWithMutations(mutations)
columns := desc.PublicColumnsNew()
if mutations {
columns = desc.AllColumnsNew()
}
types := make([]*types.T, len(columns))
for i := range columns {
types[i] = columns[i].Type
types[i] = columns[i].GetType()
}
return types
}
Expand All @@ -3699,13 +3658,16 @@ func (desc *wrapper) ColumnTypesWithMutations(mutations bool) []*types.T {
func (desc *wrapper) ColumnTypesWithMutationsAndVirtualCol(
mutations bool, virtualCol *descpb.ColumnDescriptor,
) []*types.T {
columns := desc.ColumnsWithMutations(mutations)
columns := desc.PublicColumnsNew()
if mutations {
columns = desc.AllColumnsNew()
}
types := make([]*types.T, len(columns))
for i := range columns {
if virtualCol != nil && columns[i].ID == virtualCol.ID {
if virtualCol != nil && columns[i].GetID() == virtualCol.ID {
types[i] = virtualCol.Type
} else {
types[i] = columns[i].Type
types[i] = columns[i].GetType()
}
}
return types
Expand Down Expand Up @@ -3891,8 +3853,9 @@ func (desc *wrapper) FindAllReferences() (map[descpb.ID]struct{}, error) {
}
}

for _, c := range desc.AllNonDropColumns() {
for _, id := range c.UsesSequenceIds {
for _, c := range desc.NonDropColumnsNew() {
for i := 0; i < c.NumUsesSequences(); i++ {
id := c.GetUsesSequenceID(i)
refs[id] = struct{}{}
}
}
Expand Down
Loading

0 comments on commit 473f9df

Please sign in to comment.