Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobsprofiler: store DistSQL diagram of jobs in job info #99522

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ ALL_TESTS = [
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1159,6 +1160,8 @@ GO_TARGETS = [
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprofiler:jobsprofiler",
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down Expand Up @@ -2632,6 +2635,7 @@ GET_X_DATA_TARGETS = [
"//pkg/jobs/joberror:get_x_data",
"//pkg/jobs/jobsauth:get_x_data",
"//pkg/jobs/jobspb:get_x_data",
"//pkg/jobs/jobsprofiler:get_x_data",
"//pkg/jobs/jobsprotectedts:get_x_data",
"//pkg/jobs/jobstest:get_x_data",
"//pkg/jobs/metricspoller:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/joberror",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -170,7 +171,11 @@ func distBackup(
// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(backupSpecs))
i := 0
var jobID jobspb.JobID
for sqlInstanceID, spec := range backupSpecs {
if i == 0 {
jobID = jobspb.JobID(spec.JobID)
}
corePlacement[i].SQLInstanceID = sqlInstanceID
corePlacement[i].Core.BackupData = spec
i++
Expand Down Expand Up @@ -206,6 +211,9 @@ func distBackup(
defer recv.Release()

defer close(progCh)
execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func restore(
return distRestore(
ctx,
execCtx,
int64(job.ID()),
job.ID(),
dataToRestore,
endTime,
encryption,
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -60,7 +61,7 @@ var replanRestoreFrequency = settings.RegisterDurationSetting(
func distRestore(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
dataToRestore restorationData,
restoreTime hlc.Timestamp,
encryption *jobspb.BackupEncryptionOptions,
Expand Down Expand Up @@ -111,7 +112,7 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: jobID,
JobID: int64(jobID),
RestoreTime: restoreTime,
Encryption: fileEncryption,
TableRekeys: dataToRestore.getRekeys(),
Expand Down Expand Up @@ -184,7 +185,7 @@ func distRestore(
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing,
JobID: jobID,
JobID: int64(jobID),
}
if spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0)
Expand Down Expand Up @@ -292,6 +293,9 @@ func distRestore(
)
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobsauth",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -314,6 +315,8 @@ func startDistChangefeed(
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
}

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
// p is the physical plan, recv is the distsqlreceiver
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/cloud/externalconn/connectionpb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -110,7 +111,11 @@ func distStreamIngest(

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
var jobID jobspb.JobID
for i := range streamIngestionSpecs {
if i == 0 {
jobID = jobspb.JobID(streamIngestionSpecs[i].JobID)
}
corePlacement[i].SQLInstanceID = sqlInstanceIDs[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}
Expand Down Expand Up @@ -151,6 +156,8 @@ func distStreamIngest(
)
defer recv.Release()

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
49 changes: 49 additions & 0 deletions pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobsprofiler",
srcs = ["profiler.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
],
)

go_test(
name = "jobsprofiler_test",
srcs = [
"main_test.go",
"profiler_test.go",
],
args = ["-test.timeout=295s"],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/isql",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
35 changes: 35 additions & 0 deletions pkg/jobs/jobsprofiler/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2017 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobsprofiler_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
defer ccl.TestingEnableEnterprise()()
securityassets.SetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
58 changes: 58 additions & 0 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobsprofiler

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
// table. The generation of the plan diagram and persistence to the info table
// are done asynchronously and this method does not block on their completion.
func StorePlanDiagram(
ctx context.Context, stopper *stop.Stopper, p *sql.PhysicalPlan, db isql.DB, jobID jobspb.JobID,
) {
if err := stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) {
var cancel func()
ctx, cancel = stopper.WithCancelOnQuiesce(ctx)
defer cancel()

err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
flowSpecs := p.GenerateFlowSpecs()
_, diagURL, err := execinfrapb.GeneratePlanDiagramURL(fmt.Sprintf("job:%d", jobID), flowSpecs, execinfrapb.DiagramFlags{})
if err != nil {
return err
}

const infoKey = "dsp-diag-url-%d"
infoStorage := jobs.InfoStorageForJob(txn, jobID)
return infoStorage.Write(ctx, []byte(fmt.Sprintf(infoKey, timeutil.Now().UnixNano())),
[]byte(diagURL.String()))
})
if err != nil {
log.Warningf(ctx, "failed to generate and write DistSQL diagram for job %d: %v",
jobID, err.Error())
}
}); err != nil {
log.Warningf(ctx, "failed to spawn task to generate DistSQL plan diagram for job %d: %v",
jobID, err.Error())
}
}
Loading