Skip to content

Commit

Permalink
Implement OTLP/HTTP X-Protobuf Receiver
Browse files Browse the repository at this point in the history
This change adds application/x-protobuf support to the existing
grpc-gateway mux in the OTLP receiver.

See: https://github.com/open-telemetry/oteps/blob/master/text/0099-otlp-http.md

What currently works:
* The receiver will correctly process HTTP requests with the
  Content-Type application/x-protobuf.
* The receiver will respond with Content-Type application/x-protobuf.
* The receiver will respond with status code HTTP 200 OK on success.
What doesn't work yet:
* The receiver cannot handle gzip-encoded requests (Content-Encoding
  gzip).
* The receiver will not gzip-encode responses to requests with
  Accept-Encoding: gzip.


Updates open-telemetry#881
  • Loading branch information
kirbyquerby committed May 22, 2020
1 parent daf2fc7 commit 054b75f
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 1 deletion.
4 changes: 3 additions & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func New(
r := &Receiver{
ln: ln,
corsOrigins: []string{}, // Disable CORS by default.
gatewayMux: gatewayruntime.NewServeMux(),
gatewayMux: gatewayruntime.NewServeMux(
gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}),
),
}

for _, opt := range opts {
Expand Down
113 changes: 113 additions & 0 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
collectortrace "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
Expand Down Expand Up @@ -171,6 +172,118 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
assert.EqualValues(t, got, want)
}

func TestProtoHttp(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, sink, nil)
require.NoError(t, err, "Failed to create trace receiver: %v", err)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err)
defer ocr.Shutdown(context.Background())

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s/v1/trace", addr)

wantOtlp := []*otlptrace.ResourceSpans{
{
Resource: &otlpresource.Resource{
Attributes: []*otlpcommon.AttributeKeyValue{
{
Key: conventions.AttributeHostHostname,
StringValue: "testHost",
Type: otlpcommon.AttributeKeyValue_STRING,
},
},
},
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: []byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x3, 0x81, 0x3, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0xC},
SpanId: []byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73},
Name: "testSpan",
StartTimeUnixNano: 1544712660000000000,
EndTimeUnixNano: 1544712661000000000,
Attributes: []*otlpcommon.AttributeKeyValue{
{
Key: "attr1",
Type: otlpcommon.AttributeKeyValue_INT,
IntValue: 55,
},
},
},
},
},
},
},
}

traceProto := collectortrace.ExportTraceServiceRequest{
ResourceSpans: wantOtlp,
}
traceBytes, err := proto.Marshal(&traceProto)
if err != nil {
t.Errorf("Error marshaling protobuf: %v", err)
}

buf := bytes.NewBuffer(traceBytes)

req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response from trace grpc-gateway, %v", err)
}

err = resp.Body.Close()
if err != nil {
t.Fatalf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" {
t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf")
}

tmp := collectortrace.ExportTraceServiceResponse{}
err = proto.Unmarshal(respBytes, &tmp)
if err != nil {
t.Errorf("Unable to unmarshal response to ExportTraceServiceResponse proto: %v", err)
}

gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0])

if len(gotOtlp) != len(wantOtlp) {
t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp))
}

got := gotOtlp[0]
want := wantOtlp[0]

// assert.Equal doesn't work on protos, see:
// https://github.com/stretchr/testify/issues/758
if !proto.Equal(got, want) {
t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n",
proto.MarshalTextString(got),
proto.MarshalTextString(want))
}

}

func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)
corsOrigins := []string{"allowed-*.com"}
Expand Down
52 changes: 52 additions & 0 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlpreceiver

import (
"io"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
)

// xProtobufMarshaler is a Marshaler which wraps runtime.ProtoMarshaller
// and sets ContentType to application/x-protobuf
type xProtobufMarshaler struct {
marshaler runtime.ProtoMarshaller
}

// ContentType always returns "application/x-protobuf".
func (*xProtobufMarshaler) ContentType() string {
return "application/x-protobuf"
}

// Marshal marshals "value" into Proto
func (m *xProtobufMarshaler) Marshal(value interface{}) ([]byte, error) {
return m.marshaler.Marshal(value)
}

// Unmarshal unmarshals proto "data" into "value"
func (m *xProtobufMarshaler) Unmarshal(data []byte, value interface{}) error {
return m.marshaler.Unmarshal(data, value)
}

// NewDecoder returns a Decoder which reads proto stream from "reader".
func (m *xProtobufMarshaler) NewDecoder(reader io.Reader) runtime.Decoder {
return m.marshaler.NewDecoder(reader)
}

// NewEncoder returns an Encoder which writes proto stream into "writer".
func (m *xProtobufMarshaler) NewEncoder(writer io.Writer) runtime.Encoder {
return m.marshaler.NewEncoder(writer)
}

0 comments on commit 054b75f

Please sign in to comment.