Skip to content

Commit

Permalink
tracing: wrap structured recordings with time at which they were reco…
Browse files Browse the repository at this point in the history
…rded

This change wrap the existing Structured record in a proto that
has a `time` field, indicating what time the record was recorded at.

This is similar to how we wrap free form logs in LogRecord and will
help interleaving child recordings between parent recordings, at
the time when they occurred rather than at the span start time.

This change was motivated by the fact that jobs are going to be using
structured recordings to indicate which phase of execution they are
currently in. When consuming this information, it is more intuitive to
see interleaved events, than all events for the parent span followed by
all events for the child span.

Currently we see:
```
ProcSentExportRequest <- processor level
ProcReceivedExportResponse
ExportRequestEvalStarted <- export.go level
ExportRequestEvalFinished
```

We would like to see:
```
ProcSentExportRequest <- processor level
ExportRequestEvalStarted <- export.go level
ExportRequestEvalFinished
ProcReceivedExportResponse
```

Informs: cockroachdb#64992
  • Loading branch information
adityamaru committed Jun 10, 2021
1 parent 9eb1b96 commit a7a45dd
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 126 deletions.
6 changes: 4 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
}
var ev roachpb.ContentionEvent
for i := range meta.TraceData {
meta.TraceData[i].Structured(func(any *pbtypes.Any) {
if err := meta.TraceData[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
Expand All @@ -868,7 +868,9 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
r.contendedQueryMetric = nil
}
r.contentionRegistry.AddContentionEvent(ev)
})
}); err != nil {
r.SetError(err)
}
}
}
if meta.Metrics != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/execinfra/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
pbtypes "github.com/gogo/protobuf/types"
)
Expand All @@ -38,15 +39,18 @@ func GetCumulativeContentionTime(ctx context.Context) time.Duration {
}
var ev roachpb.ContentionEvent
for i := range recording {
recording[i].Structured(func(any *pbtypes.Any) {
if err := recording[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
return
}
cumulativeContentionTime += ev.Duration
})
}); err != nil {
log.Errorf(ctx, "failed to collect contention time for all spans: %+v", err)
return cumulativeContentionTime
}
}
return cumulativeContentionTime
}
18 changes: 13 additions & 5 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
package execinfrapb

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -334,7 +337,7 @@ func ExtractStatsFromSpans(
var componentStats ComponentStats
for i := range spans {
span := &spans[i]
span.Structured(func(item *types.Any) {
if err := span.Structured(func(item *types.Any, _ time.Time) {
if !types.Is(item, &componentStats) {
return
}
Expand Down Expand Up @@ -363,21 +366,23 @@ func ExtractStatsFromSpans(
// longer present.
statsMap[stats.Component] = existing.Union(&stats)
}
})
}); err != nil {
return nil
}
}
return statsMap
}

// ExtractNodesFromSpans extracts a list of node ids from a set of tracing
// spans.
func ExtractNodesFromSpans(spans []tracingpb.RecordedSpan) util.FastIntSet {
func ExtractNodesFromSpans(ctx context.Context, spans []tracingpb.RecordedSpan) util.FastIntSet {
var nodes util.FastIntSet
// componentStats is only used to check whether a structured payload item is
// of ComponentStats type.
var componentStats ComponentStats
for i := range spans {
span := &spans[i]
span.Structured(func(item *types.Any) {
if err := span.Structured(func(item *types.Any, _ time.Time) {
if !types.Is(item, &componentStats) {
return
}
Expand All @@ -389,7 +394,10 @@ func ExtractNodesFromSpans(spans []tracingpb.RecordedSpan) util.FastIntSet {
return
}
nodes.Add(int(stats.Component.SQLInstanceID))
})
}); err != nil {
log.Errorf(ctx, "unable to extract nodes from span: %+v", err)
return util.FastIntSet{}
}
}
return nodes
}
2 changes: 1 addition & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func getNodesFromPlanner(planner *planner) []int64 {
if planner.instrumentation.sp != nil {
trace := planner.instrumentation.sp.GetRecording()
// ForEach returns nodes in order.
execinfrapb.ExtractNodesFromSpans(trace).ForEach(func(i int) {
execinfrapb.ExtractNodesFromSpans(planner.EvalContext().Context, trace).ForEach(func(i int) {
nodes = append(nodes, int64(i))
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/protoreflect/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//types",
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/protoreflect/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/gogo/protobuf/jsonpb"
pbtypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -82,9 +83,13 @@ func TestMessageToJSONBRoundTrip(t *testing.T) {
// nested inside other message; with maps
pbname: "cockroach.util.tracing.tracingpb.RecordedSpan",
message: &tracingpb.RecordedSpan{
TraceID: 123,
Tags: map[string]string{"one": "1", "two": "2", "three": "3"},
InternalStructured: []*pbtypes.Any{makeAny(t, &descpb.ColumnDescriptor{Name: "bogus stats"})},
TraceID: 123,
Tags: map[string]string{"one": "1", "two": "2", "three": "3"},
DeprecatedInternalStructured: []*pbtypes.Any{makeAny(t, &descpb.ColumnDescriptor{Name: "bogus stats"})},
StructuredRecords: []*pbtypes.Any{makeAny(t, &tracingpb.StructuredRecord{
Time: timeutil.Now(),
Payload: makeAny(t, &descpb.ColumnDescriptor{Name: "bogus stats"})}),
},
},
},
{ // Message deeply nested inside other message
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"regexp"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require"
)

func TestTableReader(t *testing.T) {
Expand Down Expand Up @@ -438,14 +440,14 @@ func TestLimitScans(t *testing.T) {
if span.Operation == tableReaderProcName {
// Verify that stat collection lines up with results.
stats := execinfrapb.ComponentStats{}
span.Structured(func(item *types.Any) {
require.NoError(t, span.Structured(func(item *types.Any, _ time.Time) {
if !types.Is(item, &stats) {
return
}
if err := types.UnmarshalAny(item, &stats); err != nil {
t.Fatal(err)
}
})
}))

if stats.KV.TuplesRead.Value() != limit {
t.Fatalf("read %d rows, but stats counted: %s", limit, stats.KV.TuplesRead)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowflow/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,15 +875,15 @@ func TestRouterDiskSpill(t *testing.T) {
var stats execinfrapb.ComponentStats
var err error
var unmarshalled bool
span.Structured(func(any *pbtypes.Any) {
require.NoError(t, span.Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &stats) {
return
}
if err = pbtypes.UnmarshalAny(any, &stats); err != nil {
return
}
unmarshalled = true
})
}))
require.NoError(t, err)
require.True(t, unmarshalled)
require.True(t, stats.Inputs[0].NumTuples.HasValue())
Expand Down
16 changes: 10 additions & 6 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,7 +1526,7 @@ func (p *payloadsForSpanGenerator) Start(_ context.Context, _ *kv.Txn) error {
}

// Next implements the tree.ValueGenerator interface.
func (p *payloadsForSpanGenerator) Next(_ context.Context) (bool, error) {
func (p *payloadsForSpanGenerator) Next(ctx context.Context) (bool, error) {
p.payloadIndex++

// If payloadIndex is within payloads and there are some payloads, then we
Expand All @@ -1548,15 +1548,17 @@ func (p *payloadsForSpanGenerator) Next(_ context.Context) (bool, error) {
return false, nil
}
currRecording := p.span.GetRecording()[p.recordingIndex]
currRecording.Structured(func(item *pbtypes.Any) {
if err := currRecording.Structured(func(item *pbtypes.Any, _ time.Time) {
payload, err := protoreflect.MessageToJSON(item, true /* emitDefaults */)
if err != nil {
return
}
if payload != nil {
p.payloads = append(p.payloads, payload)
}
})
}); err != nil {
return false, err
}
}

p.payloadIndex = 0
Expand All @@ -1575,9 +1577,11 @@ func (p *payloadsForSpanGenerator) Values() (tree.Datums, error) {
// leftover from JSON value conversion.
payloadTypeAsString := strings.TrimSuffix(
strings.TrimPrefix(
payloadTypeAsJSON.String(),
"\"type.googleapis.com/cockroach.",
),
strings.TrimPrefix(
payloadTypeAsJSON.String(),
"\"type.googleapis.com/",
),
"cockroach."),
"\"",
)

Expand Down
24 changes: 20 additions & 4 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,17 @@ func (s *crdbSpan) record(msg string) {
}

func (s *crdbSpan) recordStructured(item Structured) {
s.recordInternal(item, &s.mu.recording.structured)
p, err := types.MarshalAny(item)
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
return
}
sr := &tracingpb.StructuredRecord{
Time: time.Now(),
Payload: p,
}
s.recordInternal(sr, &s.mu.recording.structured)
}

// sizable is a subset for protoutil.Message, for payloads (log records and
Expand Down Expand Up @@ -439,16 +449,22 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
}

if numEvents := s.mu.recording.structured.Len(); numEvents != 0 {
rs.InternalStructured = make([]*types.Any, 0, numEvents)
// TODO(adityamaru): Stop writing to DeprecatedInternalStructured in 22.1.
rs.DeprecatedInternalStructured = make([]*types.Any, 0, numEvents)
rs.StructuredRecords = make([]*types.Any, 0, numEvents)
for i := 0; i < numEvents; i++ {
event := s.mu.recording.structured.Get(i).(Structured)
event := s.mu.recording.structured.Get(i).(*tracingpb.StructuredRecord)
item, err := types.MarshalAny(event)
if err != nil {
// An error here is an error from Marshal; these
// are unlikely to happen.
continue
}
rs.InternalStructured = append(rs.InternalStructured, item)
rs.StructuredRecords = append(rs.StructuredRecords, item)
// Write the Structured payload stored in the StructuredRecord, since
// nodes older than 21.2 expect a Structured event when they fetch
// recordings.
rs.DeprecatedInternalStructured = append(rs.DeprecatedInternalStructured, event.Payload)
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,17 @@ func TestGRPCInterceptors(t *testing.T) {
require.NoError(t, err)
var rec tracingpb.RecordedSpan
require.NoError(t, types.UnmarshalAny(recAny, &rec))
require.Len(t, rec.InternalStructured, 1)
require.Len(t, rec.DeprecatedInternalStructured, 1)
require.Len(t, rec.StructuredRecords, 1)
sp.ImportRemoteSpans([]tracingpb.RecordedSpan{rec})
sp.Finish()
var deprecatedN int
var n int
finalRecs := sp.GetRecording()
sp.SetVerbose(false)
for _, rec := range finalRecs {
n += len(rec.InternalStructured)
deprecatedN += len(rec.DeprecatedInternalStructured)
n += len(rec.StructuredRecords)
// Remove all of the _unfinished tags. These crop up because
// in this test we are pulling the recorder in the handler impl,
// but the span is only closed in the interceptor. Additionally,
Expand All @@ -227,6 +230,7 @@ func TestGRPCInterceptors(t *testing.T) {
delete(rec.Tags, "_unfinished")
delete(rec.Tags, "_verbose")
}
require.Equal(t, 1, deprecatedN)
require.Equal(t, 1, n)

exp := fmt.Sprintf(`
Expand Down
Loading

0 comments on commit a7a45dd

Please sign in to comment.