Skip to content

Commit

Permalink
schemachange: refactor GC job code
Browse files Browse the repository at this point in the history
To implement #48775 I will be modifying the schema GC code to
add support for destroying tenants data. I notice3d that the
current code has redundant repetitions which makes it hard to
understand and modify.

The logic for computing new deadline and refreshing tables is
being repeated in 3 code blocks - I have merged them together.

Release note: none.
  • Loading branch information
Spas Bojanov committed Oct 20, 2020
1 parent b1abf9c commit d85a63d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 83 deletions.
97 changes: 33 additions & 64 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,22 @@ func performGC(
execCfg *sql.ExecutorConfig,
details *jobspb.SchemaChangeGCDetails,
progress *jobspb.SchemaChangeGCProgress,
) (bool, error) {
didGC := false
) error {
if details.Indexes != nil {
if didGCIndex, err := gcIndexes(ctx, execCfg, details.ParentID, progress); err != nil {
return false, errors.Wrap(err, "attempting to GC indexes")
} else if didGCIndex {
didGC = true
}
return errors.Wrap(gcIndexes(ctx, execCfg, details.ParentID, progress), "attempting to GC indexes")
} else if details.Tables != nil {
if didGCTable, err := gcTables(ctx, execCfg, progress); err != nil {
return false, errors.Wrap(err, "attempting to GC tables")
} else if didGCTable {
didGC = true
if err := gcTables(ctx, execCfg, progress); err != nil {
return errors.Wrap(err, "attempting to GC tables")
}

// Drop database zone config when all the tables have been GCed.
if details.ParentID != descpb.InvalidID && isDoneGC(progress) {
if err := deleteDatabaseZoneConfig(ctx, execCfg.DB, execCfg.Codec, details.ParentID); err != nil {
return false, errors.Wrap(err, "deleting database zone config")
return errors.Wrap(err, "deleting database zone config")
}
}
}
return didGC, nil
return nil
}

// Resume is part of the jobs.Resumer interface.
Expand Down Expand Up @@ -115,78 +108,54 @@ func (r schemaChangeGCResumer) Resume(
}
}

gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx)
defer cleanup()
tableDropTimes, indexDropTimes := getDropTimes(details)

allTables := getAllTablesWaitingForGC(details, progress)
if len(allTables) == 0 {
return nil
}
expired, earliestDeadline := refreshTables(ctx, execCfg, allTables, tableDropTimes, indexDropTimes, r.jobID, progress)
timerDuration := timeutil.Until(earliestDeadline)
if expired {
timerDuration = 0
} else if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(timerDuration)

timer.Reset(0)
gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx)
defer cleanup()
for {
select {
case <-gossipUpdateC:
// Upon notification of a gossip update, update the status of the relevant schema elements.
if log.V(2) {
log.Info(ctx, "received a new system config")
}
remainingTables := getAllTablesWaitingForGC(details, progress)
if len(remainingTables) == 0 {
return nil
}
expired, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)

if isDoneGC(progress) {
return nil
}

timerDuration := time.Until(earliestDeadline)
if expired {
timerDuration = 0
} else if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}

timer.Reset(timerDuration)
case <-timer.C:
timer.Read = true
if log.V(2) {
log.Info(ctx, "SchemaChangeGC timer triggered")
}
// Refresh the status of all tables in case any GC TTLs have changed.
remainingTables := getAllTablesWaitingForGC(details, progress)
_, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)
case <-ctx.Done():
return ctx.Err()
}

// Refresh the status of all tables in case any GC TTLs have changed.
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline := refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)
timerDuration := time.Until(earliestDeadline)

if didWork, err := performGC(ctx, execCfg, details, progress); err != nil {
if expired {
// Some elements have been marked as DELETING so save the progress.
persistProgress(ctx, execCfg, r.jobID, progress)
if err := performGC(ctx, execCfg, details, progress); err != nil {
return err
} else if didWork {
persistProgress(ctx, execCfg, r.jobID, progress)
}
persistProgress(ctx, execCfg, r.jobID, progress)

if isDoneGC(progress) {
return nil
}
// Trigger immediate re-run in case of more expired elements.
timerDuration = 0
}

// Schedule the next check for GC.
timerDuration := time.Until(earliestDeadline)
if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer.Reset(timerDuration)
case <-ctx.Done():
return ctx.Err()
if isDoneGC(progress) {
return nil
}

// Schedule the next check for GC.
if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer.Reset(timerDuration)
}
}

Expand Down
18 changes: 7 additions & 11 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func gcIndexes(
execCfg *sql.ExecutorConfig,
parentID descpb.ID,
progress *jobspb.SchemaChangeGCProgress,
) (bool, error) {
didGC := false
) error {
droppedIndexes := progress.Indexes
if log.V(2) {
log.Infof(ctx, "GC is being considered on table %d for indexes indexes: %+v", parentID, droppedIndexes)
Expand All @@ -43,15 +42,15 @@ func gcIndexes(
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID); err != nil {
return false, err
return err
}

var parentTable *tabledesc.Immutable
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
parentTable, err = catalogkv.MustGetTableDescByID(ctx, txn, execCfg.Codec, parentID)
return err
}); err != nil {
return false, errors.Wrapf(err, "fetching parent table %d", parentID)
return errors.Wrapf(err, "fetching parent table %d", parentID)
}

for _, index := range droppedIndexes {
Expand All @@ -61,25 +60,22 @@ func gcIndexes(

indexDesc := descpb.IndexDescriptor{ID: index.IndexID}
if err := clearIndex(ctx, execCfg, parentTable, indexDesc); err != nil {
return false, errors.Wrapf(err, "clearing index %d", indexDesc.ID)
return errors.Wrapf(err, "clearing index %d", indexDesc.ID)
}

// All the data chunks have been removed. Now also removed the
// zone configs for the dropped indexes, if any.
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return sql.RemoveIndexZoneConfigs(ctx, txn, execCfg, parentTable.GetID(), []descpb.IndexDescriptor{indexDesc})
}); err != nil {
return false, errors.Wrapf(err, "removing index %d zone configs", indexDesc.ID)
return errors.Wrapf(err, "removing index %d zone configs", indexDesc.ID)
}

if err := completeDroppedIndex(ctx, execCfg, parentTable, index.IndexID, progress); err != nil {
return false, err
return err
}

didGC = true
}

return didGC, nil
return nil
}

// clearIndexes issues Clear Range requests over all specified indexes.
Expand Down
13 changes: 5 additions & 8 deletions pkg/sql/gcjob/table_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (
// The job progress is updated in place, but needs to be persisted to the job.
func gcTables(
ctx context.Context, execCfg *sql.ExecutorConfig, progress *jobspb.SchemaChangeGCProgress,
) (bool, error) {
didGC := false
) error {
if log.V(2) {
log.Infof(ctx, "GC is being considered for tables: %+v", progress.Tables)
}
Expand All @@ -56,10 +55,9 @@ func gcTables(
log.Warningf(ctx, "table descriptor %d not found while attempting to GC, skipping", droppedTable.ID)
// Update the details payload to indicate that the table was dropped.
markTableGCed(ctx, droppedTable.ID, progress)
didGC = true
continue
}
return false, errors.Wrapf(err, "fetching table %d", droppedTable.ID)
return errors.Wrapf(err, "fetching table %d", droppedTable.ID)
}

if !table.Dropped() {
Expand All @@ -69,19 +67,18 @@ func gcTables(

// First, delete all the table data.
if err := ClearTableData(ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, table); err != nil {
return false, errors.Wrapf(err, "clearing data for table %d", table.ID)
return errors.Wrapf(err, "clearing data for table %d", table.ID)
}

// Finished deleting all the table data, now delete the table meta data.
if err := dropTableDesc(ctx, execCfg.DB, execCfg.Codec, table); err != nil {
return false, errors.Wrapf(err, "dropping table descriptor for table %d", table.ID)
return errors.Wrapf(err, "dropping table descriptor for table %d", table.ID)
}

// Update the details payload to indicate that the table was dropped.
markTableGCed(ctx, table.ID, progress)
didGC = true
}
return didGC, nil
return nil
}

// ClearTableData deletes all of the data in the specified table.
Expand Down

0 comments on commit d85a63d

Please sign in to comment.