Skip to content

Commit

Permalink
Revert "Honor Prometheus external labels (open-telemetry#3127)"
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed May 17, 2021
1 parent a7313e9 commit 18430e1
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 161 deletions.
24 changes: 2 additions & 22 deletions receiver/prometheusreceiver/internal/ocastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,12 @@ type OcaStore struct {
useStartTimeMetric bool
startTimeMetricRegex string
receiverID config.ComponentID
externalLabels labels.Labels

logger *zap.Logger
}

// NewOcaStore returns an ocaStore instance, which can be acted as prometheus' scrape.Appendable
func NewOcaStore(
ctx context.Context,
sink consumer.Metrics,
logger *zap.Logger,
jobsMap *JobsMap,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
externalLabels labels.Labels) *OcaStore {
func NewOcaStore(ctx context.Context, sink consumer.Metrics, logger *zap.Logger, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID) *OcaStore {
return &OcaStore{
running: runningStateInit,
ctx: ctx,
Expand All @@ -73,7 +64,6 @@ func NewOcaStore(
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
externalLabels: externalLabels,
}
}

Expand All @@ -88,17 +78,7 @@ func (o *OcaStore) SetScrapeManager(scrapeManager *scrape.Manager) {
func (o *OcaStore) Appender(context.Context) storage.Appender {
state := atomic.LoadInt32(&o.running)
if state == runningStateReady {
return newTransaction(
o.ctx,
o.jobsMap,
o.useStartTimeMetric,
o.startTimeMetricRegex,
o.receiverID,
o.mc,
o.sink,
o.externalLabels,
o.logger,
)
return newTransaction(o.ctx, o.jobsMap, o.useStartTimeMetric, o.startTimeMetricRegex, o.receiverID, o.mc, o.sink, o.logger)
} else if state == runningStateInit {
panic("ScrapeManager is not set")
}
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/ocastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
)

func TestOcaStore(t *testing.T) {
o := NewOcaStore(context.Background(), nil, nil, nil, false, "", config.NewID("prometheus"), nil)

o := NewOcaStore(context.Background(), nil, nil, nil, false, "", config.NewID("prometheus"))
o.SetScrapeManager(&scrape.Manager{})

app := o.Appender(context.Background())
Expand Down
18 changes: 2 additions & 16 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,10 @@ type transaction struct {
node *commonpb.Node
resource *resourcepb.Resource
metricBuilder *metricBuilder
externalLabels labels.Labels
logger *zap.Logger
}

func newTransaction(
ctx context.Context,
jobsMap *JobsMap,
useStartTimeMetric bool,
startTimeMetricRegex string,
receiverID config.ComponentID,
ms *metadataService,
sink consumer.Metrics,
externalLabels labels.Labels,
logger *zap.Logger) *transaction {
func newTransaction(ctx context.Context, jobsMap *JobsMap, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID, ms *metadataService, sink consumer.Metrics, logger *zap.Logger) *transaction {
return &transaction{
id: atomic.AddInt64(&idSeq, 1),
ctx: ctx,
Expand All @@ -97,7 +87,6 @@ func newTransaction(
startTimeMetricRegex: startTimeMetricRegex,
receiverID: receiverID,
ms: ms,
externalLabels: externalLabels,
logger: logger,
}
}
Expand All @@ -120,10 +109,7 @@ func (tr *transaction) Append(ref uint64, ls labels.Labels, t int64, v float64)
return 0, errTransactionAborted
default:
}
if len(tr.externalLabels) > 0 {
// TODO(jbd): Improve the allocs.
ls = append(ls, tr.externalLabels...)
}

if tr.isNew {
if err := tr.initTransaction(ls); err != nil {
return 0, err
Expand Down
14 changes: 7 additions & 7 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func Test_transaction(t *testing.T) {

t.Run("Commit Without Adding", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
})

t.Run("Rollback dose nothing", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
if got := tr.Rollback(); got != nil {
t.Errorf("expecting nil from Rollback() but got err %v", got)
}
Expand All @@ -83,7 +83,7 @@ func Test_transaction(t *testing.T) {
badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}})
t.Run("Add One No Target", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
if _, got := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -95,7 +95,7 @@ func Test_transaction(t *testing.T) {
{Name: "foo", Value: "bar"}})
t.Run("Add One Job not found", func(t *testing.T) {
nomc := consumertest.NewNop()
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, nomc, testLogger)
if _, got := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil {
t.Errorf("expecting error from Add() but got nil")
}
Expand All @@ -106,7 +106,7 @@ func Test_transaction(t *testing.T) {
{Name: "__name__", Value: "foo"}})
t.Run("Add One Good", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func Test_transaction(t *testing.T) {

t.Run("Error when start time is zero", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand All @@ -155,7 +155,7 @@ func Test_transaction(t *testing.T) {

t.Run("Drop NaN value", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, testLogger)
if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
Expand Down
12 changes: 2 additions & 10 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,8 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error {
// Per component.Component Start instructions, for async operations we should not use the
// incoming context, it may get cancelled.
receiverCtx := obsreport.ReceiverContext(context.Background(), r.cfg.ID(), transport)
ocaStore := internal.NewOcaStore(
receiverCtx,
r.consumer,
r.logger,
jobsMap,
r.cfg.UseStartTimeMetric,
r.cfg.StartTimeMetricRegex,
r.cfg.ID(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
)
ocaStore := internal.NewOcaStore(receiverCtx, r.consumer, r.logger, jobsMap, r.cfg.UseStartTimeMetric, r.cfg.StartTimeMetricRegex, r.cfg.ID())

scrapeManager := scrape.NewManager(logger, ocaStore)
ocaStore.SetScrapeManager(scrapeManager)
if err := scrapeManager.ApplyConfig(r.cfg.PrometheusConfig); err != nil {
Expand Down
105 changes: 0 additions & 105 deletions receiver/prometheusreceiver/metrics_reciever_external_labels_test.go

This file was deleted.

0 comments on commit 18430e1

Please sign in to comment.