diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index 712624a36903..bd118a15682c 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -94,11 +94,43 @@ func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.Instrumen return a.accumulateIntHistogram(metric, il, now) case pdata.MetricDataTypeHistogram: return a.accumulateDoubleHistogram(metric, il, now) + case pdata.MetricDataTypeSummary: + return a.accumulateSummary(metric, il, now) + default: + a.logger.With( + zap.String("data_type", string(metric.DataType())), + zap.String("metric_name", metric.Name()), + ).Error("failed to translate metric") } return 0 } +func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) { + dps := metric.Summary().DataPoints() + for i := 0; i < dps.Len(); i++ { + ip := dps.At(i) + + signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap()) + + v, ok := a.registeredMetrics.Load(signature) + stalePoint := ok && + ip.Timestamp().AsTime().Before(v.(*accumulatedValue).value.Summary().DataPoints().At(0).Timestamp().AsTime()) + + if stalePoint { + // Only keep this datapoint if it has a later timestamp. + continue + } + + mm := createMetric(metric) + mm.Summary().DataPoints().Append(ip) + a.registeredMetrics.Store(signature, &accumulatedValue{value: mm, instrumentationLibrary: il, updated: now}) + n++ + } + + return n +} + func (a *lastValueAccumulator) accumulateIntGauge(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) { dps := metric.IntGauge().DataPoints() for i := 0; i < dps.Len(); i++ { diff --git a/exporter/prometheusexporter/collector.go b/exporter/prometheusexporter/collector.go index e7a26accc181..e6bbdda68970 100644 --- a/exporter/prometheusexporter/collector.go +++ b/exporter/prometheusexporter/collector.go @@ -70,6 +70,8 @@ func (c *collector) convertMetric(metric pdata.Metric) (prometheus.Metric, error return c.convertIntHistogram(metric) case pdata.MetricDataTypeHistogram: return c.convertDoubleHistogram(metric) + case pdata.MetricDataTypeSummary: + return c.convertSummary(metric) } return nil, errUnknownMetricType @@ -207,6 +209,30 @@ func (c *collector) convertIntHistogram(metric pdata.Metric) (prometheus.Metric, return m, nil } +func (c *collector) convertSummary(metric pdata.Metric) (prometheus.Metric, error) { + // TODO: In the off chance that we have multiple points + // within the same metric, how should we handle them? + point := metric.Summary().DataPoints().At(0) + + quantiles := make(map[float64]float64) + qv := point.QuantileValues() + for j := 0; j < qv.Len(); j++ { + qvj := qv.At(j) + // There should be EXACTLY one quantile value lest it is an invalid exposition. + quantiles[qvj.Quantile()] = qvj.Value() + } + + desc, labelValues := c.getMetricMetadata(metric, point.LabelsMap()) + m, err := prometheus.NewConstSummary(desc, point.Count(), point.Sum(), quantiles, labelValues...) + if err != nil { + return nil, err + } + if c.sendTimestamps { + return prometheus.NewMetricWithTimestamp(point.Timestamp().AsTime(), m), nil + } + return m, nil +} + func (c *collector) convertDoubleHistogram(metric pdata.Metric) (prometheus.Metric, error) { ip := metric.Histogram().DataPoints().At(0) desc, labels := c.getMetricMetadata(metric, ip.LabelsMap()) diff --git a/exporter/prometheusexporter/collector_test.go b/exporter/prometheusexporter/collector_test.go index fde6e663260e..856cea652bfb 100644 --- a/exporter/prometheusexporter/collector_test.go +++ b/exporter/prometheusexporter/collector_test.go @@ -349,10 +349,12 @@ func TestCollectMetrics(t *testing.T) { require.Equal(t, tt.value, *pbMetric.Counter.Value) require.Nil(t, pbMetric.Gauge) require.Nil(t, pbMetric.Histogram) + require.Nil(t, pbMetric.Summary) case prometheus.GaugeValue: require.Equal(t, tt.value, *pbMetric.Gauge.Value) require.Nil(t, pbMetric.Counter) require.Nil(t, pbMetric.Histogram) + require.Nil(t, pbMetric.Summary) } } require.Equal(t, 1, j) @@ -488,3 +490,115 @@ func TestAccumulateHistograms(t *testing.T) { } } } + +func TestAccumulateSummary(t *testing.T) { + quantileValue := func(pN, value float64) pdata.ValueAtQuantile { + vqpN := pdata.NewValueAtQuantile() + vqpN.SetQuantile(pN) + vqpN.SetValue(value) + return vqpN + } + quantilesFromMap := func(qf map[float64]float64) (qL []*io_prometheus_client.Quantile) { + f64Ptr := func(v float64) *float64 { return &v } + for quantile, value := range qf { + qL = append(qL, &io_prometheus_client.Quantile{ + Quantile: f64Ptr(quantile), Value: f64Ptr(value), + }) + } + return qL + } + tests := []struct { + name string + metric func(time.Time) pdata.Metric + wantSum float64 + wantCount uint64 + wantQuantiles []*io_prometheus_client.Quantile + }{ + { + name: "Summary with single point", + wantSum: 0.012, + wantCount: 10, + wantQuantiles: quantilesFromMap(map[float64]float64{ + 0.50: 190, + 0.99: 817, + }), + metric: func(ts time.Time) (metric pdata.Metric) { + sp := pdata.NewSummaryDataPoint() + sp.SetCount(10) + sp.SetSum(0.012) + sp.SetCount(10) + sp.LabelsMap().Insert("label_1", "1") + sp.LabelsMap().Insert("label_2", "2") + sp.SetTimestamp(pdata.TimestampFromTime(ts)) + + sp.QuantileValues().Append(quantileValue(0.50, 190)) + sp.QuantileValues().Append(quantileValue(0.99, 817)) + + metric = pdata.NewMetric() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeSummary) + metric.Summary().DataPoints().Append(sp) + metric.SetDescription("test description") + + return + }, + }, + } + + for _, tt := range tests { + for _, sendTimestamp := range []bool{true, false} { + name := tt.name + if sendTimestamp { + name += "/WithTimestamp" + } + t.Run(name, func(t *testing.T) { + ts := time.Now() + metric := tt.metric(ts) + c := collector{ + accumulator: &mockAccumulator{ + []pdata.Metric{metric}, + }, + sendTimestamps: sendTimestamp, + logger: zap.NewNop(), + } + + ch := make(chan prometheus.Metric, 1) + go func() { + c.Collect(ch) + close(ch) + }() + + n := 0 + for m := range ch { + n++ + require.Contains(t, m.Desc().String(), "fqName: \"test_metric\"") + require.Contains(t, m.Desc().String(), "variableLabels: [label_1 label_2]") + + pbMetric := io_prometheus_client.Metric{} + m.Write(&pbMetric) + + labelsKeys := map[string]string{"label_1": "1", "label_2": "2"} + for _, l := range pbMetric.Label { + require.Equal(t, labelsKeys[*l.Name], *l.Value) + } + + if sendTimestamp { + require.Equal(t, ts.UnixNano()/1e6, *(pbMetric.TimestampMs)) + } else { + require.Nil(t, pbMetric.TimestampMs) + } + + require.Nil(t, pbMetric.Gauge) + require.Nil(t, pbMetric.Counter) + require.Nil(t, pbMetric.Histogram) + + s := *pbMetric.Summary + require.Equal(t, tt.wantCount, *s.SampleCount) + require.Equal(t, tt.wantSum, *s.SampleSum) + require.Equal(t, tt.wantQuantiles, s.Quantile) + } + require.Equal(t, 1, n) + }) + } + } +} diff --git a/exporter/prometheusexporter/end_to_end_test.go b/exporter/prometheusexporter/end_to_end_test.go new file mode 100644 index 000000000000..3534241f1fd1 --- /dev/null +++ b/exporter/prometheusexporter/end_to_end_test.go @@ -0,0 +1,195 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusexporter + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "regexp" + "testing" + "time" + + promconfig "github.com/prometheus/prometheus/config" + "go.uber.org/zap" + "gopkg.in/yaml.v2" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/receiver/prometheusreceiver" +) + +func TestEndToEndSummarySupport(t *testing.T) { + if testing.Short() { + t.Skip("This test can take a couple of seconds") + } + + // 1. Create the Prometheus scrape endpoint. + waitForScrape := make(chan bool, 1) + shutdown := make(chan bool, 1) + dropWizardServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + select { + case <-shutdown: + return + case waitForScrape <- true: + // Serve back the metrics as if they were from DropWizard. + rw.Write([]byte(dropWizardResponse)) + } + })) + defer dropWizardServer.Close() + defer close(shutdown) + + srvURL, err := url.Parse(dropWizardServer.URL) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 2. Create the Prometheus metrics exporter that'll receive and verify the metrics produced. + exporterCfg := &Config{ + ExporterSettings: config.NewExporterSettings("prometheus"), + Namespace: "test", + Endpoint: ":8787", + SendTimestamps: true, + MetricExpiration: 2 * time.Hour, + } + exporterFactory := NewFactory() + creationParams := component.ExporterCreateParams{Logger: zap.NewNop()} + exporter, err := exporterFactory.CreateMetricsExporter(ctx, creationParams, exporterCfg) + if err != nil { + t.Fatal(err) + } + if err = exporter.Start(ctx, nil); err != nil { + t.Fatalf("Failed to start the Prometheus receiver: %v", err) + } + defer exporter.Shutdown(ctx) + + // 3. Create the Prometheus receiver scraping from the DropWizard mock server and + // it'll feed scraped and converted metrics then pass them to the Prometheus exporter. + yamlConfig := []byte(fmt.Sprintf(` + global: + scrape_interval: 2ms + + scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 2ms + static_configs: + - targets: ['%s'] + `, srvURL.Host)) + receiverConfig := new(promconfig.Config) + if err = yaml.Unmarshal(yamlConfig, receiverConfig); err != nil { + t.Fatal(err) + } + + receiverFactory := prometheusreceiver.NewFactory() + receiverCreateParams := component.ReceiverCreateParams{ + Logger: zap.NewNop(), + } + rcvCfg := &prometheusreceiver.Config{ + PrometheusConfig: receiverConfig, + ReceiverSettings: config.ReceiverSettings{ + TypeVal: "prometheus", + NameVal: "prometheus", + }, + } + // 3.5 Create the Prometheus receiver and pass in the preivously created Prometheus exporter. + prometheusReceiver, err := receiverFactory.CreateMetricsReceiver(ctx, receiverCreateParams, rcvCfg, exporter) + if err != nil { + t.Fatal(err) + } + if err = prometheusReceiver.Start(ctx, nil); err != nil { + t.Fatalf("Failed to start the Prometheus receiver: %v", err) + } + defer prometheusReceiver.Shutdown(ctx) + + // 4. Scrape from the Prometheus exporter to ensure that we export summary metrics + // We shall let the Prometheus exporter scrape the DropWizard mock server, at least 9 times. + for i := 0; i < 8; i++ { + <-waitForScrape + } + res, err := http.Get("http://localhost" + exporterCfg.Endpoint + "/metrics") + if err != nil { + t.Fatalf("Failed to scrape from the exporter: %v", err) + } + prometheusExporterScrape, err := ioutil.ReadAll(res.Body) + res.Body.Close() + if err != nil { + t.Fatal(err) + } + + // 5. Verify that we have the summary metrics and that their values make sense. + wantLineRegexps := []string{ + `. HELP test_jvm_gc_collection_seconds Time spent in a given JVM garbage collector in seconds.`, + `. TYPE test_jvm_gc_collection_seconds summary`, + `test_jvm_gc_collection_seconds_sum.gc="G1 Old Generation". 0.*`, + `test_jvm_gc_collection_seconds_count.gc="G1 Old Generation". 0.*`, + `test_jvm_gc_collection_seconds_sum.gc="G1 Young Generation". 0.*`, + `test_jvm_gc_collection_seconds_count.gc="G1 Young Generation". 0.*`, + `. HELP test_jvm_info JVM version info`, + `. TYPE test_jvm_info gauge`, + `test_jvm_info.vendor="Oracle Corporation",version="9.0.4.11". 1.*`, + `. HELP test_jvm_memory_pool_bytes_used Used bytes of a given JVM memory pool.`, + `. TYPE test_jvm_memory_pool_bytes_used gauge`, + `test_jvm_memory_pool_bytes_used.pool="CodeHeap 'non.nmethods'". 1.277952e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="CodeHeap 'non.profiled nmethods'". 2.869376e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="CodeHeap 'profiled nmethods'". 6.871168e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="Compressed Class Space". 2.751312e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="G1 Eden Space". 4.4040192e.07.*`, + `test_jvm_memory_pool_bytes_used.pool="G1 Old Gen". 4.385408e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="G1 Survivor Space". 8.388608e.06.*`, + `test_jvm_memory_pool_bytes_used.pool="Metaspace". 2.6218176e.07.*`, + } + + // 5.5: Perform a complete line by line prefix verification to ensure we extract back the inputs + // we'd expect after scraping Prometheus. + for _, wantLineRegexp := range wantLineRegexps { + reg := regexp.MustCompile(wantLineRegexp) + prometheusExporterScrape = reg.ReplaceAll(prometheusExporterScrape, []byte("")) + } + // After this replacement, there should ONLY be newlines present. + prometheusExporterScrape = bytes.ReplaceAll(prometheusExporterScrape, []byte("\n"), []byte("")) + // Now assert that NO output was left over. + if len(prometheusExporterScrape) != 0 { + t.Fatalf("Left-over unmatched Prometheus scrape content: %q\n", prometheusExporterScrape) + } + +} + +const dropWizardResponse = ` +# HELP jvm_memory_pool_bytes_used Used bytes of a given JVM memory pool. +# TYPE jvm_memory_pool_bytes_used gauge +jvm_memory_pool_bytes_used{pool="CodeHeap 'non-nmethods'",} 1277952.0 +jvm_memory_pool_bytes_used{pool="Metaspace",} 2.6218176E7 +jvm_memory_pool_bytes_used{pool="CodeHeap 'profiled nmethods'",} 6871168.0 +jvm_memory_pool_bytes_used{pool="Compressed Class Space",} 2751312.0 +jvm_memory_pool_bytes_used{pool="G1 Eden Space",} 4.4040192E7 +jvm_memory_pool_bytes_used{pool="G1 Old Gen",} 4385408.0 +jvm_memory_pool_bytes_used{pool="G1 Survivor Space",} 8388608.0 +jvm_memory_pool_bytes_used{pool="CodeHeap 'non-profiled nmethods'",} 2869376.0 +# HELP jvm_info JVM version info +# TYPE jvm_info gauge +jvm_info{version="9.0.4+11",vendor="Oracle Corporation",} 1.0 +# HELP jvm_gc_collection_seconds Time spent in a given JVM garbage collector in seconds. +# TYPE jvm_gc_collection_seconds summary +jvm_gc_collection_seconds_count{gc="G1 Young Generation",} 9.0 +jvm_gc_collection_seconds_sum{gc="G1 Young Generation",} 0.229 +jvm_gc_collection_seconds_count{gc="G1 Old Generation",} 0.0 +jvm_gc_collection_seconds_sum{gc="G1 Old Generation",} 0.0`