Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ajwerner/wrapped descriptors #5

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8192df7
cloud: Fixed google storage reader test which exercises download inte…
adityamaru Jun 3, 2020
b7d2886
cloud: Corrected the number of GCS/HTTP Read retries
adityamaru Jun 3, 2020
028ca9e
kvserver: use engine's filesystem
jbowens May 28, 2020
bd6b65f
sql: fix panic with index drop on interleaved table with enum
rohany Jun 7, 2020
828db96
sql: add tests for primary key changes with enum columns
rohany Jun 7, 2020
a89d17d
Merge #49943
Jun 7, 2020
b86ced2
Merge #49717
Jun 8, 2020
5689836
Merge #49840
Jun 8, 2020
449dc00
sqlbase: lift shared fields required for leasing to Descriptor
ajwerner May 12, 2020
4066dda
sqlbase,catalog: Move the catalog.Descriptor interface to sqlbase
ajwerner May 29, 2020
d9c7b82
sql,sqlbase: remove SetID() from the DescriptorProto interface
ajwerner Jun 1, 2020
fb98816
sqlbase,*: replace DescriptorProto with DescriptorInterface
ajwerner Jun 2, 2020
27721e2
sqlbase: introduce DatabaseDescriptorInterface and ImmutableDatabaseD…
ajwerner Jun 3, 2020
b308174
sqlbase,*: adopt wrapper structs around DatabaseDescriptor
ajwerner Jun 3, 2020
16e1792
sqlbase,catalogkv,*: have GetDescriptorByID return an interface
ajwerner Jun 4, 2020
bd59ae2
sqlbase: add sanity to the various TypeDescriptor types
ajwerner Jun 4, 2020
dbf393e
sqlbase: lift methods from TypeDescriptor to ImmutableTypeDescriptor
ajwerner Jun 4, 2020
2f7ae46
sqlbase: stop referring to Name and ID on TypeDescriptor directly
ajwerner Jun 4, 2020
7489d85
sqlbase,*: access ID and Name on DatabaseDescriptor through methods
ajwerner Jun 4, 2020
d8fe5e2
sqlbase: add wrapper types for SchemaDescriptor
ajwerner Jun 8, 2020
46fa9fb
sqlbase: lift interface methods from SchemaDescriptor
ajwerner Jun 8, 2020
d91204f
catalogkv,*: "unwrap" descriptor before returning them
ajwerner Jun 8, 2020
841534e
sqlbase: generalize setting of ModificationTime
ajwerner Jun 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,20 @@ type tableAndIndex struct {
// spansForAllTableIndexes returns non-overlapping spans for every index and
// table passed in. They would normally overlap if any of them are interleaved.
func spansForAllTableIndexes(
codec keys.SQLCodec, tables []*sqlbase.TableDescriptor, revs []BackupManifest_DescriptorRevision,
codec keys.SQLCodec,
tables []sqlbase.TableDescriptorInterface,
revs []BackupManifest_DescriptorRevision,
) []roachpb.Span {

added := make(map[tableAndIndex]bool, len(tables))
sstIntervalTree := interval.NewTree(interval.ExclusiveOverlapper)
for _, table := range tables {
for _, index := range table.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.ID)), false); err != nil {
tableDesc := table.TableDesc()
for _, index := range tableDesc.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(tableDesc.IndexSpan(codec, index.ID)), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[tableAndIndex{tableID: table.ID, indexID: index.ID}] = true
added[tableAndIndex{tableID: table.GetID(), indexID: index.ID}] = true
}
}
// If there are desc revisions, ensure that we also add any index spans
Expand Down Expand Up @@ -350,18 +353,22 @@ func backupPlanHook(

statsCache := p.ExecCfg().TableStatsCache
tableStatistics := make([]*stats.TableStatisticProto, 0)
var tables []*sqlbase.TableDescriptor
var tables []sqlbase.TableDescriptorInterface
for _, desc := range targetDescs {
if dbDesc := desc.GetDatabase(); dbDesc != nil {
if err := p.CheckPrivilege(ctx, dbDesc, privilege.SELECT); err != nil {
db := sqlbase.NewImmutableDatabaseDescriptor(*dbDesc)
if err := p.CheckPrivilege(ctx, db, privilege.SELECT); err != nil {
return err
}
}
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
if err := p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil {
// TODO(ajwerner): This construction of a wrapper is unfortunate and should
// go away in this PR.
table := sqlbase.NewImmutableTableDescriptor(*tableDesc)
if err := p.CheckPrivilege(ctx, table, privilege.SELECT); err != nil {
return err
}
tables = append(tables, tableDesc)
tables = append(tables, table)

// If the table has any user defined types, error out.
for _, col := range tableDesc.Columns {
Expand Down
118 changes: 61 additions & 57 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ func splitAndScatter(
func WriteTableDescs(
ctx context.Context,
txn *kv.Txn,
databases []*sqlbase.DatabaseDescriptor,
tables []*sqlbase.TableDescriptor,
databases []*sqlbase.ImmutableDatabaseDescriptor,
tables []sqlbase.TableDescriptorInterface,
descCoverage tree.DescriptorCoverage,
settings *cluster.Settings,
extra []roachpb.KeyValue,
Expand All @@ -440,55 +440,56 @@ func WriteTableDescs(
defer tracing.FinishSpan(span)
err := func() error {
b := txn.NewBatch()
wroteDBs := make(map[sqlbase.ID]*sqlbase.DatabaseDescriptor)
wroteDBs := make(map[sqlbase.ID]*sqlbase.ImmutableDatabaseDescriptor)
for _, desc := range databases {
// If the restore is not a full cluster restore we cannot know that
// the users on the restoring cluster match the ones that were on the
// cluster that was backed up. So we wipe the priviledges on the database.
if descCoverage != tree.AllDescriptors {
desc.Privileges = sqlbase.NewDefaultPrivilegeDescriptor()
}
wroteDBs[desc.ID] = desc
if err := catalogkv.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, desc.ID, desc); err != nil {
wroteDBs[desc.GetID()] = desc
if err := catalogkv.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, desc.GetID(), desc); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
dKey := sqlbase.MakeDatabaseNameKey(ctx, settings, desc.Name)
b.CPut(dKey.Key(keys.SystemSQLCodec), desc.ID, nil)
dKey := sqlbase.MakeDatabaseNameKey(ctx, settings, desc.GetName())
b.CPut(dKey.Key(keys.SystemSQLCodec), desc.GetID(), nil)
}
for i := range tables {
table := tables[i].TableDesc()
// For full cluster restore, keep privileges as they were.
if wrote, ok := wroteDBs[tables[i].ParentID]; ok {
if wrote, ok := wroteDBs[table.ParentID]; ok {
// Leave the privileges of the temp system tables as
// the default.
if descCoverage != tree.AllDescriptors || wrote.Name == restoreTempSystemDB {
tables[i].Privileges = wrote.GetPrivileges()
if descCoverage != tree.AllDescriptors || wrote.GetName() == restoreTempSystemDB {
table.Privileges = wrote.GetPrivileges()
}
} else {
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, keys.SystemSQLCodec, tables[i].ParentID)
parentDB, err := sqlbase.GetDatabaseDescFromID(ctx, txn, keys.SystemSQLCodec, table.ParentID)
if err != nil {
return errors.Wrapf(err,
"failed to lookup parent DB %d", errors.Safe(tables[i].ParentID))
"failed to lookup parent DB %d", errors.Safe(table.ParentID))
}
// We don't check priv's here since we checked them during job planning.

// On full cluster restore, keep the privs as they are in the backup.
if descCoverage != tree.AllDescriptors {
// Default is to copy privs from restoring parent db, like CREATE TABLE.
// TODO(dt): Make this more configurable.
tables[i].Privileges = parentDB.GetPrivileges()
table.Privileges = parentDB.GetPrivileges()
}
}
if err := catalogkv.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, tables[i].ID, tables[i]); err != nil {
if err := catalogkv.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, keys.SystemSQLCodec, table.ID, tables[i]); err != nil {
return err
}
// Depending on which cluster version we are restoring to, we decide which
// namespace table to write the descriptor into. This may cause wrong
// behavior if the cluster version is bumped DURING a restore.
tkey := sqlbase.MakePublicTableNameKey(ctx, settings, tables[i].ParentID, tables[i].Name)
b.CPut(tkey.Key(keys.SystemSQLCodec), tables[i].ID, nil)
tkey := sqlbase.MakePublicTableNameKey(ctx, settings, table.ParentID, table.Name)
b.CPut(tkey.Key(keys.SystemSQLCodec), table.ID, nil)
}
for _, kv := range extra {
b.InitPut(kv.Key, &kv.Value, false)
Expand All @@ -501,9 +502,9 @@ func WriteTableDescs(
}

for _, table := range tables {
if err := table.Validate(ctx, txn, keys.SystemSQLCodec); err != nil {
if err := table.TableDesc().Validate(ctx, txn, keys.SystemSQLCodec); err != nil {
return errors.Wrapf(err,
"validate table %d", errors.Safe(table.ID))
"validate table %d", errors.Safe(table.GetID()))
}
}
return nil
Expand Down Expand Up @@ -564,7 +565,7 @@ func restore(
backupManifests []BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
endTime hlc.Timestamp,
tables []*sqlbase.TableDescriptor,
tables []sqlbase.TableDescriptorInterface,
oldTableIDs []sqlbase.ID,
spans []roachpb.Span,
job *jobs.Job,
Expand All @@ -587,7 +588,7 @@ func restore(
var rekeys []roachpb.ImportRequest_TableRekey
for i := range tables {
tableToSerialize := tables[i]
newDescBytes, err := protoutil.Marshal(sqlbase.WrapDescriptor(tableToSerialize))
newDescBytes, err := protoutil.Marshal(tableToSerialize.DescriptorProto())
if err != nil {
return mu.res, errors.NewAssertionErrorWithWrappedErrf(err,
"marshaling descriptor")
Expand Down Expand Up @@ -631,7 +632,7 @@ func restore(

pkIDs := make(map[uint64]struct{})
for _, tbl := range tables {
pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.ID), uint64(tbl.PrimaryIndex.ID))] = struct{}{}
pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.TableDesc().PrimaryIndex.ID))] = struct{}{}
}

// We're already limiting these on the server-side, but sending all the
Expand Down Expand Up @@ -789,8 +790,8 @@ func loadBackupSQLDescs(
type restoreResumer struct {
job *jobs.Job
settings *cluster.Settings
databases []*sqlbase.DatabaseDescriptor
tables []*sqlbase.TableDescriptor
databases []*sqlbase.ImmutableDatabaseDescriptor
tables []sqlbase.TableDescriptorInterface
descriptorCoverage tree.DescriptorCoverage
latestStats []*stats.TableStatisticProto
execCfg *sql.ExecutorConfig
Expand Down Expand Up @@ -834,7 +835,7 @@ func remapRelevantStatistics(
func isDatabaseEmpty(
ctx context.Context,
db *kv.DB,
dbDesc *sql.DatabaseDescriptor,
dbDesc *sqlbase.ImmutableDatabaseDescriptor,
ignoredTables map[sqlbase.ID]struct{},
) (bool, error) {
var allDescs []sqlbase.Descriptor
Expand All @@ -853,7 +854,7 @@ func isDatabaseEmpty(
if _, ok := ignoredTables[t.GetID()]; ok {
continue
}
if t.GetParentID() == dbDesc.ID {
if t.GetParentID() == dbDesc.GetID() {
return false, nil
}
}
Expand All @@ -866,26 +867,28 @@ func isDatabaseEmpty(
func createImportingTables(
ctx context.Context, p sql.PlanHookState, sqlDescs []sqlbase.Descriptor, r *restoreResumer,
) (
[]*sqlbase.DatabaseDescriptor,
[]*sqlbase.TableDescriptor,
[]*sqlbase.ImmutableDatabaseDescriptor,
[]sqlbase.TableDescriptorInterface,
[]sqlbase.ID,
[]roachpb.Span,
error,
) {
details := r.job.Details().(jobspb.RestoreDetails)

var databases []*sqlbase.DatabaseDescriptor
var tables []*sqlbase.TableDescriptor
var databases []*sqlbase.ImmutableDatabaseDescriptor
var tables []sqlbase.TableDescriptorInterface
var oldTableIDs []sqlbase.ID
for _, desc := range sqlDescs {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
tables = append(tables, tableDesc)
table := sqlbase.NewMutableCreatedTableDescriptor(*tableDesc)
tables = append(tables, table)
oldTableIDs = append(oldTableIDs, tableDesc.ID)
}
if dbDesc := desc.GetDatabase(); dbDesc != nil {
if rewrite, ok := details.TableRewrites[dbDesc.ID]; ok {
dbDesc.ID = rewrite.TableID
databases = append(databases, dbDesc)
if rewrite, ok := details.TableRewrites[dbDesc.GetID()]; ok {
rewriteDesc := sqlbase.NewInitialDatabaseDescriptorWithPrivileges(
rewrite.TableID, dbDesc.GetName(), dbDesc.Privileges)
databases = append(databases, rewriteDesc)
}
}
}
Expand All @@ -896,11 +899,8 @@ func createImportingTables(
}
}
if details.DescriptorCoverage == tree.AllDescriptors {
databases = append(databases, &sqlbase.DatabaseDescriptor{
ID: sqlbase.ID(tempSystemDBID),
Name: restoreTempSystemDB,
Privileges: sqlbase.NewDefaultPrivilegeDescriptor(),
})
databases = append(databases, sqlbase.NewInitialDatabaseDescriptor(
sqlbase.ID(tempSystemDBID), restoreTempSystemDB))
}

// We get the spans of the restoring tables _as they appear in the backup_,
Expand All @@ -911,11 +911,15 @@ func createImportingTables(

// Assign new IDs and privileges to the tables, and update all references to
// use the new IDs.
if err := RewriteTableDescs(tables, details.TableRewrites, details.OverrideDB); err != nil {
tableDescs := make([]*sqlbase.TableDescriptor, len(tables))
for i, table := range tables {
tableDescs[i] = table.TableDesc()
}
if err := RewriteTableDescs(tableDescs, details.TableRewrites, details.OverrideDB); err != nil {
return nil, nil, nil, nil, err
}

for _, desc := range tables {
for _, desc := range tableDescs {
desc.Version++
desc.State = sqlbase.TableDescriptor_OFFLINE
desc.OfflineReason = "restoring"
Expand All @@ -929,7 +933,7 @@ func createImportingTables(
}

details.PrepareCompleted = true
details.TableDescs = tables
details.TableDescs = tableDescs

// Update the job once all descs have been prepared for ingestion.
err := r.job.WithTxn(txn).SetDetails(ctx, details)
Expand Down Expand Up @@ -1082,23 +1086,23 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
// accessed.
b := txn.NewBatch()
for _, tbl := range r.tables {
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
newTableDesc := sqlbase.NewMutableExistingTableDescriptor(*tbl.TableDesc())
newTableDesc.Version++
newTableDesc.State = sqlbase.TableDescriptor_PUBLIC
// Convert any mutations that were in progress on the table descriptor
// when the backup was taken, and convert them to schema change jobs.
newJobs, err := createSchemaChangeJobsFromMutations(ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().Username, &tableDesc)
newJobs, err := createSchemaChangeJobsFromMutations(ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().Username, newTableDesc.TableDesc())
if err != nil {
return err
}
newSchemaChangeJobs = append(newSchemaChangeJobs, newJobs...)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl)
existingDescVal, err := sqlbase.ConditionalGetTableDescFromTxn(ctx, txn, r.execCfg.Codec, tbl.TableDesc())
if err != nil {
return errors.Wrap(err, "validating table descriptor has not changed")
}
b.CPut(
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, newTableDesc.ID),
newTableDesc.DescriptorProto(),
existingDescVal,
)
}
Expand Down Expand Up @@ -1135,7 +1139,7 @@ func (r *restoreResumer) publishTables(ctx context.Context) error {
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
for i := range r.tables {
r.execCfg.StatsRefresher.NotifyMutation(r.tables[i].ID, math.MaxInt32 /* rowsAffected */)
r.execCfg.StatsRefresher.NotifyMutation(r.tables[i].GetID(), math.MaxInt32 /* rowsAffected */)
}

return nil
Expand Down Expand Up @@ -1176,9 +1180,9 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
tablesToGC := make([]sqlbase.ID, 0, len(details.TableDescs))
for _, tbl := range details.TableDescs {
tablesToGC = append(tablesToGC, tbl.ID)
tableDesc := *tbl
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_DROP
tableToDrop := sqlbase.NewMutableExistingTableDescriptor(*tbl)
tableToDrop.Version++
tableToDrop.State = sqlbase.TableDescriptor_DROP
err := sqlbase.RemovePublicTableNamespaceEntry(ctx, txn, keys.SystemSQLCodec, tbl.ParentID, tbl.Name)
if err != nil {
return errors.Wrap(err, "dropping tables caused by restore fail/cancel from public namespace")
Expand All @@ -1188,8 +1192,8 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
return errors.Wrap(err, "dropping tables caused by restore fail/cancel")
}
b.CPut(
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID),
sqlbase.WrapDescriptor(&tableDesc),
sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableToDrop.ID),
tableToDrop.DescriptorProto(),
existingDescVal,
)
}
Expand Down Expand Up @@ -1230,13 +1234,13 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
// We need to ignore details.TableDescs since we haven't committed the txn that deletes these.
isDBEmpty, err = isDatabaseEmpty(ctx, r.execCfg.DB, dbDesc, ignoredTables)
if err != nil {
return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", dbDesc.Name)
return errors.Wrapf(err, "checking if database %s is empty during restore cleanup", dbDesc.GetName())
}

if isDBEmpty {
descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, dbDesc.ID)
descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, dbDesc.GetID())
b.Del(descKey)
b.Del(sqlbase.NewDatabaseKey(dbDesc.Name).Key(keys.SystemSQLCodec))
b.Del(sqlbase.NewDatabaseKey(dbDesc.GetName()).Key(keys.SystemSQLCodec))
}
}
if err := txn.Run(ctx, b); err != nil {
Expand Down
Loading