From 3d8ac619066e7e92f6e76c38904d2be66a2f4d16 Mon Sep 17 00:00:00 2001 From: kirbyquerby Date: Tue, 26 May 2020 16:53:38 -0500 Subject: [PATCH] Implement OTLP/HTTP X-Protobuf Receiver (#1021) This change adds application/x-protobuf support to the existing grpc-gateway mux in the OTLP receiver. See: https://github.com/open-telemetry/oteps/blob/master/text/0099-otlp-http.md What currently works: * The receiver will correctly process HTTP requests with the Content-Type application/x-protobuf. * The receiver will respond with Content-Type application/x-protobuf. * The receiver will respond with status code HTTP 200 OK on success. What doesn't work yet: * The receiver cannot handle gzip-encoded requests (Content-Encoding gzip). * The receiver will not gzip-encode responses to requests with Accept-Encoding: gzip. Updates #881 --- receiver/otlpreceiver/otlp.go | 4 +- receiver/otlpreceiver/otlp_test.go | 82 ++++++++++++++++++++++++++++++ receiver/otlpreceiver/otlphttp.go | 30 +++++++++++ 3 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 receiver/otlpreceiver/otlphttp.go diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index cd03bd169f5..709199d053c 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -80,7 +80,9 @@ func New( r := &Receiver{ ln: ln, corsOrigins: []string{}, // Disable CORS by default. - gatewayMux: gatewayruntime.NewServeMux(), + gatewayMux: gatewayruntime.NewServeMux( + gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), + ), } for _, opt := range opts { diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 758a76606b6..84c410a1550 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" collectortrace "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1" otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1" @@ -42,6 +43,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/observability/observabilitytest" "go.opentelemetry.io/collector/testutils" "go.opentelemetry.io/collector/translator/conventions" @@ -171,6 +173,86 @@ func TestGrpcGateway_endToEnd(t *testing.T) { assert.EqualValues(t, got, want) } +func TestProtoHttp(t *testing.T) { + addr := testutils.GetAvailableLocalAddress(t) + + // Set the buffer count to 1 to make it flush the test span immediately. + sink := new(exportertest.SinkTraceExporter) + ocr, err := New(otlpReceiver, "tcp", addr, sink, nil) + require.NoError(t, err, "Failed to create trace receiver: %v", err) + + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) + defer ocr.Shutdown(context.Background()) + + // TODO(nilebox): make starting server deterministic + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + url := fmt.Sprintf("http://%s/v1/trace", addr) + + wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()) + + traceProto := collectortrace.ExportTraceServiceRequest{ + ResourceSpans: wantOtlp, + } + traceBytes, err := proto.Marshal(&traceProto) + if err != nil { + t.Errorf("Error marshaling protobuf: %v", err) + } + + buf := bytes.NewBuffer(traceBytes) + + req, err := http.NewRequest("POST", url, buf) + require.NoError(t, err, "Error creating trace POST request: %v", err) + req.Header.Set("Content-Type", "application/x-protobuf") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err) + + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error reading response from trace grpc-gateway, %v", err) + } + + err = resp.Body.Close() + if err != nil { + t.Fatalf("Error closing response body, %v", err) + } + + if resp.StatusCode != 200 { + t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode) + } + + if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" { + t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf") + } + + tmp := collectortrace.ExportTraceServiceResponse{} + err = proto.Unmarshal(respBytes, &tmp) + if err != nil { + t.Errorf("Unable to unmarshal response to ExportTraceServiceResponse proto: %v", err) + } + + gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0]) + + if len(gotOtlp) != len(wantOtlp) { + t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp)) + } + + got := gotOtlp[0] + want := wantOtlp[0] + + // assert.Equal doesn't work on protos, see: + // https://github.com/stretchr/testify/issues/758 + if !proto.Equal(got, want) { + t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n", + proto.MarshalTextString(got), + proto.MarshalTextString(want)) + } + +} + func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { addr := testutils.GetAvailableLocalAddress(t) corsOrigins := []string{"allowed-*.com"} diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go new file mode 100644 index 00000000000..466e3d8dfa5 --- /dev/null +++ b/receiver/otlpreceiver/otlphttp.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpreceiver + +import ( + "github.com/grpc-ecosystem/grpc-gateway/runtime" +) + +// xProtobufMarshaler is a Marshaler which wraps runtime.ProtoMarshaller +// and sets ContentType to application/x-protobuf +type xProtobufMarshaler struct { + *runtime.ProtoMarshaller +} + +// ContentType always returns "application/x-protobuf". +func (*xProtobufMarshaler) ContentType() string { + return "application/x-protobuf" +}