Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument obsreport.Scraper #6460

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .chloggen/obsreport-scraper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `obsreport.Scraper` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6460]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
21 changes: 15 additions & 6 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func allViews() []*view.View {
views = append(views, receiverViews()...)

// Scraper views.
measures = []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
views = append(views, scraperViews()...)

// Exporter views.
measures = []*stats.Int64Measure{
Expand Down Expand Up @@ -136,6 +131,20 @@ func receiverViews() []*view.View {
return genViews(measures, tagKeys, view.Sum())
}

func scraperViews() []*view.View {
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
return nil
}

measures := []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}

return genViews(measures, tagKeys, view.Sum())
}

func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
Expand Down
100 changes: 87 additions & 13 deletions obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,40 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/receiver/scrapererror"
)

var (
scraperName = "scraper"
scraperScope = scopeName + nameSep + scraperName
)

// Scraper is a helper to add observability to a component.Scraper.
type Scraper struct {
level configtelemetry.Level
receiverID component.ID
scraper component.ID
mutators []tag.Mutator
tracer trace.Tracer

logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
scrapedMetricsPoints syncint64.Counter
erroredMetricsPoints syncint64.Counter
}

// ScraperSettings are settings for creating a Scraper.
Expand All @@ -46,26 +65,72 @@ type ScraperSettings struct {
}

// NewScraper creates a new Scraper.
func NewScraper(cfg ScraperSettings) (*Scraper, error) {
return &Scraper{
func NewScraper(cfg ScraperSettings) *Scraper {
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
scr, err := newScraper(cfg, featuregate.GetRegistry())
if err != nil && cfg.ReceiverCreateSettings.Logger != nil {
cfg.ReceiverCreateSettings.Logger.Warn("Error creating an obsreport.Scraper", zap.Error(err))
}
return scr
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
codeboten marked this conversation as resolved.
Show resolved Hide resolved
scr, err := newScraper(cfg, featuregate.GetRegistry())
if err != nil {
panic(err)
}

return scr
}

func newScraper(cfg ScraperSettings, registry *featuregate.Registry) (*Scraper, error) {
scraper := &Scraper{
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
receiverID: cfg.ReceiverID,
scraper: cfg.Scraper,
mutators: []tag.Mutator{
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
}, nil

logger: cfg.ReceiverCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
},
}

if err := scraper.createOtelMetrics(cfg); err != nil {
return nil, err
}

return scraper, nil
}

// Deprecated: [v0.65.0] use NewScraper.
func MustNewScraper(cfg ScraperSettings) *Scraper {
scrap, err := NewScraper(cfg)
if err != nil {
panic(err)
func (s *Scraper) createOtelMetrics(cfg ScraperSettings) error {
if !s.useOtelForMetrics {
return nil
}
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)

return scrap
var errors, err error

s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
instrument.WithDescription("Number of metric points that were unable to be scraped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

return errors
}

// StartMetricsOp is called when a scrape operation is started. The
Expand Down Expand Up @@ -100,10 +165,7 @@ func (s *Scraper) EndMetricsOp(
span := trace.SpanFromContext(scraperCtx)

if s.level != configtelemetry.LevelNone {
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics)
}

// end span according to errors
Expand All @@ -118,3 +180,15 @@ func (s *Scraper) EndMetricsOp(

span.End()
}

func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) {
if s.useOtelForMetrics {
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
} else { // OC for metrics
stats.Record(
scraperCtx,
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
}
}
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
98 changes: 48 additions & 50 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,62 +219,60 @@ func TestReceiveMetricsOp(t *testing.T) {
}

func TestScrapeMetricsDataOp(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is horrible, makes everything be super indented...

/cc @paivagustavo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmacd suggested the creation of new top level function tests as 5961f60, which keeps the same indentation for the existing tests and can be easily removed once we're done with the transition.

cc @moh-osman3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refactor as suggested, and we can get this finalized + merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if my latest commit matches what was suggested.

parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

params := []testParams{
{items: 23, err: partialErrFake},
{items: 29, err: errFake},
{items: 15, err: nil},
}
for i := range params {
scrp, serr := NewScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
require.NoError(t, serr)
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
}
params := []testParams{
{items: 23, err: partialErrFake},
{items: 29, err: errFake},
{items: 15, err: nil},
}
for i := range params {
scrp, err := newScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
}, registry)
require.NoError(t, err)
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
}

spans := tt.SpanRecorder.Ended()
require.Equal(t, len(params), len(spans))
spans := tt.SpanRecorder.Ended()
require.Equal(t, len(params), len(spans))

var scrapedMetricPoints, erroredMetricPoints int
for i, span := range spans {
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
switch {
case params[i].err == nil:
scrapedMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
assert.Equal(t, codes.Unset, span.Status().Code)
case errors.Is(params[i].err, errFake):
erroredMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
var scrapedMetricPoints, erroredMetricPoints int
for i, span := range spans {
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
switch {
case params[i].err == nil:
scrapedMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
assert.Equal(t, codes.Unset, span.Status().Code)
case errors.Is(params[i].err, errFake):
erroredMetricPoints += params[i].items
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)

case errors.Is(params[i].err, partialErrFake):
scrapedMetricPoints += params[i].items
erroredMetricPoints++
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
default:
t.Fatalf("unexpected err param: %v", params[i].err)
case errors.Is(params[i].err, partialErrFake):
scrapedMetricPoints += params[i].items
erroredMetricPoints++
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
assert.Equal(t, codes.Error, span.Status().Code)
assert.Equal(t, params[i].err.Error(), span.Status().Description)
default:
t.Fatalf("unexpected err param: %v", params[i].err)
}
}
}

require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
})
}

func TestExportTraceDataOp(t *testing.T) {
Expand Down
15 changes: 2 additions & 13 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver component.ID, protocol str

// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func CheckScraperMetrics(_ TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperTags := tagsForScraperView(receiver, scraper)
return multierr.Combine(
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
func CheckScraperMetrics(tts TestTelemetry, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
}

// checkValueForView checks that for the current exported value in the view with the given name
Expand Down Expand Up @@ -237,14 +234,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error {
return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows)
}

// tagsForScraperView returns the tags that are needed for the scraper views.
func tagsForScraperView(receiver component.ID, scraper component.ID) []tag.Tag {
return []tag.Tag{
{Key: receiverTag, Value: receiver.String()},
{Key: scraperTag, Value: scraper.String()},
}
}

// tagsForProcessorView returns the tags that are needed for the processor views.
func tagsForProcessorView(processor component.ID) []tag.Tag {
return []tag.Tag{
Expand Down
21 changes: 21 additions & 0 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,31 @@ const (
)

var (
scraper = component.NewID("fakeScraper")
receiver = component.NewID("fakeReicever")
exporter = component.NewID("fakeExporter")
)

func TestCheckScraperMetricsViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

s := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
})
ctx := s.StartMetricsOp(context.Background())
require.NotNil(t, ctx)
s.EndMetricsOp(ctx, 7, nil)

assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
}

func TestCheckReceiverTracesViews(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
Expand Down
Loading