diff --git a/Gopkg.lock b/Gopkg.lock index 47c487233d33..207ce46e9fba 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -523,6 +523,7 @@ "proto", "protoc-gen-go/descriptor", "protoc-gen-go/generator", + "protoc-gen-go/generator/internal/remap", "protoc-gen-go/plugin", "ptypes", "ptypes/any", @@ -530,7 +531,7 @@ "ptypes/struct", "ptypes/timestamp", ] - revision = "bbd03ef6da3a115852eaf24c8a1c46aeb39aa175" + revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" [[projects]] branch = "master" @@ -1204,10 +1205,12 @@ "balancer", "balancer/base", "balancer/roundrobin", + "channelz", "codes", "connectivity", "credentials", "encoding", + "encoding/proto", "grpclb/grpc_lb_v1/messages", "grpclog", "health/grpc_health_v1", @@ -1224,8 +1227,8 @@ "tap", "transport", ] - revision = "6b51017f791ae1cfbec89c52efdf444b13b550ef" - version = "v1.9.2" + revision = "41344da2231b913fa3d983840a57a6b1b7b631a1" + version = "1.12.0" [[projects]] branch = "v2" diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 913f5478178a..9115c804d229 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -16,7 +16,6 @@ package sql import ( "context" - "fmt" "sort" "time" @@ -144,8 +143,7 @@ func (sc *SchemaChanger) runBackfill( case sqlbase.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - desc := m.GetColumn() - if desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() { + if columnNeedsBackfill(m.GetColumn()) { needColumnBackfill = true } case *sqlbase.DescriptorMutation_Index: @@ -172,15 +170,19 @@ func (sc *SchemaChanger) runBackfill( // First drop indexes, then add/drop columns, and only then add indexes. // Drop indexes. - if err := sc.truncateIndexes( - ctx, lease, version, droppedIndexDescs, droppedIndexMutationIdx, - ); err != nil { - return err - } + if len(droppedIndexDescs) > 0 { + if err := sc.truncateIndexes( + ctx, lease, version, droppedIndexDescs, droppedIndexMutationIdx, + ); err != nil { + return err + } - // Remove index zone configs. - if err := sc.removeIndexZoneConfigs(ctx, tableDesc.ID, droppedIndexDescs); err != nil { - return err + // Remove index zone configs. + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + return removeIndexZoneConfigs(ctx, txn, sc.execCfg, tableDesc.ID, droppedIndexDescs) + }); err != nil { + return err + } } // Add and drop columns. @@ -213,39 +215,6 @@ func (sc *SchemaChanger) getTableVersion( return tableDesc, nil } -func (sc *SchemaChanger) removeIndexZoneConfigs( - ctx context.Context, tableID sqlbase.ID, indexDescs []sqlbase.IndexDescriptor, -) error { - if len(indexDescs) == 0 { - return nil - } - - return sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID) - if err != nil { - return err - } - - zone, err := getZoneConfigRaw(ctx, txn, sc.tableID) - if err != nil { - return err - } - - for _, indexDesc := range indexDescs { - zone.DeleteIndexSubzones(uint32(indexDesc.ID)) - } - - hasNewSubzones := false - _, err = writeZoneConfig(ctx, txn, sc.tableID, tableDesc, zone, sc.execCfg, hasNewSubzones) - if sqlbase.IsCCLRequiredError(err) { - return sqlbase.NewCCLRequiredError(fmt.Errorf("schema change requires a CCL binary "+ - "because table %q has at least one remaining index or partition with a zone config", - tableDesc.Name)) - } - return err - }) -} - func (sc *SchemaChanger) truncateIndexes( ctx context.Context, lease *sqlbase.TableDescriptor_SchemaChangeLease, @@ -612,3 +581,162 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( lease, version, columnBackfill, columnTruncateAndBackfillChunkSize, backfill.ColumnMutationFilter) } + +func columnNeedsBackfill(desc *sqlbase.ColumnDescriptor) bool { + return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() +} + +// runSchemaChangesInTxn runs all the schema changes immediately in a +// transaction. This is called when a CREATE TABLE is followed by +// schema changes in the same transaction. The CREATE TABLE is +// invisible to the rest of the cluster, so the schema changes +// can be executed immediately on the same version of the table. +func runSchemaChangesInTxn( + ctx context.Context, + txn *client.Txn, + execCfg *ExecutorConfig, + evalCtx *tree.EvalContext, + tableDesc *sqlbase.TableDescriptor, +) error { + tableDesc.UpVersion = false + + if len(tableDesc.DrainingNames) > 0 { + // Reclaim all the old names. Leave the data and descriptor + // cleanup for later. + b := txn.NewBatch() + for _, drain := range tableDesc.DrainingNames { + tbKey := tableKey{drain.ParentID, drain.Name}.Key() + b.Del(tbKey) + } + tableDesc.DrainingNames = nil + if err := txn.Run(ctx, b); err != nil { + return err + } + } + + if tableDesc.Dropped() { + return nil + } + + // Only needed because columnBackfillInTxn() backfills + // all column mutations. + doneColumnBackfill := false + for _, m := range tableDesc.Mutations { + switch m.Direction { + case sqlbase.DescriptorMutation_ADD: + switch m.Descriptor_.(type) { + case *sqlbase.DescriptorMutation_Column: + if doneColumnBackfill || !columnNeedsBackfill(m.GetColumn()) { + break + } + if err := columnBackfillInTxn(ctx, txn, evalCtx, tableDesc); err != nil { + return err + } + doneColumnBackfill = true + + case *sqlbase.DescriptorMutation_Index: + if err := indexBackfillInTxn(ctx, txn, tableDesc); err != nil { + return err + } + + default: + return errors.Errorf("unsupported mutation: %+v", m) + } + + case sqlbase.DescriptorMutation_DROP: + // Drop the name and drop the associated data later. + switch m.Descriptor_.(type) { + case *sqlbase.DescriptorMutation_Column: + if doneColumnBackfill { + break + } + if err := columnBackfillInTxn(ctx, txn, evalCtx, tableDesc); err != nil { + return err + } + doneColumnBackfill = true + + case *sqlbase.DescriptorMutation_Index: + if err := indexTruncateInTxn(ctx, txn, execCfg, tableDesc); err != nil { + return err + } + + default: + return errors.Errorf("unsupported mutation: %+v", m) + } + + } + tableDesc.MakeMutationComplete(m) + } + tableDesc.Mutations = nil + + return nil +} + +// columnBackfillInTxn backfills columns for all mutation columns in +// the mutation list. +func columnBackfillInTxn( + ctx context.Context, + txn *client.Txn, + evalCtx *tree.EvalContext, + tableDesc *sqlbase.TableDescriptor, +) error { + var backfiller backfill.ColumnBackfiller + if err := backfiller.Init(evalCtx, *tableDesc); err != nil { + return err + } + sp := tableDesc.PrimaryIndexSpan() + for sp.Key != nil { + var err error + sp.Key, err = backfiller.RunColumnBackfillChunk(ctx, txn, *tableDesc, nil, sp, columnTruncateAndBackfillChunkSize, false) + if err != nil { + return err + } + } + return nil +} + +func indexBackfillInTxn( + ctx context.Context, txn *client.Txn, tableDesc *sqlbase.TableDescriptor, +) error { + var backfiller backfill.IndexBackfiller + if err := backfiller.Init(*tableDesc); err != nil { + return err + } + sp := tableDesc.PrimaryIndexSpan() + for sp.Key != nil { + var err error + sp.Key, err = backfiller.RunIndexBackfillChunk(ctx, txn, *tableDesc, sp, indexBackfillChunkSize, false) + if err != nil { + return err + } + } + return nil +} + +func indexTruncateInTxn( + ctx context.Context, txn *client.Txn, execCfg *ExecutorConfig, tableDesc *sqlbase.TableDescriptor, +) error { + alloc := &sqlbase.DatumAlloc{} + idx := tableDesc.Mutations[0].GetIndex() + var sp roachpb.Span + for done := false; !done; done = sp.Key == nil { + rd, err := sqlbase.MakeRowDeleter( + txn, tableDesc, nil, nil, sqlbase.SkipFKs, nil /* *tree.EvalContext */, alloc, + ) + if err != nil { + return err + } + td := tableDeleter{rd: rd, alloc: alloc} + if err := td.init(txn, nil /* *tree.EvalContext */); err != nil { + return err + } + sp, err = td.deleteIndex( + ctx, idx, sp, indexTruncateChunkSize, noAutoCommit, false, /* traceKV */ + ) + if err != nil { + return err + } + } + // Remove index zone configs. + return removeIndexZoneConfigs(ctx, txn, execCfg, tableDesc.ID, []sqlbase.IndexDescriptor{*idx}) +} diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 590e75547ed2..939e11db3287 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -127,6 +127,8 @@ func (n *createTableNode) startExec(params runParams) error { return err } + params.p.Tables().addCreatedTable(id) + // If a new system table is being created (which should only be doable by // an internal user account), make sure it gets the correct privileges. privs := n.dbDesc.GetPrivileges() diff --git a/pkg/sql/logictest/testdata/logic_test/fk b/pkg/sql/logictest/testdata/logic_test/fk index 91af2b270975..4eda2901072c 100644 --- a/pkg/sql/logictest/testdata/logic_test/fk +++ b/pkg/sql/logictest/testdata/logic_test/fk @@ -808,6 +808,7 @@ refers CREATE TABLE refers ( INDEX another_idx (b ASC), CONSTRAINT fk_a_ref_referee FOREIGN KEY (a) REFERENCES referee (id), INDEX refers_auto_index_fk_a_ref_referee (a ASC), + INDEX foo (a ASC), FAMILY "primary" (a, b, rowid) ) diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn new file mode 100644 index 000000000000..5cf537663c28 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn @@ -0,0 +1,328 @@ +# LogicTest: default parallel-stmts distsql distsql-metadata + +subtest create_with_other_commands_in_txn + +statement count 3 +CREATE TABLE kv (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +statement ok +BEGIN + +statement ok +CREATE TABLE test.parent (id int primary key) + +statement ok +INSERT into test.parent values (1) + +statement ok +CREATE TABLE test.chill (id int primary key, parent_id int) + +# random schema change that doesn't require a backfill. +statement ok +ALTER TABLE test.chill RENAME TO test.child + +statement ok +INSERT INTO test.child VALUES (1, 1) + +# index is over data added in the transaction so the backfill runs +# within the trasaction. +statement ok +CREATE INDEX idx_child_parent_id ON test.child (parent_id) + +# FK can be added because the index is visible. +statement ok +ALTER TABLE test.child ADD CONSTRAINT fk_child_parent_id FOREIGN KEY (parent_id) REFERENCES test.parent (id); + +statement ok +INSERT INTO test.child VALUES (2, 1) + +# check that the index is indeed visible. +query II rowsort +SELECT * FROM test.child@idx_child_parent_id +---- +1 1 +2 1 + +# create index on a table that was created outside of the trasanction +statement ok +CREATE INDEX foo ON test.kv (quantity) + +statement ok +COMMIT + +# foo is visible +query TI rowsort +SELECT * FROM test.kv@foo +---- +cups 10 +forks 15 +plates 30 + +subtest create_index_references_create_table_outside_txn + +statement ok +BEGIN + +# create index on a table that was created outside of the trasanction +statement ok +CREATE INDEX bar ON test.kv (quantity) + +# bar is invisible +statement error pgcode XX000 index "bar" not found +SELECT * FROM test.kv@bar + +statement ok +COMMIT + +# bar is still invisible because the error above prevents the +# transaction from committing. +statement error pgcode XX000 index "bar" not found +SELECT * FROM test.kv@bar + +subtest create_reference_to_create_outside_txn_17949 + +statement ok +BEGIN + +statement ok +CREATE TABLE b (parent_id INT REFERENCES parent(id)); + +# table b is not visible to the transaction #17949 +statement error pgcode 42P01 relation "b" does not exist +INSERT INTO b VALUES (1); + +statement ok +COMMIT + +subtest create_as_with_add_column_index_in_txn + +statement ok +BEGIN + +statement count 3 +CREATE TABLE stock (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +# index is only over data added in the transaction so the backfill occurs +# within the trasaction. +statement ok +CREATE INDEX idx_quantity ON stock (quantity) + +# Add two columns and a constraint in the same statement. +statement ok +ALTER TABLE stock ADD COLUMN c INT AS (quantity + 4) STORED, ADD COLUMN d INT DEFAULT 23, ADD CONSTRAINT bar UNIQUE (c) + +# check that the index and columns are indeed visible. +query TIII rowsort +SELECT * FROM test.stock@idx_quantity +---- +cups 10 14 23 +forks 15 19 23 +plates 30 34 23 + +# check that the constraint bar is indeed visible. +query TIII rowsort +SELECT * FROM test.stock@bar +---- +cups 10 14 23 +forks 15 19 23 +plates 30 34 23 + +statement ok +COMMIT + +subtest create_as_with_reuse_column_index_name_in_txn + +statement ok +BEGIN + +statement ok +CREATE TABLE warehouse (item STRING PRIMARY KEY, quantity INT, UNIQUE (quantity), INDEX bar (quantity)) + +statement ok +INSERT INTO warehouse VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +statement ok +DROP INDEX warehouse@bar + +statement ok +ALTER TABLE warehouse DROP quantity + +# See if the column and index names can be reused. +statement ok +ALTER TABLE warehouse ADD COLUMN quantity INT DEFAULT 23 + +statement ok +CREATE INDEX bar ON warehouse (item) + +# check that the index is indeed visible. +query TI rowsort +SELECT * FROM warehouse@bar +---- +cups 23 +forks 23 +plates 23 + +statement ok +COMMIT + +subtest create_as_drop_and_create_in_txn + +statement ok +BEGIN + +statement count 3 +CREATE TABLE hood (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +statement ok +DROP TABLE hood + +statement count 3 +CREATE TABLE hood (item, quantity) AS VALUES ('plates', 10), ('knives', 30), ('spoons', 12) + +query TI rowsort +SELECT * FROM hood +---- +plates 10 +knives 30 +spoons 12 + +statement ok +COMMIT + +subtest create_as_rename_and_create_in_txn + +statement ok +BEGIN + +statement count 3 +CREATE TABLE shop (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +statement ok +ALTER TABLE shop RENAME TO ship + +statement count 3 +CREATE TABLE shop (item, quantity) AS VALUES ('spoons', 11), ('plates', 34), ('knives', 22) + +query TI rowsort +SELECT * FROM shop +---- +spoons 11 +plates 34 +knives 22 + +query TI rowsort +SELECT * FROM ship +---- +cups 10 +plates 30 +forks 15 + +statement ok +COMMIT + +subtest create_as_fail_unique_index + +statement ok +BEGIN + +statement count 3 +CREATE TABLE shopping (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 10) + +statement error pgcode 23505 duplicate key value \(quantity\)=\(10\) violates unique constraint "bar" +CREATE UNIQUE INDEX bar ON shopping (quantity) + +statement ok +COMMIT + +# Ensure the above transaction didn't commit. +query error pgcode 42P01 relation \"shopping\" does not exist +SELECT * FROM shopping + +subtest add_column_not_null_violation + +statement ok +BEGIN + +statement count 3 +CREATE TABLE shopping (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 10) + +statement error pgcode 23502 null value in column \"q\" violates not-null constraint +ALTER TABLE shopping ADD COLUMN q DECIMAL NOT NULL + +statement ok +COMMIT + +# Ensure the above transaction didn't commit. +statement error pgcode 42P01 relation \"shopping\" does not exist +SELECT * FROM shopping + +subtest add_column_computed_column_failure + +statement ok +BEGIN + +statement count 3 +CREATE TABLE shopping (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 10) + +statement error pgcode 42P15 division by zero +ALTER TABLE shopping ADD COLUMN c int AS (quantity::int // 0) STORED + +statement ok +COMMIT + +subtest create_as_add_multiple_columns + +statement ok +BEGIN + +statement count 3 +CREATE TABLE cutlery (item, quantity) AS VALUES ('cups', 10), ('plates', 30), ('forks', 15) + +# Add two columns, one with a computed and the other without any default. +statement ok +ALTER TABLE cutlery ADD COLUMN c INT AS (quantity + 4) STORED, ADD COLUMN d INT + +query TIII rowsort +SELECT * FROM test.cutlery +---- +cups 10 14 NULL +plates 30 34 NULL +forks 15 19 NULL + +statement ok +COMMIT + +subtest table_rename_within_txn + +statement ok +BEGIN + +statement ok +CREATE TABLE dontwant (k CHAR PRIMARY KEY, v CHAR) + +statement ok +CREATE TABLE want (k CHAR PRIMARY KEY, v CHAR) + +statement ok +INSERT INTO dontwant (k,v) VALUES ('a', 'b') + +statement ok +INSERT INTO want (k,v) VALUES ('c', 'd') + +statement ok +ALTER TABLE want RENAME TO forlater + +statement ok +ALTER TABLE dontwant RENAME TO want + +statement ok +INSERT INTO want (k,v) VALUES ('e', 'f') + +statement ok +COMMIT + +query TT rowsort +SELECT * FROM want +---- +a b +e f diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 1cbb059ddbd6..38803e2f33a0 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -2980,3 +2980,106 @@ func TestAddComputedColumn(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE t.test ADD COLUMN b INT AS (a + 5) STORED`) sqlDB.CheckQueryResults(t, `SELECT * FROM t.test ORDER BY a`, [][]string{{"2", "7"}, {"10", "15"}}) } + +func TestSchemaChangeAfterCreateInTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + params, _ := tests.CreateTestServerParams() + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.TODO()) + + // A large enough value that the backfills run as part of the + // schema change run in many chunks. + var maxValue = 4001 + if util.RaceEnabled { + // Race builds are a lot slower, so use a smaller number of rows. + maxValue = 200 + } + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +`); err != nil { + t.Fatal(err) + } + + tx, err := sqlDB.Begin() + if err != nil { + t.Fatal(err) + } + + if _, err := tx.Exec(`CREATE TABLE t.testing (k INT PRIMARY KEY, v INT, INDEX foo(v));`); err != nil { + t.Fatal(err) + } + + inserts := make([]string, maxValue+1) + for i := 0; i < maxValue+1; i++ { + inserts[i] = fmt.Sprintf(`(%d, %d)`, i, maxValue-i) + } + + if _, err := tx.Exec(`INSERT INTO t.testing VALUES ` + strings.Join(inserts, ",")); err != nil { + t.Fatal(err) + } + + if _, err := tx.Exec(`ALTER TABLE t.testing RENAME TO t.test`); err != nil { + t.Fatal(err) + } + + // Run schema changes that are execute Column and Index backfills. + if _, err := tx.Exec(` +ALTER TABLE t.test ADD COLUMN c INT AS (v + 4) STORED, ADD COLUMN d INT DEFAULT 23, ADD CONSTRAINT bar UNIQUE (c) +`); err != nil { + t.Fatal(err) + } + + if _, err := tx.Exec(`DROP INDEX t.test@foo`); err != nil { + t.Fatal(err) + } + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + if err := checkTableKeyCount(context.TODO(), kvDB, 2, maxValue); err != nil { + t.Fatal(err) + } + + if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil { + t.Fatal(err) + } + + // Verify that the index bar over is consistent, and that columns c, d + // have been backfilled properly. + rows, err := sqlDB.Query(`SELECT c, d from t.test@bar`) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + + count := 0 + for ; rows.Next(); count++ { + var c int + var d int + if err := rows.Scan(&c, &d); err != nil { + t.Errorf("row %d scan failed: %s", count, err) + continue + } + if count+4 != c { + t.Errorf("e = %d, v = %d", count, c) + } + if 23 != d { + t.Errorf("e = %d, v = %d", 23, d) + } + } + if err := rows.Err(); err != nil { + t.Fatal(err) + } + eCount := maxValue + 1 + if eCount != count { + t.Fatalf("read the wrong number of rows: e = %d, v = %d", eCount, count) + } + + // The descriptor version hasn't changed. + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + if tableDesc.Version != 1 { + t.Fatalf("invalid version = %d", tableDesc.Version) + } +} diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index f0c863bd8ac2..5838b0dc0a79 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -353,3 +353,34 @@ func getZoneConfigRaw( } return zone, nil } + +func removeIndexZoneConfigs( + ctx context.Context, + txn *client.Txn, + execCfg *ExecutorConfig, + tableID sqlbase.ID, + indexDescs []sqlbase.IndexDescriptor, +) error { + tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID) + if err != nil { + return err + } + + zone, err := getZoneConfigRaw(ctx, txn, tableID) + if err != nil { + return err + } + + for _, indexDesc := range indexDescs { + zone.DeleteIndexSubzones(uint32(indexDesc.ID)) + } + + hasNewSubzones := false + _, err = writeZoneConfig(ctx, txn, tableID, tableDesc, zone, execCfg, hasNewSubzones) + if sqlbase.IsCCLRequiredError(err) { + return sqlbase.NewCCLRequiredError(fmt.Errorf("schema change requires a CCL binary "+ + "because table %q has at least one remaining index or partition with a zone config", + tableDesc.Name)) + } + return err +} diff --git a/pkg/sql/table.go b/pkg/sql/table.go index c4da6882618d..7816342735f9 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -154,6 +154,9 @@ type TableCollection struct { // table is marked dropped. uncommittedTables []*sqlbase.TableDescriptor + // Map of tables created in the transaction. + createdTables map[sqlbase.ID]struct{} + // databaseCache is used as a cache for database names. // TODO(andrei): get rid of it and replace it with a leasing system for // database descriptors. @@ -400,6 +403,7 @@ func (tc *TableCollection) releaseTables(ctx context.Context, opt releaseOpt) er tc.leasedTables = tc.leasedTables[:0] } tc.uncommittedTables = nil + tc.createdTables = nil if opt == blockForDBCacheUpdate { for _, uc := range tc.uncommittedDatabases { @@ -442,6 +446,19 @@ func (tc *TableCollection) addUncommittedTable(desc sqlbase.TableDescriptor) { tc.releaseAllDescriptors() } +func (tc *TableCollection) addCreatedTable(id sqlbase.ID) { + if tc.createdTables == nil { + tc.createdTables = make(map[sqlbase.ID]struct{}) + } + tc.createdTables[id] = struct{}{} + tc.releaseAllDescriptors() +} + +func (tc *TableCollection) isCreatedTable(id sqlbase.ID) bool { + _, ok := tc.createdTables[id] + return ok +} + type dbAction bool const ( @@ -461,7 +478,9 @@ func (tc *TableCollection) addUncommittedDatabase(name string, id sqlbase.ID, ac func (tc *TableCollection) getUncommittedDatabaseID( requestedDbName string, required bool, ) (c bool, res sqlbase.ID, err error) { - // Walk latest to earliest. + // Walk latest to earliest so that a DROP DATABASE followed by a + // CREATE DATABASE with the same name will result in the CREATE DATABASE + // being seen. for i := len(tc.uncommittedDatabases) - 1; i >= 0; i-- { db := tc.uncommittedDatabases[i] if requestedDbName == db.name { @@ -488,7 +507,10 @@ func (tc *TableCollection) getUncommittedDatabaseID( func (tc *TableCollection) getUncommittedTable( dbID sqlbase.ID, tn *tree.TableName, required bool, ) (refuseFurtherLookup bool, table *sqlbase.TableDescriptor, err error) { - for _, table := range tc.uncommittedTables { + // Walk latest to earliest so that a DROP TABLE followed by a CREATE TABLE + // with the same name will result in the CREATE TABLE being seen. + for i := len(tc.uncommittedTables) - 1; i >= 0; i-- { + table := tc.uncommittedTables[i] // If a table has gotten renamed we'd like to disallow using the old names. // The renames could have happened in another transaction but it's still okay // to disallow the use of the old name in this transaction because the other @@ -597,6 +619,11 @@ func (p *planner) createSchemaChangeJob( func (p *planner) notifySchemaChange( tableDesc *sqlbase.TableDescriptor, mutationID sqlbase.MutationID, ) { + if !tableDesc.UpVersion { + // If a version change has not been requested there is no pending + // schema change to be executed. + return + } sc := SchemaChanger{ tableID: tableDesc.GetID(), mutationID: mutationID, @@ -620,6 +647,12 @@ func (p *planner) writeTableDesc(ctx context.Context, tableDesc *sqlbase.TableDe "programming error: virtual descriptors cannot be stored, found: %v", tableDesc) } + if p.Tables().isCreatedTable(tableDesc.ID) { + if err := runSchemaChangesInTxn(ctx, p.txn, p.execCfg, p.EvalContext(), tableDesc); err != nil { + return err + } + } + if err := tableDesc.ValidateTable(p.extendedEvalCtx.Settings); err != nil { return pgerror.NewErrorf(pgerror.CodeInternalError, "programming error: table descriptor is not valid: %s\n%v", err, tableDesc) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index b9bcb9bf645b..7d1b3852067e 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -469,10 +469,15 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { // fast and loose with granting range leases. func TestReplicaReadConsistency(t *testing.T) { defer leaktest.AfterTest(t)() - tc := testContext{} + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + + tc := testContext{manualClock: hlc.NewManualClock(123)} + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true + tc.StartWithStoreConfig(t, stopper, cfg) + secondReplica, err := tc.addBogusReplicaToRangeDesc(context.TODO()) if err != nil { t.Fatal(err) @@ -552,6 +557,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { tc := testContext{manualClock: manual} tsc := TestStoreConfig(clock) var leaseAcquisitionTrap atomic.Value + tsc.TestingKnobs.DisableAutomaticLeaseRenewal = true tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp) { val := leaseAcquisitionTrap.Load() if val == nil { @@ -686,10 +692,15 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { // returned. This prevents regression of #1483. func TestApplyCmdLeaseError(t *testing.T) { defer leaktest.AfterTest(t)() - tc := testContext{} + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + + tc := testContext{manualClock: hlc.NewManualClock(123)} + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true + tc.StartWithStoreConfig(t, stopper, cfg) + secondReplica, err := tc.addBogusReplicaToRangeDesc(context.TODO()) if err != nil { t.Fatal(err) @@ -1014,10 +1025,15 @@ func TestReplicaLeaseCounters(t *testing.T) { // upon acquisition of the range lease. func TestReplicaGossipConfigsOnLease(t *testing.T) { defer leaktest.AfterTest(t)() - tc := testContext{} + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + + tc := testContext{manualClock: hlc.NewManualClock(123)} + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true + tc.StartWithStoreConfig(t, stopper, cfg) + secondReplica, err := tc.addBogusReplicaToRangeDesc(context.TODO()) if err != nil { t.Fatal(err) @@ -1095,10 +1111,15 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) { // some point; now we're just testing the cache on the first replica. func TestReplicaTSCacheLowWaterOnLease(t *testing.T) { defer leaktest.AfterTest(t)() - tc := testContext{} + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + + tc := testContext{manualClock: hlc.NewManualClock(123)} + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true + tc.StartWithStoreConfig(t, stopper, cfg) + // Disable raft log truncation which confuses this test. tc.store.SetRaftLogQueueActive(false) secondReplica, err := tc.addBogusReplicaToRangeDesc(context.TODO()) @@ -1175,10 +1196,14 @@ func TestReplicaTSCacheLowWaterOnLease(t *testing.T) { // using a real second store. func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { defer leaktest.AfterTest(t)() - tc := testContext{} + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) + + tc := testContext{manualClock: hlc.NewManualClock(123)} + cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.DisableAutomaticLeaseRenewal = true + tc.StartWithStoreConfig(t, stopper, cfg) tc.manualClock.Set(leaseExpiry(tc.repl)) now := tc.Clock().Now() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 243f13f78ef7..1b47bb290116 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -703,6 +703,9 @@ type StoreTestingKnobs struct { // allowing the replica to use the lease that it had in a previous life (in // case the tests persisted the engine used in said previous life). DontPreventUseOfOldLeaseOnStart bool + // DisableAutomaticLeaseRenewal enables turning off the background worker + // that attempts to automatically renew expiration-based leases. + DisableAutomaticLeaseRenewal bool // LeaseRequestEvent, if set, is called when replica.requestLeaseLocked() is // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. @@ -1442,7 +1445,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { log.Event(ctx, "computed initial metrics") } - s.startLeaseRenewer(ctx) + if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { + s.startLeaseRenewer(ctx) + } // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 295b2926ce8b..8fa0bc6ccd76 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -816,6 +816,10 @@ func TestLint(t *testing.T) { // // sql/parser/yaccpar:362:3: this value of sqlDollar is never used (SA4006) &lint.GlobIgnore{Pattern: "github.com/cockroachdb/cockroach/pkg/sql/parser/sql.go", Checks: []string{"SA4006"}}, + // Files generated by github.com/grpc-ecosystem/grpc-gateway use a + // deprecated logging method (SA1019). Ignore such errors until they + // fix it and we update to using a newer SHA. + &lint.GlobIgnore{Pattern: "github.com/cockroachdb/cockroach/pkg/*/*/*.pb.gw.go", Checks: []string{"SA1019"}}, }, unused.NewLintChecker(unusedChecker): { // sql/parser/yaccpar:14:6: type sqlParser is unused (U1000) diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index d73f45399b31..de727acd4921 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -56,8 +56,7 @@ func IsClosedConnection(err error) bool { } if s, ok := status.FromError(err); ok { if s.Code() == codes.Canceled || - s.Code() == codes.Unavailable || - s.Message() == grpc.ErrClientConnClosing.Error() { + s.Code() == codes.Unavailable { return true } } diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go index 0e800feb1613..b95c9a5efa50 100644 --- a/pkg/util/retry/retry.go +++ b/pkg/util/retry/retry.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" ) // Options provides reusable configuration of Retry objects. @@ -150,7 +151,12 @@ func (r *Retry) NextCh() <-chan time.Time { } // WithMaxAttempts is a helper that runs fn N times and collects the last err. +// It guarantees fn will run at least once. Otherwise, an error will be returned. func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error) error { + if n <= 0 { + return errors.Errorf("max attempts should not be 0 or below, got: %d", n) + } + opts.MaxRetries = n - 1 var err error for r := StartWithCtx(ctx, opts); r.Next(); { @@ -159,6 +165,9 @@ func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error) return nil } } + if err == nil { + err = errors.Wrap(ctx.Err(), "did not run function") + } return err } diff --git a/vendor b/vendor index 30aaa97d2c8a..8254d79bd46c 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 30aaa97d2c8a551eac10fe45412e9687343e84d7 +Subproject commit 8254d79bd46c1559872cc26832fe7c421c6dde10