Skip to content

Commit 48cd5d6

Browse files
committed
feat: receiver/prometheusremotewrite - Implement body unmarshaling
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
1 parent 24e6ba0 commit 48cd5d6

File tree

5 files changed

+105
-7
lines changed

5 files changed

+105
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement body unmarshaling for Prometheus Remote Write requests
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35624]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Warning - The HTTP Server still doesn't do anything. It's just a placeholder for now.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api, user]

receiver/prometheusremotewritereceiver/go.mod

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
33
go 1.22.0
44

55
require (
6+
github.com/gogo/protobuf v1.3.2
7+
github.com/golang/snappy v0.0.4
68
github.com/prometheus/prometheus v0.54.1
79
github.com/stretchr/testify v1.9.0
810
go.opentelemetry.io/collector/component v0.112.0
@@ -11,6 +13,7 @@ require (
1113
go.opentelemetry.io/collector/confmap v1.18.0
1214
go.opentelemetry.io/collector/consumer v0.112.0
1315
go.opentelemetry.io/collector/consumer/consumertest v0.112.0
16+
go.opentelemetry.io/collector/pdata v1.18.0
1417
go.opentelemetry.io/collector/receiver v0.112.0
1518
go.uber.org/goleak v1.3.0
1619
go.uber.org/zap v1.27.0
@@ -26,16 +29,15 @@ require (
2629
github.com/beorn7/perks v1.0.1 // indirect
2730
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2831
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
32+
github.com/dennwc/varint v1.0.0 // indirect
2933
github.com/felixge/httpsnoop v1.0.4 // indirect
3034
github.com/fsnotify/fsnotify v1.7.0 // indirect
3135
github.com/go-kit/log v0.2.1 // indirect
3236
github.com/go-logfmt/logfmt v0.6.0 // indirect
3337
github.com/go-logr/logr v1.4.2 // indirect
3438
github.com/go-logr/stdr v1.2.2 // indirect
3539
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
36-
github.com/gogo/protobuf v1.3.2 // indirect
3740
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
38-
github.com/golang/snappy v0.0.4 // indirect
3941
github.com/google/uuid v1.6.0 // indirect
4042
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
4143
github.com/jmespath/go-jmespath v0.4.0 // indirect
@@ -72,16 +74,17 @@ require (
7274
go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 // indirect
7375
go.opentelemetry.io/collector/extension v0.112.0 // indirect
7476
go.opentelemetry.io/collector/extension/auth v0.112.0 // indirect
75-
go.opentelemetry.io/collector/pdata v1.18.0 // indirect
7677
go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect
7778
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
7879
go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect
80+
go.opentelemetry.io/collector/semconv v0.105.0 // indirect
7981
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
8082
go.opentelemetry.io/otel v1.31.0 // indirect
8183
go.opentelemetry.io/otel/metric v1.31.0 // indirect
8284
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
8385
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
8486
go.opentelemetry.io/otel/trace v1.31.0 // indirect
87+
go.uber.org/atomic v1.11.0 // indirect
8588
go.uber.org/multierr v1.11.0 // indirect
8689
golang.org/x/crypto v0.28.0 // indirect
8790
golang.org/x/net v0.30.0 // indirect

receiver/prometheusremotewritereceiver/go.sum

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"io"
1011
"net/http"
1112
"strings"
1213
"time"
1314

15+
"github.com/gogo/protobuf/proto"
1416
promconfig "github.com/prometheus/prometheus/config"
17+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
18+
promremote "github.com/prometheus/prometheus/storage/remote"
1519
"go.opentelemetry.io/collector/component"
1620
"go.opentelemetry.io/collector/component/componentstatus"
1721
"go.opentelemetry.io/collector/consumer"
22+
"go.opentelemetry.io/collector/pdata/pmetric"
1823
"go.opentelemetry.io/collector/receiver"
1924
"go.uber.org/zap/zapcore"
2025
)
@@ -90,7 +95,28 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req *
9095
// After parsing the content-type header, the next step would be to handle content-encoding.
9196
// Luckly confighttp's Server has middleware that already decompress the request body for us.
9297

93-
// The next step in follow up PRs would be to decode the request body.
98+
body, err := io.ReadAll(req.Body)
99+
if err != nil {
100+
prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err})
101+
http.Error(w, err.Error(), http.StatusBadRequest)
102+
return
103+
}
104+
105+
var prw2Req writev2.Request
106+
if err = proto.Unmarshal(body, &prw2Req); err != nil {
107+
prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err})
108+
http.Error(w, err.Error(), http.StatusBadRequest)
109+
return
110+
}
111+
112+
_, stats, err := prw.translateV2(req.Context(), &prw2Req)
113+
stats.SetHeaders(w)
114+
if err != nil {
115+
http.Error(w, err.Error(), http.StatusBadRequest) // Following instructions at https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples
116+
return
117+
}
118+
119+
w.WriteHeader(http.StatusNoContent)
94120
}
95121

96122
// parseProto parses the content-type header and returns the version of the remote-write protocol.
@@ -122,3 +148,10 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
122148
// No "proto=" parameter found, assume v1.
123149
return promconfig.RemoteWriteProtoMsgV1, nil
124150
}
151+
152+
// translateV2 translates a v2 remote-write request into OTLP metrics.
153+
// For now translateV2 is not implemented and returns an empty metrics.
154+
// nolint
155+
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
156+
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
157+
}

receiver/prometheusremotewritereceiver/receiver_test.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,25 @@
44
package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver"
55

66
import (
7+
"bytes"
78
"context"
89
"fmt"
910
"net/http"
1011
"testing"
1112

13+
"github.com/gogo/protobuf/proto"
14+
"github.com/golang/snappy"
1215
promconfig "github.com/prometheus/prometheus/config"
16+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1317
"github.com/stretchr/testify/assert"
1418
"go.opentelemetry.io/collector/component/componenttest"
1519
"go.opentelemetry.io/collector/consumer/consumertest"
1620
"go.opentelemetry.io/collector/receiver/receivertest"
1721
)
1822

19-
func TestHandlePRWContentTypeNegotiation(t *testing.T) {
23+
func setupServer(t *testing.T) {
24+
t.Helper()
25+
2026
factory := NewFactory()
2127
cfg := factory.CreateDefaultConfig()
2228

@@ -31,6 +37,10 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
3137
t.Cleanup(func() {
3238
assert.NoError(t, prwReceiver.Shutdown(ctx), "Must not error shutting down")
3339
})
40+
}
41+
42+
func TestHandlePRWContentTypeNegotiation(t *testing.T) {
43+
setupServer(t)
3444

3545
for _, tc := range []struct {
3646
name string
@@ -60,18 +70,31 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
6070
{
6171
name: "x-protobuf/v2 proto parameter",
6272
contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
63-
extectedCode: http.StatusOK,
73+
extectedCode: http.StatusNoContent,
6474
},
6575
} {
6676
t.Run(tc.name, func(t *testing.T) {
67-
req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", nil)
77+
body := writev2.Request{}
78+
pBuf := proto.NewBuffer(nil)
79+
err := pBuf.Marshal(&body)
80+
assert.NoError(t, err)
81+
82+
var compressedBody []byte
83+
snappy.Encode(compressedBody, pBuf.Bytes())
84+
req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", bytes.NewBuffer(compressedBody))
6885
assert.NoError(t, err)
6986

7087
req.Header.Set("Content-Type", tc.contentType)
88+
req.Header.Set("Content-Encoding", "snappy")
7189
resp, err := http.DefaultClient.Do(req)
7290
assert.NoError(t, err)
7391

7492
assert.Equal(t, tc.extectedCode, resp.StatusCode)
93+
if tc.extectedCode == http.StatusNoContent { // We went until the end
94+
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Samples-Written"))
95+
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
96+
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
97+
}
7598
})
7699
}
77100
}

0 commit comments

Comments
 (0)