Skip to content

Commit

Permalink
Separate into separate into url path per signal
Browse files Browse the repository at this point in the history
  • Loading branch information
fredthomsen committed May 17, 2023
1 parent deac979 commit 5f60165
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ change_type: enhancement
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add http path prefix config option to otlpreceiver
note: Add http url paths per signal config options to otlpreceiver

# One or more tracking issues or pull requests related to the change
issues: [7511]
Expand Down
11 changes: 6 additions & 5 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ The OTLP receiver can receive trace export calls via HTTP/JSON in addition to
gRPC. The HTTP/JSON address is the same as gRPC as the protocol is recognized
and processed accordingly. Note the serialization format needs to be [protobuf JSON](https://developers.google.com/protocol-buffers/docs/proto3#json).

The HTTP/JSON configuration also provides `path_prefix` configuration to allow the URL
path to be modified. The defaults to an empty string and has no impact for GRPC.
The HTTP/JSON configuration also provides `traces_endpoint`, `metrics_endpoint`, and `logs_endpoint`
configuration to allow the endpoint for signal types to be modified. These default to `v1/traces`,
`v1/metrics`, and `v1/logs` respectively.

To write traces with HTTP/JSON, `POST` to `[address]/[path_prefix]/v1/traces` for traces,
to `[address]/[path_prefix]/v1/metrics` for metrics, to `[address]/[path_prefix]/v1/logs`
for logs. The default port is `4318`.
To write traces with HTTP/JSON, `POST` to `[address]/[traces_endpoint]` for traces,
to `[address]/[metrics_endpoint]` for metrics, to `[address]/[logs_endpoint]` for logs.
The default port is `4318`.

### CORS (Cross-origin resource sharing)

Expand Down
27 changes: 24 additions & 3 deletions receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlprecei

import (
"errors"
"strings"
"net/url"
"path"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -32,7 +33,15 @@ const (

type httpServerSettings struct {
*confighttp.HTTPServerSettings `mapstructure:",squash"`
PathPrefix string `mapstructure:"path_prefix,omitempty"`

// The URL path to receive traces on. If omitted "/v1/traces" will be used.
TracesUrlPath string `mapstructure:"traces_url_path,omitempty"`

// The URL path to receive metrics on. If omitted "/v1/metrics" will be used.
MetricsUrlPath string `mapstructure:"metrics_url_path,omitempty"`

// The URL path to receive logs on. If omitted "/v1/logs" will be used.
LogsUrlPath string `mapstructure:"logs_url_path,omitempty"`
}

// Protocols is the configuration for the supported protocols.
Expand Down Expand Up @@ -73,7 +82,19 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
if !conf.IsSet(protoHTTP) {
cfg.HTTP = nil
} else {
cfg.HTTP.PathPrefix = strings.Trim(cfg.HTTP.PathPrefix, "/")
// Verify URL path sanity
signalUrlPaths := []*string{&cfg.HTTP.TracesUrlPath, &cfg.HTTP.MetricsUrlPath, &cfg.HTTP.LogsUrlPath}
for i, urlPath := range signalUrlPaths {
u, err := url.Parse(*urlPath)
if err != nil {
return errors.New("Invalid HTTP URL path set for signal")
}
// Normalize URLs
if !path.IsAbs(u.Path) {
u.Path = "/" + u.Path
}
*signalUrlPaths[i] = u.Path
}
}

return nil
Expand Down
20 changes: 6 additions & 14 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,6 @@ func TestUnmarshalConfigOnlyHTTP(t *testing.T) {
assert.Equal(t, defaultOnlyHTTP, cfg)
}

func TestUnmarshalConfigOnlyHTTPSlashPrefixPath(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "only_http_slash_prefix.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))

defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config)
defaultOnlyHTTP.GRPC = nil
assert.Equal(t, defaultOnlyHTTP, cfg)
}

func TestUnmarshalConfigOnlyHTTPNull(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "only_http_null.yaml"))
require.NoError(t, err)
Expand Down Expand Up @@ -152,7 +140,9 @@ func TestUnmarshalConfig(t *testing.T) {
MaxAge: 7200,
},
},
PathPrefix: "otlp",
TracesUrlPath: "/traces",
MetricsUrlPath: "/v2/metrics",
LogsUrlPath: "/log/ingest",
},
},
}, cfg)
Expand All @@ -179,7 +169,9 @@ func TestUnmarshalConfigUnix(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
},
PathPrefix: defaultPathPrefix,
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
},
},
}, cfg)
Expand Down
9 changes: 7 additions & 2 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ const (

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:4318"
defaultPathPrefix = ""

defaultTracesUrlPath = "/v1/traces"
defaultMetricsUrlPath = "/v1/metrics"
defaultLogsUrlPath = "/v1/logs"
)

// NewFactory creates a new OTLP receiver factory.
Expand Down Expand Up @@ -60,7 +63,9 @@ func createDefaultConfig() component.Config {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPEndpoint,
},
PathPrefix: defaultPathPrefix,
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
},
},
}
Expand Down
12 changes: 12 additions & 0 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func TestCreateTracesReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
}

tests := []struct {
Expand Down Expand Up @@ -106,6 +109,7 @@ func TestCreateTracesReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "localhost:112233",
},
TracesUrlPath: defaultTracesUrlPath,
},
},
},
Expand Down Expand Up @@ -143,6 +147,9 @@ func TestCreateMetricReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
}

tests := []struct {
Expand Down Expand Up @@ -183,6 +190,7 @@ func TestCreateMetricReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
MetricsUrlPath: defaultMetricsUrlPath,
},
},
},
Expand Down Expand Up @@ -219,6 +227,9 @@ func TestCreateLogReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
}

tests := []struct {
Expand Down Expand Up @@ -263,6 +274,7 @@ func TestCreateLogReceiver(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
LogsUrlPath: defaultLogsUrlPath,
},
},
},
Expand Down
13 changes: 3 additions & 10 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"sync"

"go.uber.org/zap"
Expand Down Expand Up @@ -201,9 +200,7 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
r.tracesReceiver = trace.New(tc, r.obsrepGRPC)
httpTracesReceiver := trace.New(tc, r.obsrepHTTP)
if r.httpMux != nil {
u := url.URL{Path: "/"}
urlPath := u.JoinPath(url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/traces").String()
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.TracesUrlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -228,9 +225,7 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
r.metricsReceiver = metrics.New(mc, r.obsrepGRPC)
httpMetricsReceiver := metrics.New(mc, r.obsrepHTTP)
if r.httpMux != nil {
u := url.URL{Path: "/"}
urlPath := u.JoinPath(url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/metrics").String()
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.MetricsUrlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -255,9 +250,7 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
r.logsReceiver = logs.New(lc, r.obsrepGRPC)
httpLogsReceiver := logs.New(lc, r.obsrepHTTP)
if r.httpMux != nil {
u := url.URL{Path: "/"}
urlPath := u.JoinPath(url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/logs").String()
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.LogsUrlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand Down
44 changes: 30 additions & 14 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ func TestJsonHttp(t *testing.T) {
},
}
addr := testutil.GetAvailableLocalAddress(t)
pathPrefix := "json"
tracesUrlPath := "/v1/traceingest"

// Set the buffer count to 1 to make it flush the test span immediately.
sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, pathPrefix, sink, nil)
ocr := newHTTPReceiver(t, addr, tracesUrlPath, defaultMetricsUrlPath, sink, nil)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand All @@ -193,7 +193,7 @@ func TestJsonHttp(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/%s/v1/traces", addr, pathPrefix)
url := fmt.Sprintf("http://%s%s", addr, tracesUrlPath)
sink.Reset()
testHTTPJSONRequest(t, url, sink, test.encoding, test.contentType, test.err)
})
Expand All @@ -203,7 +203,16 @@ func TestJsonHttp(t *testing.T) {
func TestHandleInvalidRequests(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)
cfg := &Config{
Protocols: Protocols{HTTP: &httpServerSettings{HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: endpoint}}},
Protocols: Protocols{
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: endpoint,
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
},
},
}

// Traces
Expand Down Expand Up @@ -451,11 +460,10 @@ func TestProtoHttp(t *testing.T) {
},
}
addr := testutil.GetAvailableLocalAddress(t)
pathPrefix := "proto"

// Set the buffer count to 1 to make it flush the test span immediately.
tSink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, pathPrefix, tSink, consumertest.NewNop())
ocr := newHTTPReceiver(t, addr, defaultTracesUrlPath, defaultMetricsUrlPath, tSink, consumertest.NewNop())

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand All @@ -471,7 +479,7 @@ func TestProtoHttp(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/%s/v1/traces", addr, pathPrefix)
url := fmt.Sprintf("http://%s%s", addr, defaultTracesUrlPath)
tSink.Reset()
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td)
})
Expand Down Expand Up @@ -586,12 +594,12 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) {
// Set the buffer count to 1 to make it flush the test span immediately.
tSink := new(consumertest.TracesSink)
mSink := new(consumertest.MetricsSink)
ocr := newHTTPReceiver(t, addr, defaultPathPrefix, tSink, mSink)
ocr := newHTTPReceiver(t, addr, defaultTracesUrlPath, defaultMetricsUrlPath, tSink, mSink)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })

url := fmt.Sprintf("http://%s/v1/traces", addr)
url := fmt.Sprintf("http://%s/%s", addr, defaultTracesUrlPath)

// Wait for the servers to start
<-time.After(10 * time.Millisecond)
Expand Down Expand Up @@ -645,7 +653,7 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
assert.NoError(t, ln.Close())
})

r := newHTTPReceiver(t, addr, defaultPathPrefix, consumertest.NewNop(), consumertest.NewNop())
r := newHTTPReceiver(t, addr, defaultTracesUrlPath, defaultMetricsUrlPath, consumertest.NewNop(), consumertest.NewNop())
require.NotNil(t, r)

require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -756,7 +764,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {

sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newHTTPReceiver(t, addr, defaultPathPrefix, sink, nil)
ocr := newHTTPReceiver(t, addr, defaultTracesUrlPath, defaultMetricsUrlPath, sink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand All @@ -771,7 +779,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
pbMarshaler := ptrace.ProtoMarshaler{}
pbBytes, err := pbMarshaler.MarshalTraces(td)
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, "http://"+addr+"/v1/traces", bytes.NewReader(pbBytes))
req, err := http.NewRequest(http.MethodPost, "http://"+addr+defaultTracesUrlPath, bytes.NewReader(pbBytes))
require.NoError(t, err)
req.Header.Set("Content-Type", pbContentType)
resp, err := http.DefaultClient.Do(req)
Expand Down Expand Up @@ -878,6 +886,9 @@ func TestHTTPInvalidTLSCredentials(t *testing.T) {
},
},
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
},
},
}
Expand All @@ -904,6 +915,9 @@ func testHTTPMaxRequestBodySizeJSON(t *testing.T, payload []byte, size int, expe
Endpoint: endpoint,
MaxRequestBodySize: int64(size),
},
TracesUrlPath: defaultTracesUrlPath,
MetricsUrlPath: defaultMetricsUrlPath,
LogsUrlPath: defaultLogsUrlPath,
},
},
}
Expand Down Expand Up @@ -947,11 +961,13 @@ func newGRPCReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consu
return newReceiver(t, factory, cfg, otlpReceiverID, tc, mc)
}

func newHTTPReceiver(t *testing.T, endpoint string, pathPrefix string, tc consumer.Traces, mc consumer.Metrics) component.Component {
func newHTTPReceiver(t *testing.T, endpoint string, tracesUrlPath string, metricsUrlPath string, tc consumer.Traces, mc consumer.Metrics) component.Component {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.HTTP.Endpoint = endpoint
cfg.HTTP.PathPrefix = pathPrefix
cfg.HTTP.TracesUrlPath = tracesUrlPath
cfg.HTTP.MetricsUrlPath = metricsUrlPath
cfg.HTTP.LogsUrlPath = defaultLogsUrlPath
cfg.GRPC = nil
return newReceiver(t, factory, cfg, otlpReceiverID, tc, mc)
}
Expand Down
6 changes: 5 additions & 1 deletion receiver/otlpreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ protocols:
- https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com.
- https://test.com # Fully qualified domain name. Allows https://test.com only.
max_age: 7200
path_prefix: "/otlp"

# The following shows URL paths for endpoints where signals are listened for bt the OTLP receiver
traces_url_path: traces
metrics_url_path: /v2/metrics
logs_url_path: log/ingest
4 changes: 0 additions & 4 deletions receiver/otlpreceiver/testdata/only_http_slash_prefix.yaml

This file was deleted.

0 comments on commit 5f60165

Please sign in to comment.