diff --git a/exporter/cortexexporter/README.md b/exporter/cortexexporter/README.md new file mode 100644 index 00000000000..034d2388d3c --- /dev/null +++ b/exporter/cortexexporter/README.md @@ -0,0 +1 @@ +To be added. diff --git a/exporter/cortexexporter/cortex.go b/exporter/cortexexporter/cortex.go new file mode 100644 index 00000000000..d419e7ec04e --- /dev/null +++ b/exporter/cortexexporter/cortex.go @@ -0,0 +1,244 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cortexexporter + +import ( + "context" + "fmt" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + "net/http" + "strconv" + "strings" + "sync" +) +// TODO: get default labels such as job or instance from Resource + +// cortexExporter converts OTLP metrics to Cortex TimeSeries and sends them to a remote endpoint +type cortexExporter struct { + namespace string + endpoint string + client *http.Client + headers map[string]string + wg *sync.WaitGroup + closeChan chan struct{} +} + +// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into +// its corresponding TimeSeries in tsMap. +// tsMap and metric cannot be nil, and metric must have a non-nil descriptor +func (ce *cortexExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + mType := metric.MetricDescriptor.Type + switch mType { + // int points + case otlp.MetricDescriptor_MONOTONIC_INT64,otlp.MetricDescriptor_INT64: + if metric.Int64DataPoints == nil { + return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + for _, pt := range metric.Int64DataPoints { + name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + lbs := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: float64(pt.Value), + Timestamp: int64(pt.TimeUnixNano), + } + addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType()) + } + return nil + + // double points + case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if metric.DoubleDataPoints == nil { + return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + for _, pt := range metric.DoubleDataPoints { + name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + lbs := createLabelSet(pt.GetLabels(),nameStr, name) + sample := &prompb.Sample{ + Value: pt.Value, + Timestamp: int64(pt.TimeUnixNano), + } + addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType()) + } + return nil + } + + return fmt.Errorf("invalid metric type: wants int or double data points"); +} + +// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each +// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + if metric.HistogramDataPoints == nil { + return fmt.Errorf("invalid metric type: wants histogram points") + } + for _, pt := range metric.HistogramDataPoints { + time := int64(pt.GetTimeUnixNano()) + ty := metric.GetMetricDescriptor().GetType() + baseName := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + sumLbs := createLabelSet(pt.GetLabels(),nameStr, baseName+sumStr) + countLbs := createLabelSet(pt.GetLabels(),nameStr, baseName+countStr) + addSample(tsMap, sum, sumLbs, ty) + addSample(tsMap, count, countLbs, ty) + var totalCount uint64 + for le, bk := range pt.GetBuckets(){ + bucket := &prompb.Sample{ + Value: float64(bk.Count), + Timestamp: time, + } + boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f',-1, 64) + lbs := createLabelSet(pt.GetLabels(),nameStr, baseName+bucketStr, leStr,boundStr) + addSample(tsMap, bucket, lbs ,ty) + totalCount += bk.GetCount() + } + infSample := &prompb.Sample{Value:float64(totalCount),Timestamp:time} + infLbs := createLabelSet(pt.GetLabels(),nameStr, baseName+bucketStr, leStr,pInfStr) + addSample(tsMap, infSample, infLbs, ty) + } + return nil +} + +// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each +// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (ce *cortexExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + if metric.SummaryDataPoints == nil { + return fmt.Errorf("invalid metric type: wants summary points") + } + + for _, pt := range metric.SummaryDataPoints { + time := int64(pt.GetTimeUnixNano()) + ty := metric.GetMetricDescriptor().GetType() + baseName := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + sumLbs := createLabelSet(pt.GetLabels(),nameStr, baseName+sumStr) + countLbs := createLabelSet(pt.GetLabels(),nameStr, baseName+countStr) + addSample(tsMap, sum, sumLbs, ty) + addSample(tsMap, count, countLbs, ty) + for _, qt := range pt.GetPercentileValues(){ + quantile := &prompb.Sample{ + Value: float64(qt.Value), + Timestamp: time, + } + percentileStr := strconv.FormatFloat(qt.Percentile, 'f',-1, 64) + qtLbs := createLabelSet(pt.GetLabels(),nameStr, baseName, quantileStr, percentileStr) + addSample(tsMap, quantile, qtLbs, ty) + } + } + return nil + +} + +// newCortexExporter initializes a new cortexExporter instance and sets fields accordingly. +// client parameter cannot be nil. +func newCortexExporter(ns string, ep string, client *http.Client) (*cortexExporter, error) { + if client == nil { + return nil, fmt.Errorf("http client cannot be nil") + } + + return &cortexExporter{ + namespace: ns, + endpoint: ep, + client: client, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + }, nil +} + +// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations +// to finish before returning +func (ce *cortexExporter)shutdown(context.Context) error{ + close(ce.closeChan) + ce.wg.Wait() + return nil +} + +// pushMetrics converts metrics to Cortex TimeSeries and send to remote endpoint. It maintain a map of TimeSeries, +// validates and handles each individual metric, adding the converted TimeSeries to the map, and finally +// exports the map. +func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { + ce.wg.Add(1) + defer ce.wg.Done() + select{ + case <-ce.closeChan: + return pdatautil.MetricCount(md),fmt.Errorf("shutdown has been called") + default: + tsMap := map[string]*prompb.TimeSeries{} + dropped := 0 + errStrings := []string{} + rms := data.MetricDataToOtlp(pdatautil.MetricsToInternalMetrics(md)) + for _, r := range rms { + // TODO add resource attributes as labels + for _, inst := range r.InstrumentationLibraryMetrics { + //TODO add instrumentation library information as labels + for _, m := range inst.Metrics { + ok := validateMetrics(m.MetricDescriptor) + if !ok { + dropped++ + errStrings = append(errStrings, "invalid temporality and type combination") + continue + } + switch m.GetMetricDescriptor().GetType() { + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64, + otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + ce.handleScalarMetric(tsMap,m) + case otlp.MetricDescriptor_HISTOGRAM: + ce.handleHistogramMetric(tsMap,m) + case otlp.MetricDescriptor_SUMMARY: + ce.handleSummaryMetric(tsMap,m) + default: + dropped++ + errStrings = append(errStrings, "invalid type") + continue + } + } + } + } + if(dropped != 0) { + return dropped, fmt.Errorf(strings.Join(errStrings, "\n")) + } + + if err := ce.export(ctx,tsMap); err != nil { + return pdatautil.MetricCount(md), err + } + + return 0, nil + } +} + +// export sends TimeSeries in tsMap to a Cortex Gateway +// this needs to be done +func (ce *cortexExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { + return nil +} diff --git a/exporter/cortexexporter/cortex_test.go b/exporter/cortexexporter/cortex_test.go new file mode 100644 index 00000000000..d285502d83e --- /dev/null +++ b/exporter/cortexexporter/cortex_test.go @@ -0,0 +1,494 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cortexexporter + +import ( + "context" + "github.com/golang/snappy" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "sync" + "testing" + + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data/testdata" + + "github.com/stretchr/testify/assert" + + proto "github.com/gogo/protobuf/proto" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + // "github.com/stretchr/testify/require" +) + +// TODO: try to run Test_PushMetrics after export() is in +// TODO: add bucket and histogram test cases for Test_PushMetrics +// TODO: check that NoError instead of NoNil is used at the right places +// TODO: add one line comment before each test stating criteria + +// Test_handleScalarMetric checks whether data points within a single scalar metric can be added to a map of +// TimeSeries correctly. +// Test cases are two data point belonging to the same TimeSeries, two data point belonging different TimeSeries, +// and nil data points case. +func Test_handleScalarMetric(t *testing.T) { + sameTs := map[string]*prompb.TimeSeries{ + // string signature of the data point is the key of the map + typeMonotonicInt64 + "-name-same_ts_int_points_total" + lb1Sig: + getTimeSeries( + getPromLabels(label11, value11, label12, value12, "name", "same_ts_int_points_total"), + getSample(float64(intVal1), time1), + getSample(float64(intVal2), time1)), + } + differentTs := map[string]*prompb.TimeSeries{ + typeMonotonicInt64 + "-name-different_ts_int_points_total" + lb1Sig: + getTimeSeries( + getPromLabels(label11, value11, label12, value12, "name", "different_ts_int_points_total"), + getSample(float64(intVal1), time1)), + typeMonotonicInt64 + "-name-different_ts_int_points_total" + lb2Sig: + getTimeSeries( + getPromLabels(label21, value21, label22, value22, "name", "different_ts_int_points_total"), + getSample(float64(intVal1), time2)), + } + + tests := []struct { + name string + m *otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", monotonicInt64Comb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "same_ts_int_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("same_ts_int_points", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{ + getIntDataPoint(lbs1, intVal1, time1), + getIntDataPoint(lbs1, intVal2, time1), + }, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + sameTs, + }, + { + "different_ts_int_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("different_ts_int_points", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{ + getIntDataPoint(lbs1, intVal1, time1), + getIntDataPoint(lbs2, intVal1, time2), + }, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + differentTs, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + ce := &cortexExporter{} + ok := ce.handleScalarMetric(tsMap, tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k]) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_handleHistogramMetric checks whether data points(sum, count, buckets) within a single Histogram metric can be +// added to a map of TimeSeries correctly. +// Test cases are a histogram data point with two buckets and nil data points case. +func Test_handleHistogramMetric(t *testing.T) { + sum := "sum" + count := "count" + bucket1 := "bucket1" + bucket2 := "bucket2" + bucketInf := "bucketInf" + histPoint := otlp.HistogramDataPoint{ + Labels: lbs1, + StartTimeUnixNano: 0, + TimeUnixNano: time1, + Count: uint64(intVal2), + Sum: floatVal2, + Buckets: []*otlp.HistogramDataPoint_Bucket{ + {uint64(intVal1), + nil, + }, + {uint64(intVal1), + nil, + }, + }, + ExplicitBounds: []float64{ + floatVal1, + floatVal2, + }, + } + // string signature of the data point is the key of the map + sigs := map[string]string{ + sum: typeHistogram + "-name-" + name1 +"_sum" + lb1Sig, + count: typeHistogram + "-name-" + name1 +"_count" + lb1Sig, + bucket1: typeHistogram + "-" + "le-" + strconv.FormatFloat(floatVal1, 'f', -1, 64) + + "-name-" + name1 +"_bucket" + lb1Sig, + bucket2: typeHistogram + "-" + "le-" + strconv.FormatFloat(floatVal2, 'f', -1, 64) + + "-name-" + name1 +"_bucket" + lb1Sig, + bucketInf: typeHistogram + "-" + "le-" + "+Inf" + + "-name-" + name1 +"_bucket" + lb1Sig, + } + lbls := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels("name", name1+ "_sum")...), + count: append(promLbs1, getPromLabels("name", name1+ "_count")...), + bucket1: append(promLbs1, getPromLabels("name", name1+ "_bucket", "le", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + bucket2: append(promLbs1, getPromLabels("name", name1+ "_bucket", "le", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + bucketInf: append(promLbs1, getPromLabels("name", name1+ "_bucket", "le", + "+Inf")...), + } + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "single_histogram_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1+ "", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: []*otlp.HistogramDataPoint{&histPoint}, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(lbls[sum], getSample(floatVal2, time1)), + sigs[count]: getTimeSeries(lbls[count], getSample(float64(intVal2), time1)), + sigs[bucket1]: getTimeSeries(lbls[bucket1], getSample(float64(intVal1), time1)), + sigs[bucket2]: getTimeSeries(lbls[bucket2], getSample(float64(intVal1), time1)), + sigs[bucketInf]: getTimeSeries(lbls[bucketInf], getSample(float64(intVal2), time1)), + }, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + ce := &cortexExporter{} + ok := ce.handleHistogramMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_handleSummaryMetric checks whether data points(sum, count, quantiles) within a single Summary metric can be +// added to a map of TimeSeries correctly. +// Test cases are a summary data point with two quantiles and nil data points case. +func Test_handleSummaryMetric(t *testing.T) { + sum := "sum" + count := "count" + q1 := "quantile1" + q2 := "quantile2" + // string signature is the key of the map + sigs := map[string]string{ + sum: typeSummary + "-name-" + name1 +"_sum" + lb1Sig, + count: typeSummary + "-name-" + name1 +"_count" + lb1Sig, + q1: typeSummary + "-name-" + name1 +"-" + "quantile-" + + strconv.FormatFloat(floatVal1, 'f', -1, 64) + lb1Sig, + q2: typeSummary + "-name-" + name1 +"-" + "quantile-" + + strconv.FormatFloat(floatVal2, 'f', -1, 64) + lb1Sig, + } + lbls := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels("name", name1+ "_sum")...), + count: append(promLbs1, getPromLabels("name", name1+ "_count")...), + q1: append(promLbs1, getPromLabels("name", name1, "quantile", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + q2: append(promLbs1, getPromLabels("name", name1, "quantile", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + } + summaryPoint := otlp.SummaryDataPoint{ + Labels: lbs1, + StartTimeUnixNano: 0, + TimeUnixNano: uint64(time1), + Count: uint64(intVal2), + Sum: floatVal2, + PercentileValues: []*otlp.SummaryDataPoint_ValueAtPercentile{ + {floatVal1, + floatVal1, + }, + {floatVal2, + floatVal1, + }, + }, + } + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "single_summary_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1, summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: []*otlp.SummaryDataPoint{&summaryPoint}, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(lbls[sum], getSample(floatVal2, time1)), + sigs[count]: getTimeSeries(lbls[count], getSample(float64(intVal2), time1)), + sigs[q1]: getTimeSeries(lbls[q1], getSample(float64(intVal1), time1)), + sigs[q2]: getTimeSeries(lbls[q2], getSample(float64(intVal1), time1)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + ce := &cortexExporter{} + ok := ce.handleSummaryMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_shutdown checks after shutdown is called, incoming calls to pushMetrics return error. +func Test_shutdown(t *testing.T) { + ce := &cortexExporter{ + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + } + wg:=new(sync.WaitGroup) + errChan := make(chan error, 5) + err := ce.shutdown(context.Background()) + require.NoError(t, err) + errChan = make(chan error, 5) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, ok := ce.pushMetrics(context.Background(), + pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty())) + errChan <- ok + }() + } + wg.Wait() + close(errChan) + for ok := range errChan { + assert.Error(t, ok) + } +} + +// To be implemented +func Test_Export(t *testing.T) { + return +} + +// Test_newCortexExporter checks that a new exporter instance with non-nil fields is initialized +func Test_newCortexExporter(t *testing.T) { + config := &Config{ + ExporterSettings: configmodels.ExporterSettings{}, + TimeoutSettings: exporterhelper.TimeoutSettings{}, + QueueSettings: exporterhelper.QueueSettings{}, + RetrySettings: exporterhelper.RetrySettings{}, + Namespace: "", + HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""}, + } + c, _ := config.HTTPClientSettings.ToClient() + ce, err:= newCortexExporter(config.HTTPClientSettings.Endpoint, config.Namespace, c) + require.NoError(t,err) + require.NotNil(t, ce) + assert.NotNil(t, ce.namespace) + assert.NotNil(t, ce.endpoint) + assert.NotNil(t, ce.client) + assert.NotNil(t, ce.closeChan) + assert.NotNil(t, ce.wg) +} + +// Bug{@huyan0} success case pass but it should fail; this is because the server gets no request because export() is +// empty. This test cannot run until export is finished. +// Test_pushMetrics the correctness and the number of points +func Test_pushMetrics(t *testing.T) { + noTempBatch := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataManyMetricsSameResource(10)) + noDescBatch := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataMetricTypeInvalid()) + // 10 counter metrics, 2 points in each. Two TimeSeries in total + batch := testdata.GenerateMetricDataManyMetricsSameResource(10) + setCumulative(batch) + successBatch := pdatautil.MetricsFromInternalMetrics(batch) + + tests := []struct { + name string + md *pdata.Metrics + reqTestFunc func(t *testing.T, r *http.Request) + httpResponseCode int + numDroppedTimeSeries int + returnErr bool + }{ + { + "no_desc_case", + &noDescBatch, + nil, + http.StatusAccepted, + pdatautil.MetricCount(noDescBatch), + true, + }, + { + "no_temp_case", + &noTempBatch, + nil, + http.StatusAccepted, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "http_error_case", + &noTempBatch, + nil, + http.StatusForbidden, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "success_case", + &successBatch, + func(t *testing.T, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version:")) + assert.Equal(t, "Snappy", r.Header.Get("Content-Encoding")) + assert.NotNil(t, r.Header.Get("Tenant-id")) + buf := make([]byte, len(body)) + snappy.Decode(buf, body) + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(buf, wr) + require.Nil(t, ok) + assert.EqualValues(t, 2, len(wr.Timeseries)) + }, + 0, + 0, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r) + } + w.WriteHeader(tt.httpResponseCode) + })) + defer server.Close() + + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + + config := createDefaultConfig().(*Config) + assert.NotNil(t,config) + config.HTTPClientSettings.Endpoint = serverURL.String() + c, err := config.HTTPClientSettings.ToClient() + assert.Nil(t,err) + sender, nErr:= newCortexExporter(config.HTTPClientSettings.Endpoint, config.Namespace, c) + require.NoError(t, nErr) + numDroppedTimeSeries, err := sender.pushMetrics(context.Background(), *tt.md) + assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + + if tt.returnErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} + diff --git a/exporter/cortexexporter/testdata/config.yaml b/exporter/cortexexporter/testdata/config.yaml new file mode 100644 index 00000000000..44317b41f56 --- /dev/null +++ b/exporter/cortexexporter/testdata/config.yaml @@ -0,0 +1,35 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + cortex: + cortex/2: + headers: + Prometheus-Remote-Write-Version: "0.1.0" + Tenant-id: 234 + namespace: "test-space" + timeout: 10s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m + http_setting: + endpoint: "localhost:8888" + ca_file: "/var/lib/mycert.pem" + timeout: 5s + +service: + pipelines: + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [cortex] + \ No newline at end of file diff --git a/exporter/cortexexporter/testutil.go b/exporter/cortexexporter/testutil.go new file mode 100644 index 00000000000..7c0990622fe --- /dev/null +++ b/exporter/cortexexporter/testutil.go @@ -0,0 +1,237 @@ +package cortexexporter + +import ( + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/internal/data" + commonpb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + "time" +) +type combination struct { + ty otlp.MetricDescriptor_Type + temp otlp.MetricDescriptor_Temporality +} + +var ( + + time1 = uint64(time.Now().UnixNano()) + time2 = uint64(time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).UnixNano()) + + typeInt64 = "INT64" + typeMonotonicInt64 = "MONOTONIC_INT64" + typeDouble = "DOUBLE" + typeHistogram = "HISTOGRAM" + typeSummary = "SUMMARY" + + label11 = "test_label11" + value11 = "test_value11" + label12 = "test_label12" + value12 = "test_value12" + label21 = "test_label21" + value21 = "test_value21" + label22 = "test_label22" + value22 = "test_value22" + label31 = "test_label31" + value31 = "test_value31" + label32 = "test_label32" + value32 = "test_value32" + dirty1 = "%" + dirty2 = "?" + + intVal1 int64 = 1 + intVal2 int64= 2 + floatVal1 = 1.0 + floatVal2 = 2.0 + + lbs1 = getLabels(label11, value11, label12, value12) + lbs2 = getLabels(label21, value21, label22, value22) + lbs1Dirty = getLabels(label11+dirty1, value11, dirty2+label12, value12) + lbs2Dirty = getLabels(label21+dirty1, value21, dirty2+label22, value22) + + promLbs1 = getPromLabels(label11, value11, label12, value12) + promLbs2 = getPromLabels(label21, value21, label22, value22) + promLbs3 = getPromLabels(label31, value31, label32, value32) + + lb1Sig = "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12 + lb2Sig = "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22 + ns1 = "test_ns" + name1 = "valid_single_int_point" + + int64CumulativeComb = 9 + monotonicInt64Comb = 0 + histogramComb = 2 + summaryComb = 3 + validCombinations = []combination{ + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_INSTANTANEOUS}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_INSTANTANEOUS}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, + } + invalidCombinations = []combination{ + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_DELTA}, + {ty: otlp.MetricDescriptor_INVALID_TYPE}, + {temp: otlp.MetricDescriptor_INVALID_TEMPORALITY}, + {}, + } + twoPointsSameTs = map[string]*prompb.TimeSeries{ + typeInt64 + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: + getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), time1), + getSample(float64(intVal2), time2)), + } + twoPointsDifferentTs = map[string]*prompb.TimeSeries{ + typeInt64 + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: + getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), time1), ), + typeInt64 + "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22: + getTimeSeries(getPromLabels(label21, value21, label22, value22), + getSample(float64(intVal1), time2), ), + } + +) + +// OTLP metrics +// labels must come in pairs +func getLabels (labels...string) []*commonpb.StringKeyValue{ + var set []*commonpb.StringKeyValue + for i := 0; i < len(labels); i += 2 { + set = append(set, &commonpb.StringKeyValue{ + labels[i], + labels[i+1], + }) + } + return set +} + +func getDescriptor(name string, i int, comb []combination) *otlp.MetricDescriptor { + return &otlp.MetricDescriptor{ + Name: name, + Description: "", + Unit: "", + Type: comb[i].ty, + Temporality: comb[i].temp, + } +} + +func getIntDataPoint(lbls []*commonpb.StringKeyValue, value int64, ts uint64) *otlp.Int64DataPoint{ + return &otlp.Int64DataPoint{ + Labels: lbls, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + +func getDoubleDataPoint(lbls []*commonpb.StringKeyValue, value float64, ts time.Time) *otlp.DoubleDataPoint { + return &otlp.DoubleDataPoint{ + Labels: lbls, + StartTimeUnixNano: 0, + TimeUnixNano: uint64(ts.Unix()), + Value: value, + } +} + +func getHistogramDataPoint(lbls []*commonpb.StringKeyValue, ts time.Time, sum float64, count uint64, bounds []float64, buckets []uint64) *otlp.HistogramDataPoint { + bks := []*otlp.HistogramDataPoint_Bucket{} + for _, c := range buckets { + bks = append(bks, &otlp.HistogramDataPoint_Bucket{ + Count: c, + Exemplar: nil, + }) + } + return &otlp.HistogramDataPoint{ + Labels: lbls, + StartTimeUnixNano: 0, + TimeUnixNano: uint64(ts.Unix()), + Count: count, + Sum: sum, + Buckets: bks, + ExplicitBounds: bounds, + } +} + +func getSummaryDataPoint(lbls []*commonpb.StringKeyValue, ts time.Time, sum float64, count uint64, pcts []float64, values []float64) *otlp.SummaryDataPoint { + pcs := []*otlp.SummaryDataPoint_ValueAtPercentile{} + for i, v := range values { + pcs = append(pcs, &otlp.SummaryDataPoint_ValueAtPercentile{ + Percentile: pcts[i], + Value: v, + }) + } + return &otlp.SummaryDataPoint{ + Labels: lbls, + StartTimeUnixNano: 0, + TimeUnixNano: uint64(ts.Unix()), + Count: count, + Sum: sum, + PercentileValues: pcs, + } +} + +// Prometheus TimeSeries +func getPromLabels(lbs ...string) []prompb.Label{ + pbLbs := prompb.Labels{ + Labels: []prompb.Label{}, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } + for i := 0; i < len(lbs); i+=2 { + pbLbs.Labels = append(pbLbs.Labels, getLabel(lbs[i],lbs[i+1])) + } + return pbLbs.Labels +} + +func getLabel(name string, value string) prompb.Label{ + return prompb.Label{ + Name: name, + Value: value, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + + +func getSample(v float64, t uint64) prompb.Sample { + return prompb.Sample{ + Value: v, + Timestamp: int64(t), + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + +func getTimeSeries (lbls []prompb.Label, samples...prompb.Sample) *prompb.TimeSeries{ + return &prompb.TimeSeries{ + Labels: lbls, + Samples: samples, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + +func setCumulative (metricsData data.MetricData) { + for _, r := range data.MetricDataToOtlp(metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + m.MetricDescriptor.Temporality = otlp.MetricDescriptor_CUMULATIVE + } + } + } +} \ No newline at end of file diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index fe9c3f0ded8..ac760032efd 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -25,7 +25,7 @@ type Config struct { configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. exporterhelper.QueueSettings `mapstructure:"sending_queue"` - exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` + exporterhelper.RetrySettings `mapstructure:"sending_queue"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/go.mod b/go.mod index 4ddf9e5ceb6..3fd2ab16117 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/golang/protobuf v1.4.2 + github.com/golang/snappy v0.0.1 github.com/golangci/golangci-lint v1.29.0 github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 github.com/google/go-cmp v0.5.1 @@ -44,6 +45,7 @@ require ( github.com/rs/cors v1.7.0 github.com/securego/gosec v0.0.0-20200316084457-7da9f46445fd github.com/shirou/gopsutil v0.0.0-20200517204708-c89193f22d93 // c89193f22d9359848988f32aee972122bb2abdc2 + github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e github.com/soheilhy/cmux v0.1.4 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.1 diff --git a/service/README.md b/service/README.md new file mode 100644 index 00000000000..cdc5d1428bc --- /dev/null +++ b/service/README.md @@ -0,0 +1,4 @@ +# General Information + +The service package does most of the backend processing for the Collector pipeline. +