Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument obsreport.Scraper #6460

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/obsreport-scraper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `obsreport.Scraper` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6460]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
21 changes: 15 additions & 6 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func allViews() []*view.View {
views = append(views, receiverViews()...)

// Scraper views.
measures = []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
views = append(views, scraperViews()...)

// Exporter views.
measures = []*stats.Int64Measure{
Expand Down Expand Up @@ -136,6 +131,20 @@ func receiverViews() []*view.View {
return genViews(measures, tagKeys, view.Sum())
}

func scraperViews() []*view.View {
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
return nil
}

measures := []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}

return genViews(measures, tagKeys, view.Sum())
}

func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
Expand Down
94 changes: 82 additions & 12 deletions obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,40 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/receiver/scrapererror"
)

var (
scraperName = "scraper"
scraperScope = scopeName + nameSep + scraperName
)

// Scraper is a helper to add observability to a component.Scraper.
type Scraper struct {
level configtelemetry.Level
receiverID component.ID
scraper component.ID
mutators []tag.Mutator
tracer trace.Tracer

logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
scrapedMetricsPoints syncint64.Counter
erroredMetricsPoints syncint64.Counter
}

// ScraperSettings are settings for creating a Scraper.
Expand All @@ -47,25 +66,67 @@ type ScraperSettings struct {

// NewScraper creates a new Scraper.
func NewScraper(cfg ScraperSettings) (*Scraper, error) {
return &Scraper{
return newScraper(cfg, featuregate.GetRegistry())
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
codeboten marked this conversation as resolved.
Show resolved Hide resolved
scr, err := newScraper(cfg, featuregate.GetRegistry())
if err != nil {
panic(err)
}

return scr
}

func newScraper(cfg ScraperSettings, registry *featuregate.Registry) (*Scraper, error) {
scraper := &Scraper{
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
receiverID: cfg.ReceiverID,
scraper: cfg.Scraper,
mutators: []tag.Mutator{
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
}, nil

logger: cfg.ReceiverCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
},
}

if err := scraper.createOtelMetrics(cfg); err != nil {
return nil, err
}

return scraper, nil
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
scrap, err := NewScraper(cfg)
if err != nil {
panic(err)
func (s *Scraper) createOtelMetrics(cfg ScraperSettings) error {
if !s.useOtelForMetrics {
return nil
}
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)

var errors, err error

return scrap
s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
instrument.WithDescription("Number of metric points that were unable to be scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

return errors
}

// StartMetricsOp is called when a scrape operation is started. The
Expand Down Expand Up @@ -100,10 +161,7 @@ func (s *Scraper) EndMetricsOp(
span := trace.SpanFromContext(scraperCtx)

if s.level != configtelemetry.LevelNone {
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics)
}

// end span according to errors
Expand All @@ -118,3 +176,15 @@ func (s *Scraper) EndMetricsOp(

span.End()
}

func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) {
if s.useOtelForMetrics {
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
} else { // OC for metrics
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
}
}
30 changes: 15 additions & 15 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type testParams struct {
err error
}

func testTelemetry(t *testing.T, testFunc func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry)) {
func testTelemetry(t *testing.T, testFunc func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry)) {
t.Run("WithOC", func(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

testFunc(tt, featuregate.NewRegistry())
testFunc(t, tt, featuregate.NewRegistry())
})

t.Run("WithOTel", func(t *testing.T) {
Expand All @@ -70,12 +70,12 @@ func testTelemetry(t *testing.T, testFunc func(tt obsreporttest.TestTelemetry, r
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

testFunc(tt, registry)
testFunc(t, tt, registry)
})
}

func TestReceiveTraceDataOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestReceiveTraceDataOp(t *testing.T) {
}

func TestReceiveLogsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -170,7 +170,7 @@ func TestReceiveLogsOp(t *testing.T) {
}

func TestReceiveMetricsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -219,10 +219,10 @@ func TestReceiveMetricsOp(t *testing.T) {
}

func TestScrapeMetricsDataOp(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
testTelemetry(t, testScrapeMetricsDataOp)
}

func testScrapeMetricsDataOp(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -232,12 +232,12 @@ func TestScrapeMetricsDataOp(t *testing.T) {
{items: 15, err: nil},
}
for i := range params {
scrp, serr := NewScraper(ScraperSettings{
scrp, err := newScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
require.NoError(t, serr)
}, registry)
require.NoError(t, err)
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestScrapeMetricsDataOp(t *testing.T) {
}

func TestExportTraceDataOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestExportTraceDataOp(t *testing.T) {
}

func TestExportMetricsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down Expand Up @@ -376,7 +376,7 @@ func TestExportMetricsOp(t *testing.T) {
}

func TestExportLogsOp(t *testing.T) {
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand Down
15 changes: 2 additions & 13 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver component.ID, protocol str

// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func CheckScraperMetrics(_ TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperTags := tagsForScraperView(receiver, scraper)
return multierr.Combine(
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
func CheckScraperMetrics(tts TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
}

// checkValueForView checks that for the current exported value in the view with the given name
Expand Down Expand Up @@ -237,14 +234,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error {
return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows)
}

// tagsForScraperView returns the tags that are needed for the scraper views.
func tagsForScraperView(receiver component.ID, scraper component.ID) []tag.Tag {
return []tag.Tag{
{Key: receiverTag, Value: receiver.String()},
{Key: scraperTag, Value: scraper.String()},
}
}

// tagsForProcessorView returns the tags that are needed for the processor views.
func tagsForProcessorView(processor component.ID) []tag.Tag {
return []tag.Tag{
Expand Down
22 changes: 22 additions & 0 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,32 @@ const (
)

var (
scraper = component.NewID("fakeScraper")
receiver = component.NewID("fakeReicever")
exporter = component.NewID("fakeExporter")
)

func TestCheckScraperMetricsViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

s, err := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
require.NoError(t, err)
ctx := s.StartMetricsOp(context.Background())
require.NotNil(t, ctx)
s.EndMetricsOp(ctx, 7, nil)

assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
}

func TestCheckReceiverTracesViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
Expand Down
14 changes: 14 additions & 0 deletions obsreport/obsreporttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ type prometheusChecker struct {
promHandler http.Handler
}

func (pc *prometheusChecker) checkScraperMetrics(receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
}

func (pc *prometheusChecker) checkReceiverTraces(receiver component.ID, protocol string, acceptedSpans, droppedSpans int64) error {
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
return multierr.Combine(
Expand Down Expand Up @@ -145,6 +152,13 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
return parser.TextToMetricFamilies(rr.Body)
}

func attributesForScraperMetrics(receiver component.ID, scraper component.ID) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(receiverTag.Name(), receiver.String()),
attribute.String(scraperTag.Name(), scraper.String()),
}
}
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
func attributesForReceiverMetrics(receiver component.ID, transport string) []attribute.KeyValue {
return []attribute.KeyValue{
Expand Down
Loading