Skip to content

Commit

Permalink
sql: support the ADD COLUMN ... REFERENCES syntax
Browse files Browse the repository at this point in the history
Fixes cockroachdb#32917.

This PR adds support for the add column references
statement by allowing the foreign key building code
to use columns and indexes added in the current txn.
The schema changer already understands how to add
the combination of the three in the same transaction.

Release note (sql change): This PR adds support for the
`ALTER TABLE ... ADD COLUMN ... REFERENCES ...` syntax
for tables that are empty.
  • Loading branch information
rohany committed Apr 27, 2020
1 parent ef2b897 commit 626dc47
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 66 deletions.
10 changes: 1 addition & 9 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ func (n *alterTableNode) startExec(params runParams) error {
switch t := cmd.(type) {
case *tree.AlterTableAddColumn:
d := t.ColumnDef
if d.HasFKConstraint() {
return unimplemented.NewWithIssue(32917,
"adding a REFERENCES constraint while also adding a column via ALTER not supported")
}
version := params.ExecCfg().Settings.Version.ActiveVersionOrEmpty(params.ctx)
if supported, err := isTypeSupportedInVersion(version, d.Type); err != nil {
return err
Expand Down Expand Up @@ -317,12 +313,8 @@ func (n *alterTableNode) startExec(params runParams) error {

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
col, err := n.tableDesc.FindActiveColumnByName(string(colName))
col, err := n.tableDesc.FindActiveOrNewColumnByName(colName)
if err != nil {
if _, dropped, inactiveErr := n.tableDesc.FindColumnByName(colName); inactiveErr == nil && !dropped {
return unimplemented.NewWithIssue(32917,
"adding a REFERENCES constraint while the column is being added not supported")
}
return err
}

Expand Down
60 changes: 30 additions & 30 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,16 +608,16 @@ func ResolveFK(
validationBehavior tree.ValidationBehavior,
settings *cluster.Settings,
) error {
originColumnIDs := make(sqlbase.ColumnIDs, len(d.FromCols))
originColumns := make([]*sqlbase.ColumnDescriptor, len(d.FromCols))
for i, col := range d.FromCols {
col, _, err := tbl.FindColumnByName(col)
col, err := tbl.FindActiveOrNewColumnByName(col)
if err != nil {
return err
}
if err := col.CheckCanBeFKRef(); err != nil {
return err
}
originColumnIDs[i] = col.ID
originColumns[i] = col
}

target, err := ResolveMutableExistingObject(ctx, sc, &d.Table, true /*required*/, ResolveRequireTableDesc)
Expand Down Expand Up @@ -658,33 +658,28 @@ func ResolveFK(
}
}

srcCols, err := tbl.FindActiveColumnsByNames(d.FromCols)
if err != nil {
return err
}

targetColNames := d.ToCols
referencedColNames := d.ToCols
// If no columns are specified, attempt to default to PK.
if len(targetColNames) == 0 {
targetColNames = make(tree.NameList, len(target.PrimaryIndex.ColumnNames))
if len(referencedColNames) == 0 {
referencedColNames = make(tree.NameList, len(target.PrimaryIndex.ColumnNames))
for i, n := range target.PrimaryIndex.ColumnNames {
targetColNames[i] = tree.Name(n)
referencedColNames[i] = tree.Name(n)
}
}

targetCols, err := target.FindActiveColumnsByNames(targetColNames)
referencedCols, err := target.FindActiveColumnsByNames(referencedColNames)
if err != nil {
return err
}

if len(targetCols) != len(srcCols) {
if len(referencedCols) != len(originColumns) {
return pgerror.Newf(pgcode.Syntax,
"%d columns must reference exactly %d columns in referenced table (found %d)",
len(srcCols), len(srcCols), len(targetCols))
len(originColumns), len(originColumns), len(referencedCols))
}

for i := range srcCols {
if s, t := srcCols[i], targetCols[i]; !s.Type.Equivalent(&t.Type) {
for i := range originColumns {
if s, t := originColumns[i], referencedCols[i]; !s.Type.Equivalent(&t.Type) {
return pgerror.Newf(pgcode.DatatypeMismatch,
"type of %q (%s) does not match foreign key %q.%q (%s)",
s.Name, s.Type.String(), target.Name, t.Name, t.Type.String())
Expand Down Expand Up @@ -715,17 +710,17 @@ func ResolveFK(
}
}

targetColIDs := make(sqlbase.ColumnIDs, len(targetCols))
for i := range targetCols {
targetColIDs[i] = targetCols[i].ID
targetColIDs := make(sqlbase.ColumnIDs, len(referencedCols))
for i := range referencedCols {
targetColIDs[i] = referencedCols[i].ID
}

// Don't add a SET NULL action on an index that has any column that is NOT
// NULL.
if d.Actions.Delete == tree.SetNull || d.Actions.Update == tree.SetNull {
for _, sourceColumn := range srcCols {
if !sourceColumn.Nullable {
col := qualifyFKColErrorWithDB(ctx, txn, tbl.TableDesc(), sourceColumn.Name)
for _, originColumn := range originColumns {
if !originColumn.Nullable {
col := qualifyFKColErrorWithDB(ctx, txn, tbl.TableDesc(), originColumn.Name)
return pgerror.Newf(pgcode.InvalidForeignKey,
"cannot add a SET NULL cascading action on column %q which has a NOT NULL constraint", col,
)
Expand All @@ -736,11 +731,11 @@ func ResolveFK(
// Don't add a SET DEFAULT action on an index that has any column that has
// a DEFAULT expression of NULL and a NOT NULL constraint.
if d.Actions.Delete == tree.SetDefault || d.Actions.Update == tree.SetDefault {
for _, sourceColumn := range srcCols {
for _, originColumn := range originColumns {
// Having a default expression of NULL, and a constraint of NOT NULL is a
// contradiction and should never be allowed.
if sourceColumn.DefaultExpr == nil && !sourceColumn.Nullable {
col := qualifyFKColErrorWithDB(ctx, txn, tbl.TableDesc(), sourceColumn.Name)
if originColumn.DefaultExpr == nil && !originColumn.Nullable {
col := qualifyFKColErrorWithDB(ctx, txn, tbl.TableDesc(), originColumn.Name)
return pgerror.Newf(pgcode.InvalidForeignKey,
"cannot add a SET DEFAULT cascading action on column %q which has a "+
"NOT NULL constraint and a NULL default expression", col,
Expand All @@ -749,10 +744,15 @@ func ResolveFK(
}
}

originColumnIDs := make(sqlbase.ColumnIDs, len(originColumns))
for i, col := range originColumns {
originColumnIDs[i] = col.ID
}
var legacyOriginIndexID sqlbase.IndexID
// Search for an index on the origin table that matches. If one doesn't exist,
// we create one automatically if the table to alter is new or empty.
originIdx, err := sqlbase.FindFKOriginIndex(tbl.TableDesc(), originColumnIDs)
// we create one automatically if the table to alter is new or empty. We also
// search if an index for the set of columns was created in this transaction.
originIdx, err := sqlbase.FindFKOriginIndexInTxn(tbl, originColumnIDs)
if err == nil {
// If there was no error, we found a suitable index.
legacyOriginIndexID = originIdx.ID
Expand All @@ -775,7 +775,7 @@ func ResolveFK(
return pgerror.Newf(pgcode.ForeignKeyViolation,
"foreign key requires an existing index on columns %s", colNames.String())
}
id, err := addIndexForFK(tbl, srcCols, constraintName, ts)
id, err := addIndexForFK(tbl, originColumns, constraintName, ts)
if err != nil {
return err
}
Expand Down Expand Up @@ -825,7 +825,7 @@ func ResolveFK(
// that will support using `srcCols` as the referencing (src) side of an FK.
func addIndexForFK(
tbl *sqlbase.MutableTableDescriptor,
srcCols []sqlbase.ColumnDescriptor,
srcCols []*sqlbase.ColumnDescriptor,
constraintName string,
ts FKTableState,
) (sqlbase.IndexID, error) {
Expand Down
216 changes: 201 additions & 15 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,6 @@ ALTER TABLE t ADD i INT DEFAULT 1 CHECK (i < g)
statement error pq: validation of CHECK "i > 0" failed on row:.* g=1.* i=0
ALTER TABLE t ADD i INT AS (g - 1) STORED CHECK (i > 0)

statement error adding a REFERENCES constraint while also adding a column via ALTER not supported
ALTER TABLE t ADD f INT UNIQUE REFERENCES other

query TTTTB
SHOW CONSTRAINTS FROM t
----
Expand Down Expand Up @@ -926,18 +923,6 @@ CREATE TABLE t32917_2 (b INT PRIMARY KEY)
statement ok
INSERT INTO t32917_2 VALUES (1), (2), (3)

statement ok
BEGIN

statement ok
ALTER TABLE t32917_2 ADD c INT UNIQUE DEFAULT 4

statement error adding a REFERENCES constraint while the column is being added not supported
ALTER TABLE t32917_2 ADD CONSTRAINT fk_c_a FOREIGN KEY (c) references t32917 (a)

statement ok
ROLLBACK

# Test SET NOT NULL
statement ok
CREATE TABLE t (a INT)
Expand Down Expand Up @@ -1135,3 +1120,204 @@ CREATE TABLE t25045 (x INT, y INT AS (x+1) STORED)

statement error pq: column \"x\" is referenced by computed column \"y\"
ALTER TABLE t25045 DROP COLUMN x

subtest add_col_references

statement ok
DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT)

statement ok
ALTER TABLE t2 ADD COLUMN x INT REFERENCES t1 (x)

statement ok
INSERT INTO t1 VALUES (1)

statement error pq: insert on table "t2" violates foreign key constraint "fk_x_ref_t1"
INSERT INTO t2 VALUES (2, 2)

statement ok
INSERT INTO t2 VALUES (1, 1)

# Error out trying to add a column with a foreign key on a non-empty table.
statement error pq: foreign key requires an existing index on columns \("z"\)
ALTER TABLE t2 ADD COLUMN z INT REFERENCES t1 (x)

# Check that the foreign key was indeed added.
query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test that only one index gets created when adding a column
# with references and unique.
statement ok
CREATE TABLE t3 (y INT)

statement ok
ALTER TABLE t3 ADD COLUMN x INT UNIQUE REFERENCES t1 (x)

query TT
SHOW CREATE t3
----
t3 CREATE TABLE t3 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
UNIQUE INDEX t3_x_key (x ASC),
FAMILY "primary" (y, rowid, x)
)

# We allowed the foreign key validation code to look into the mutations
# list to validate what columns / indexes can be used for foreign keys.
# Ensure that we still have the correct restrictions.
statement ok
DROP TABLE t1, t2 CASCADE;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (x INT, y INT, INDEX i (x))

statement error pq: column \"x\" does not exist
BEGIN;
ALTER TABLE t2 DROP COLUMN x;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1(x);

statement ok
ROLLBACK

statement ok
INSERT INTO t2 VALUES (1, 2)

statement error pq: foreign key requires an existing index on columns \("x"\)
BEGIN;
DROP INDEX t2@i;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1(x)

statement ok
ROLLBACK

# Test using ADD COL REFERENCES in a self referencing constraint.
statement ok
DROP TABLE t1 CASCADE;
CREATE TABLE t1 (x INT PRIMARY KEY);
ALTER TABLE t1 ADD COLUMN x2 INT REFERENCES t1 (x)

query TT
SHOW CREATE t1
----
t1 CREATE TABLE t1 (
x INT8 NOT NULL,
x2 INT8 NULL,
CONSTRAINT "primary" PRIMARY KEY (x ASC),
CONSTRAINT fk_x2_ref_t1 FOREIGN KEY (x2) REFERENCES t1(x),
INDEX t1_auto_index_fk_x2_ref_t1 (x2 ASC),
FAMILY "primary" (x, x2)
)

statement error pq: insert on table "t1" violates foreign key constraint "fk_x2_ref_t1"
INSERT INTO t1 VALUES (1, 2)

# Test ADD COL REFERENCES on a new table in the same txn.
statement ok
DROP TABLE t1, t2 CASCADE

statement ok
BEGIN;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT);
ALTER TABLE t2 ADD COLUMN x INT REFERENCES t1 (x);
COMMIT

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test that we can also add a column and then an FK in the same txn.
statement ok
DROP TABLE t1, t2 CASCADE

statement ok
BEGIN;
CREATE TABLE t1 (x INT PRIMARY KEY);
CREATE TABLE t2 (y INT);
ALTER TABLE t2 ADD COLUMN x INT;
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1 (x);
COMMIT

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_auto_index_fk_x_ref_t1 (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test that we can add a column and an index to an FK in the same txn.
statement ok
DROP TABLE t1, t2 CASCADE

statement ok
BEGIN;
CREATE TABLE t1 (x INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
CREATE TABLE t2 (y INT);
INSERT INTO t2 VALUES (2);
ALTER TABLE t2 ADD COLUMN x INT;
CREATE INDEX ON t2 (x);
ALTER TABLE t2 ADD FOREIGN KEY (x) REFERENCES t1 (x);
COMMIT

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
INDEX t2_x_idx (x ASC),
FAMILY "primary" (y, rowid, x)
)

# Test when default column value leads to an FK violation.
statement ok
DROP TABLE t1, t2 CASCADE

statement ok
CREATE TABLE t1 (x INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
CREATE TABLE t2 (y INT);
INSERT INTO t2 VALUES (2)

statement error pq: foreign key violation
ALTER TABLE t2 ADD COLUMN x INT DEFAULT 2 UNIQUE REFERENCES t1 (x)

# Test that it works with an appropriate default.
statement ok
ALTER TABLE t2 ADD COLUMN x INT DEFAULT 1 UNIQUE REFERENCES t1 (x)

query TT
SHOW CREATE t2
----
t2 CREATE TABLE t2 (
y INT8 NULL,
x INT8 NULL DEFAULT 1:::INT8,
CONSTRAINT fk_x_ref_t1 FOREIGN KEY (x) REFERENCES t1(x),
UNIQUE INDEX t2_x_key (x ASC),
FAMILY "primary" (y, rowid, x)
)
Loading

0 comments on commit 626dc47

Please sign in to comment.