Skip to content

Commit

Permalink
Merge #65324
Browse files Browse the repository at this point in the history
65324: cli: add cockroach debug job-trace r=pbardea,knz a=adityamaru

This change adds a new cli command - `cockroach debug job-trace`
that can be used to grab information from inflight trace spans
associated with the execution of a particular job.
The command takes a `jobID` and file destination where the
information will be dumped. It also requires a `--url` of the
node from which to pull the inflight spans.

In the future we will have a means of pulling inflight spans from
the cluster, but at the moment this command must be run against every
node to get a complete picture of job execution.

Informs: #64992

Release note (cli change): Adds a `cockroach debug job-trace` command
that takes 2 arguments: <jobID> and file destination, along with a
`--url` pointing to the node on which to execute this command.
The command pulls information about inflight trace spans associated
with the job and dumps it to the file destination.


Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
craig[bot] and adityamaru committed Jun 1, 2021
2 parents 5fb0e29 + a7a8da0 commit 7762f00
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 108 deletions.
6 changes: 6 additions & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"cpuprofile.go",
"debug.go",
"debug_check_store.go",
"debug_job_trace.go",
"debug_list_files.go",
"debug_logconfig.go",
"debug_merge_logs.go",
Expand Down Expand Up @@ -276,6 +277,7 @@ go_test(
"cli_test.go",
"connect_join_test.go",
"debug_check_store_test.go",
"debug_job_trace_test.go",
"debug_list_files_test.go",
"debug_merge_logs_test.go",
"debug_test.go",
Expand Down Expand Up @@ -307,6 +309,7 @@ go_test(
"//pkg/cli/exit",
"//pkg/gossip",
"//pkg/gossip/resolver",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -338,6 +341,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/encoding/csv",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/channel",
Expand All @@ -346,10 +350,12 @@ go_test(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/workload/examples",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_kr_pretty//:pretty",
"@com_github_lib_pq//:pq",
"@com_github_spf13_cobra//:cobra",
"@com_github_spf13_pflag//:pflag",
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,8 @@ func init() {
debugDoctorCmd.AddCommand(doctorExamineCmd, doctorRecreateCmd, doctorExamineFallbackClusterCmd, doctorExamineFallbackZipDirCmd)
DebugCmd.AddCommand(debugDoctorCmd)

DebugCmd.AddCommand(debugJobTraceFromClusterCmd)

f := debugSyncBenchCmd.Flags()
f.IntVarP(&syncBenchOpts.Concurrency, "concurrency", "c", syncBenchOpts.Concurrency,
"number of concurrent writers")
Expand Down
112 changes: 112 additions & 0 deletions pkg/cli/debug_job_trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package cli

import (
"database/sql/driver"
"fmt"
"io"
"os"
"strconv"

"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)

var debugJobTraceFromClusterCmd = &cobra.Command{
Use: "job-trace <job_id> <file_path> --url=<cluster connection string>",
Short: "get the trace payloads for the executing job",
Args: cobra.MinimumNArgs(2),
RunE: MaybeDecorateGRPCError(runDebugJobTrace),
}

func runDebugJobTrace(_ *cobra.Command, args []string) error {
jobID, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
return err
}

sqlConn, err := makeSQLClient("cockroach debug job-trace", useSystemDb)
if err != nil {
return errors.Wrap(err, "could not establish connection to cluster")
}
defer sqlConn.Close()

return writeTracePayloadJSON(sqlConn, jobID, args[1])
}

func getJobTraceID(sqlConn *sqlConn, jobID int64) (int64, error) {
var traceID int64
rows, err := sqlConn.Query(`SELECT trace_id FROM crdb_internal.jobs WHERE job_id=$1`, []driver.Value{jobID})
if err != nil {
return traceID, err
}
vals := make([]driver.Value, 1)
for {
var err error
if err = rows.Next(vals); err == io.EOF {
break
}
if err != nil {
return traceID, err
}
}
if err := rows.Close(); err != nil {
return traceID, err
}
if vals[0] == nil {
return traceID, errors.Newf("no job entry found for %d", jobID)
}
var ok bool
traceID, ok = vals[0].(int64)
if !ok {
return traceID, errors.New("failed to parse traceID")
}
return traceID, nil
}

func writeTracePayloadJSON(sqlConn *sqlConn, jobID int64, traceFilePath string) error {
maybePrint := func(stmt string) string {
if debugCtx.verbose {
fmt.Println("querying " + stmt)
}
return stmt
}

// Check if a timeout has been set for this command.
if cliCtx.cmdTimeout != 0 {
stmt := fmt.Sprintf(`SET statement_timeout = '%s'`, cliCtx.cmdTimeout)
if err := sqlConn.Exec(maybePrint(stmt), nil); err != nil {
return err
}
}

traceID, err := getJobTraceID(sqlConn, jobID)
if err != nil {
return err
}

var inflightSpanQuery = `
WITH spans AS(
SELECT span_id, goroutine_id, operation, start_time, duration
FROM crdb_internal.node_inflight_trace_spans
WHERE trace_id=$1
) SELECT *
FROM spans, LATERAL crdb_internal.payloads_for_span(spans.span_id)`

var f *os.File
if f, err = os.Create(traceFilePath); err != nil {
return err
}
defer f.Close()

return runQueryAndFormatResults(sqlConn, f, makeQuery(inflightSpanQuery, traceID))
}
177 changes: 177 additions & 0 deletions pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package cli

import (
"context"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)

// A special jobs.Resumer that, instead of finishing
// the job successfully, forces the job to be paused.
var _ jobs.Resumer = &traceSpanResumer{}
var _ jobs.TraceableJob = &traceSpanResumer{}

func (r *traceSpanResumer) ForceRealSpan() {}

type traceSpanResumer struct {
ctx context.Context
recordedSpanCh chan struct{}
completeResumerCh chan struct{}
}

func (r *traceSpanResumer) Resume(ctx context.Context, _ interface{}) error {
_, span := tracing.ChildSpan(ctx, "trace test")
defer span.Finish()
// Picked a random proto message that was simple to match output against.
span.RecordStructured(&serverpb.TableStatsRequest{Database: "foo", Table: "bar"})
r.recordedSpanCh <- struct{}{}
<-r.completeResumerCh
return nil
}

func (r *traceSpanResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return errors.New("unimplemented")
}

// Header from the output of `cockroach debug job-trace`.
var jobTraceHeader = []string{
"span_id", "goroutine_id", "operation", "start_time", "duration", "payload_type", "payload_jsonb",
}

func TestDebugJobTrace(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

c := NewCLITest(TestCLIParams{T: t})
defer c.Cleanup()
c.omitArgs = true

registry := c.TestServer.JobRegistry().(*jobs.Registry)
jobCtx, _ := context.WithCancel(ctx)
completeResumerCh := make(chan struct{})
recordedSpanCh := make(chan struct{})
defer close(completeResumerCh)
defer close(recordedSpanCh)

jobs.RegisterConstructor(
jobspb.TypeBackup,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &traceSpanResumer{
ctx: jobCtx,
completeResumerCh: completeResumerCh,
recordedSpanCh: recordedSpanCh,
}
},
)

// Create a "backup job" but we have overridden the resumer constructor above
// to inject our traceSpanResumer.
var job *jobs.StartableJob
id := registry.MakeJobID()
require.NoError(t, c.TestServer.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
err = registry.CreateStartableJobWithTxn(ctx, &job, id, txn, jobs.Record{
Username: security.RootUserName(),
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
})
return err
}))

require.NoError(t, job.Start(ctx))

// Wait for the job to record information in the trace span.
<-recordedSpanCh

tempDir, cleanup := testutils.TempDir(t)
defer cleanup()
targetFilePath := filepath.Join(tempDir, "tracetest")
args := []string{strconv.Itoa(int(id)), targetFilePath}
pgURL, _ := sqlutils.PGUrl(t, c.TestServer.ServingSQLAddr(),
"TestDebugJobTrace", url.User(security.RootUser))

_, err := c.RunWithCaptureArgs([]string{`debug`, `job-trace`, args[0], args[1], fmt.Sprintf(`--url=%s`, pgURL.String()), `--format=csv`})
require.NoError(t, err)
actual, err := ioutil.ReadFile(targetFilePath)
if err != nil {
t.Errorf("Failed to read actual result from %v: %v", targetFilePath, err)
}

if err := matchCSVHeader(string(actual), jobTraceHeader); err != nil {
t.Fatal(err)
}

operationName := fmt.Sprintf("BACKUP-%d", id)
exp := [][]string{
// This is the span recording we injected above.
{`\d+`, `\d+`, operationName, ".*", ".*", "server.serverpb.TableStatsRequest", "{\"@type\": \"type.googleapis.com/cockroach.server.serverpb.TableStatsRequest\", \"database\": \"foo\", \"table\": \"bar\"}"},
}
if err := MatchCSV(string(actual), exp); err != nil {
t.Fatal(err)
}
}

func matchCSVHeader(csvStr string, expectedHeader []string) (err error) {
defer func() {
if err != nil {
err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s",
csvStr, pretty.Sprint(expectedHeader), err)
}
}()

csvStr = ElideInsecureDeprecationNotice(csvStr)
reader := csv.NewReader(strings.NewReader(csvStr))
reader.FieldsPerRecord = -1
records, err := reader.ReadAll()
if err != nil {
return err
}

if len(records) < 1 {
return errors.Errorf("csv is empty")
}

// Only match the first record, i.e. the expectedHeader.
headerRow := records[0]
if lr, lm := len(headerRow), len(expectedHeader); lr != lm {
return errors.Errorf("csv header has %d columns, but expected %d", lr, lm)
}
for j := range expectedHeader {
exp, act := expectedHeader[j], headerRow[j]
if exp != act {
err = errors.Errorf("found %q which does not match %q", act, exp)
}
}
return err
}
5 changes: 5 additions & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ func init() {
}

clientCmds := []*cobra.Command{
debugJobTraceFromClusterCmd,
debugGossipValuesCmd,
debugTimeSeriesDumpCmd,
debugZipCmd,
Expand Down Expand Up @@ -644,6 +645,7 @@ func init() {
timeoutCmds := []*cobra.Command{
statusNodeCmd,
lsNodesCmd,
debugJobTraceFromClusterCmd,
debugZipCmd,
doctorExamineClusterCmd,
doctorExamineFallbackClusterCmd,
Expand Down Expand Up @@ -715,6 +717,7 @@ func init() {
sqlCmds := []*cobra.Command{
sqlShellCmd,
demoCmd,
debugJobTraceFromClusterCmd,
doctorExamineClusterCmd,
doctorExamineFallbackClusterCmd,
doctorRecreateClusterCmd,
Expand Down Expand Up @@ -778,6 +781,7 @@ func init() {
demoCmd,
debugListFilesCmd,
debugTimeSeriesDumpCmd,
debugJobTraceFromClusterCmd,
},
demoCmd.Commands()...)
tableOutputCommands = append(tableOutputCommands, nodeCmds...)
Expand Down Expand Up @@ -905,6 +909,7 @@ func init() {
}
{
for _, c := range []*cobra.Command{
debugJobTraceFromClusterCmd,
doctorExamineClusterCmd,
doctorExamineZipDirCmd,
doctorExamineFallbackClusterCmd,
Expand Down
Loading

0 comments on commit 7762f00

Please sign in to comment.