diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index c25f8b31f978..071883cbc3ba 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "cpuprofile.go", "debug.go", "debug_check_store.go", + "debug_job_trace.go", "debug_list_files.go", "debug_logconfig.go", "debug_merge_logs.go", @@ -276,6 +277,7 @@ go_test( "cli_test.go", "connect_join_test.go", "debug_check_store_test.go", + "debug_job_trace_test.go", "debug_list_files_test.go", "debug_merge_logs_test.go", "debug_test.go", @@ -307,6 +309,7 @@ go_test( "//pkg/cli/exit", "//pkg/gossip", "//pkg/gossip/resolver", + "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", @@ -338,6 +341,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/encoding/csv", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/log/channel", @@ -346,10 +350,12 @@ go_test( "//pkg/util/protoutil", "//pkg/util/stop", "//pkg/util/timeutil", + "//pkg/util/tracing", "//pkg/workload/examples", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_kr_pretty//:pretty", "@com_github_lib_pq//:pq", "@com_github_spf13_cobra//:cobra", "@com_github_spf13_pflag//:pflag", diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 699cd498d329..4cf3c6134212 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1398,6 +1398,8 @@ func init() { debugDoctorCmd.AddCommand(doctorExamineCmd, doctorRecreateCmd, doctorExamineFallbackClusterCmd, doctorExamineFallbackZipDirCmd) DebugCmd.AddCommand(debugDoctorCmd) + DebugCmd.AddCommand(debugJobTraceFromClusterCmd) + f := debugSyncBenchCmd.Flags() f.IntVarP(&syncBenchOpts.Concurrency, "concurrency", "c", syncBenchOpts.Concurrency, "number of concurrent writers") diff --git a/pkg/cli/debug_job_trace.go b/pkg/cli/debug_job_trace.go new file mode 100644 index 000000000000..685eafbde0dc --- /dev/null +++ b/pkg/cli/debug_job_trace.go @@ -0,0 +1,112 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "database/sql/driver" + "fmt" + "io" + "os" + "strconv" + + "github.com/cockroachdb/errors" + "github.com/spf13/cobra" +) + +var debugJobTraceFromClusterCmd = &cobra.Command{ + Use: "job-trace --url=", + Short: "get the trace payloads for the executing job", + Args: cobra.MinimumNArgs(2), + RunE: MaybeDecorateGRPCError(runDebugJobTrace), +} + +func runDebugJobTrace(_ *cobra.Command, args []string) error { + jobID, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return err + } + + sqlConn, err := makeSQLClient("cockroach debug job-trace", useSystemDb) + if err != nil { + return errors.Wrap(err, "could not establish connection to cluster") + } + defer sqlConn.Close() + + return writeTracePayloadJSON(sqlConn, jobID, args[1]) +} + +func getJobTraceID(sqlConn *sqlConn, jobID int64) (int64, error) { + var traceID int64 + rows, err := sqlConn.Query(`SELECT trace_id FROM crdb_internal.jobs WHERE job_id=$1`, []driver.Value{jobID}) + if err != nil { + return traceID, err + } + vals := make([]driver.Value, 1) + for { + var err error + if err = rows.Next(vals); err == io.EOF { + break + } + if err != nil { + return traceID, err + } + } + if err := rows.Close(); err != nil { + return traceID, err + } + if vals[0] == nil { + return traceID, errors.Newf("no job entry found for %d", jobID) + } + var ok bool + traceID, ok = vals[0].(int64) + if !ok { + return traceID, errors.New("failed to parse traceID") + } + return traceID, nil +} + +func writeTracePayloadJSON(sqlConn *sqlConn, jobID int64, traceFilePath string) error { + maybePrint := func(stmt string) string { + if debugCtx.verbose { + fmt.Println("querying " + stmt) + } + return stmt + } + + // Check if a timeout has been set for this command. + if cliCtx.cmdTimeout != 0 { + stmt := fmt.Sprintf(`SET statement_timeout = '%s'`, cliCtx.cmdTimeout) + if err := sqlConn.Exec(maybePrint(stmt), nil); err != nil { + return err + } + } + + traceID, err := getJobTraceID(sqlConn, jobID) + if err != nil { + return err + } + + var inflightSpanQuery = ` +WITH spans AS( +SELECT span_id, goroutine_id, operation, start_time, duration +FROM crdb_internal.node_inflight_trace_spans +WHERE trace_id=$1 +) SELECT * +FROM spans, LATERAL crdb_internal.payloads_for_span(spans.span_id)` + + var f *os.File + if f, err = os.Create(traceFilePath); err != nil { + return err + } + defer f.Close() + + return runQueryAndFormatResults(sqlConn, f, makeQuery(inflightSpanQuery, traceID)) +} diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go new file mode 100644 index 000000000000..484cd5cc14dc --- /dev/null +++ b/pkg/cli/debug_job_trace_test.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "context" + "fmt" + "io/ioutil" + "net/url" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" +) + +// A special jobs.Resumer that, instead of finishing +// the job successfully, forces the job to be paused. +var _ jobs.Resumer = &traceSpanResumer{} +var _ jobs.TraceableJob = &traceSpanResumer{} + +func (r *traceSpanResumer) ForceRealSpan() {} + +type traceSpanResumer struct { + ctx context.Context + recordedSpanCh chan struct{} + completeResumerCh chan struct{} +} + +func (r *traceSpanResumer) Resume(ctx context.Context, _ interface{}) error { + _, span := tracing.ChildSpan(ctx, "trace test") + defer span.Finish() + // Picked a random proto message that was simple to match output against. + span.RecordStructured(&serverpb.TableStatsRequest{Database: "foo", Table: "bar"}) + r.recordedSpanCh <- struct{}{} + <-r.completeResumerCh + return nil +} + +func (r *traceSpanResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + return errors.New("unimplemented") +} + +// Header from the output of `cockroach debug job-trace`. +var jobTraceHeader = []string{ + "span_id", "goroutine_id", "operation", "start_time", "duration", "payload_type", "payload_jsonb", +} + +func TestDebugJobTrace(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() + + c := NewCLITest(TestCLIParams{T: t}) + defer c.Cleanup() + c.omitArgs = true + + registry := c.TestServer.JobRegistry().(*jobs.Registry) + jobCtx, _ := context.WithCancel(ctx) + completeResumerCh := make(chan struct{}) + recordedSpanCh := make(chan struct{}) + defer close(completeResumerCh) + defer close(recordedSpanCh) + + jobs.RegisterConstructor( + jobspb.TypeBackup, + func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return &traceSpanResumer{ + ctx: jobCtx, + completeResumerCh: completeResumerCh, + recordedSpanCh: recordedSpanCh, + } + }, + ) + + // Create a "backup job" but we have overridden the resumer constructor above + // to inject our traceSpanResumer. + var job *jobs.StartableJob + id := registry.MakeJobID() + require.NoError(t, c.TestServer.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + err = registry.CreateStartableJobWithTxn(ctx, &job, id, txn, jobs.Record{ + Username: security.RootUserName(), + Details: jobspb.BackupDetails{}, + Progress: jobspb.BackupProgress{}, + }) + return err + })) + + require.NoError(t, job.Start(ctx)) + + // Wait for the job to record information in the trace span. + <-recordedSpanCh + + tempDir, cleanup := testutils.TempDir(t) + defer cleanup() + targetFilePath := filepath.Join(tempDir, "tracetest") + args := []string{strconv.Itoa(int(id)), targetFilePath} + pgURL, _ := sqlutils.PGUrl(t, c.TestServer.ServingSQLAddr(), + "TestDebugJobTrace", url.User(security.RootUser)) + + _, err := c.RunWithCaptureArgs([]string{`debug`, `job-trace`, args[0], args[1], fmt.Sprintf(`--url=%s`, pgURL.String()), `--format=csv`}) + require.NoError(t, err) + actual, err := ioutil.ReadFile(targetFilePath) + if err != nil { + t.Errorf("Failed to read actual result from %v: %v", targetFilePath, err) + } + + if err := matchCSVHeader(string(actual), jobTraceHeader); err != nil { + t.Fatal(err) + } + + operationName := fmt.Sprintf("BACKUP-%d", id) + exp := [][]string{ + // This is the span recording we injected above. + {`\d+`, `\d+`, operationName, ".*", ".*", "server.serverpb.TableStatsRequest", "{\"@type\": \"type.googleapis.com/cockroach.server.serverpb.TableStatsRequest\", \"database\": \"foo\", \"table\": \"bar\"}"}, + } + if err := MatchCSV(string(actual), exp); err != nil { + t.Fatal(err) + } +} + +func matchCSVHeader(csvStr string, expectedHeader []string) (err error) { + defer func() { + if err != nil { + err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", + csvStr, pretty.Sprint(expectedHeader), err) + } + }() + + csvStr = ElideInsecureDeprecationNotice(csvStr) + reader := csv.NewReader(strings.NewReader(csvStr)) + reader.FieldsPerRecord = -1 + records, err := reader.ReadAll() + if err != nil { + return err + } + + if len(records) < 1 { + return errors.Errorf("csv is empty") + } + + // Only match the first record, i.e. the expectedHeader. + headerRow := records[0] + if lr, lm := len(headerRow), len(expectedHeader); lr != lm { + return errors.Errorf("csv header has %d columns, but expected %d", lr, lm) + } + for j := range expectedHeader { + exp, act := expectedHeader[j], headerRow[j] + if exp != act { + err = errors.Errorf("found %q which does not match %q", act, exp) + } + } + return err +} diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index d10e5bfb97c4..b5013b4be21b 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -599,6 +599,7 @@ func init() { } clientCmds := []*cobra.Command{ + debugJobTraceFromClusterCmd, debugGossipValuesCmd, debugTimeSeriesDumpCmd, debugZipCmd, @@ -644,6 +645,7 @@ func init() { timeoutCmds := []*cobra.Command{ statusNodeCmd, lsNodesCmd, + debugJobTraceFromClusterCmd, debugZipCmd, doctorExamineClusterCmd, doctorExamineFallbackClusterCmd, @@ -715,6 +717,7 @@ func init() { sqlCmds := []*cobra.Command{ sqlShellCmd, demoCmd, + debugJobTraceFromClusterCmd, doctorExamineClusterCmd, doctorExamineFallbackClusterCmd, doctorRecreateClusterCmd, @@ -778,6 +781,7 @@ func init() { demoCmd, debugListFilesCmd, debugTimeSeriesDumpCmd, + debugJobTraceFromClusterCmd, }, demoCmd.Commands()...) tableOutputCommands = append(tableOutputCommands, nodeCmds...) @@ -905,6 +909,7 @@ func init() { } { for _, c := range []*cobra.Command{ + debugJobTraceFromClusterCmd, doctorExamineClusterCmd, doctorExamineZipDirCmd, doctorExamineFallbackClusterCmd, diff --git a/pkg/cli/testutils.go b/pkg/cli/testutils.go index a2a1e7faff6b..29b3102acd80 100644 --- a/pkg/cli/testutils.go +++ b/pkg/cli/testutils.go @@ -13,12 +13,14 @@ package cli import ( "bytes" "context" + "encoding/csv" "fmt" "io" "io/ioutil" "net" "os" "path/filepath" + "regexp" "strings" "testing" @@ -31,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/kr/pretty" ) // TestingReset resets global mutable state so that Run can be called multiple @@ -371,3 +374,70 @@ func (c TestCLI) RunWithCAArgs(origArgs []string) { fmt.Println(err) } } + +// ElideInsecureDeprecationNotice elides the deprecation notice for --insecure. +func ElideInsecureDeprecationNotice(csvStr string) string { + // v20.1 introduces a deprecation notice for --insecure. Skip over it. + // TODO(knz): Remove this when --insecure is dropped. + // See: https://github.com/cockroachdb/cockroach/issues/53404 + lines := strings.SplitN(csvStr, "\n", 3) + if len(lines) > 0 && strings.HasPrefix(lines[0], "Flag --insecure has been deprecated") { + csvStr = lines[2] + } + return csvStr +} + +// GetCsvNumCols returns the number of columns in the given csv string. +func GetCsvNumCols(csvStr string) (cols int, err error) { + csvStr = ElideInsecureDeprecationNotice(csvStr) + reader := csv.NewReader(strings.NewReader(csvStr)) + records, err := reader.Read() + if err != nil { + return 0, errors.Errorf("error reading csv input: \n %v\n errors:%s", csvStr, err) + } + return len(records), nil +} + +// MatchCSV matches a multi-line csv string with the provided regex +// (matchColRow[i][j] will be matched against the i-th line, j-th column). +func MatchCSV(csvStr string, matchColRow [][]string) (err error) { + defer func() { + if err != nil { + err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", + csvStr, pretty.Sprint(matchColRow), err) + } + }() + + csvStr = ElideInsecureDeprecationNotice(csvStr) + reader := csv.NewReader(strings.NewReader(csvStr)) + reader.FieldsPerRecord = -1 + records, err := reader.ReadAll() + if err != nil { + return err + } + + lr, lm := len(records), len(matchColRow) + if lr < lm { + return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) + } + + // Compare only the last len(matchColRow) records. That is, if we want to + // match 4 rows and we have 100 records, we only really compare + // records[96:], that is, the last four rows. + records = records[lr-lm:] + + for i := range records { + if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { + return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) + } + for j := range records[i] { + pat, str := matchColRow[i][j], records[i][j] + re := regexp.MustCompile(pat) + if !re.MatchString(str) { + err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", + err, i+1, j+1, str, pat) + } + } + } + return err +} diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index c62bea45248a..1342f4fbfeaa 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -133,6 +133,7 @@ go_library( deps = [ "//pkg/base", "//pkg/ccl/changefeedccl/cdctest", + "//pkg/cli", "//pkg/cmd/cmpconn", "//pkg/cmd/internal/issues", "//pkg/gossip", diff --git a/pkg/cmd/roachtest/cli.go b/pkg/cmd/roachtest/cli.go index e569a3b0bbaa..4a7758c522f3 100644 --- a/pkg/cmd/roachtest/cli.go +++ b/pkg/cmd/roachtest/cli.go @@ -15,6 +15,8 @@ import ( "reflect" "strings" "time" + + "github.com/cockroachdb/cockroach/pkg/cli" ) func runCLINodeStatus(ctx context.Context, t *test, c *cluster) { @@ -28,7 +30,7 @@ func runCLINodeStatus(ctx context.Context, t *test, c *cluster) { lastWords := func(s string) []string { var result []string - s = elideInsecureDeprecationNotice(s) + s = cli.ElideInsecureDeprecationNotice(s) lines := strings.Split(s, "\n") for _, line := range lines { words := strings.Fields(line) diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 1d6aea9ccb57..610f66e395e2 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -12,21 +12,19 @@ package main import ( "context" - "encoding/csv" "fmt" "math/rand" "reflect" - "regexp" "strconv" - "strings" "time" + "github.com/cockroachdb/cockroach/pkg/cli" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" "github.com/kr/pretty" _ "github.com/lib/pq" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -333,7 +331,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { decommissionHeader, {strconv.Itoa(targetNode), "true", `\d+`, "true", "decommissioning", "false"}, } - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } @@ -348,10 +346,11 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) exp := h.expectCell(targetNode-1, /* node IDs are 1-indexed */ statusHeaderMembershipColumnIdx, `decommissioning`, c.spec.NodeCount, numCols) - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } } @@ -377,10 +376,11 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) exp := h.expectCell(targetNode-1, /* node IDs are 1-indexed */ statusHeaderMembershipColumnIdx, `active`, c.spec.NodeCount, numCols) - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } } @@ -412,7 +412,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { rowRegex := []string{strconv.Itoa(i), "true", `\d+`, "true", "decommissioning", "false"} exp = append(exp, rowRegex) } - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatalf("decommission failed: %v", err) } return nil @@ -432,13 +432,14 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) var colRegex []string for i := 1; i <= c.spec.NodeCount; i++ { colRegex = append(colRegex, `decommissioning`) } exp := h.expectColumn(statusHeaderMembershipColumnIdx, colRegex, c.spec.NodeCount, numCols) - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } } @@ -476,13 +477,14 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) var colRegex []string for i := 1; i <= c.spec.NodeCount; i++ { colRegex = append(colRegex, `active`) } exp := h.expectColumn(statusHeaderMembershipColumnIdx, colRegex, c.spec.NodeCount, numCols) - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } } @@ -533,7 +535,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { {strconv.Itoa(targetNodeB), "true", "0", "true", "decommissioned", "false"}, decommissionFooter, } - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -556,7 +558,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { } exp = append(exp, []string{strconv.Itoa(i)}) } - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -571,7 +573,8 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) if err := retry.WithMaxAttempts(ctx, retry.Options{}, 50, func() error { colRegex := []string{} for i := 1; i <= c.spec.NodeCount; i++ { @@ -581,7 +584,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { colRegex = append(colRegex, strconv.Itoa(i)) } exp := h.expectIDsInStatusOut(colRegex, numCols) - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -618,7 +621,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { {strconv.Itoa(targetNodeB), "false", "0", "true", "decommissioned", "false"}, decommissionFooter, } - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } } @@ -709,7 +712,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { {strconv.Itoa(targetNode), "true|false", "0", "true", "decommissioned", "false"}, decommissionFooter, } - if err := h.matchCSV(o, exp); err != nil { + if err := cli.MatchCSV(o, exp); err != nil { t.Fatal(err) } @@ -729,7 +732,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { exp = append(exp, []string{fmt.Sprintf("[^%d]", targetNode)}) } - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -742,13 +745,14 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) var expC []string for i := 1; i <= c.spec.NodeCount-len(h.randNodeBlocklist); i++ { expC = append(expC, fmt.Sprintf("[^%d].*", targetNode)) } exp := h.expectIDsInStatusOut(expC, numCols) - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -776,7 +780,8 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { if err != nil { t.Fatalf("node-status failed: %v", err) } - numCols := h.getCsvNumCols(o) + numCols, err := cli.GetCsvNumCols(o) + require.NoError(t, err) var expC []string // The decommissioned nodes should all disappear. (We // abuse that nodeIDs are single-digit in this test). @@ -791,7 +796,7 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { expC = append(expC, re) } exp := h.expectIDsInStatusOut(expC, numCols) - return h.matchCSV(o, exp) + return cli.MatchCSV(o, exp) }); err != nil { t.Fatal(err) } @@ -945,72 +950,6 @@ func (h *decommTestHelper) recommission( return execCLI(ctx, h.t, h.c, runNode, args...) } -func elideInsecureDeprecationNotice(csvStr string) string { - // v20.1 introduces a deprecation notice for --insecure. Skip over it. - // TODO(knz): Remove this when --insecure is dropped. - // See: https://github.com/cockroachdb/cockroach/issues/53404 - lines := strings.SplitN(csvStr, "\n", 3) - if len(lines) > 0 && strings.HasPrefix(lines[0], "Flag --insecure has been deprecated") { - csvStr = lines[2] - } - return csvStr -} - -// getCsvNumCols returns the number of columns in the given csv string. -func (h *decommTestHelper) getCsvNumCols(csvStr string) (cols int) { - csvStr = elideInsecureDeprecationNotice(csvStr) - reader := csv.NewReader(strings.NewReader(csvStr)) - records, err := reader.Read() - if err != nil { - h.t.Fatal(errors.Errorf("error reading csv input: \n %v\n errors:%s", csvStr, err)) - } - return len(records) -} - -// matchCSV matches a multi-line csv string with the provided regex -// (matchColRow[i][j] will be matched against the i-th line, j-th column). -func (h *decommTestHelper) matchCSV(csvStr string, matchColRow [][]string) (err error) { - defer func() { - if err != nil { - err = errors.Errorf("csv input:\n%v\nexpected:\n%s\nerrors:%s", - csvStr, pretty.Sprint(matchColRow), err) - } - }() - - csvStr = elideInsecureDeprecationNotice(csvStr) - reader := csv.NewReader(strings.NewReader(csvStr)) - reader.FieldsPerRecord = -1 - records, err := reader.ReadAll() - if err != nil { - return err - } - - lr, lm := len(records), len(matchColRow) - if lr < lm { - return errors.Errorf("csv has %d rows, but expected at least %d", lr, lm) - } - - // Compare only the last len(matchColRow) records. That is, if we want to - // match 4 rows and we have 100 records, we only really compare - // records[96:], that is, the last four rows. - records = records[lr-lm:] - - for i := range records { - if lr, lm := len(records[i]), len(matchColRow[i]); lr != lm { - return errors.Errorf("row #%d: csv has %d columns, but expected %d", i+1, lr, lm) - } - for j := range records[i] { - pat, str := matchColRow[i][j], records[i][j] - re := regexp.MustCompile(pat) - if !re.MatchString(str) { - err = errors.Errorf("%v\nrow #%d, col #%d: found %q which does not match %q", - err, i+1, j+1, str, pat) - } - } - } - return err -} - // expectColumn constructs a matching regex for a given column (identified // by its column index). func (h *decommTestHelper) expectColumn( diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index e42d3756e23d..caf6540f1ee7 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -605,22 +605,22 @@ func tsOrNull(micros int64) (tree.Datum, error) { var crdbInternalJobsTable = virtualSchemaTable{ schema: ` CREATE TABLE crdb_internal.jobs ( - job_id INT, - job_type STRING, - description STRING, - statement STRING, - user_name STRING, - descriptor_ids INT[], - status STRING, - running_status STRING, - created TIMESTAMP, - started TIMESTAMP, - finished TIMESTAMP, - modified TIMESTAMP, - fraction_completed FLOAT, - high_water_timestamp DECIMAL, - error STRING, - coordinator_id INT, + job_id INT, + job_type STRING, + description STRING, + statement STRING, + user_name STRING, + descriptor_ids INT[], + status STRING, + running_status STRING, + created TIMESTAMP, + started TIMESTAMP, + finished TIMESTAMP, + modified TIMESTAMP, + fraction_completed FLOAT, + high_water_timestamp DECIMAL, + error STRING, + coordinator_id INT, trace_id INT )`, comment: `decoded job metadata from system.jobs (KV scan)`,