Skip to content

Commit

Permalink
jobsprofiler: annotate backup DSP diagram with per-proc progress
Browse files Browse the repository at this point in the history
This change teaches `SHOW JOB <jobID> WITH EXECUTION DETAILS`
when run against a backup job to annotate the latest DSP diagram
stored for the backup with per-node, per-processor progress
information. This annotated diagram is then re-serialized and
stored as the most up-to-date DSP diagram for that job.

The backup job is the first of its kind to annotate a DSP diagram
with per-node, per-proc progress information and so this change
adds some logic to deserialize a flow diagram from a URL and
annotate the diagram's processors with progress information.

Informs: cockroachdb#100488

Release note (sql change): `SHOW JOB WITH EXECUTION DETAILS` for
a backup job will regenerate the DistSQL plan diagram with per-node,
per-processor progress information. This will help better understand
the state of a running backup job.
  • Loading branch information
adityamaru committed May 24, 2023
1 parent bd6f8f5 commit a5eec57
Show file tree
Hide file tree
Showing 6 changed files with 655 additions and 72 deletions.
4 changes: 4 additions & 0 deletions pkg/jobs/jobsprofiler/profilerconstants/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ go_library(
srcs = ["constants.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

get_x_data(name = "get_x_data")
32 changes: 31 additions & 1 deletion pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@

package profilerconstants

import "fmt"
import (
"fmt"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"

Expand All @@ -31,3 +38,26 @@ func MakeNodeProcessorProgressInfoKey(
// The info key is of the form: <prefix>-<flowID>,<instanceID>,<processorID>.
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil
}

// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
// returns the referenced flowID, instanceID and processorID.
func GetNodeProcessorProgressInfoKeyParts(infoKey string) (uuid.UUID, int, int, error) {
parts := strings.Split(strings.TrimPrefix(infoKey, NodeProcessorProgressInfoKeyPrefix), ",")
if len(parts) != 3 {
return uuid.Nil, 0, 0, errors.AssertionFailedf("expected 3 parts in info key but found %d: %v", len(parts), parts)
}
flowID, err := uuid.FromString(parts[0])
if err != nil {
return uuid.Nil, 0, 0, err
}
instanceID, err := strconv.Atoi(parts[1])
if err != nil {
return uuid.Nil, 0, 0, err
}
processorID, err := strconv.Atoi(parts[2])
if err != nil {
return uuid.Nil, 0, 0, err
}

return flowID, instanceID, processorID, nil
}
1 change: 1 addition & 0 deletions pkg/sql/delegate/show_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SELECT job_id, job_type, description, statement, user_name, status,
if n.Options != nil {
if n.Options.ExecutionDetails {
baseQuery.WriteString(`, NULLIF(crdb_internal.job_execution_details(job_id)->>'plan_diagram'::STRING, '') AS plan_diagram`)
baseQuery.WriteString(`, NULLIF(crdb_internal.job_execution_details(job_id)->>'per_component_fraction_progressed'::STRING, '') AS plan_diagram`)
}
}

Expand Down
118 changes: 98 additions & 20 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/url"
"sort"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -640,13 +642,12 @@ type diagramCell struct {
}

type diagramProcessor struct {
NodeIdx int `json:"nodeIdx"`
Inputs []diagramCell `json:"inputs"`
Core diagramCell `json:"core"`
Outputs []diagramCell `json:"outputs"`
StageID int32 `json:"stage"`

processorID int32
NodeIdx int `json:"nodeIdx"`
Inputs []diagramCell `json:"inputs"`
Core diagramCell `json:"core"`
Outputs []diagramCell `json:"outputs"`
StageID int32 `json:"stage"`
ProcessorID int32 `json:"processorID"`
}

type diagramEdge struct {
Expand All @@ -655,8 +656,7 @@ type diagramEdge struct {
DestProc int `json:"destProc"`
DestInput int `json:"destInput"`
Stats []string `json:"stats,omitempty"`

streamID StreamID
StreamID StreamID `json:"streamID"`
}

// FlowDiagram is a plan diagram that can be made into a URL.
Expand All @@ -667,21 +667,44 @@ type FlowDiagram interface {

// AddSpans adds stats extracted from the input spans to the diagram.
AddSpans([]tracingpb.RecordedSpan)

// UpdateComponentFractionProgressed updates the per-component progress on the
// diagram.
UpdateComponentFractionProgressed(perComponentProgress map[ComponentID]float32)
}

type diagramData struct {
SQL string `json:"sql"`
NodeNames []string `json:"nodeNames"`
Processors []diagramProcessor `json:"processors"`
Edges []diagramEdge `json:"edges"`
FlowID FlowID `json:"flow_id"`
Flags DiagramFlags `json:"flags"`

flags DiagramFlags
flowID FlowID
sqlInstanceIDs []base.SQLInstanceID
}

var _ FlowDiagram = &diagramData{}

// FromURL converts a FlowDiagram URL to a FlowDiagram.
func FromURL(url string) (FlowDiagram, error) {
r, err := decodeURLToJSON(url)
if err != nil {
return &diagramData{}, errors.Wrap(err, "failed to decode URL to JSON")
}

d := diagramData{}
err = json.NewDecoder(&r).Decode(&d)
for _, name := range d.NodeNames {
sqlInstanceID, err := strconv.Atoi(name)
if err != nil {
return nil, err
}
d.sqlInstanceIDs = append(d.sqlInstanceIDs, base.SQLInstanceID(sqlInstanceID))
}
return &d, err
}

// ToURL implements the FlowDiagram interface.
func (d diagramData) ToURL() (string, url.URL, error) {
var buf bytes.Buffer
Expand All @@ -691,20 +714,50 @@ func (d diagramData) ToURL() (string, url.URL, error) {
return encodeJSONToURL(buf)
}

// UpdateComponentFractionProgressed implements the FlowDiagram interface.
func (d *diagramData) UpdateComponentFractionProgressed(
perComponentProgress map[ComponentID]float32,
) {
for i := range d.Processors {
p := &d.Processors[i]
sqlInstanceID := d.sqlInstanceIDs[p.NodeIdx]
component := ProcessorComponentID(sqlInstanceID, d.FlowID, p.ProcessorID)
if fraction, ok := perComponentProgress[component]; ok {
var updated bool
for i, detail := range p.Core.Details {
if strings.HasPrefix(detail, "progress") {
p.Core.Details[i] = fmt.Sprintf("progress: %.2f", fraction)
updated = true
break
}
}

// If this is the first time we are recording the component progress then
// we simply append the new details.
//
// TODO(adityamaru): Consider making p.Core.Details a map instead of a
// slice since all the values stored in the slice are key-values.
if !updated {
p.Core.Details = append(p.Core.Details, fmt.Sprintf("progress: %.2f", fraction))
}
}
}
}

// AddSpans implements the FlowDiagram interface.
func (d *diagramData) AddSpans(spans []tracingpb.RecordedSpan) {
statsMap := ExtractStatsFromSpans(spans, d.flags.MakeDeterministic)
statsMap := ExtractStatsFromSpans(spans, d.Flags.MakeDeterministic)
for i := range d.Processors {
p := &d.Processors[i]
sqlInstanceID := d.sqlInstanceIDs[p.NodeIdx]
component := ProcessorComponentID(sqlInstanceID, d.flowID, p.processorID)
component := ProcessorComponentID(sqlInstanceID, d.FlowID, p.ProcessorID)
if compStats := statsMap[component]; compStats != nil {
p.Core.Details = append(p.Core.Details, compStats.StatsForQueryPlan()...)
}
}
for i := range d.Edges {
originSQLInstanceID := d.sqlInstanceIDs[d.Processors[d.Edges[i].SourceProc].NodeIdx]
component := StreamComponentID(originSQLInstanceID, d.flowID, d.Edges[i].streamID)
component := StreamComponentID(originSQLInstanceID, d.FlowID, d.Edges[i].StreamID)
if compStats := statsMap[component]; compStats != nil {
d.Edges[i].Stats = compStats.StatsForQueryPlan()
}
Expand All @@ -719,17 +772,17 @@ func generateDiagramData(
d := &diagramData{
SQL: sql,
sqlInstanceIDs: sqlInstanceIDs,
flags: flags,
Flags: flags,
}
d.NodeNames = make([]string, len(sqlInstanceIDs))
for i := range d.NodeNames {
d.NodeNames[i] = sqlInstanceIDs[i].String()
}

if len(flows) > 0 {
d.flowID = flows[0].FlowID
d.FlowID = flows[0].FlowID
for i := 1; i < len(flows); i++ {
if flows[i].FlowID != d.flowID {
if flows[i].FlowID != d.FlowID {
return nil, errors.AssertionFailedf("flow ID mismatch within a diagram")
}
}
Expand All @@ -746,7 +799,7 @@ func generateDiagramData(
proc := diagramProcessor{NodeIdx: n}
proc.Core.Title, proc.Core.Details = p.Core.GetValue().(diagramCellType).summary()
proc.Core.Title += fmt.Sprintf("/%d", p.ProcessorID)
proc.processorID = p.ProcessorID
proc.ProcessorID = p.ProcessorID
proc.Core.Details = append(proc.Core.Details, p.Post.summary()...)

// We need explicit synchronizers if we have multiple inputs, or if the
Expand Down Expand Up @@ -809,7 +862,7 @@ func generateDiagramData(
// When generating stats, spans are mapped from processor ID in the span
// tags to processor ID in the diagram data. To avoid clashing with
// the processor with ID 0, assign an impossible processorID.
processorID: -1,
ProcessorID: -1,
})
}

Expand All @@ -826,7 +879,7 @@ func generateDiagramData(
edge := diagramEdge{
SourceProc: pIdx,
SourceOutput: srcOutput,
streamID: o.StreamID,
StreamID: o.StreamID,
}
if o.Type == StreamEndpointSpec_SYNC_RESPONSE {
edge.DestProc = len(d.Processors) - 1
Expand Down Expand Up @@ -885,6 +938,31 @@ func GeneratePlanDiagramURL(
return d.ToURL()
}

func decodeURLToJSON(urlStr string) (bytes.Buffer, error) {
var json bytes.Buffer
u, err := url.Parse(urlStr)
if err != nil {
return json, errors.Wrap(err, "failed to Parse URL")
}

compressed := u.Fragment
decoder := base64.NewDecoder(base64.URLEncoding, bytes.NewReader([]byte(compressed)))
decompressor, err := zlib.NewReader(decoder)
if err != nil {
return json, errors.Wrap(err, "failed in NewReader")
}
b, err := io.ReadAll(decompressor)
if err != nil {
return json, errors.Wrap(err, "failed in ReadAll")
}
if err := decompressor.Close(); err != nil {
return json, err
}

_, err = json.Write(b)
return json, err
}

func encodeJSONToURL(json bytes.Buffer) (string, url.URL, error) {
var compressed bytes.Buffer
jsonStr := json.String()
Expand Down
Loading

0 comments on commit a5eec57

Please sign in to comment.