Skip to content

Commit

Permalink
dotnet receiver: convert stream data to metrics (#2608)
Browse files Browse the repository at this point in the history
This is the fifth PR in a series for the dotnet core diagnostics
receiver.

This PR converts the raw metric data extracted from the dotnet stream
into Otel metrics and sends them down the pipeline.

For the full implementation please see #2200
  • Loading branch information
pmcollins authored Mar 17, 2021
1 parent 9fd7b91 commit f8c550f
Show file tree
Hide file tree
Showing 45 changed files with 997 additions and 95 deletions.
12 changes: 12 additions & 0 deletions receiver/dotnetdiagnosticsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,16 @@ type Config struct {
// be displayed by the `dotnet-counters` tool:
// https://docs.microsoft.com/en-us/dotnet/core/diagnostics/dotnet-counters
Counters []string `mapstructure:"counters"`

// LocalDebugDir takes an optional directory name where stream data can be written for
// offline analysis and troubleshooting. If LocalDebugDir is empty, no stream data is
// written. If it has a value, MaxLocalDebugFiles also needs to be set, and stream
// data will be written to disk at the specified location using the naming
// convention `msg.%d.bin` as each message is received, where %d is the current
// message number.
LocalDebugDir string `mapstructure:"local_debug_dir"`
// MaxLocalDebugFiles indicates the maximum number of files kept in LocalDebugDir. When a
// file is written, the oldest one will be deleted if necessary to keep the
// number of files in LocalDebugDir at the specified maximum.
MaxLocalDebugFiles int `mapstructure:"max_local_debug_files"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

func TestEventHeader(t *testing.T) {
h := eventHeader{}
err := parseEventHeader(network.NewMultiReader(network.NewDefaultFakeRW("", "", "")), &h)
rw := network.NewDefaultFakeRW("", "", "")
mr := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseEventHeader(mr, &h)
require.NoError(t, err)
}

Expand All @@ -49,6 +51,7 @@ func testEventHeaderError(t *testing.T, flags headerFlags, errPos int) {
0: {byte(flags)},
},
}
err := parseEventHeader(network.NewMultiReader(pr), &eventHeader{})
mr := network.NewMultiReader(pr, &network.NopBlobWriter{})
err := parseEventHeader(mr, &eventHeader{})
require.Error(t, err)
}
4 changes: 3 additions & 1 deletion receiver/dotnetdiagnosticsreceiver/dotnet/event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func parseEventBlock(r network.MultiReader, fm fieldMetadataMap) (metrics []Metr
if err != nil {
return
}
metrics = append(metrics, m)
if len(m) > 0 {
metrics = append(metrics, m)
}
}

return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestEventParser(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 4)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(1131)
require.NoError(t, err)
metrics, err := parseEventBlock(reader, fms())
Expand Down Expand Up @@ -73,9 +73,9 @@ func TestEventParserErrors(t *testing.T) {

func testEventParserError(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(1131)
rw.SetReadError(i)
rw.ErrOnRead(i)
require.NoError(t, err)
_, err = parseEventBlock(reader, fms())
require.Error(t, err)
Expand Down
8 changes: 4 additions & 4 deletions receiver/dotnetdiagnosticsreceiver/dotnet/ipc_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestIPCParser_NoErrors(t *testing.T) {
0: []byte(magicTerminated),
},
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseIPC(r)
require.NoError(t, err)
}
Expand All @@ -43,7 +43,7 @@ func TestIPCParser_BadMagic(t *testing.T) {
0: []byte("DOTNET_IPC_V2"),
},
}
err := parseIPC(network.NewMultiReader(rw))
err := parseIPC(network.NewMultiReader(rw, &network.NopBlobWriter{}))
require.EqualError(t, err, `ipc header: expected magic "DOTNET_IPC_V1" got "DOTNET_IPC_V2"`)
}

Expand All @@ -55,7 +55,7 @@ func TestIPCParser_BadResponseID(t *testing.T) {
3: {responseError},
},
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseIPC(r)
assert.EqualError(t, err, "ipc header: got error response")
}
Expand All @@ -70,7 +70,7 @@ func testIPCError(t *testing.T, idx int) {
rw := &network.FakeRW{
ReadErrIdx: idx,
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseIPC(r)
require.EqualError(t, err, fmt.Sprintf("deliberate error on read %d", idx))
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestParseMetadata(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 4)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(159)
require.NoError(t, err)
msgs := fieldMetadataMap{}
Expand Down Expand Up @@ -62,11 +62,11 @@ func TestParseMetadataErrors(t *testing.T) {

func testParseMetadataError(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(159)
require.NoError(t, err)

rw.SetReadError(i)
rw.ErrOnRead(i)

msgs := fieldMetadataMap{}
err = parseMetadataBlock(reader, msgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestParseNettrace(t *testing.T) {
2: []byte(nettraceSerialization),
},
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseNettrace(r)
require.NoError(t, err)
}
Expand All @@ -43,7 +43,7 @@ func TestParseNettrace_BadHeaderName(t *testing.T) {
0: []byte("nettrace"),
},
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseNettrace(r)
require.EqualError(t, err, `header name: expected "Nettrace" got "nettrace"`)
}
Expand All @@ -58,7 +58,7 @@ func TestParseNettrace_BadSerializationName(t *testing.T) {
2: []byte(serType),
},
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := parseNettrace(r)
require.EqualError(t, err, `serialization type: expected "!FastSerialization.1" got "foo"`)
}
Expand All @@ -74,6 +74,6 @@ func testParseNettraceReadErr(i int) error {
rw := &network.FakeRW{
ReadErrIdx: i,
}
r := network.NewMultiReader(rw)
r := network.NewMultiReader(rw, &network.NopBlobWriter{})
return parseNettrace(r)
}
25 changes: 17 additions & 8 deletions receiver/dotnetdiagnosticsreceiver/dotnet/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ import (

// Parser encapsulates all of the functionality to parse an IPC stream.
type Parser struct {
r network.MultiReader
logger *zap.Logger
r network.MultiReader
consume func([]Metric)
logger *zap.Logger
}

// NewParser accepts an io.Reader and logger returns a Parser for processing an
// IPC stream.
func NewParser(rdr io.Reader, logger *zap.Logger) *Parser {
r := network.NewMultiReader(rdr)
return &Parser{r: r, logger: logger}
// MetricsConsumer is a function that accepts a slice of Metrics. Parser has a
// member consumer function, used to send Metrics as they are created.
type MetricsConsumer func([]Metric)

// NewParser accepts an io.Reader, a MetricsConsumer, and logger, and returns a
// Parser for processing an IPC stream.
func NewParser(rdr io.Reader, mc MetricsConsumer, bw network.BlobWriter, logger *zap.Logger) *Parser {
r := network.NewMultiReader(rdr, bw)
return &Parser{r: r, consume: mc, logger: logger}
}

// ParseIPC parses the IPC response from the initial request to a dotnet process.
Expand All @@ -58,6 +63,8 @@ func (p *Parser) ParseAll(ctx context.Context) error {
return nil
default:
err = p.parseBlock(fms)
// flush regardless of error
p.r.Flush()
if err != nil {
return err
}
Expand Down Expand Up @@ -97,10 +104,12 @@ func (p *Parser) parseBlock(fms fieldMetadataMap) error {
return err
}
case "EventBlock":
_, err = parseEventBlock(p.r, fms)
var metrics []Metric
metrics, err = parseEventBlock(p.r, fms)
if err != nil {
return err
}
p.consume(metrics)
case "SPBlock":
err = parseSPBlock(p.r)
if err != nil {
Expand Down
57 changes: 53 additions & 4 deletions receiver/dotnetdiagnosticsreceiver/dotnet/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dotnet

import (
"context"
"path"
"testing"

Expand All @@ -36,7 +37,12 @@ func TestParser(t *testing.T) {
8: []byte(fastSerialization),
},
}
p := NewParser(rw, zap.NewNop())
p := NewParser(
rw,
func(metrics []Metric) {},
&network.NopBlobWriter{},
zap.NewNop(),
)
err := p.ParseIPC()
require.NoError(t, err)
err = p.ParseNettrace()
Expand All @@ -47,7 +53,7 @@ func TestParser_TestData(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 16)
require.NoError(t, err)
rw := network.NewBlobReader(data)
parser := NewParser(rw, zap.NewNop())
parser := NewParser(rw, func([]Metric) {}, &network.NopBlobWriter{}, zap.NewNop())
err = parser.ParseIPC()
require.NoError(t, err)
err = parser.ParseNettrace()
Expand Down Expand Up @@ -85,10 +91,10 @@ func testParseBlockError(t *testing.T, data [][]byte, offset, errIdx int) {

func testParseBlock(t *testing.T, data [][]byte, offset, errIdx int) error {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(offset)
require.NoError(t, err)
rw.SetReadError(errIdx)
rw.ErrOnRead(errIdx)
msgs := fieldMetadataMap{}
parser := &Parser{r: reader, logger: zap.NewNop()}
for i := 0; i < 16; i++ {
Expand All @@ -99,3 +105,46 @@ func testParseBlock(t *testing.T, data [][]byte, offset, errIdx int) error {
}
return nil
}

func TestParser_ParseAll_Error(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 16)
require.NoError(t, err)
rw := network.NewBlobReader(data)
parser := NewParser(rw, func([]Metric) {}, &network.NopBlobWriter{}, zap.NewNop())
err = parser.ParseIPC()
require.NoError(t, err)
err = parser.ParseNettrace()
require.NoError(t, err)
rw.StopOnRead(0)
errCh := make(chan error)
go func() {
err = parser.ParseAll(context.Background())
errCh <- err
}()
<-rw.Gate()
rw.ErrOnRead(0)
rw.Gate() <- struct{}{}
require.Error(t, <-errCh)
}

func TestParser_ParseAll_Cancel(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 16)
require.NoError(t, err)
rw := network.NewBlobReader(data)
parser := NewParser(rw, func([]Metric) {}, &network.NopBlobWriter{}, zap.NewNop())
err = parser.ParseIPC()
require.NoError(t, err)
err = parser.ParseNettrace()
require.NoError(t, err)
rw.StopOnRead(0)
errCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go func() {
err = parser.ParseAll(ctx)
errCh <- err
}()
<-rw.Gate()
cancel()
rw.Gate() <- struct{}{}
require.NoError(t, <-errCh)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestParseSPBlock(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 16)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(36870)
require.NoError(t, err)
err = parseSPBlock(reader)
Expand All @@ -44,10 +44,10 @@ func TestParseSPBlock_Errors(t *testing.T) {

func testParseSPBlockError(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(36870)
require.NoError(t, err)
rw.SetReadError(i)
rw.ErrOnRead(i)
err = parseSPBlock(reader)
require.Error(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestParseSerializationType(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 1)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(61)
require.NoError(t, err)
st, err := parseSerializationType(reader)
Expand All @@ -48,11 +48,11 @@ func TestParseSerializationType_Error(t *testing.T) {

func testParseSerializationTypeErr(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
// metadata block is 61 bytes in
err := reader.Seek(61)
require.NoError(t, err)
rw.SetReadError(i)
rw.ErrOnRead(i)
_, err = parseSerializationType(reader)
require.Error(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestParseStackBlock(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 4)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(972)
require.NoError(t, err)
err = parseStackBlock(reader)
Expand All @@ -44,10 +44,10 @@ func TestParseStackBlockErrors(t *testing.T) {

func testParseStackBlockError(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(972)
require.NoError(t, err)
rw.SetReadError(i)
rw.ErrOnRead(i)
err = parseStackBlock(reader)
require.Error(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestTraceParser(t *testing.T) {
data, err := network.ReadBlobData(path.Join("..", "testdata"), 2)
require.NoError(t, err)
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err = reader.Seek(81)
require.NoError(t, err)
err = parseTraceMessage(reader)
Expand All @@ -45,10 +45,10 @@ func TestTraceParser_Errors(t *testing.T) {

func testTraceParserReadError(t *testing.T, data [][]byte, i int) {
rw := network.NewBlobReader(data)
reader := network.NewMultiReader(rw)
reader := network.NewMultiReader(rw, &network.NopBlobWriter{})
err := reader.Seek(81)
require.NoError(t, err)
rw.SetReadError(i)
rw.ErrOnRead(i)
err = parseTraceMessage(reader)
require.Error(t, err)
}
Loading

0 comments on commit f8c550f

Please sign in to comment.