Skip to content

Commit

Permalink
Implement OTLP/HTTP X-Protobuf Receiver (open-telemetry#1021)
Browse files Browse the repository at this point in the history
This change adds application/x-protobuf support to the existing
grpc-gateway mux in the OTLP receiver.

See: https://github.com/open-telemetry/oteps/blob/master/text/0099-otlp-http.md

What currently works:
* The receiver will correctly process HTTP requests with the
  Content-Type application/x-protobuf.
* The receiver will respond with Content-Type application/x-protobuf.
* The receiver will respond with status code HTTP 200 OK on success.

What doesn't work yet:
* The receiver cannot handle gzip-encoded requests (Content-Encoding
  gzip).
* The receiver will not gzip-encode responses to requests with
  Accept-Encoding: gzip.

Updates open-telemetry#881
  • Loading branch information
kirbyquerby authored and wyTrivail committed Jul 13, 2020
1 parent 30ed73a commit 3d8ac61
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
4 changes: 3 additions & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func New(
r := &Receiver{
ln: ln,
corsOrigins: []string{}, // Disable CORS by default.
gatewayMux: gatewayruntime.NewServeMux(),
gatewayMux: gatewayruntime.NewServeMux(
gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}),
),
}

for _, opt := range opts {
Expand Down
82 changes: 82 additions & 0 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
collectortrace "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
Expand All @@ -42,6 +43,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/data/testdata"
"go.opentelemetry.io/collector/observability/observabilitytest"
"go.opentelemetry.io/collector/testutils"
"go.opentelemetry.io/collector/translator/conventions"
Expand Down Expand Up @@ -171,6 +173,86 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
assert.EqualValues(t, got, want)
}

func TestProtoHttp(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, sink, nil)
require.NoError(t, err, "Failed to create trace receiver: %v", err)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err)
defer ocr.Shutdown(context.Background())

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s/v1/trace", addr)

wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan())

traceProto := collectortrace.ExportTraceServiceRequest{
ResourceSpans: wantOtlp,
}
traceBytes, err := proto.Marshal(&traceProto)
if err != nil {
t.Errorf("Error marshaling protobuf: %v", err)
}

buf := bytes.NewBuffer(traceBytes)

req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response from trace grpc-gateway, %v", err)
}

err = resp.Body.Close()
if err != nil {
t.Fatalf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" {
t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf")
}

tmp := collectortrace.ExportTraceServiceResponse{}
err = proto.Unmarshal(respBytes, &tmp)
if err != nil {
t.Errorf("Unable to unmarshal response to ExportTraceServiceResponse proto: %v", err)
}

gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0])

if len(gotOtlp) != len(wantOtlp) {
t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp))
}

got := gotOtlp[0]
want := wantOtlp[0]

// assert.Equal doesn't work on protos, see:
// https://github.com/stretchr/testify/issues/758
if !proto.Equal(got, want) {
t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n",
proto.MarshalTextString(got),
proto.MarshalTextString(want))
}

}

func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)
corsOrigins := []string{"allowed-*.com"}
Expand Down
30 changes: 30 additions & 0 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlpreceiver

import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
)

// xProtobufMarshaler is a Marshaler which wraps runtime.ProtoMarshaller
// and sets ContentType to application/x-protobuf
type xProtobufMarshaler struct {
*runtime.ProtoMarshaller
}

// ContentType always returns "application/x-protobuf".
func (*xProtobufMarshaler) ContentType() string {
return "application/x-protobuf"
}

0 comments on commit 3d8ac61

Please sign in to comment.