Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Honor Prometheus external labels #3127

Merged
merged 3 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,21 @@ type OcaStore struct {
useStartTimeMetric bool
startTimeMetricRegex string
receiverID config.ComponentID
externalLabels labels.Labels

logger *zap.Logger
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(ctx context.Context, sink consumer.Metrics, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID) *OcaStore {
func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
logger *zap.Logger,
jobsMap *JobsMap,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
externalLabels labels.Labels) *OcaStore {
return &OcaStore{
running: runningStateInit,
ctx: ctx,
Expand All @@ -64,6 +73,7 @@ func NewOcaStore(ctx context.Context, sink consumer.Metrics, logger *zap.Logger,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
}
}

Expand All @@ -78,7 +88,17 @@ func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.startTimeMetricRegex, o.receiverID, o.mc, o.sink, o.logger)
return newTransaction(
o.ctx,
o.jobsMap,
o.useStartTimeMetric,
o.startTimeMetricRegex,
o.receiverID,
o.mc,
o.sink,
o.externalLabels,
o.logger,
)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
}
Expand Down
3 changes: 1 addition & 2 deletions receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
)

func TestOcaStore(t *testing.T) {

o := NewOcaStore(context.Background(), nil, nil, nil, false, "", config.NewID("prometheus"))
o := NewOcaStore(context.Background(), nil, nil, nil, false, "", config.NewID("prometheus"), nil)
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender(context.Background())
Expand Down
18 changes: 16 additions & 2 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,20 @@ type transaction struct {
node *commonpb.Node
resource *resourcepb.Resource
metricBuilder *metricBuilder
externalLabels labels.Labels
logger *zap.Logger
}

func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID, ms *metadataService, sink consumer.Metrics, logger *zap.Logger) *transaction {
func newTransaction(
ctx context.Context,
jobsMap *JobsMap,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
ms *metadataService,
sink consumer.Metrics,
externalLabels labels.Labels,
logger *zap.Logger) *transaction {
return &transaction{
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
Expand All @@ -87,6 +97,7 @@ func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bo
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
ms: ms,
externalLabels: externalLabels,
logger: logger,
}
}
Expand All @@ -109,7 +120,10 @@ func (tr *transaction) Append(ref uint64, ls labels.Labels, t int64, v float64)
return 0, errTransactionAborted
default:
}

if len(tr.externalLabels) > 0 {
// TODO(jbd): Improve the allocs.
ls = append(ls, tr.externalLabels...)
}
if tr.isNew {
if err := tr.initTransaction(ls); err != nil {
return 0, err
Expand Down
14 changes: 7 additions & 7 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ func Test_transaction(t *testing.T) {

t.Run("Commit Without Adding", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
})

t.Run("Rollback dose nothing", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
if got := tr.Rollback(); got != nil {
t.Errorf("expecting nil from Rollback() but got err %v", got)
}
Expand All @@ -81,7 +81,7 @@ func Test_transaction(t *testing.T) {
badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}})
t.Run("Add One No Target", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
if _, got := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -93,7 +93,7 @@ func Test_transaction(t *testing.T) {
{Name: "foo", Value: "bar"}})
t.Run("Add One Job not found", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
if _, got := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -104,7 +104,7 @@ func Test_transaction(t *testing.T) {
{Name: "__name__", Value: "foo"}})
t.Run("Add One Good", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func Test_transaction(t *testing.T) {

t.Run("Error when start time is zero", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand All @@ -149,7 +149,7 @@ func Test_transaction(t *testing.T) {

t.Run("Drop NaN value", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand Down
12 changes: 10 additions & 2 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
// Per component.Component Start instructions, for async operations we should not use the
// incoming context, it may get cancelled.
receiverCtx := obsreport.ReceiverContext(context.Background(), r.cfg.ID(), transport)
ocaStore := internal.NewOcaStore(receiverCtx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.ID())

ocaStore := internal.NewOcaStore(
receiverCtx,
r.consumer,
r.logger,
jobsMap,
r.cfg.UseStartTimeMetric,
r.cfg.StartTimeMetricRegex,
r.cfg.ID(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
)
scrapeManager := scrape.NewManager(logger, ocaStore)
ocaStore.SetScrapeManager(scrapeManager)
if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
Expand Down
105 changes: 105 additions & 0 deletions receiver/prometheusreceiver/metrics_reciever_external_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusreceiver

import (
"context"
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/translator/internaldata"
)

const targetExternalLabels = `
# HELP go_threads Number of OS threads created
# TYPE go_threads gauge
go_threads 19`

func TestExternalLabels(t *testing.T) {
ctx := context.Background()
targets := []*testData{
{
name: "target1",
pages: []mockPrometheusResponse{
{code: 200, data: targetExternalLabels},
},
validateFunc: verifyExternalLabels,
},
}

mp, cfg, err := setupMockPrometheus(targets...)
cfg.GlobalConfig.ExternalLabels = labels.FromStrings("key", "value")
require.Nilf(t, err, "Failed to create Promtheus config: %v", err)
defer mp.Close()

cms := new(consumertest.MetricsSink)
receiver := newPrometheusReceiver(logger, &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)),
PrometheusConfig: cfg}, cms)

require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost()), "Failed to invoke Start: %v", err)
t.Cleanup(func() { require.NoError(t, receiver.Shutdown(ctx)) })

mp.wg.Wait()
metrics := cms.AllMetrics()

results := make(map[string][]internaldata.MetricsData)
for _, m := range metrics {
ocmds := internaldata.MetricsToOC(m)
for _, ocmd := range ocmds {
result, ok := results[ocmd.Node.ServiceInfo.Name]
if !ok {
result = make([]internaldata.MetricsData, 0)
}
results[ocmd.Node.ServiceInfo.Name] = append(result, ocmd)
}
}
for _, target := range targets {
target.validateFunc(t, target, results[target.name])
}
}

func verifyExternalLabels(t *testing.T, td *testData, mds []internaldata.MetricsData) {
verifyNumScrapeResults(t, td, mds)

wantG1 := &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "go_threads",
Description: "Number of OS threads created",
Type: metricspb.MetricDescriptor_GAUGE_DOUBLE,
LabelKeys: []*metricspb.LabelKey{{Key: "key"}},
},
Timeseries: []*metricspb.TimeSeries{
{
Points: []*metricspb.Point{
{Value: &metricspb.Point_DoubleValue{DoubleValue: 19.0}},
},
LabelValues: []*metricspb.LabelValue{
{Value: "value", HasValue: true},
},
},
},
}
gotG1 := mds[0].Metrics[0]
ts1 := gotG1.Timeseries[0].Points[0].Timestamp
wantG1.Timeseries[0].Points[0].Timestamp = ts1
doCompare("scrape-externalLabels", t, wantG1, gotG1)
}