Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add fk downgrade path for mixed version clusters #39448

Merged
merged 1 commit into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,9 @@ func WriteTableDescs(
// TODO(dt): support restoring privs.
desc.Privileges = sqlbase.NewDefaultPrivilegeDescriptor()
wroteDBs[desc.ID] = desc
sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc)
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc); err != nil {
return err
}
b.CPut(sqlbase.MakeNameMetadataKey(keys.RootNamespaceID, desc.Name), desc.ID, nil)
}
for i := range tables {
Expand All @@ -1055,7 +1057,9 @@ func WriteTableDescs(
// TODO(dt): Make this more configurable.
tables[i].Privileges = parentDB.GetPrivileges()
}
sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i])
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]); err != nil {
return err
}
b.CPut(sqlbase.MakeNameMetadataKey(tables[i].ParentID, tables[i].Name), tables[i].ID, nil)
}
for _, kv := range extra {
Expand Down Expand Up @@ -1653,7 +1657,9 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) er
b := txn.NewBatch()
for _, tableDesc := range details.TableDescs {
tableDesc.State = sqlbase.TableDescriptor_DROP
sql.WriteNewDescToBatch(ctx, false /* kvTrace */, r.settings, b, tableDesc.ID, tableDesc)
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, r.settings, b, tableDesc.ID, tableDesc); err != nil {
return err
}
}
return txn.Run(ctx, b)
}
Expand Down
58 changes: 46 additions & 12 deletions pkg/sql/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (p *planner) createDescriptorWithID(
log.VEventf(ctx, 2, "CPut %s -> %d", idKey, descID)
}
b.CPut(idKey, descID, nil)
WriteNewDescToBatch(
ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(),
st, b, descID, descriptor)
if err := WriteNewDescToBatch(ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), st, b, descID, descriptor); err != nil {
return err
}

mutDesc, isTable := descriptor.(*sqlbase.MutableTableDescriptor)
if isTable {
Expand Down Expand Up @@ -227,44 +227,78 @@ func GetAllDescriptors(ctx context.Context, txn *client.Txn) ([]sqlbase.Descript
// writeDescToBatch adds a Put command writing a descriptor proto to the
// descriptors table. It writes the descriptor desc at the id descID. If kvTrace
// is enabled, it will log an event explaining the put that was performed.
// TODO(jordan): the unused cluster.Settings will be used for version gating
// this function, which must downgrade the descriptors in the case of a
// mixed-version 19.1-19.2 cluster.
func writeDescToBatch(
ctx context.Context,
kvTrace bool,
s *cluster.Settings,
b *client.Batch,
descID sqlbase.ID,
desc sqlbase.DescriptorProto,
) {
) (err error) {
var tableToDowngrade *TableDescriptor
switch d := desc.(type) {
case *TableDescriptor:
tableToDowngrade = d
case *MutableTableDescriptor:
tableToDowngrade = d.TableDesc()
case *DatabaseDescriptor:
default:
return errors.AssertionFailedf("unexpected proto type %T", desc)
}
if tableToDowngrade != nil {
didDowngrade, downgraded, err := tableToDowngrade.MaybeDowngradeForeignKeyRepresentation(ctx, s)
if err != nil {
return err
}
if didDowngrade {
desc = downgraded
}
}
descKey := sqlbase.MakeDescMetadataKey(descID)
descDesc := sqlbase.WrapDescriptor(desc)
if kvTrace {
log.VEventf(ctx, 2, "Put %s -> %s", descKey, descDesc)
}
b.Put(sqlbase.MakeDescMetadataKey(descID), sqlbase.WrapDescriptor(desc))
b.Put(descKey, descDesc)
return nil
}

// WriteNewDescToBatch adds a CPut command writing a descriptor proto to the
// descriptors table. It writes the descriptor desc at the id descID, asserting
// that there was no previous descriptor at that id present already. If kvTrace
// is enabled, it will log an event explaining the CPut that was performed.
// TODO(jordan): the unused cluster.Settings will be used for version gating
// this function, which must downgrade the descriptors in the case of a
// mixed-version 19.1-19.2 cluster.
func WriteNewDescToBatch(
ctx context.Context,
kvTrace bool,
s *cluster.Settings,
b *client.Batch,
tableID sqlbase.ID,
desc sqlbase.DescriptorProto,
) {
) (err error) {
var tableToDowngrade *TableDescriptor
switch d := desc.(type) {
case *TableDescriptor:
tableToDowngrade = d
case *MutableTableDescriptor:
tableToDowngrade = d.TableDesc()
case *DatabaseDescriptor:
default:
return errors.AssertionFailedf("unexpected proto type %T", desc)
}
if tableToDowngrade != nil {
didDowngrade, downgraded, err := tableToDowngrade.MaybeDowngradeForeignKeyRepresentation(ctx, s)
if err != nil {
return err
}
if didDowngrade {
desc = downgraded
}
}
descKey := sqlbase.MakeDescMetadataKey(tableID)
descDesc := sqlbase.WrapDescriptor(desc)
if kvTrace {
log.VEventf(ctx, 2, "CPut %s -> %s", descKey, descDesc)
}
b.CPut(descKey, descDesc, nil)
return nil
}
5 changes: 3 additions & 2 deletions pkg/sql/grant_revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func (n *changePrivilegesNode) startExec(params runParams) error {
if err := d.Validate(); err != nil {
return err
}
writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings,
b, descriptor.GetID(), descriptor)
if err := writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, b, descriptor.GetID(), descriptor); err != nil {
return err
}

case *sqlbase.MutableTableDescriptor:
if !d.Dropped() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ func (s LeaseStore) PublishMultiple(
}
b := txn.NewBatch()
for tableID, tableDesc := range tableDescs {
writeDescToBatch(ctx, false /* kvTrace */, s.settings, b, tableID, tableDesc.TableDesc())
if err := writeDescToBatch(ctx, false /* kvTrace */, s.settings, b, tableID, tableDesc.TableDesc()); err != nil {
return err
}
}
if logEvent != nil {
// If an event log is required for this update, ensure that the
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/rename_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ func (n *renameTableNode) startExec(params runParams) error {
if p.extendedEvalCtx.Tracing.KVTracingEnabled() {
log.VEventf(ctx, 2, "CPut %s -> %d", newTbKey, descID)
}
writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.EvalContext().Settings,
b, descID, tableDesc)
if err := writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.EvalContext().Settings, b, descID, tableDesc.TableDesc()); err != nil {
return err
}
b.CPut(newTbKey, descID, nil)

if err := p.txn.Run(ctx, b); err != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ func (sc *SchemaChanger) AcquireLease(
lease = sc.createSchemaChangeLease()
tableDesc.Lease = &lease
b := txn.NewBatch()
writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc)
if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil {
return err
}
return txn.Run(ctx, b)
})
return lease, err
Expand Down Expand Up @@ -334,7 +336,9 @@ func (sc *SchemaChanger) ReleaseLease(
return err
}
b := txn.NewBatch()
writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc)
if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil {
return err
}
return txn.Run(ctx, b)
})
}
Expand Down Expand Up @@ -366,7 +370,9 @@ func (sc *SchemaChanger) ExtendLease(
return err
}
b := txn.NewBatch()
writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc)
if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil {
return err
}
return txn.Run(ctx, b)
}); err != nil {
return err
Expand Down Expand Up @@ -1658,7 +1664,9 @@ func (sc *SchemaChanger) createRollbackJob(

// write descriptor, the version has already been incremented.
b := txn.NewBatch()
writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc)
if err := writeDescToBatch(ctx, false /* kvTrace */, sc.settings, b, tableDesc.GetID(), tableDesc); err != nil {
return nil, err
}
if err := txn.Run(ctx, b); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,14 +889,14 @@ func maybeUpgradeForeignKeyRepOnIndex(
return changed, nil
}

// maybeDowngradeForeignKeyRepresentation non-destructively downgrades the
// MaybeDowngradeForeignKeyRepresentation non-destructively downgrades the
// receiver into the old foreign key representation (the ForeignKey
// and ReferencedBy fields on IndexDescriptor if and only if the cluster version
// has not yet been upgraded to VersionTopLevelForeignKeys. It returns true in
// the first position if the downgrade occurred, along with a new
// TableDescriptor object that is the downgraded descriptor. The receiver is not
// modified in either case.
func (desc *TableDescriptor) maybeDowngradeForeignKeyRepresentation(
func (desc *TableDescriptor) MaybeDowngradeForeignKeyRepresentation(
ctx context.Context, clusterSettings *cluster.Settings,
) (bool, *TableDescriptor, error) {
downgradeUnnecessary := clusterSettings.Version.IsActive(cluster.VersionTopLevelForeignKeys)
Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/sqlbase/structured_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) {
// table descriptor, since we'll never be in a situation where we're
// trying to downgrade something that we read from disk that was already
// upgraded.
wasDowngraded, downgraded, err := upgraded.maybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings)
wasDowngraded, downgraded, err := upgraded.MaybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1938,9 +1938,24 @@ func TestUpgradeDowngradeFKRepr(t *testing.T) {
t.Fatalf("downgrade didn't match original %s %s", proto.MarshalTextString(downgraded),
proto.MarshalTextString(&pair.oldFormat))
}

// Check that the downgrade is idempotent as well. Downgrading the table
// again shouldn't change it.
wasDowngradedAgain, downgradedAgain, err := downgraded.MaybeDowngradeForeignKeyRepresentation(ctx, mixedVersionSettings)
if err != nil {
t.Fatal(err)
}
if wasDowngradedAgain {
t.Fatalf("expected proto to not be downgraded a second time")
}

if !reflect.DeepEqual(downgradedAgain, downgraded) {
t.Fatalf("downgrade wasn't idempotent %s %s", proto.MarshalTextString(downgradedAgain),
proto.MarshalTextString(downgraded))
}
}

wasDowngraded, _, err := upgraded.maybeDowngradeForeignKeyRepresentation(ctx, newVersionSettings)
wasDowngraded, _, err := upgraded.MaybeDowngradeForeignKeyRepresentation(ctx, newVersionSettings)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,5 @@ func (p *planner) writeTableDescToBatch(
return err
}

writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings,
b, tableDesc.GetID(), tableDesc)
return nil
return writeDescToBatch(ctx, p.extendedEvalCtx.Tracing.KVTracingEnabled(), p.execCfg.Settings, b, tableDesc.GetID(), tableDesc.TableDesc())
}