Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cortex.go and helper.go and tests #7

Closed
wants to merge 69 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
06753d6
set up project structure
huyan0 Jul 30, 2020
287a4e5
placeholder text for README
huyan0 Jul 30, 2020
c7e2a42
finish Test_validateMetrics
huyan0 Jul 30, 2020
0f6033f
finish Test_validateMetrics
huyan0 Jul 30, 2020
0686608
finish Test_validateMetrics
huyan0 Jul 30, 2020
a896535
remove extra import
huyan0 Jul 30, 2020
6335403
fix format error so that make runs
huyan0 Jul 30, 2020
f76b22a
update unit tests
huyan0 Jul 30, 2020
91dddb9
Testbed for the config and factory constructions:
Jul 30, 2020
66224aa
Testbed for the config and factory constructions, possible README.md …
Jul 30, 2020
2e4b1e1
Testbed oop
Jul 30, 2020
01441e7
add metrictest util code
huyan0 Jul 31, 2020
a9eaaec
tempeorary code
huyan0 Jul 31, 2020
7cf119b
finish testutil and Test_addSample
huyan0 Jul 31, 2020
5625330
add more unit tests
huyan0 Aug 1, 2020
cbd7f6e
add unit tests
huyan0 Aug 2, 2020
a4d7eeb
use assert.Exactly
huyan0 Aug 2, 2020
ae1758c
finish Test_handleHistogramMetric
huyan0 Aug 2, 2020
93df3e9
fix Test_handleScalarMetric to include metric name in label set
huyan0 Aug 2, 2020
346fce0
finish Test_handleSummaryMetric
huyan0 Aug 2, 2020
d215b33
add Test_shutdown
huyan0 Aug 2, 2020
b001c18
add unit test for functions
huyan0 Aug 2, 2020
64aad65
add unit tests for cortex.go
huyan0 Aug 3, 2020
b55576e
associated methods with cortexExporter
huyan0 Aug 3, 2020
ad4f1b1
some more config and factory stuff:
Aug 3, 2020
5806a51
M branch 'unit-tests' of https://github.com/open-o11y/opentelemetry-c…
Aug 3, 2020
d670578
config and some factory, export skeleton
Aug 3, 2020
d125a28
config and some factory, export skeleton
Aug 3, 2020
34b40fe
add nil test cases for cortex.go
huyan0 Aug 3, 2020
32a9db9
implement and test validateMetrics and addSample
huyan0 Aug 3, 2020
c466bd9
implement and test timeSeriesSignature
huyan0 Aug 3, 2020
c7a0dc1
implement and test createLabelSet
huyan0 Aug 3, 2020
b978fd4
finish createMetricsExporter method
huyan0 Aug 3, 2020
3992ea7
work
Aug 3, 2020
01ad87a
factory
Aug 3, 2020
da4dbcc
no more const labels for config
Aug 3, 2020
0312ced
implement and test handleScalarMetric
huyan0 Aug 3, 2020
d868fd9
implement and test handleScalarMetric
huyan0 Aug 3, 2020
cadd3bf
Update factory.go
huyan0 Aug 3, 2020
184e16a
implement and test handleHistogramMetric
huyan0 Aug 3, 2020
d0d700b
Merge branch 'unit-tests' of github.com:open-o11y/opentelemetry-colle…
huyan0 Aug 3, 2020
a133149
implement and test handleHistogramMetric
huyan0 Aug 3, 2020
222ea87
implement and test handleSummaryMetric
huyan0 Aug 3, 2020
c7ce3c6
implement shutdown
huyan0 Aug 3, 2020
a4f09ab
opt headers for config
Aug 3, 2020
100d482
Mergeae branch 'unit-tests' of https://github.com/open-o11y/opentelem…
Aug 3, 2020
ca1295e
undupped create Metrics Exporter
Aug 3, 2020
4bc9a4f
test shutdown
huyan0 Aug 3, 2020
db605f2
Merge branch 'unit-tests' of github.com:open-o11y/opentelemetry-colle…
huyan0 Aug 3, 2020
4a29b98
yes
Aug 3, 2020
c4d8d47
okay, worksMerge branch 'unit-tests' of https://github.com/open-o11y/…
Aug 3, 2020
6073ce3
remove usage of getSample in cortex.go
huyan0 Aug 3, 2020
3c12ab7
Merge branch 'unit-tests' of github.com:open-o11y/opentelemetry-colle…
huyan0 Aug 3, 2020
37db943
implement and test newMetricsExporter
huyan0 Aug 4, 2020
e9ac299
Test_pushMetrics not working
huyan0 Aug 4, 2020
53fbe2b
add comments to and format cortex.go
huyan0 Aug 4, 2020
96807eb
make cortex_test readable
huyan0 Aug 4, 2020
4182996
modify things in factory.go
huyan0 Aug 4, 2020
8ac6b25
update TODOs in cortex_test
huyan0 Aug 4, 2020
9a69f18
updated go.mod
huyan0 Aug 4, 2020
cf8fcaf
add headers field to exporter struct
huyan0 Aug 4, 2020
f223b37
fix config_test
huyan0 Aug 4, 2020
3371a59
add todos
huyan0 Aug 4, 2020
2cb879a
create helper.go and helper_test.go
huyan0 Aug 4, 2020
071b82d
add cortex.go and helper.go with tests
huyan0 Aug 4, 2020
fda582b
fix cortex_test.go
huyan0 Aug 4, 2020
05bcd9d
add comments to tests and address style issues
huyan0 Aug 5, 2020
4042156
add DO NOT REVIEW message in certain files
huyan0 Aug 5, 2020
cc12828
added export and test export and helpers, will comment and annotate t…
Aug 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/cortexexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
To be added.
37 changes: 37 additions & 0 deletions exporter/cortexexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// !!!!!!!!!!!DO NOT REVIEW THIS FILE IN THIS PR!!!!!!!!!!!!!!!!!!!!!!!
package cortexexporter

import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Remote Write exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
// Namespace if set, exports metrics under the provided value.*/
Namespace string `mapstructure:"namespace"`

// Optional headers configuration for authorization and security/extra metadata
Headers map[string]string `mapstructure:"headers"`

HTTPClientSettings confighttp.HTTPClientSettings `mapstructure:"http_setting"` // squash ensures fields are correctly decoded in embedded struct.
}
307 changes: 307 additions & 0 deletions exporter/cortexexporter/cortex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cortexexporter

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
)

// TODO: get default labels such as job or instance from Resource

// cortexExporter converts OTLP metrics to Cortex TimeSeries and sends them to a remote endpoint
type cortexExporter struct {
namespace string
endpoint string
client *http.Client
headers map[string]string
wg *sync.WaitGroup
closeChan chan struct{}
}

// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (ce *cortexExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
mType := metric.MetricDescriptor.Type
switch mType {
// int points
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64:
if metric.Int64DataPoints == nil {
return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.Int64DataPoints {
name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
lbs := createLabelSet(pt.GetLabels(), nameStr, name)
sample := &prompb.Sample{
Value: float64(pt.Value),
Timestamp: int64(pt.TimeUnixNano),
}
addSample(tsMap, sample, lbs, metric.GetMetricDescriptor().GetType())
}
return nil

// double points
case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if metric.DoubleDataPoints == nil {
return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.DoubleDataPoints {
name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
lbs := createLabelSet(pt.GetLabels(), nameStr, name)
sample := &prompb.Sample{
Value: pt.Value,
Timestamp: int64(pt.TimeUnixNano),
}
addSample(tsMap, sample, lbs, metric.GetMetricDescriptor().GetType())
}
return nil
}

return fmt.Errorf("invalid metric type: wants int or double data points")
}

// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
if metric.HistogramDataPoints == nil {
return fmt.Errorf("invalid metric type: wants histogram points")
}
for _, pt := range metric.HistogramDataPoints {
time := int64(pt.GetTimeUnixNano())
ty := metric.GetMetricDescriptor().GetType()
baseName := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
sumLbs := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr)
countLbs := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr)
addSample(tsMap, sum, sumLbs, ty)
addSample(tsMap, count, countLbs, ty)
var totalCount uint64
for le, bk := range pt.GetBuckets() {
bucket := &prompb.Sample{
Value: float64(bk.Count),
Timestamp: time,
}
boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f', -1, 64)
lbs := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, boundStr)
addSample(tsMap, bucket, lbs, ty)
totalCount += bk.GetCount()
}
infSample := &prompb.Sample{Value: float64(totalCount), Timestamp: time}
infLbs := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, pInfStr)
addSample(tsMap, infSample, infLbs, ty)
}
return nil
}

// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each
// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (ce *cortexExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
if metric.SummaryDataPoints == nil {
return fmt.Errorf("invalid metric type: wants summary points")
}

for _, pt := range metric.SummaryDataPoints {
time := int64(pt.GetTimeUnixNano())
ty := metric.GetMetricDescriptor().GetType()
baseName := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
sumLbs := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr)
countLbs := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr)
addSample(tsMap, sum, sumLbs, ty)
addSample(tsMap, count, countLbs, ty)
for _, qt := range pt.GetPercentileValues() {
quantile := &prompb.Sample{
Value: float64(qt.Value),
Timestamp: time,
}
percentileStr := strconv.FormatFloat(qt.Percentile, 'f', -1, 64)
qtLbs := createLabelSet(pt.GetLabels(), nameStr, baseName, quantileStr, percentileStr)
addSample(tsMap, quantile, qtLbs, ty)
}
}
return nil

}

// newCortexExporter initializes a new cortexExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func newCortexExporter(ns string, ep string, client *http.Client) (*cortexExporter, error) {
if client == nil {
return nil, fmt.Errorf("http client cannot be nil")
}

return &cortexExporter{
namespace: ns,
endpoint: ep,
client: client,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}, nil
}

// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (ce *cortexExporter) shutdown(context.Context) error {
close(ce.closeChan)
ce.wg.Wait()
return nil
}

// pushMetrics converts metrics to Cortex TimeSeries and send to remote endpoint. It maintain a map of TimeSeries,
// validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
ce.wg.Add(1)
defer ce.wg.Done()
select {
case <-ce.closeChan:
return pdatautil.MetricCount(md), fmt.Errorf("shutdown has been called")
default:
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
errStrings := []string{}
rms := data.MetricDataToOtlp(pdatautil.MetricsToInternalMetrics(md))
for _, r := range rms {
// TODO add resource attributes as labels
for _, inst := range r.InstrumentationLibraryMetrics {
//TODO add instrumentation library information as labels
for _, m := range inst.Metrics {
ok := validateMetrics(m.MetricDescriptor)
if !ok {
dropped++
errStrings = append(errStrings, "invalid temporality and type combination")
continue
}
switch m.GetMetricDescriptor().GetType() {
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64,
otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
ce.handleScalarMetric(tsMap, m)
case otlp.MetricDescriptor_HISTOGRAM:
ce.handleHistogramMetric(tsMap, m)
case otlp.MetricDescriptor_SUMMARY:
ce.handleSummaryMetric(tsMap, m)
default:
dropped++
errStrings = append(errStrings, "invalid type")
continue
}
}
}
}
if dropped != 0 {
return dropped, fmt.Errorf(strings.Join(errStrings, "\n"))
}

if err := ce.Export(ctx, tsMap); err != nil {
return pdatautil.MetricCount(md), err
}

return 0, nil
}
}

/*
Because we are adhering closely to the Remote Write API, we must Export a
Snappy-compressed WriteRequest instance of the TimeSeries Metrics in order
for the Remote Write Endpoint to properly receive our Metrics data.
*/
func (ce *cortexExporter) Export(ctx context.Context, TsMap map[string]*prompb.TimeSeries) error {
//Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(TsMap)
if err != nil {
return err
}

//Uses proto.Marshal to convert the WriteRequest into wire format (bytes array)
data, err := proto.Marshal(req)
if err != nil {
return err
}

//Makes use of the snappy compressor, as we are emulating the Remote Write package
compressedData := snappy.Encode(nil, data)

//Create the HTTP POST request to send to the endpoint
httpReq, err := http.NewRequest("POST", ce.endpoint, bytes.NewReader(compressedData))
if err != nil {
return err
}

//Add optional headers
for name, value := range ce.headers {
httpReq.Header.Set(name, value)
}

//Add necessary headers
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Add("Accept-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

//Changing context of the httpreq to global context
httpReq = httpReq.WithContext(ctx)

ctx, cancel := context.WithTimeout(context.Background(), ce.client.Timeout)
defer cancel()

httpResp, err := ce.client.Do(httpReq)
if err != nil {
return err
}

//Only even status codes < 400 are okay
if httpResp.StatusCode/100 != 2 || httpResp.StatusCode >= 400 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
return err
}
Loading