Skip to content

Commit

Permalink
Merge #41172
Browse files Browse the repository at this point in the history
41172: jobs: clean running jobs with no mutations left r=spaskob a=spaskob

Failures can leave jobs in orphaned state where no work needs
to be done but they are left in `system.jobs` table. We detect
and delete such jobs if they are too old.

Touches #40563.

Release justification: This commit is safe for 19.2 because
this is a fairly low risk change since it only affects jobs that
have no mutations left and that are >24h old.

Release note: None.

Co-authored-by: spaskob <spas@cockroachlabs.com>
  • Loading branch information
craig[bot] and spaskob committed Oct 2, 2019
2 parents 7b782e1 + 339d243 commit 646099a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 22 deletions.
55 changes: 48 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
Expand Down Expand Up @@ -363,11 +364,36 @@ func (r *Registry) maybeCancelJobs(ctx context.Context, nl NodeLiveness) {
}
}

// isOrphaned tries to detect if there are no mutations left to be done for the
// job which will make it a candidate for garbage collection. Jobs can be left
// in such inconsistent state if they fail before being removed from the jobs table.
func (r *Registry) isOrphaned(ctx context.Context, payload *jobspb.Payload) (bool, error) {
if payload.Type() != jobspb.TypeSchemaChange {
return false, nil
}
for _, id := range payload.DescriptorIDs {
pendingMutations := false
if err := r.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
td, err := sqlbase.GetTableDescFromID(ctx, txn, id)
if err != nil {
return err
}
pendingMutations = len(td.GetMutations()) != 0
return nil
}); err != nil {
return false, err
}
if pendingMutations {
return false, nil
}
}
return true, nil
}

func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) error {
const stmt = `SELECT id, payload FROM system.jobs WHERE status IN ($1, $2, $3) AND created < $4 ORDER BY created LIMIT 1000`
rows, err := r.ex.Query(
ctx, "gc-jobs", nil /* txn */, stmt, StatusFailed, StatusSucceeded, StatusCanceled, olderThan,
)
const stmt = `SELECT id, payload, status, created FROM system.jobs WHERE created < $1
ORDER BY created LIMIT 1000`
rows, err := r.ex.Query(ctx, "gc-jobs", nil /* txn */, stmt, olderThan)
if err != nil {
return err
}
Expand All @@ -380,19 +406,34 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro
if err != nil {
return err
}
if payload.FinishedMicros < oldMicros {
remove := false
switch Status(*row[2].(*tree.DString)) {
case StatusRunning, StatusPending:
done, err := r.isOrphaned(ctx, payload)
if err != nil {
return err
}
remove = done && row[3].(*tree.DTimestamp).Time.Before(olderThan)
case StatusSucceeded, StatusCanceled, StatusFailed:
remove = payload.FinishedMicros < oldMicros
}
if remove {
toDelete.Array = append(toDelete.Array, row[0])
}
}
if len(toDelete.Array) > 0 {

log.Infof(ctx, "cleaning up %d expired job records", len(toDelete.Array))
const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)`
if _ /* cols */, err := r.ex.Exec(
var nDeleted int
if nDeleted, err = r.ex.Exec(
ctx, "gc-jobs", nil /* txn */, stmt, toDelete,
); err != nil {
return errors.Wrap(err, "deleting old jobs")
}
if nDeleted != len(toDelete.Array) {
return errors.Errorf("asked to delete %d rows but %d were actually deleted",
len(toDelete.Array), nDeleted)
}
}
return nil
}
Expand Down
66 changes: 51 additions & 15 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestRegistryGC(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

db := sqlutils.MakeSQLRunner(sqlDB)
Expand All @@ -191,18 +192,39 @@ func TestRegistryGC(t *testing.T) {
earlier := ts.Add(-1 * time.Hour)
muchEarlier := ts.Add(-2 * time.Hour)

writeJob := func(created, finished time.Time, status Status) string {
ft := timeutil.ToUnixMicros(finished)
setMutations := func(mutations []sqlbase.DescriptorMutation) sqlbase.ID {
desc := sqlbase.GetTableDescriptor(kvDB, "t", "to_be_mutated")
desc.Mutations = mutations
if err := kvDB.Put(
context.TODO(),
sqlbase.MakeDescMetadataKey(desc.GetID()),
sqlbase.WrapDescriptor(desc),
); err != nil {
t.Fatal(err)
}
return desc.GetID()
}

writeJob := func(name string, created, finished time.Time, status Status) string {
if _, err := sqlDB.Exec(`
CREATE DATABASE IF NOT EXISTS t; CREATE TABLE IF NOT EXISTS t.to_be_mutated AS SELECT 1`); err != nil {
t.Fatal(err)
}
payload, err := protoutil.Marshal(&jobspb.Payload{
Lease: &jobspb.Lease{NodeID: 1, Epoch: 1},
Details: jobspb.WrapPayloadDetails(jobspb.BackupDetails{}),
FinishedMicros: ft,
Description: name,
Lease: &jobspb.Lease{NodeID: 1, Epoch: 1},
// register a mutation on the table so that jobs that reference
// the table are not considered orphaned
DescriptorIDs: []sqlbase.ID{setMutations([]sqlbase.DescriptorMutation{{}})},
Details: jobspb.WrapPayloadDetails(jobspb.SchemaChangeDetails{}),
StartedMicros: timeutil.ToUnixMicros(created),
FinishedMicros: timeutil.ToUnixMicros(finished),
})
if err != nil {
t.Fatal(err)
}
progress, err := protoutil.Marshal(&jobspb.Progress{
Details: jobspb.WrapProgressDetails(jobspb.BackupProgress{}),
Details: jobspb.WrapProgressDetails(jobspb.SchemaChangeProgress{}),
})
if err != nil {
t.Fatal(err)
Expand All @@ -215,23 +237,37 @@ func TestRegistryGC(t *testing.T) {
return strconv.Itoa(int(id))
}

j1 := writeJob(muchEarlier, time.Time{}, StatusRunning)
j2 := writeJob(muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded)
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded)
oldSucceededJob2 := writeJob("old_succeeded2", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded)
newRunningJob := writeJob("new_running", earlier, time.Time{}, StatusRunning)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded)

j3 := writeJob(earlier, time.Time{}, StatusRunning)
j4 := writeJob(earlier, earlier.Add(time.Minute), StatusSucceeded)
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldSucceededJob2}, {newRunningJob}, {newSucceededJob}})

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{{j1}, {j2}, {j3}, {j4}})
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{{j1}, {j3}, {j4}})
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {newRunningJob}, {newSucceededJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{{j1}, {j3}, {j4}})
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {newRunningJob}, {newSucceededJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {newRunningJob}})

// force the running jobs to become orphaned
_ = setMutations(nil)
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{{j1}, {j3}})
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{})
}

0 comments on commit 646099a

Please sign in to comment.