Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batched exports to the Prometheus Remote Write Exporter #2249

Merged
52 changes: 46 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ import (
"go.opentelemetry.io/collector/internal/version"
)

const (
maxConcurrentRequests = 5
maxBatchByteSize = 3000000
)

// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type PrwExporter struct {
namespace string
Expand Down Expand Up @@ -143,9 +148,9 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
}
}

if err := prwe.export(ctx, tsMap); err != nil {
if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 {
dropped = md.MetricCount()
errs = append(errs, consumererror.Permanent(err))
errs = append(errs, exportErrors...)
}

if dropped != 0 {
Expand Down Expand Up @@ -252,13 +257,48 @@ func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
// Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
if err != nil {
return consumererror.Permanent(err)
errs = append(errs, consumererror.Permanent(err))
return errs
}

input := make(chan *prompb.WriteRequest, len(requests))
for _, request := range requests {
input <- request
}
close(input)

var mu sync.Mutex
var wg sync.WaitGroup

wg.Add(maxConcurrentRequests) // used to wait for workers to be finished
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved

// Run concurrencyLimit of workers until there
// is no more requests to execute in the input channel.
for i := 0; i < maxConcurrentRequests; i++ {
go func() {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()

for request := range input {
err := prwe.execute(ctx, request)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}
}()
}
wg.Wait()

return errs
}

func (prwe *PrwExporter) execute(ctx context.Context, req *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,19 @@ func Test_export(t *testing.T) {
if !tt.serverUp {
server.Close()
}
err := runExportPipeline(ts1, serverURL)
errs := runExportPipeline(ts1, serverURL)
if tt.returnError {
assert.Error(t, err)
assert.Error(t, errs[0])
return
}
assert.NoError(t, err)
assert.Len(t, errs, 0)
})
}
}

func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
var errs []error

// First we will construct a TimeSeries array from the testutils package
testmap := make(map[string]*prompb.TimeSeries)
testmap["test"] = ts
Expand All @@ -246,10 +248,11 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient, map[string]string{})
if err != nil {
return err
errs = append(errs, err)
return errs
}
err = prwe.export(context.Background(), testmap)
return err
errs = append(errs, prwe.export(context.Background(), testmap)...)
return errs
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
Expand Down
38 changes: 30 additions & 8 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,35 @@ func getPromMetricName(metric *otlp.Metric, ns string) string {
return sanitize(b.String())
}

// Simple helper function that takes the <Signature String - *TimeSeries> map
// and creates a WriteRequest from the struct -- can move to the helper.go file
func wrapTimeSeries(tsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) {
// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

var requests []*prompb.WriteRequest
var tsArray []prompb.TimeSeries
sizeOfCurrentBatch := 0

for _, v := range tsMap {
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0)
sizeOfCurrentBatch = 0
}

tsArray = append(tsArray, *v)
sizeOfCurrentBatch += sizeOfSeries
}
wrapped := prompb.WriteRequest{
Timeseries: tsArray,
// Other parameters of the WriteRequest are unnecessary for our Export
}
return &wrapped, nil

wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)
JasonXZLiu marked this conversation as resolved.
Show resolved Hide resolved

return requests, nil
}

// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms
Expand Down Expand Up @@ -466,3 +480,11 @@ func addSingleDoubleSummaryDataPoint(pt *otlp.DoubleSummaryDataPoint, metric *ot
addSample(tsMap, quantile, qtlabels, metric)
}
}

func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteRequest {
// the remote_write endpoint only requires the timeseries.
// otlp defines it's own way to handle metric metadata
return &prompb.WriteRequest{
Timeseries: tsArray,
}
}
58 changes: 58 additions & 0 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,61 @@ func Test_getPromMetricName(t *testing.T) {
})
}
}

// Test_batchTimeSeries checks batchTimeSeries return the correct number of requests
// depending on byte size.
func Test_batchTimeSeries(t *testing.T) {
// First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request
labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22)
sample1 := getSample(floatVal1, msTime1)
sample2 := getSample(floatVal2, msTime2)
sample3 := getSample(floatVal3, msTime3)
ts1 := getTimeSeries(labels, sample1, sample2)
ts2 := getTimeSeries(labels, sample1, sample2, sample3)

tsMap1 := getTimeseriesMap([]*prompb.TimeSeries{})
tsMap2 := getTimeseriesMap([]*prompb.TimeSeries{ts1})
tsMap3 := getTimeseriesMap([]*prompb.TimeSeries{ts1, ts2})

tests := []struct {
name string
tsMap map[string]*prompb.TimeSeries
maxBatchByteSize int
numExpectedRequests int
returnErr bool
}{
{
"no_timeseries",
tsMap1,
100,
-1,
true,
},
{
"normal_case",
tsMap2,
300,
1,
false,
},
{
"two_requests",
tsMap3,
300,
2,
false,
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize)
if tt.returnErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.numExpectedRequests, len(requests))
})
}
}
16 changes: 15 additions & 1 deletion exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package prometheusremotewriteexporter

import (
"fmt"
"time"

"github.com/prometheus/prometheus/prompb"
Expand All @@ -25,9 +26,11 @@ import (

var (
time1 = uint64(time.Now().UnixNano())
time2 = uint64(time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).UnixNano())
time2 = uint64(time.Now().UnixNano() - 5)
time3 = uint64(time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).UnixNano())
msTime1 = int64(time1 / uint64(int64(time.Millisecond)/int64(time.Nanosecond)))
msTime2 = int64(time2 / uint64(int64(time.Millisecond)/int64(time.Nanosecond)))
msTime3 = int64(time3 / uint64(int64(time.Millisecond)/int64(time.Nanosecond)))

label11 = "test_label11"
value11 = "test_value11"
Expand All @@ -50,6 +53,7 @@ var (
intVal2 int64 = 2
floatVal1 = 1.0
floatVal2 = 2.0
floatVal3 = 3.0

lbs1 = getLabels(label11, value11, label12, value12)
lbs2 = getLabels(label21, value21, label22, value22)
Expand Down Expand Up @@ -564,3 +568,13 @@ func getQuantiles(bounds []float64, values []float64) []*otlp.DoubleSummaryDataP
}
return quantiles
}

func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[string]*prompb.TimeSeries {
tsMap := make(map[string]*prompb.TimeSeries)

for i, v := range timeseries {
tsMap[fmt.Sprintf("%s%d", "timeseries_name", i)] = v
}

return tsMap
}