From 608e7db3d6db6b4f64f9923876dcccea72dc2810 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 30 Jul 2020 13:24:48 -0700 Subject: [PATCH 1/3] sql: implement ConstructOpaque in the new factory Release note: None --- pkg/sql/distsql_spec_exec_factory.go | 37 ++++++++++++---------------- pkg/sql/exec_factory_util.go | 9 +++++++ pkg/sql/opt_exec_factory.go | 6 +---- pkg/sql/plan.go | 9 +++++++ 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index ca02e8c244e2..ef23d5037d86 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -109,7 +109,7 @@ func (e *distSQLSpecExecFactory) ConstructValues( return nil, err } physPlan.ResultColumns = cols - return planMaybePhysical{physPlan: &physicalPlanTop{PhysicalPlan: physPlan}}, nil + return makePlanMaybePhysical(physPlan, nil /* planNodesToClose */), nil } recommendation := shouldDistribute for _, exprs := range rows { @@ -146,10 +146,7 @@ func (e *distSQLSpecExecFactory) ConstructValues( return nil, err } physPlan.ResultColumns = cols - return planMaybePhysical{physPlan: &physicalPlanTop{ - PhysicalPlan: physPlan, - planNodesToClose: planNodesToClose, - }}, nil + return makePlanMaybePhysical(physPlan, planNodesToClose), nil } // ConstructScan implements exec.Factory interface by combining the logic that @@ -167,10 +164,7 @@ func (e *distSQLSpecExecFactory) ConstructScan( return nil, err } physPlan.ResultColumns = d.columns - return planMaybePhysical{physPlan: &physicalPlanTop{ - PhysicalPlan: physPlan, - planNodesToClose: []planNode{d}, - }}, nil + return makePlanMaybePhysical(physPlan, []planNode{d}), nil }, ) } @@ -291,7 +285,7 @@ func (e *distSQLSpecExecFactory) ConstructScan( }, ) - return planMaybePhysical{physPlan: &physicalPlanTop{PhysicalPlan: &p}}, err + return makePlanMaybePhysical(&p, nil /* planNodesToClose */), err } // checkExprsAndMaybeMergeLastStage is a helper method that returns a @@ -769,12 +763,7 @@ func (e *distSQLSpecExecFactory) ConstructExplain( // TODO(yuzefovich): we might also need to look at the distribution of // subqueries and postqueries. physPlan.Distribution = p.main.physPlan.Distribution - return planMaybePhysical{ - physPlan: &physicalPlanTop{ - PhysicalPlan: physPlan, - planNodesToClose: []planNode{explainNode}, - }, - }, nil + return makePlanMaybePhysical(physPlan, []planNode{explainNode}), nil } func (e *distSQLSpecExecFactory) ConstructShowTrace( @@ -889,7 +878,16 @@ func (e *distSQLSpecExecFactory) ConstructErrorIfRows( } func (e *distSQLSpecExecFactory) ConstructOpaque(metadata opt.OpaqueMetadata) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: opaque") + plan, err := constructOpaque(metadata) + if err != nil { + return nil, err + } + physPlan, err := e.dsp.wrapPlan(e.getPlanCtx(cannotDistribute), plan) + if err != nil { + return nil, err + } + physPlan.ResultColumns = planColumns(plan) + return makePlanMaybePhysical(physPlan, []planNode{plan}), nil } func (e *distSQLSpecExecFactory) ConstructAlterTableSplit( @@ -1013,8 +1011,5 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( rightPlanDistribution: rightPhysPlan.Distribution, }, ReqOrdering(reqOrdering)) p.ResultColumns = resultColumns - return planMaybePhysical{physPlan: &physicalPlanTop{ - PhysicalPlan: p, - planNodesToClose: append(leftPlan.physPlan.planNodesToClose, rightPlan.physPlan.planNodesToClose...), - }}, nil + return makePlanMaybePhysical(p, append(leftPlan.physPlan.planNodesToClose, rightPlan.physPlan.planNodesToClose...)), nil } diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index 7f2465a3d021..84dadc79be66 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" @@ -317,3 +318,11 @@ func collectSystemColumnsFromCfg( } return systemColumns, systemColumnOrdinals } + +func constructOpaque(metadata opt.OpaqueMetadata) (planNode, error) { + o, ok := metadata.(*opaqueMetadata) + if !ok { + return nil, errors.AssertionFailedf("unexpected OpaqueMetadata object type %T", metadata) + } + return o.plan, nil +} diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 14c21c825b66..c6712e15940b 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1755,11 +1755,7 @@ func (ef *execFactory) ConstructErrorIfRows( // ConstructOpaque is part of the exec.Factory interface. func (ef *execFactory) ConstructOpaque(metadata opt.OpaqueMetadata) (exec.Node, error) { - o, ok := metadata.(*opaqueMetadata) - if !ok { - return nil, errors.AssertionFailedf("unexpected OpaqueMetadata object type %T", metadata) - } - return o.plan, nil + return constructOpaque(metadata) } // ConstructAlterTableSplit is part of the exec.Factory interface. diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 332f15380347..54d7bc911ea5 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -323,6 +323,15 @@ type planMaybePhysical struct { physPlan *physicalPlanTop } +func makePlanMaybePhysical(physPlan *PhysicalPlan, planNodesToClose []planNode) planMaybePhysical { + return planMaybePhysical{ + physPlan: &physicalPlanTop{ + PhysicalPlan: physPlan, + planNodesToClose: planNodesToClose, + }, + } +} + func (p *planMaybePhysical) isPhysicalPlan() bool { return p.physPlan != nil } From f5e87980d7b2c1e1fc4f55113f132b588f482826 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 5 Aug 2020 10:58:59 -0400 Subject: [PATCH 2/3] sql/catalog/lease: attempt to fix a flakey test I think this test was going to fail. Fixes #52385. Release note: None --- pkg/sql/catalog/lease/lease_test.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index a41bc1afd0c2..1cde6f3f5e73 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -2337,7 +2337,7 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { interestingTable.Store(tableID) // Launch a goroutine to query foo. It will be blocked in lease acquisition. - selectDone := make(chan error) + selectDone := make(chan error, 1) go func() { var count int selectDone <- db2.QueryRow("SELECT count(*) FROM foo").Scan(&count) @@ -2348,7 +2348,7 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { // Launch a goroutine to perform a schema change which will lead to new // versions. - alterErrCh := make(chan error) + alterErrCh := make(chan error, 1) go func() { _, err := db1.Exec("ALTER TABLE foo ADD COLUMN j INT DEFAULT 1") alterErrCh <- err @@ -2356,8 +2356,14 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { // Make sure we get an update. Note that this is after we have already // acquired a lease on the old version but have not yet recorded that fact. - desc := <-descUpdateChan - require.Equal(t, descpb.DescriptorVersion(2), sqlbase.GetDescriptorVersion(desc)) + select { + case err := <-alterErrCh: + t.Fatalf("alter succeeded before expected: %v", err) + case err := <-selectDone: + t.Fatalf("select succeeded before expected: %v", err) + case desc := <-descUpdateChan: + require.Equal(t, descpb.DescriptorVersion(2), sqlbase.GetDescriptorVersion(desc)) + } // Allow the original lease acquisition to proceed. close(toUnblockForLeaseAcquisition) From 494074ab4737db15a4fe83ed34ee9a206539bea3 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 6 Aug 2020 13:19:46 -0400 Subject: [PATCH 3/3] sqlbase,catalogkv: move more kv interactions to catalogkv At this point the only remaining kv interactions in sqlbase are during type and table interactions though there are also usages through the protoGetter. This is a minor change in the work to pick apart sqlbase. Release note: None --- pkg/ccl/backupccl/restore_job.go | 12 +-- pkg/ccl/backupccl/restore_planning.go | 8 +- pkg/ccl/backupccl/targets.go | 4 +- pkg/ccl/importccl/import_stmt.go | 8 +- pkg/sql/alter_table_set_schema.go | 5 +- pkg/sql/alter_type.go | 4 +- pkg/sql/backfill.go | 2 +- pkg/sql/catalog/catalogkv/catalogkv.go | 32 +++++- .../catalogkv}/namespace.go | 45 ++++---- .../catalog/catalogkv/physical_accessor.go | 4 +- pkg/sql/catalog/lease/lease.go | 2 +- pkg/sql/create_schema.go | 2 +- pkg/sql/create_sequence.go | 2 +- pkg/sql/create_table.go | 4 +- pkg/sql/create_type.go | 8 +- pkg/sql/database.go | 6 +- pkg/sql/descriptor.go | 4 +- pkg/sql/drop_database.go | 5 +- pkg/sql/namespace_test.go | 3 +- pkg/sql/rename_table.go | 4 +- pkg/sql/schema_change_migrations_test.go | 2 +- pkg/sql/schema_changer.go | 2 +- pkg/sql/sqlbase/helpers_test.go | 102 ++++++++++++++++++ pkg/sql/sqlbase/index_encoding.go | 100 ----------------- pkg/sql/sqlbase/table.go | 36 +------ pkg/sql/temporary_schema.go | 2 +- pkg/sql/truncate.go | 4 +- pkg/sql/zone_config.go | 3 +- pkg/sqlmigrations/migrations.go | 2 +- 29 files changed, 210 insertions(+), 207 deletions(-) rename pkg/sql/{sqlbase => catalog/catalogkv}/namespace.go (88%) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 15c2c63b61f0..d440be004921 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -299,7 +299,7 @@ func WriteDescriptors( // Depending on which cluster version we are restoring to, we decide which // namespace table to write the descriptor into. This may cause wrong // behavior if the cluster version is bumped DURING a restore. - dKey := sqlbase.MakeDatabaseNameKey(ctx, settings, desc.GetName()) + dKey := catalogkv.MakeDatabaseNameKey(ctx, settings, desc.GetName()) b.CPut(dKey.Key(keys.SystemSQLCodec), desc.GetID(), nil) } for i := range tables { @@ -343,7 +343,7 @@ func WriteDescriptors( // Depending on which cluster version we are restoring to, we decide which // namespace table to write the descriptor into. This may cause wrong // behavior if the cluster version is bumped DURING a restore. - tkey := sqlbase.MakeObjectNameKey( + tkey := catalogkv.MakeObjectNameKey( ctx, settings, table.GetParentID(), @@ -368,7 +368,7 @@ func WriteDescriptors( ); err != nil { return err } - tkey := sqlbase.MakePublicTableNameKey(ctx, settings, typ.ParentID, typ.Name) + tkey := catalogkv.MakePublicTableNameKey(ctx, settings, typ.ParentID, typ.Name) b.CPut(tkey.Key(keys.SystemSQLCodec), typ.ID, nil) } @@ -1114,7 +1114,7 @@ func (r *restoreResumer) publishDescriptors(ctx context.Context) error { return err } newDescriptorChangeJobs = append(newDescriptorChangeJobs, newJobs...) - existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl) + existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl) if err != nil { return errors.Wrap(err, "validating table descriptor has not changed") } @@ -1232,11 +1232,11 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn prev := tableToDrop.Immutable().(sqlbase.TableDescriptor) tableToDrop.Version++ tableToDrop.State = descpb.TableDescriptor_DROP - err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, keys.SystemSQLCodec, tbl.ParentID, tbl.Name) + err := catalogkv.RemovePublicTableNamespaceEntry(ctx, txn, keys.SystemSQLCodec, tbl.ParentID, tbl.Name) if err != nil { return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace") } - existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, prev) + existingDescVal, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, prev) if err != nil { return errors.Wrap(err, "dropping tables caused by restore fail/cancel") } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 7c095ae20546..6907e809953a 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -295,7 +295,7 @@ func allocateDescriptorRewrites( if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check that any DBs being restored do _not_ exist. for name := range restoreDBNames { - found, _, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, name) + found, _, err := catalogkv.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, name) if err != nil { return err } @@ -350,7 +350,7 @@ func allocateDescriptorRewrites( } else { var parentID descpb.ID { - found, newParentID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB) + found, newParentID, err := catalogkv.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB) if err != nil { return err } @@ -415,7 +415,7 @@ func allocateDescriptorRewrites( } // Look up the parent database's ID. - found, parentID, err := sqlbase.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB) + found, parentID, err := catalogkv.LookupDatabaseID(ctx, txn, p.ExecCfg().Codec, targetDB) if err != nil { return err } @@ -431,7 +431,7 @@ func allocateDescriptorRewrites( } // See if there is an existing type with the same name. - found, id, err := sqlbase.LookupObjectID(ctx, txn, p.ExecCfg().Codec, parentID, keys.PublicSchemaID, typ.Name) + found, id, err := catalogkv.LookupObjectID(ctx, txn, p.ExecCfg().Codec, parentID, keys.PublicSchemaID, typ.Name) if err != nil { return err } diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index 0646f71db957..f7ffe76d58e3 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -660,7 +660,7 @@ func fullClusterTargets( func lookupDatabaseID( ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, name string, ) (descpb.ID, error) { - found, id, err := sqlbase.LookupDatabaseID(ctx, txn, codec, name) + found, id, err := catalogkv.LookupDatabaseID(ctx, txn, codec, name) if err != nil { return descpb.InvalidID, err } @@ -680,7 +680,7 @@ func CheckObjectExists( parentSchemaID descpb.ID, name string, ) error { - found, id, err := sqlbase.LookupObjectID(ctx, txn, codec, parentID, parentSchemaID, name) + found, id, err := catalogkv.LookupObjectID(ctx, txn, codec, parentID, parentSchemaID, name) if err != nil { return err } diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 9c0281e2dbff..574c56d4d784 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -988,7 +988,7 @@ func prepareExistingTableDescForIngestion( // upgrade and downgrade, because IMPORT does not operate in mixed-version // states. // TODO(jordan,lucy): remove this comment once 19.2 is released. - existingDesc, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, existing) + existingDesc, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, existing) if err != nil { return nil, errors.Wrap(err, "another operation is currently operating on the table") } @@ -1272,7 +1272,7 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor // upgrade and downgrade, because IMPORT does not operate in mixed-version // states. // TODO(jordan,lucy): remove this comment once 19.2 is released. - existingDesc, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prevTableDesc) + existingDesc, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prevTableDesc) if err != nil { return errors.Wrap(err, "publishing tables") } @@ -1404,7 +1404,7 @@ func (r *importResumer) dropTables( // possible. This is safe since the table data was never visible to users, // and so we don't need to preserve MVCC semantics. newTableDesc.DropTime = dropTime - if err := sqlbase.RemovePublicTableNamespaceEntry( + if err := catalogkv.RemovePublicTableNamespaceEntry( ctx, txn, execCfg.Codec, newTableDesc.ParentID, newTableDesc.Name, ); err != nil { return err @@ -1418,7 +1418,7 @@ func (r *importResumer) dropTables( // upgrade and downgrade, because IMPORT does not operate in mixed-version // states. // TODO(jordan,lucy): remove this comment once 19.2 is released. - existingDesc, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prevTableDesc) + existingDesc, err := catalogkv.ConditionalGetTableDescFromTxn(ctx, txn, execCfg.Codec, prevTableDesc) if err != nil { return errors.Wrap(err, "rolling back tables") } diff --git a/pkg/sql/alter_table_set_schema.go b/pkg/sql/alter_table_set_schema.go index 10138e160ec3..4fd427cba76f 100644 --- a/pkg/sql/alter_table_set_schema.go +++ b/pkg/sql/alter_table_set_schema.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -130,7 +131,7 @@ func (n *alterTableSetSchemaNode) startExec(params runParams) error { return nil } - exists, _, err = sqlbase.LookupObjectID( + exists, _, err = catalogkv.LookupObjectID( ctx, p.txn, p.ExecCfg().Codec, databaseID, desiredSchemaID, tableDesc.Name, ) if err == nil && exists { @@ -156,7 +157,7 @@ func (n *alterTableSetSchemaNode) startExec(params runParams) error { return err } - newTbKey := sqlbase.MakeObjectNameKey(ctx, p.ExecCfg().Settings, + newTbKey := catalogkv.MakeObjectNameKey(ctx, p.ExecCfg().Settings, databaseID, desiredSchemaID, tableDesc.Name).Key(p.ExecCfg().Codec) b := &kv.Batch{} diff --git a/pkg/sql/alter_type.go b/pkg/sql/alter_type.go index 10ea52dfff82..860cd0f4ce53 100644 --- a/pkg/sql/alter_type.go +++ b/pkg/sql/alter_type.go @@ -83,7 +83,7 @@ func (p *planner) addEnumValue( func (p *planner) renameType(params runParams, n *alterTypeNode, newName string) error { // See if there is a name collision with the new name. - exists, id, err := sqlbase.LookupObjectID( + exists, id, err := catalogkv.LookupObjectID( params.ctx, p.txn, p.ExecCfg().Codec, @@ -155,7 +155,7 @@ func (p *planner) performRenameTypeDesc( } // Construct the new namespace key. b := p.txn.NewBatch() - key := sqlbase.MakeObjectNameKey( + key := catalogkv.MakeObjectNameKey( ctx, p.ExecCfg().Settings, desc.ParentID, diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index a9d420d3ceaf..f4a9ba0fb6a1 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -1347,7 +1347,7 @@ func runSchemaChangesInTxn( // Reclaim all the old names. Leave the data and descriptor // cleanup for later. for _, drain := range tableDesc.DrainingNames { - err := sqlbase.RemoveObjectNamespaceEntry(ctx, planner.Txn(), planner.ExecCfg().Codec, + err := catalogkv.RemoveObjectNamespaceEntry(ctx, planner.Txn(), planner.ExecCfg().Codec, drain.ParentID, drain.ParentSchemaID, drain.Name, false /* KVTrace */) if err != nil { return err diff --git a/pkg/sql/catalog/catalogkv/catalogkv.go b/pkg/sql/catalog/catalogkv/catalogkv.go index d4af27eaaf10..275f4410512f 100644 --- a/pkg/sql/catalog/catalogkv/catalogkv.go +++ b/pkg/sql/catalog/catalogkv/catalogkv.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -412,7 +413,7 @@ func GetDatabaseID( if name == sqlbase.SystemDatabaseName { return keys.SystemDatabaseID, nil } - found, dbID, err := sqlbase.LookupDatabaseID(ctx, txn, codec, name) + found, dbID, err := LookupDatabaseID(ctx, txn, codec, name) if err != nil { return descpb.InvalidID, err } @@ -509,3 +510,32 @@ func GetDatabaseDescriptorsFromIDs( } return results, nil } + +// ConditionalGetTableDescFromTxn validates that the supplied TableDescriptor +// matches the one currently stored in kv. This simulates a CPut and returns a +// ConditionFailedError on mismatch. We don't directly use CPut with protos +// because the marshaling is not guaranteed to be stable and also because it's +// sensitive to things like missing vs default values of fields. +func ConditionalGetTableDescFromTxn( + ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, expectation sqlbase.TableDescriptor, +) ([]byte, error) { + key := sqlbase.MakeDescMetadataKey(codec, expectation.GetID()) + existingKV, err := txn.Get(ctx, key) + if err != nil { + return nil, err + } + var existing *descpb.Descriptor + var existingTable *descpb.TableDescriptor + if existingKV.Value != nil { + existing = &descpb.Descriptor{} + if err := existingKV.Value.GetProto(existing); err != nil { + return nil, errors.Wrapf(err, + "decoding current table descriptor value for id: %d", expectation.GetID()) + } + existingTable = sqlbase.TableFromDescriptor(existing, existingKV.Value.Timestamp) + } + if !expectation.TableDesc().Equal(existingTable) { + return nil, &roachpb.ConditionFailedError{ActualValue: existingKV.Value} + } + return existingKV.Value.TagAndDataBytes(), nil +} diff --git a/pkg/sql/sqlbase/namespace.go b/pkg/sql/catalog/catalogkv/namespace.go similarity index 88% rename from pkg/sql/sqlbase/namespace.go rename to pkg/sql/catalog/catalogkv/namespace.go index 95c007194547..b82c776ffa4b 100644 --- a/pkg/sql/sqlbase/namespace.go +++ b/pkg/sql/catalog/catalogkv/namespace.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sqlbase +package catalogkv import ( "context" @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -69,20 +70,20 @@ func RemoveObjectNamespaceEntry( KVTrace bool, ) error { b := txn.NewBatch() - var toDelete []DescriptorKey + var toDelete []sqlbase.DescriptorKey // The (parentID, name) mapping could be in either the new system.namespace // or the deprecated version. Thus we try to remove the mapping from both. if parentID == keys.RootNamespaceID { - toDelete = append(toDelete, NewDatabaseKey(name)) + toDelete = append(toDelete, sqlbase.NewDatabaseKey(name)) // TODO(solon): This can be completely removed in 20.2. - toDelete = append(toDelete, NewDeprecatedDatabaseKey(name)) + toDelete = append(toDelete, sqlbase.NewDeprecatedDatabaseKey(name)) } else if parentSchemaID == keys.RootNamespaceID { // Schemas were introduced in 20.1. - toDelete = append(toDelete, NewSchemaKey(parentID, name)) + toDelete = append(toDelete, sqlbase.NewSchemaKey(parentID, name)) } else { - toDelete = append(toDelete, NewTableKey(parentID, parentSchemaID, name)) + toDelete = append(toDelete, sqlbase.NewTableKey(parentID, parentSchemaID, name)) // TODO(solon): This can be completely removed in 20.2. - toDelete = append(toDelete, NewDeprecatedTableKey(parentID, name)) + toDelete = append(toDelete, sqlbase.NewDeprecatedTableKey(parentID, name)) } for _, delKey := range toDelete { if KVTrace { @@ -128,18 +129,18 @@ func MakeObjectNameKey( parentID descpb.ID, parentSchemaID descpb.ID, name string, -) DescriptorKey { +) sqlbase.DescriptorKey { // TODO(solon): This if condition can be removed in 20.2 if !settings.Version.IsActive(ctx, clusterversion.VersionNamespaceTableWithSchemas) { - return NewDeprecatedTableKey(parentID, name) + return sqlbase.NewDeprecatedTableKey(parentID, name) } - var key DescriptorKey + var key sqlbase.DescriptorKey if parentID == keys.RootNamespaceID { - key = NewDatabaseKey(name) + key = sqlbase.NewDatabaseKey(name) } else if parentSchemaID == keys.RootNamespaceID { - key = NewSchemaKey(parentID, name) + key = sqlbase.NewSchemaKey(parentID, name) } else { - key = NewTableKey(parentID, parentSchemaID, name) + key = sqlbase.NewTableKey(parentID, parentSchemaID, name) } return key } @@ -147,14 +148,14 @@ func MakeObjectNameKey( // MakePublicTableNameKey is a wrapper around MakeObjectNameKey for public tables. func MakePublicTableNameKey( ctx context.Context, settings *cluster.Settings, parentID descpb.ID, name string, -) DescriptorKey { +) sqlbase.DescriptorKey { return MakeObjectNameKey(ctx, settings, parentID, keys.PublicSchemaID, name) } // MakeDatabaseNameKey is a wrapper around MakeObjectNameKey for databases. func MakeDatabaseNameKey( ctx context.Context, settings *cluster.Settings, name string, -) DescriptorKey { +) sqlbase.DescriptorKey { return MakeObjectNameKey(ctx, settings, keys.RootNamespaceID, keys.RootNamespaceID, name) } @@ -169,13 +170,13 @@ func LookupObjectID( parentSchemaID descpb.ID, name string, ) (bool, descpb.ID, error) { - var key DescriptorKey + var key sqlbase.DescriptorKey if parentID == keys.RootNamespaceID { - key = NewDatabaseKey(name) + key = sqlbase.NewDatabaseKey(name) } else if parentSchemaID == keys.RootNamespaceID { - key = NewSchemaKey(parentID, name) + key = sqlbase.NewSchemaKey(parentID, name) } else { - key = NewTableKey(parentID, parentSchemaID, name) + key = sqlbase.NewTableKey(parentID, parentSchemaID, name) } log.Eventf(ctx, "looking up descriptor ID for name key %q", key.Key(codec)) res, err := txn.Get(ctx, key.Key(codec)) @@ -202,11 +203,11 @@ func LookupObjectID( return false, descpb.InvalidID, nil } - var dKey DescriptorKey + var dKey sqlbase.DescriptorKey if parentID == keys.RootNamespaceID { - dKey = NewDeprecatedDatabaseKey(name) + dKey = sqlbase.NewDeprecatedDatabaseKey(name) } else { - dKey = NewDeprecatedTableKey(parentID, name) + dKey = sqlbase.NewDeprecatedTableKey(parentID, name) } log.Eventf(ctx, "looking up descriptor ID for name key %q", dKey.Key(codec)) res, err = txn.Get(ctx, dKey.Key(codec)) diff --git a/pkg/sql/catalog/catalogkv/physical_accessor.go b/pkg/sql/catalog/catalogkv/physical_accessor.go index ea2361a2a39f..6c1ead3ee5ee 100644 --- a/pkg/sql/catalog/catalogkv/physical_accessor.go +++ b/pkg/sql/catalog/catalogkv/physical_accessor.go @@ -52,7 +52,7 @@ func (a UncachedPhysicalAccessor) GetDatabaseDesc( return sysDB, nil } - found, descID, err := sqlbase.LookupDatabaseID(ctx, txn, codec, name) + found, descID, err := LookupDatabaseID(ctx, txn, codec, name) if err != nil { return nil, err } else if !found { @@ -245,7 +245,7 @@ func (a UncachedPhysicalAccessor) GetObjectDesc( descID := sqlbase.LookupSystemTableDescriptorID(ctx, settings, codec, dbID, object) if descID == descpb.InvalidID { var found bool - found, descID, err = sqlbase.LookupObjectID(ctx, txn, codec, dbID, schema.ID, object) + found, descID, err = LookupObjectID(ctx, txn, codec, dbID, schema.ID, object) if err != nil { return nil, err } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 052ff171a606..7db65d99007e 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -1662,7 +1662,7 @@ func (m *Manager) resolveName( txn.SetFixedTimestamp(ctx, timestamp) var found bool var err error - found, id, err = sqlbase.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name) + found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name) if err != nil { return err } diff --git a/pkg/sql/create_schema.go b/pkg/sql/create_schema.go index 2edbaa2ec6f9..fc506f0db683 100644 --- a/pkg/sql/create_schema.go +++ b/pkg/sql/create_schema.go @@ -47,7 +47,7 @@ func (p *planner) schemaExists( } } // Now lookup in the namespace for other schemas. - exists, _, err := sqlbase.LookupObjectID(ctx, p.txn, p.ExecCfg().Codec, parentID, keys.RootNamespaceID, schema) + exists, _, err := catalogkv.LookupObjectID(ctx, p.txn, p.ExecCfg().Codec, parentID, keys.RootNamespaceID, schema) if err != nil { return false, err } diff --git a/pkg/sql/create_sequence.go b/pkg/sql/create_sequence.go index 49ecfdc08cf0..779adb8a2494 100644 --- a/pkg/sql/create_sequence.go +++ b/pkg/sql/create_sequence.go @@ -120,7 +120,7 @@ func doCreateSequence( // makeSequenceTableDesc already validates the table. No call to // desc.ValidateTable() needed here. - key := sqlbase.MakeObjectNameKey( + key := catalogkv.MakeObjectNameKey( params.ctx, params.ExecCfg().Settings, dbDesc.GetID(), diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index f4f7c87803a0..6befb5f08a00 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -214,10 +214,10 @@ func getTableCreateParams( if err != nil { return nil, 0, err } - tKey = sqlbase.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, dbID, schemaID, tableName.Table()) + tKey = catalogkv.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, dbID, schemaID, tableName.Table()) } - exists, id, err := sqlbase.LookupObjectID( + exists, id, err := catalogkv.LookupObjectID( params.ctx, params.p.txn, params.ExecCfg().Codec, dbID, schemaID, tableName.Table()) if err == nil && exists { // Try and see what kind of object we collided with. diff --git a/pkg/sql/create_type.go b/pkg/sql/create_type.go index d6db34489f9f..30f769954770 100644 --- a/pkg/sql/create_type.go +++ b/pkg/sql/create_type.go @@ -84,8 +84,8 @@ func getCreateTypeParams( if err != nil { return nil, 0, err } - typeKey = sqlbase.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, db.GetID(), schemaID, name.Type()) - exists, collided, err := sqlbase.LookupObjectID( + typeKey = catalogkv.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, db.GetID(), schemaID, name.Type()) + exists, collided, err := catalogkv.LookupObjectID( params.ctx, params.p.txn, params.ExecCfg().Codec, db.GetID(), schemaID, name.Type()) if err == nil && exists { // Try and see what kind of object we collided with. @@ -111,7 +111,7 @@ func findFreeArrayTypeName( arrayName := "_" + name for { // See if there is a collision with the current name. - exists, _, err := sqlbase.LookupObjectID( + exists, _, err := catalogkv.LookupObjectID( ctx, txn, codec, @@ -157,7 +157,7 @@ func (p *planner) createArrayType( if err != nil { return 0, err } - arrayTypeKey := sqlbase.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, db.ID, schemaID, arrayTypeName) + arrayTypeKey := catalogkv.MakeObjectNameKey(params.ctx, params.ExecCfg().Settings, db.ID, schemaID, arrayTypeName) // Generate the stable ID for the array type. id, err := catalogkv.GenerateUniqueDescID(params.ctx, params.ExecCfg().DB, params.ExecCfg().Codec) diff --git a/pkg/sql/database.go b/pkg/sql/database.go index c5c02a258677..267dfb00755b 100644 --- a/pkg/sql/database.go +++ b/pkg/sql/database.go @@ -42,14 +42,14 @@ func (p *planner) renameDatabase( return err } - if exists, _, err := sqlbase.LookupDatabaseID(ctx, p.txn, p.ExecCfg().Codec, newName); err == nil && exists { + if exists, _, err := catalogkv.LookupDatabaseID(ctx, p.txn, p.ExecCfg().Codec, newName); err == nil && exists { return pgerror.Newf(pgcode.DuplicateDatabase, "the new database name %q already exists", newName) } else if err != nil { return err } - newKey := sqlbase.MakeDatabaseNameKey(ctx, p.ExecCfg().Settings, newName).Key(p.ExecCfg().Codec) + newKey := catalogkv.MakeDatabaseNameKey(ctx, p.ExecCfg().Settings, newName).Key(p.ExecCfg().Codec) descID := newDesc.GetID() descKey := sqlbase.MakeDescMetadataKey(p.ExecCfg().Codec, descID) @@ -62,7 +62,7 @@ func (p *planner) renameDatabase( } b.CPut(newKey, descID, nil) b.Put(descKey, descDesc) - err := sqlbase.RemoveDatabaseNamespaceEntry( + err := catalogkv.RemoveDatabaseNamespaceEntry( ctx, p.txn, p.ExecCfg().Codec, oldName, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), ) if err != nil { diff --git a/pkg/sql/descriptor.go b/pkg/sql/descriptor.go index e909369ab179..ae144117ac60 100644 --- a/pkg/sql/descriptor.go +++ b/pkg/sql/descriptor.go @@ -55,7 +55,7 @@ func (p *planner) createDatabase( dbName := string(database.Name) shouldCreatePublicSchema := true - dKey := sqlbase.MakeDatabaseNameKey(ctx, p.ExecCfg().Settings, dbName) + dKey := catalogkv.MakeDatabaseNameKey(ctx, p.ExecCfg().Settings, dbName) // TODO(solon): This conditional can be removed in 20.2. Every database // is created with a public schema for cluster version >= 20.1, so we can remove // the `shouldCreatePublicSchema` logic as well. @@ -63,7 +63,7 @@ func (p *planner) createDatabase( shouldCreatePublicSchema = false } - if exists, _, err := sqlbase.LookupDatabaseID(ctx, p.txn, p.ExecCfg().Codec, dbName); err == nil && exists { + if exists, _, err := catalogkv.LookupDatabaseID(ctx, p.txn, p.ExecCfg().Codec, dbName); err == nil && exists { if database.IfNotExists { // Noop. return nil, false, nil diff --git a/pkg/sql/drop_database.go b/pkg/sql/drop_database.go index 836459d54fb9..bfd3bd8bd827 100644 --- a/pkg/sql/drop_database.go +++ b/pkg/sql/drop_database.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" @@ -264,7 +265,7 @@ func (n *dropDatabaseNode) startExec(params runParams) error { b.Del(descKey) for _, schemaToDelete := range n.schemasToDelete { - if err := sqlbase.RemoveSchemaNamespaceEntry( + if err := catalogkv.RemoveSchemaNamespaceEntry( ctx, p.txn, p.ExecCfg().Codec, @@ -275,7 +276,7 @@ func (n *dropDatabaseNode) startExec(params runParams) error { } } - err := sqlbase.RemoveDatabaseNamespaceEntry( + err := catalogkv.RemoveDatabaseNamespaceEntry( ctx, p.txn, p.ExecCfg().Codec, n.dbDesc.GetName(), p.ExtendedEvalContext().Tracing.KVTracingEnabled(), ) if err != nil { diff --git a/pkg/sql/namespace_test.go b/pkg/sql/namespace_test.go index 09742c7b394b..ec9ee4048d7d 100644 --- a/pkg/sql/namespace_test.go +++ b/pkg/sql/namespace_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -100,7 +101,7 @@ func TestNamespaceTableSemantics(t *testing.T) { } txn := kvDB.NewTxn(ctx, "lookup-test-db-id") - found, dbID, err := sqlbase.LookupDatabaseID(ctx, txn, codec, "test") + found, dbID, err := catalogkv.LookupDatabaseID(ctx, txn, codec, "test") if err != nil { t.Fatal(err) } diff --git a/pkg/sql/rename_table.go b/pkg/sql/rename_table.go index 5dd9b312e30c..d493f1c4552a 100644 --- a/pkg/sql/rename_table.go +++ b/pkg/sql/rename_table.go @@ -148,7 +148,7 @@ func (n *renameTableNode) startExec(params runParams) error { tableDesc.SetName(newTn.Table()) tableDesc.ParentID = targetDbDesc.GetID() - newTbKey := sqlbase.MakeObjectNameKey(ctx, params.ExecCfg().Settings, + newTbKey := catalogkv.MakeObjectNameKey(ctx, params.ExecCfg().Settings, targetDbDesc.GetID(), tableDesc.GetParentSchemaID(), newTn.Table()).Key(p.ExecCfg().Codec) if err := tableDesc.Validate(ctx, p.txn, p.ExecCfg().Codec); err != nil { @@ -182,7 +182,7 @@ func (n *renameTableNode) startExec(params runParams) error { return err } - exists, id, err := sqlbase.LookupPublicTableID( + exists, id, err := catalogkv.LookupPublicTableID( params.ctx, params.p.txn, p.ExecCfg().Codec, targetDbDesc.GetID(), newTn.Table(), ) if err == nil && exists { diff --git a/pkg/sql/schema_change_migrations_test.go b/pkg/sql/schema_change_migrations_test.go index 95c348c874af..9b77643971ce 100644 --- a/pkg/sql/schema_change_migrations_test.go +++ b/pkg/sql/schema_change_migrations_test.go @@ -887,7 +887,7 @@ func TestGCJobCreated(t *testing.T) { if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil { return err } - if err := sqlbase.RemoveObjectNamespaceEntry( + if err := catalogkv.RemoveObjectNamespaceEntry( ctx, txn, keys.SystemSQLCodec, tableDesc.ID, tableDesc.ParentID, tableDesc.Name, false, /* kvTrace */ ); err != nil { return err diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 083625a6b579..ce04c76b9781 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -372,7 +372,7 @@ func drainNamesForDescriptor( func(txn *kv.Txn) error { b := txn.NewBatch() for _, drain := range namesToReclaim { - err := sqlbase.RemoveObjectNamespaceEntry( + err := catalogkv.RemoveObjectNamespaceEntry( ctx, txn, codec, drain.ParentID, drain.ParentSchemaID, drain.Name, false, /* KVTrace */ ) if err != nil { diff --git a/pkg/sql/sqlbase/helpers_test.go b/pkg/sql/sqlbase/helpers_test.go index 6d48bec52400..90d14deaf5c4 100644 --- a/pkg/sql/sqlbase/helpers_test.go +++ b/pkg/sql/sqlbase/helpers_test.go @@ -15,9 +15,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) func testingGetDescriptor( @@ -76,3 +79,102 @@ func TestingGetImmutableTableDescriptor( ) *ImmutableTableDescriptor { return NewImmutableTableDescriptor(*TestingGetTableDescriptor(kvDB, codec, database, table)) } + +// ExtractIndexKey constructs the index (primary) key for a row from any index +// key/value entry, including secondary indexes. +// +// Don't use this function in the scan "hot path". +func ExtractIndexKey( + a *DatumAlloc, codec keys.SQLCodec, tableDesc *ImmutableTableDescriptor, entry kv.KeyValue, +) (roachpb.Key, error) { + indexID, key, err := DecodeIndexKeyPrefix(codec, tableDesc, entry.Key) + if err != nil { + return nil, err + } + if indexID == tableDesc.PrimaryIndex.ID { + return entry.Key, nil + } + + index, err := tableDesc.FindIndexByID(indexID) + if err != nil { + return nil, err + } + + // Extract the values for index.ColumnIDs. + indexTypes, err := GetColumnTypes(tableDesc, index.ColumnIDs) + if err != nil { + return nil, err + } + values := make([]EncDatum, len(index.ColumnIDs)) + dirs := index.ColumnDirections + if len(index.Interleave.Ancestors) > 0 { + // TODO(dan): In the interleaved index case, we parse the key twice; once to + // find the index id so we can look up the descriptor, and once to extract + // the values. Only parse once. + var ok bool + _, ok, _, err = DecodeIndexKey(codec, tableDesc, index, indexTypes, values, dirs, entry.Key) + if err != nil { + return nil, err + } + if !ok { + return nil, errors.Errorf("descriptor did not match key") + } + } else { + key, _, err = DecodeKeyVals(indexTypes, values, dirs, key) + if err != nil { + return nil, err + } + } + + // Extract the values for index.ExtraColumnIDs + extraTypes, err := GetColumnTypes(tableDesc, index.ExtraColumnIDs) + if err != nil { + return nil, err + } + extraValues := make([]EncDatum, len(index.ExtraColumnIDs)) + dirs = make([]descpb.IndexDescriptor_Direction, len(index.ExtraColumnIDs)) + for i := range index.ExtraColumnIDs { + // Implicit columns are always encoded Ascending. + dirs[i] = descpb.IndexDescriptor_ASC + } + extraKey := key + if index.Unique { + extraKey, err = entry.Value.GetBytes() + if err != nil { + return nil, err + } + } + _, _, err = DecodeKeyVals(extraTypes, extraValues, dirs, extraKey) + if err != nil { + return nil, err + } + + // Encode the index key from its components. + colMap := make(map[descpb.ColumnID]int) + for i, columnID := range index.ColumnIDs { + colMap[columnID] = i + } + for i, columnID := range index.ExtraColumnIDs { + colMap[columnID] = i + len(index.ColumnIDs) + } + indexKeyPrefix := MakeIndexKeyPrefix(codec, tableDesc, tableDesc.PrimaryIndex.ID) + + decodedValues := make([]tree.Datum, len(values)+len(extraValues)) + for i, value := range values { + err := value.EnsureDecoded(indexTypes[i], a) + if err != nil { + return nil, err + } + decodedValues[i] = value.Datum + } + for i, value := range extraValues { + err := value.EnsureDecoded(extraTypes[i], a) + if err != nil { + return nil, err + } + decodedValues[len(values)+i] = value.Datum + } + indexKey, _, err := EncodeIndexKey( + tableDesc, &tableDesc.PrimaryIndex, colMap, decodedValues, indexKeyPrefix) + return indexKey, err +} diff --git a/pkg/sql/sqlbase/index_encoding.go b/pkg/sql/sqlbase/index_encoding.go index 368f3a3270cf..f8e8932ccfae 100644 --- a/pkg/sql/sqlbase/index_encoding.go +++ b/pkg/sql/sqlbase/index_encoding.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -721,105 +720,6 @@ func DecodeKeyVals( return key, foundNull, nil } -// ExtractIndexKey constructs the index (primary) key for a row from any index -// key/value entry, including secondary indexes. -// -// Don't use this function in the scan "hot path". -func ExtractIndexKey( - a *DatumAlloc, codec keys.SQLCodec, tableDesc *ImmutableTableDescriptor, entry kv.KeyValue, -) (roachpb.Key, error) { - indexID, key, err := DecodeIndexKeyPrefix(codec, tableDesc, entry.Key) - if err != nil { - return nil, err - } - if indexID == tableDesc.PrimaryIndex.ID { - return entry.Key, nil - } - - index, err := tableDesc.FindIndexByID(indexID) - if err != nil { - return nil, err - } - - // Extract the values for index.ColumnIDs. - indexTypes, err := GetColumnTypes(tableDesc, index.ColumnIDs) - if err != nil { - return nil, err - } - values := make([]EncDatum, len(index.ColumnIDs)) - dirs := index.ColumnDirections - if len(index.Interleave.Ancestors) > 0 { - // TODO(dan): In the interleaved index case, we parse the key twice; once to - // find the index id so we can look up the descriptor, and once to extract - // the values. Only parse once. - var ok bool - _, ok, _, err = DecodeIndexKey(codec, tableDesc, index, indexTypes, values, dirs, entry.Key) - if err != nil { - return nil, err - } - if !ok { - return nil, errors.Errorf("descriptor did not match key") - } - } else { - key, _, err = DecodeKeyVals(indexTypes, values, dirs, key) - if err != nil { - return nil, err - } - } - - // Extract the values for index.ExtraColumnIDs - extraTypes, err := GetColumnTypes(tableDesc, index.ExtraColumnIDs) - if err != nil { - return nil, err - } - extraValues := make([]EncDatum, len(index.ExtraColumnIDs)) - dirs = make([]descpb.IndexDescriptor_Direction, len(index.ExtraColumnIDs)) - for i := range index.ExtraColumnIDs { - // Implicit columns are always encoded Ascending. - dirs[i] = descpb.IndexDescriptor_ASC - } - extraKey := key - if index.Unique { - extraKey, err = entry.Value.GetBytes() - if err != nil { - return nil, err - } - } - _, _, err = DecodeKeyVals(extraTypes, extraValues, dirs, extraKey) - if err != nil { - return nil, err - } - - // Encode the index key from its components. - colMap := make(map[descpb.ColumnID]int) - for i, columnID := range index.ColumnIDs { - colMap[columnID] = i - } - for i, columnID := range index.ExtraColumnIDs { - colMap[columnID] = i + len(index.ColumnIDs) - } - indexKeyPrefix := MakeIndexKeyPrefix(codec, tableDesc, tableDesc.PrimaryIndex.ID) - - decodedValues := make([]tree.Datum, len(values)+len(extraValues)) - for i, value := range values { - err := value.EnsureDecoded(indexTypes[i], a) - if err != nil { - return nil, err - } - decodedValues[i] = value.Datum - } - for i, value := range extraValues { - err := value.EnsureDecoded(extraTypes[i], a) - if err != nil { - return nil, err - } - decodedValues[len(values)+i] = value.Datum - } - indexKey, _, err := EncodeIndexKey( - tableDesc, &tableDesc.PrimaryIndex, colMap, decodedValues, indexKeyPrefix) - return indexKey, err -} - // IndexEntry represents an encoded key/value for an index entry. type IndexEntry struct { Key roachpb.Key diff --git a/pkg/sql/sqlbase/table.go b/pkg/sql/sqlbase/table.go index 70f032aefda4..0b68cc5a34a4 100644 --- a/pkg/sql/sqlbase/table.go +++ b/pkg/sql/sqlbase/table.go @@ -17,8 +17,6 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -323,7 +321,7 @@ type tableLookupFn func(descpb.ID) (TableDescriptor, error) // GetConstraintInfo returns a summary of all constraints on the table. func (desc *ImmutableTableDescriptor) GetConstraintInfo( - ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, + ctx context.Context, txn protoGetter, codec keys.SQLCodec, ) (map[string]descpb.ConstraintDetail, error) { var tableLookup tableLookupFn if txn != nil { @@ -553,38 +551,6 @@ func FindFKOriginIndexInTxn( ) } -// ConditionalGetTableDescFromTxn validates that the supplied TableDescriptor -// matches the one currently stored in kv. This simulates a CPut and returns a -// ConditionFailedError on mismatch. We don't directly use CPut with protos -// because the marshaling is not guaranteed to be stable and also because it's -// sensitive to things like missing vs default values of fields. -// -// TODO(ajwerner): Make this take a TableDescriptor and probably add -// an equality method on that interface or something like that. -func ConditionalGetTableDescFromTxn( - ctx context.Context, txn *kv.Txn, codec keys.SQLCodec, expectation TableDescriptor, -) ([]byte, error) { - key := MakeDescMetadataKey(codec, expectation.GetID()) - existingKV, err := txn.Get(ctx, key) - if err != nil { - return nil, err - } - var existing *descpb.Descriptor - var existingTable *descpb.TableDescriptor - if existingKV.Value != nil { - existing = &descpb.Descriptor{} - if err := existingKV.Value.GetProto(existing); err != nil { - return nil, errors.Wrapf(err, - "decoding current table descriptor value for id: %d", expectation.GetID()) - } - existingTable = TableFromDescriptor(existing, existingKV.Value.Timestamp) - } - if !expectation.TableDesc().Equal(existingTable) { - return nil, &roachpb.ConditionFailedError{ActualValue: existingKV.Value} - } - return existingKV.Value.TagAndDataBytes(), nil -} - // InitTableDescriptor returns a blank TableDescriptor. func InitTableDescriptor( id, parentID, parentSchemaID descpb.ID, diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 9d4feabb2f06..d0e50dfa496b 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -202,7 +202,7 @@ func cleanupSessionTempObjects( // itself may still exist (eg. a temporary table was created and then // dropped). So we remove the namespace table entry of the temporary // schema. - if err := sqlbase.RemoveSchemaNamespaceEntry(ctx, txn, codec, id, tempSchemaName); err != nil { + if err := catalogkv.RemoveSchemaNamespaceEntry(ctx, txn, codec, id, tempSchemaName); err != nil { return err } } diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index a4a3d92a8c65..1c51899c82cb 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -190,7 +190,7 @@ func (p *planner) truncateTable( // structured.proto // // TODO(vivek): Fix properly along with #12123. - key := sqlbase.MakeObjectNameKey( + key := catalogkv.MakeObjectNameKey( ctx, p.ExecCfg().Settings, newTableDesc.ParentID, newTableDesc.GetParentSchemaID(), @@ -198,7 +198,7 @@ func (p *planner) truncateTable( ).Key(p.ExecCfg().Codec) // Remove the old namespace entry. - if err := sqlbase.RemoveObjectNamespaceEntry( + if err := catalogkv.RemoveObjectNamespaceEntry( ctx, p.txn, p.execCfg.Codec, tableDesc.ParentID, tableDesc.GetParentSchemaID(), tableDesc.GetName(), traceKV); err != nil { diff --git a/pkg/sql/zone_config.go b/pkg/sql/zone_config.go index 6017f3ae853f..2800792925fe 100644 --- a/pkg/sql/zone_config.go +++ b/pkg/sql/zone_config.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -283,7 +284,7 @@ func resolveZone(ctx context.Context, txn *kv.Txn, zs *tree.ZoneSpecifier) (desc errMissingKey := errors.New("missing key") id, err := zonepb.ResolveZoneSpecifier(zs, func(parentID uint32, name string) (uint32, error) { - found, id, err := sqlbase.LookupPublicTableID(ctx, txn, keys.SystemSQLCodec, descpb.ID(parentID), name) + found, id, err := catalogkv.LookupPublicTableID(ctx, txn, keys.SystemSQLCodec, descpb.ID(parentID), name) if err != nil { return 0, err } diff --git a/pkg/sqlmigrations/migrations.go b/pkg/sqlmigrations/migrations.go index 0131e0bc2ecd..49b232c4426b 100644 --- a/pkg/sqlmigrations/migrations.go +++ b/pkg/sqlmigrations/migrations.go @@ -1486,7 +1486,7 @@ func createSystemTable(ctx context.Context, r runner, desc sqlbase.TableDescript // the reserved ID space. (The SQL layer doesn't allow this.) err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() - tKey := sqlbase.MakePublicTableNameKey(ctx, r.settings, desc.GetParentID(), desc.GetName()) + tKey := catalogkv.MakePublicTableNameKey(ctx, r.settings, desc.GetParentID(), desc.GetName()) b.CPut(tKey.Key(r.codec), desc.GetID(), nil) b.CPut(sqlbase.MakeDescMetadataKey(r.codec, desc.GetID()), desc.DescriptorProto(), nil) if err := txn.SetSystemConfigTrigger(r.codec.ForSystemTenant()); err != nil {