diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go new file mode 100644 index 00000000000..dd03a25f2cc --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -0,0 +1,155 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "sort" + "strings" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +type metricFamilyPdata struct { + // We are composing the already present metricFamily to + // make for a scalable migration, so that we only edit target + // fields progressively, when we are ready to make changes. + metricFamily + mtype pdata.MetricDataType + groups map[string]*metricGroupPdata +} + +// metricGroupPdata, represents a single metric of a metric family. for example a histogram metric is usually represent by +// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for +// simple types like counter and gauge, each data point is a group of itself +type metricGroupPdata struct { + // We are composing the already present metricGroup to + // make for a scalable migration, so that we only edit target + // fields progressively, when we are ready to make changes. + metricGroup + family *metricFamilyPdata +} + +func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily { + familyName := normalizeMetricName(metricName) + + // lookup metadata based on familyName + metadata, ok := mc.Metadata(familyName) + if !ok && metricName != familyName { + // use the original metricName as metricFamily + familyName = metricName + // perform a 2nd lookup with the original metric name. it can happen if there's a metric which is not histogram + // or summary, but ends with one of those _count/_sum suffixes + metadata, ok = mc.Metadata(metricName) + // still not found, this can happen when metric has no TYPE HINT + if !ok { + metadata.Metric = familyName + metadata.Type = textparse.MetricTypeUnknown + } + } + + return &metricFamilyPdata{ + mtype: convToPdataMetricType(metadata.Type), + groups: make(map[string]*metricGroupPdata), + metricFamily: metricFamily{ + name: familyName, + mc: mc, + droppedTimeseries: 0, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + }, + } +} + +// updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus +// receiver removes any label with empty value before feeding it to an appender, in order to figure out all the labels +// from the same metric family we will need to keep track of what labels have ever been observed. +func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) { + for _, l := range ls { + if isUsefulLabelPdata(mf.mtype, l.Name) { + if _, ok := mf.labelKeys[l.Name]; !ok { + mf.labelKeys[l.Name] = true + // use insertion sort to maintain order + i := sort.SearchStrings(mf.labelKeysOrdered, l.Name) + mf.labelKeysOrdered = append(mf.labelKeysOrdered, "") + copy(mf.labelKeysOrdered[i+1:], mf.labelKeysOrdered[i:]) + mf.labelKeysOrdered[i] = l.Name + + } + } + } +} + +// Purposefully being referenced to avoid lint warnings about being "unused". +var _ = (*metricFamilyPdata)(nil).updateLabelKeys + +func (mf *metricFamilyPdata) isCumulativeTypePdata() bool { + return mf.mtype == pdata.MetricDataTypeDoubleSum || + mf.mtype == pdata.MetricDataTypeIntSum || + mf.mtype == pdata.MetricDataTypeHistogram || + mf.mtype == pdata.MetricDataTypeSummary +} + +func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.Labels, ts int64) *metricGroupPdata { + mg, ok := mf.groups[groupKey] + if !ok { + mg = &metricGroupPdata{ + family: mf, + metricGroup: metricGroup{ + ls: ls, + ts: ts, + complexValue: make([]*dataPoint, 0), + }, + } + mf.groups[groupKey] = mg + // maintaining data insertion order is helpful to generate stable/reproducible metric output + mf.groupOrders[groupKey] = len(mf.groupOrders) + } + return mg +} + +func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v float64) error { + groupKey := mf.getGroupKey(ls) + mg := mf.loadMetricGroupOrCreate(groupKey, ls, t) + switch mf.mtype { + case pdata.MetricDataTypeHistogram, pdata.MetricDataTypeSummary: + switch { + case strings.HasSuffix(metricName, metricsSuffixSum): + // always use the timestamp from sum (count is ok too), because the startTs from quantiles won't be reliable + // in cases like remote server restart + mg.ts = t + mg.sum = v + mg.hasSum = true + case strings.HasSuffix(metricName, metricsSuffixCount): + mg.count = v + mg.hasCount = true + default: + boundary, err := getBoundaryPdata(mf.mtype, ls) + if err != nil { + mf.droppedTimeseries++ + return err + } + mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary}) + } + default: + mg.value = v + } + + return nil +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go new file mode 100644 index 00000000000..6f8cc98136e --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/assert" +) + +type byLookupMetadataCache map[string]scrape.MetricMetadata + +func (bmc byLookupMetadataCache) Metadata(familyName string) (scrape.MetricMetadata, bool) { + lookup, ok := bmc[familyName] + return lookup, ok +} + +func (bmc byLookupMetadataCache) SharedLabels() labels.Labels { + return nil +} + +func TestIsCumulativeEquivalence(t *testing.T) { + mc := byLookupMetadataCache{ + "counter": scrape.MetricMetadata{ + Metric: "cr", + Type: textparse.MetricTypeCounter, + Help: "This is some help", + Unit: "By", + }, + "gauge": scrape.MetricMetadata{ + Metric: "ge", + Type: textparse.MetricTypeGauge, + Help: "This is some help", + Unit: "1", + }, + "gaugehistogram": scrape.MetricMetadata{ + Metric: "gh", + Type: textparse.MetricTypeGaugeHistogram, + Help: "This is some help", + Unit: "?", + }, + "histogram": scrape.MetricMetadata{ + Metric: "hg", + Type: textparse.MetricTypeHistogram, + Help: "This is some help", + Unit: "ms", + }, + "summary": scrape.MetricMetadata{ + Metric: "s", + Type: textparse.MetricTypeSummary, + Help: "This is some help", + Unit: "?", + }, + "unknown": scrape.MetricMetadata{ + Metric: "u", + Type: textparse.MetricTypeUnknown, + Help: "This is some help", + Unit: "?", + }, + } + + tests := []struct { + name string + want bool + }{ + {name: "counter", want: true}, + {name: "gauge", want: false}, + {name: "histogram", want: true}, + {name: "gaugehistogram", want: false}, + {name: "does not exist", want: false}, + {name: "unknown", want: false}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + mf := newMetricFamily(tt.name, mc).(*metricFamily) + mfp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative") + assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily") + assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily") + }) + } +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go new file mode 100644 index 00000000000..ea1b2a2f87e --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "strconv" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +func isUsefulLabelPdata(mType pdata.MetricDataType, labelKey string) bool { + switch labelKey { + case model.MetricNameLabel, model.InstanceLabel, model.SchemeLabel, model.MetricsPathLabel, model.JobLabel: + return false + case model.BucketLabel: + return mType != pdata.MetricDataTypeIntHistogram && + mType != pdata.MetricDataTypeHistogram + case model.QuantileLabel: + return mType != pdata.MetricDataTypeSummary + } + return true +} + +func getBoundaryPdata(metricType pdata.MetricDataType, labels labels.Labels) (float64, error) { + labelName := "" + switch metricType { + case pdata.MetricDataTypeHistogram, pdata.MetricDataTypeIntHistogram: + labelName = model.BucketLabel + case pdata.MetricDataTypeSummary: + labelName = model.QuantileLabel + default: + return 0, errNoBoundaryLabel + } + + v := labels.Get(labelName) + if v == "" { + return 0, errEmptyBoundaryLabel + } + + return strconv.ParseFloat(v, 64) +} + +func convToPdataMetricType(metricType textparse.MetricType) pdata.MetricDataType { + switch metricType { + case textparse.MetricTypeCounter: + // always use float64, as it's the internal data type used in prometheus + return pdata.MetricDataTypeDoubleSum + // textparse.MetricTypeUnknown is converted to gauge by default to fix Prometheus untyped metrics from being dropped + case textparse.MetricTypeGauge, textparse.MetricTypeUnknown: + return pdata.MetricDataTypeDoubleGauge + case textparse.MetricTypeHistogram: + return pdata.MetricDataTypeHistogram + // dropping support for gaugehistogram for now until we have an official spec of its implementation + // a draft can be found in: https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.1cvzqd4ksd23 + // case textparse.MetricTypeGaugeHistogram: + // return metricspb.MetricDescriptor_GAUGE_DISTRIBUTION + case textparse.MetricTypeSummary: + return pdata.MetricDataTypeSummary + default: + // including: textparse.MetricTypeInfo, textparse.MetricTypeStateset + return pdata.MetricDataTypeNone + } +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go new file mode 100644 index 00000000000..9853512c949 --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go @@ -0,0 +1,337 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +func TestGetBoundaryEquivalence(t *testing.T) { + cases := []struct { + name string + mtype metricspb.MetricDescriptor_Type + pmtype pdata.MetricDataType + labels labels.Labels + wantValue float64 + wantErr string + }{ + { + name: "cumulative histogram with bucket label", + mtype: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + pmtype: pdata.MetricDataTypeHistogram, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "0.256"}, + }, + wantValue: 0.256, + }, + { + name: "gauge histogram with bucket label", + mtype: metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, + pmtype: pdata.MetricDataTypeIntHistogram, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantValue: 11.71, + }, + { + name: "summary with bucket label", + mtype: metricspb.MetricDescriptor_SUMMARY, + pmtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "QuantileLabel is empty", + }, + { + name: "summary with quantile label", + mtype: metricspb.MetricDescriptor_SUMMARY, + pmtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.QuantileLabel, Value: "92.88"}, + }, + wantValue: 92.88, + }, + { + name: "gauge histogram mismatched with bucket label", + mtype: metricspb.MetricDescriptor_SUMMARY, + pmtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "QuantileLabel is empty", + }, + { + name: "other data types without matches", + mtype: metricspb.MetricDescriptor_GAUGE_DOUBLE, + pmtype: pdata.MetricDataTypeDoubleGauge, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "given metricType has no BucketLabel or QuantileLabel", + }, + } + + for _, tt := range cases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + oldBoundary, oerr := getBoundary(tt.mtype, tt.labels) + pdataBoundary, perr := getBoundaryPdata(tt.pmtype, tt.labels) + assert.Equal(t, oldBoundary, pdataBoundary, "Both boundary values MUST be equal") + assert.Equal(t, oldBoundary, tt.wantValue, "Mismatched boundary messages") + assert.Equal(t, oerr, perr, "The exact same error MUST be returned from both boundary helpers") + + if tt.wantErr != "" { + require.NotEqual(t, oerr, "expected an error from old style boundary retrieval") + require.NotEqual(t, perr, "expected an error from new style boundary retrieval") + require.Contains(t, oerr.Error(), tt.wantErr) + require.Contains(t, perr.Error(), tt.wantErr) + } + }) + } +} + +func TestGetBoundaryPdata(t *testing.T) { + tests := []struct { + name string + mtype pdata.MetricDataType + labels labels.Labels + wantValue float64 + wantErr string + }{ + { + name: "cumulative histogram with bucket label", + mtype: pdata.MetricDataTypeHistogram, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "0.256"}, + }, + wantValue: 0.256, + }, + { + name: "gauge histogram with bucket label", + mtype: pdata.MetricDataTypeIntHistogram, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantValue: 11.71, + }, + { + name: "summary with bucket label", + mtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "QuantileLabel is empty", + }, + { + name: "summary with quantile label", + mtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.QuantileLabel, Value: "92.88"}, + }, + wantValue: 92.88, + }, + { + name: "gauge histogram mismatched with bucket label", + mtype: pdata.MetricDataTypeSummary, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "QuantileLabel is empty", + }, + { + name: "other data types without matches", + mtype: pdata.MetricDataTypeDoubleGauge, + labels: labels.Labels{ + {Name: model.BucketLabel, Value: "11.71"}, + }, + wantErr: "given metricType has no BucketLabel or QuantileLabel", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + value, err := getBoundaryPdata(tt.mtype, tt.labels) + if tt.wantErr != "" { + require.NotNil(t, err) + require.Contains(t, err.Error(), tt.wantErr) + return + } + + require.Nil(t, err) + require.Equal(t, value, tt.wantValue) + }) + } +} + +func TestConvToPdataMetricType(t *testing.T) { + tests := []struct { + name string + mtype textparse.MetricType + want pdata.MetricDataType + }{ + { + name: "textparse.counter", + mtype: textparse.MetricTypeCounter, + want: pdata.MetricDataTypeDoubleSum, + }, + { + name: "textparse.gauge", + mtype: textparse.MetricTypeCounter, + want: pdata.MetricDataTypeDoubleSum, + }, + { + name: "textparse.unknown", + mtype: textparse.MetricTypeUnknown, + want: pdata.MetricDataTypeDoubleGauge, + }, + { + name: "textparse.histogram", + mtype: textparse.MetricTypeHistogram, + want: pdata.MetricDataTypeHistogram, + }, + { + name: "textparse.summary", + mtype: textparse.MetricTypeSummary, + want: pdata.MetricDataTypeSummary, + }, + { + name: "textparse.metric_type_info", + mtype: textparse.MetricTypeInfo, + want: pdata.MetricDataTypeNone, + }, + { + name: "textparse.metric_state_set", + mtype: textparse.MetricTypeStateset, + want: pdata.MetricDataTypeNone, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + got := convToPdataMetricType(tt.mtype) + require.Equal(t, got, tt.want) + }) + } +} + +func TestIsusefulLabelPdata(t *testing.T) { + tests := []struct { + name string + mtypes []pdata.MetricDataType + labelKeys []string + want bool + }{ + { + name: `unuseful "metric","instance","scheme","path","job" with any kind`, + labelKeys: []string{ + model.MetricNameLabel, model.InstanceLabel, model.SchemeLabel, model.MetricsPathLabel, model.JobLabel, + }, + mtypes: []pdata.MetricDataType{ + pdata.MetricDataTypeDoubleSum, + pdata.MetricDataTypeDoubleGauge, + pdata.MetricDataTypeIntHistogram, + pdata.MetricDataTypeHistogram, + pdata.MetricDataTypeSummary, + pdata.MetricDataTypeIntSum, + pdata.MetricDataTypeNone, + pdata.MetricDataTypeIntGauge, + pdata.MetricDataTypeIntSum, + }, + want: false, + }, + { + name: `bucket label with "int_histogram", "histogram":: non-useful`, + mtypes: []pdata.MetricDataType{pdata.MetricDataTypeIntHistogram, pdata.MetricDataTypeHistogram}, + labelKeys: []string{model.BucketLabel}, + want: false, + }, + { + name: `bucket label with non "int_histogram", "histogram":: useful`, + mtypes: []pdata.MetricDataType{ + pdata.MetricDataTypeDoubleSum, + pdata.MetricDataTypeDoubleGauge, + pdata.MetricDataTypeSummary, + pdata.MetricDataTypeIntSum, + pdata.MetricDataTypeNone, + pdata.MetricDataTypeIntGauge, + pdata.MetricDataTypeIntSum, + }, + labelKeys: []string{model.BucketLabel}, + want: true, + }, + { + name: `quantile label with "summary": non-useful`, + mtypes: []pdata.MetricDataType{ + pdata.MetricDataTypeSummary, + }, + labelKeys: []string{model.QuantileLabel}, + want: false, + }, + { + name: `quantile label with non-"summary": useful`, + labelKeys: []string{model.QuantileLabel}, + mtypes: []pdata.MetricDataType{ + pdata.MetricDataTypeDoubleSum, + pdata.MetricDataTypeDoubleGauge, + pdata.MetricDataTypeIntHistogram, + pdata.MetricDataTypeHistogram, + pdata.MetricDataTypeIntSum, + pdata.MetricDataTypeNone, + pdata.MetricDataTypeIntGauge, + pdata.MetricDataTypeIntSum, + }, + want: true, + }, + { + name: `any other label with any type:: useful`, + labelKeys: []string{"any_label", "foo.bar"}, + mtypes: []pdata.MetricDataType{ + pdata.MetricDataTypeDoubleSum, + pdata.MetricDataTypeDoubleGauge, + pdata.MetricDataTypeIntHistogram, + pdata.MetricDataTypeHistogram, + pdata.MetricDataTypeSummary, + pdata.MetricDataTypeIntSum, + pdata.MetricDataTypeNone, + pdata.MetricDataTypeIntGauge, + pdata.MetricDataTypeIntSum, + }, + want: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, mtype := range tt.mtypes { + for _, labelKey := range tt.labelKeys { + got := isUsefulLabelPdata(mtype, labelKey) + assert.Equal(t, got, tt.want) + } + } + }) + } +}