Skip to content

Commit

Permalink
Merge #59539 #60714
Browse files Browse the repository at this point in the history
59539: ui: Sessions page reintegration r=elkmaster a=elkmaster

update cluster-ui to latest version, minor updates to sessions,
sessions details and statements details pages to provide new
props needed by cluster-ui components.

Release note (ui): none

60714: jobs,*: make job clients responsible for generating IDs r=lucy-zhang a=lucy-zhang

jobs,*: make job clients responsible for generating IDs

Job IDs used to be randomly generated in the job registry when creating
a job, which meant that they were not stable across restarts when jobs
were created in a txn closure meant to be idempotent. For
`StartableJobs`, we also created a tracing span and registered the "new"
job each time `CreateStartableJobWithTxn` was called, so these would
leak in the presence of restarts.

This commit adds a job ID argument to registry methods that create jobs,
so that callers can generate stable IDs. It also modifies the
`StartableJob` API to help ensure jobs (identified by a stable ID) are
only registered once in the presence of restarts:
`CreateStartableJobWithTxn` now takes a `*StartableJob`, and will not
create a tracing span and register the job again if the reference is
non-nil. This API is not ideal because it's probably easy to use it
incorrectly, but it at least makes the correct behavior on txn restarts
possible.

Closes #57492

Release note: None

----

jobs: remove workaround for tracing span leak in registry

This reverts the workaround added in
3ae0f06 to close the tracing span
for job resumers prematurely to avoid a leak.

Related to #60671.

Release note: None

Co-authored-by: v <carrott9@gmail.com>
Co-authored-by: Lucy Zhang <lucy@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 23, 2021
3 parents e9f5f8c + fa84941 + 91152f0 commit 1f6c36c
Show file tree
Hide file tree
Showing 37 changed files with 389 additions and 257 deletions.
19 changes: 10 additions & 9 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,41 +1224,42 @@ func backupPlanHook(
if backupStmt.Options.Detached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}

if err := doWriteBackupManifestCheckpoint(ctx, *aj.ID()); err != nil {
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
return err
}

// The protect timestamp logic for a DETACHED BACKUP can be run within the
// same txn as the BACKUP is being planned in, because we do not wait for
// the BACKUP job to complete.
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spans,
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, jobID, spans,
startTime, endTime, backupDetails)
if err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
collectTelemetry()
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}
if err := doWriteBackupManifestCheckpoint(ctx, *sj.ID()); err != nil {
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
return err
}

return protectTimestampForBackup(ctx, p, txn, *sj.ID(), spans, startTime, endTime,
return protectTimestampForBackup(ctx, p, txn, jobID, spans, startTime, endTime,
backupDetails)
}); err != nil {
if sj != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ func (r *restoreResumer) dropDescriptors(
Progress: jobspb.SchemaChangeGCProgress{},
NonCancelable: true,
}
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, jr.MakeJobID(), txn); err != nil {
return err
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,23 +1755,21 @@ func doRestorePlan(
if restoreStmt.Options.Detached {
// When running in detached mode, we simply create the job record.
// We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
collectTelemetry()
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
return err
}
return nil
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func createTypeChangeJobFromDesc(
// Type change jobs are not cancellable.
NonCancelable: true,
}
job, err := jr.CreateJobWithTxn(ctx, record, txn)
if err != nil {
jobID := jr.MakeJobID()
if _, err := jr.CreateJobWithTxn(ctx, record, jobID, txn); err != nil {
return err
}
log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID())
log.Infof(ctx, "queued new type schema change job %d for type %d", jobID, typ.GetID())
return nil
}

Expand Down Expand Up @@ -201,18 +201,18 @@ func createSchemaChangeJobsFromMutations(
},
Progress: jobspb.SchemaChangeProgress{},
}
newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn)
if err != nil {
jobID := jr.MakeJobID()
if _, err := jr.CreateJobWithTxn(ctx, jobRecord, jobID, txn); err != nil {
return err
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *newJob.ID(),
JobID: jobID,
}
mutationJobs = append(mutationJobs, newMutationJob)

log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d",
*newJob.ID(), tableDesc.ID, mutationID)
jobID, tableDesc.ID, mutationID)
}
tableDesc.MutationJobs = mutationJobs
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb:ptpb_go_proto",
"//pkg/roachpb",
"//pkg/security",
"//pkg/server/telemetry",
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -327,13 +328,17 @@ func changefeedPlanHook(
// changeFrontier.manageProtectedTimestamps for more details on the handling of
// protected timestamps.
var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
{
var protectedTimestampID uuid.UUID
var spansToProtect []roachpb.Span
var ptr *ptpb.Record
if hasInitialScan := initialScanFromOptions(details.Opts); hasInitialScan {
protectedTimestampID = uuid.MakeV4()
spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, jobID,
statementTime, spansToProtect)
}

jr := jobs.Record{
Expand All @@ -348,19 +353,15 @@ func changefeedPlanHook(
Details: details,
Progress: *progress.GetChangefeed(),
}
createJobAndProtectedTS := func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}
if protectedTimestampID == uuid.Nil {
return nil
if ptr != nil {
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
}
ptr := jobsprotectedts.MakeRecord(protectedTimestampID, *sj.ID(),
statementTime, spansToProtect)
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
}
if err := p.ExecCfg().DB.Txn(ctx, createJobAndProtectedTS); err != nil {
return nil
}); err != nil {
if sj != nil {
if err := sj.CleanupOnRollback(ctx); err != nil {
log.Warningf(ctx, "failed to cleanup aborted job: %v", err)
Expand Down Expand Up @@ -392,7 +393,7 @@ func changefeedPlanHook(
case <-ctx.Done():
return ctx.Err()
case resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*sj.ID())),
tree.NewDInt(tree.DInt(jobID)),
}:
return nil
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,30 +914,31 @@ func importPlanHook(
if isDetached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}

if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spansToProtect,
if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, jobID, spansToProtect,
walltime, importDetails); err != nil {
return err
}

addToFileFormatTelemetry(format.Format.String(), "started")
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}

return protectTimestampForImport(ctx, p, txn, *sj.ID(), spansToProtect, walltime, importDetails)
return protectTimestampForImport(ctx, p, txn, jobID, spansToProtect, walltime, importDetails)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down Expand Up @@ -1915,7 +1916,8 @@ func (r *importResumer) dropTables(
Progress: jobspb.SchemaChangeGCProgress{},
NonCancelable: true,
}
if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
if _, err := execCfg.JobRegistry.CreateJobWithTxn(
ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4716,7 +4716,7 @@ func TestImportControlJobRBAC(t *testing.T) {
})

startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob {
job, err := registry.CreateAndStartJob(ctx, nil, record)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), record)
require.NoError(t, err)
return job
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStreamIngestionJobRollBack(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
j, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord)
j, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), streamIngestJobRecord)
require.NoError(t, err)

// Insert more data in the table. These changes should be rollback during job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func ingestionPlanHook(
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
return err
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCutoverBuiltin(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
job, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord)
job, err := jobs.TestingCreateAndStartJob(ctx, registry, tc.Server(0).DB(), streamIngestJobRecord)
require.NoError(t, err)

// Check that sentinel is not set.
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_test(
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
5 changes: 0 additions & 5 deletions pkg/jobs/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,6 @@ func (r *Registry) deprecatedResume(ctx context.Context, resumer Resumer, job *J
func (j *Job) deprecatedInsert(
ctx context.Context, txn *kv.Txn, id int64, lease *jobspb.Lease, session sqlliveness.Session,
) error {
if j.id != nil {
// Already created - do nothing.
return nil
}

j.mu.payload.Lease = lease

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
6 changes: 1 addition & 5 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/errors"
)

// FakeResumer calls optional callbacks during the job lifecycle.
Expand Down Expand Up @@ -69,10 +68,7 @@ func (j *Job) Started(ctx context.Context) error {

// Created is a test only function that inserts a new jobs table row.
func (j *Job) Created(ctx context.Context) error {
if j.ID() != nil {
return errors.Errorf("job already created with ID %v", *j.ID())
}
return j.deprecatedInsert(ctx, nil /* txn */, j.registry.makeJobID(), nil /* lease */, nil /* session */)
return j.deprecatedInsert(ctx, nil /* txn */, *j.ID(), nil /* lease */, nil /* session */)
}

// Paused is a wrapper around the internal function that moves a job to the
Expand Down
Loading

0 comments on commit 1f6c36c

Please sign in to comment.