diff --git a/pkg/ccl/backupccl/restore.go b/pkg/ccl/backupccl/restore.go index fc8061d08cf8..153fdb2f3f6c 100644 --- a/pkg/ccl/backupccl/restore.go +++ b/pkg/ccl/backupccl/restore.go @@ -1035,7 +1035,9 @@ func WriteTableDescs( // TODO(dt): support restoring privs. desc.Privileges = sqlbase.NewDefaultPrivilegeDescriptor() wroteDBs[desc.ID] = desc - sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc) + if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc); err != nil { + return err + } b.CPut(sqlbase.MakeNameMetadataKey(keys.RootNamespaceID, desc.Name), desc.ID, nil) } for i := range tables { @@ -1055,7 +1057,9 @@ func WriteTableDescs( // TODO(dt): Make this more configurable. tables[i].Privileges = parentDB.GetPrivileges() } - sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]) + if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]); err != nil { + return err + } b.CPut(sqlbase.MakeNameMetadataKey(tables[i].ParentID, tables[i].Name), tables[i].ID, nil) } for _, kv := range extra { @@ -1653,7 +1657,9 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) er b := txn.NewBatch() for _, tableDesc := range details.TableDescs { tableDesc.State = sqlbase.TableDescriptor_DROP - sql.WriteNewDescToBatch(ctx, false /* kvTrace */, r.settings, b, tableDesc.ID, tableDesc) + if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, r.settings, b, tableDesc.ID, tableDesc); err != nil { + return err + } } return txn.Run(ctx, b) } diff --git a/pkg/sql/descriptor.go b/pkg/sql/descriptor.go index 2491adc73fa5..d7a2438f588c 100644 --- a/pkg/sql/descriptor.go +++ b/pkg/sql/descriptor.go @@ -114,9 +114,9 @@ func (p *planner) createDescriptorWithID( log.VEventf(ctx, 2, "CPut %s -> %d", idKey, descID) } b.CPut(idKey, descID, nil) - WriteNewDescToBatch( - ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), - st, b, descID, descriptor) + if err := WriteNewDescToBatch(ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), st, b, descID, descriptor); err != nil { + return err + } mutDesc, isTable := descriptor.(*sqlbase.MutableTableDescriptor) if isTable { @@ -227,9 +227,6 @@ func GetAllDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript // writeDescToBatch adds a Put command writing a descriptor proto to the // descriptors table. It writes the descriptor desc at the id descID. If kvTrace // is enabled, it will log an event explaining the put that was performed. -// TODO(jordan): the unused cluster.Settings will be used for version gating -// this function, which must downgrade the descriptors in the case of a -// mixed-version 19.1-19.2 cluster. func writeDescToBatch( ctx context.Context, kvTrace bool, @@ -237,22 +234,39 @@ func writeDescToBatch( b *client.Batch, descID sqlbase.ID, desc sqlbase.DescriptorProto, -) { +) (err error) { + var tableToDowngrade *TableDescriptor + switch d := desc.(type) { + case *TableDescriptor: + tableToDowngrade = d + case *MutableTableDescriptor: + tableToDowngrade = d.TableDesc() + case *DatabaseDescriptor: + default: + return errors.AssertionFailedf("unexpected proto type %T", desc) + } + if tableToDowngrade != nil { + didDowngrade, downgraded, err := tableToDowngrade.MaybeDowngradeForeignKeyRepresentation(ctx, s) + if err != nil { + return err + } + if didDowngrade { + desc = downgraded + } + } descKey := sqlbase.MakeDescMetadataKey(descID) descDesc := sqlbase.WrapDescriptor(desc) if kvTrace { log.VEventf(ctx, 2, "Put %s -> %s", descKey, descDesc) } - b.Put(sqlbase.MakeDescMetadataKey(descID), sqlbase.WrapDescriptor(desc)) + b.Put(descKey, descDesc) + return nil } // WriteNewDescToBatch adds a CPut command writing a descriptor proto to the // descriptors table. It writes the descriptor desc at the id descID, asserting // that there was no previous descriptor at that id present already. If kvTrace // is enabled, it will log an event explaining the CPut that was performed. -// TODO(jordan): the unused cluster.Settings will be used for version gating -// this function, which must downgrade the descriptors in the case of a -// mixed-version 19.1-19.2 cluster. func WriteNewDescToBatch( ctx context.Context, kvTrace bool, @@ -260,11 +274,31 @@ func WriteNewDescToBatch( b *client.Batch, tableID sqlbase.ID, desc sqlbase.DescriptorProto, -) { +) (err error) { + var tableToDowngrade *TableDescriptor + switch d := desc.(type) { + case *TableDescriptor: + tableToDowngrade = d + case *MutableTableDescriptor: + tableToDowngrade = d.TableDesc() + case *DatabaseDescriptor: + default: + return errors.AssertionFailedf("unexpected proto type %T", desc) + } + if tableToDowngrade != nil { + didDowngrade, downgraded, err := tableToDowngrade.MaybeDowngradeForeignKeyRepresentation(ctx, s) + if err != nil { + return err + } + if didDowngrade { + desc = downgraded + } + } descKey := sqlbase.MakeDescMetadataKey(tableID) descDesc := sqlbase.WrapDescriptor(desc) if kvTrace { log.VEventf(ctx, 2, "CPut %s -> %s", descKey, descDesc) } b.CPut(descKey, descDesc, nil) + return nil } diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 1ca4fde2cf6e..778fde0d88fa 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -115,8 +115,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error { if err := d.Validate(); err != nil { return err } - writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, - b, descriptor.GetID(), descriptor) + if err := writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, b, descriptor.GetID(), descriptor); err != nil { + return err + } case *sqlbase.MutableTableDescriptor: if !d.Dropped() { diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index a5655bc61fce..e9eea095c063 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -418,7 +418,9 @@ func (s LeaseStore) PublishMultiple( } b := txn.NewBatch() for tableID, tableDesc := range tableDescs { - writeDescToBatch(ctx, false /* kvTrace */, s.settings, b, tableID, tableDesc.TableDesc()) + if err := writeDescToBatch(ctx, false /* kvTrace */, s.settings, b, tableID, tableDesc.TableDesc()); err != nil { + return err + } } if logEvent != nil { // If an event log is required for this update, ensure that the diff --git a/pkg/sql/rename_table.go b/pkg/sql/rename_table.go index 56d3103bf915..75cd7518e5a8 100644 --- a/pkg/sql/rename_table.go +++ b/pkg/sql/rename_table.go @@ -129,8 +129,9 @@ func (n *renameTableNode) startExec(params runParams) error { if p.extendedEvalCtx.Tracing.KVTracingEnabled() { log.VEventf(ctx, 2, "CPut %s -> %d", newTbKey, descID) } - writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.EvalContext().Settings, - b, descID, tableDesc) + if err := writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.EvalContext().Settings, b, descID, tableDesc.TableDesc()); err != nil { + return err + } b.CPut(newTbKey, descID, nil) if err := p.txn.Run(ctx, b); err != nil { diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index fa92e7bfc26e..0221f04b295d 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -296,7 +296,9 @@ func (sc *SchemaChanger) AcquireLease( lease = sc.createSchemaChangeLease() tableDesc.Lease = &lease b := txn.NewBatch() - writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc) + if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil { + return err + } return txn.Run(ctx, b) }) return lease, err @@ -334,7 +336,9 @@ func (sc *SchemaChanger) ReleaseLease( return err } b := txn.NewBatch() - writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc) + if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil { + return err + } return txn.Run(ctx, b) }) } @@ -366,7 +370,9 @@ func (sc *SchemaChanger) ExtendLease( return err } b := txn.NewBatch() - writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc) + if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil { + return err + } return txn.Run(ctx, b) }); err != nil { return err @@ -1658,7 +1664,9 @@ func (sc *SchemaChanger) createRollbackJob( // write descriptor, the version has already been incremented. b := txn.NewBatch() - writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc) + if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil { + return nil, err + } if err := txn.Run(ctx, b); err != nil { return nil, err } diff --git a/pkg/sql/sqlbase/structured.go b/pkg/sql/sqlbase/structured.go index b1923bd890e8..f24f18276e00 100644 --- a/pkg/sql/sqlbase/structured.go +++ b/pkg/sql/sqlbase/structured.go @@ -889,14 +889,14 @@ func maybeUpgradeForeignKeyRepOnIndex( return changed, nil } -// maybeDowngradeForeignKeyRepresentation non-destructively downgrades the +// MaybeDowngradeForeignKeyRepresentation non-destructively downgrades the // receiver into the old foreign key representation (the ForeignKey // and ReferencedBy fields on IndexDescriptor if and only if the cluster version // has not yet been upgraded to VersionTopLevelForeignKeys. It returns true in // the first position if the downgrade occurred, along with a new // TableDescriptor object that is the downgraded descriptor. The receiver is not // modified in either case. -func (desc *TableDescriptor) maybeDowngradeForeignKeyRepresentation( +func (desc *TableDescriptor) MaybeDowngradeForeignKeyRepresentation( ctx context.Context, clusterSettings *cluster.Settings, ) (bool, *TableDescriptor, error) { downgradeUnnecessary := clusterSettings.Version.IsActive(cluster.VersionTopLevelForeignKeys) diff --git a/pkg/sql/sqlbase/structured_test.go b/pkg/sql/sqlbase/structured_test.go index 17bc85087d4d..5e4c536d660f 100644 --- a/pkg/sql/sqlbase/structured_test.go +++ b/pkg/sql/sqlbase/structured_test.go @@ -1926,7 +1926,7 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { // table descriptor, since we'll never be in a situation where we're // trying to downgrade something that we read from disk that was already // upgraded. - wasDowngraded, downgraded, err := upgraded.maybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings) + wasDowngraded, downgraded, err := upgraded.MaybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings) if err != nil { t.Fatal(err) } @@ -1938,9 +1938,24 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) { t.Fatalf("downgrade didn't match original %s %s", proto.MarshalTextString(downgraded), proto.MarshalTextString(&pair.oldFormat)) } + + // Check that the downgrade is idempotent as well. Downgrading the table + // again shouldn't change it. + wasDowngradedAgain, downgradedAgain, err := downgraded.MaybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings) + if err != nil { + t.Fatal(err) + } + if wasDowngradedAgain { + t.Fatalf("expected proto to not be downgraded a second time") + } + + if !reflect.DeepEqual(downgradedAgain, downgraded) { + t.Fatalf("downgrade wasn't idempotent %s %s", proto.MarshalTextString(downgradedAgain), + proto.MarshalTextString(downgraded)) + } } - wasDowngraded, _, err := upgraded.maybeDowngradeForeignKeyRepresentation(ctx, newVersionSettings) + wasDowngraded, _, err := upgraded.MaybeDowngradeForeignKeyRepresentation(ctx, newVersionSettings) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 29498d344cda..10e6203fd8c9 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -849,7 +849,5 @@ func (p *planner) writeTableDescToBatch( return err } - writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, - b, tableDesc.GetID(), tableDesc) - return nil + return writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, b, tableDesc.GetID(), tableDesc.TableDesc()) }