Skip to content

Commit

Permalink
Refactor OTLP receiver tests
Browse files Browse the repository at this point in the history
This change decreases the amount of duplicate code in the OTLP receiver tests.

Context: open-telemetry#982 (comment)
  • Loading branch information
kirbyquerby committed May 23, 2020
1 parent 6fb8ae4 commit ba31901
Showing 1 changed file with 58 additions and 110 deletions.
168 changes: 58 additions & 110 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,9 @@ const otlpReceiver = "otlp_receiver_test"

func TestGrpcGateway_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, sink, nil)
require.NoError(t, err, "Failed to create trace receiver: %v", err)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err)
sink, ocr := setupTraceTest(t, "tcp", addr)
defer ocr.Shutdown(context.Background())

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s/v1/trace", addr)

// Verify that CORS is not enabled by default, but that it gives an 405
Expand Down Expand Up @@ -107,29 +97,10 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
}
]
}`)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(traceJSON))
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading response from trace grpc-gateway, %v", err)
}
respBytes := makeTraceHTTPRequest(t, url, "application/json", "application/json", 200, traceJSON)
respStr := string(respBytes)

err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

if respStr != "{}" {
t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr)
}
Expand Down Expand Up @@ -175,65 +146,32 @@ func TestGrpcGateway_endToEnd(t *testing.T) {

func TestProtoHttp(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, sink, nil)
require.NoError(t, err, "Failed to create trace receiver: %v", err)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err)
sink, ocr := setupTraceTest(t, "tcp", addr)
defer ocr.Shutdown(context.Background())

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s/v1/trace", addr)

wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan())

traceProto := collectortrace.ExportTraceServiceRequest{
traceBytes, err := proto.Marshal(&collectortrace.ExportTraceServiceRequest{
ResourceSpans: wantOtlp,
}
traceBytes, err := proto.Marshal(&traceProto)
if err != nil {
t.Errorf("Error marshaling protobuf: %v", err)
}

buf := bytes.NewBuffer(traceBytes)

req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response from trace grpc-gateway, %v", err)
}
})

err = resp.Body.Close()
if err != nil {
t.Fatalf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
t.Fatalf("Error marshaling protobuf: %v", err)
}

if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" {
t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf")
}
respBytes := makeTraceHTTPRequest(t, url, "application/x-protobuf", "application/x-protobuf", 200, traceBytes)

tmp := collectortrace.ExportTraceServiceResponse{}
err = proto.Unmarshal(respBytes, &tmp)
traceRes := collectortrace.ExportTraceServiceResponse{}
err = proto.Unmarshal(respBytes, &traceRes)
if err != nil {
t.Errorf("Unable to unmarshal response to ExportTraceServiceResponse proto: %v", err)
}

if len(sink.AllTraces()) != 1 {
t.Fatalf("len(sink.AllTraces():\nGot: %d\nWant: %d\n", len(sink.AllTraces()), 1)
}

gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0])

if len(gotOtlp) != len(wantOtlp) {
Expand All @@ -253,20 +191,55 @@ func TestProtoHttp(t *testing.T) {

}

func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)
corsOrigins := []string{"allowed-*.com"}
func setupTraceTest(t *testing.T, transport, addr string, opts ...Option) (*exportertest.SinkTraceExporter, *Receiver) {

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, sink, nil, WithCorsOrigins(corsOrigins))
require.NoError(t, err, "Failed to create trace receiver: %v", err)
defer ocr.Shutdown(context.Background())
ocr, err := New(otlpReceiver, transport, addr, sink, nil, opts...)

require.NoError(t, err, "Failed to create trace receiver: %v", err)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err)

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)
return sink, ocr
}

func makeTraceHTTPRequest(t *testing.T, url, sendContentType, wantContentType string, wantStatus int, traceBytes []byte) []byte {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(traceBytes))
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", sendContentType)

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response from trace grpc-gateway, %v", err)
}
err = resp.Body.Close()
if err != nil {
t.Fatalf("Error closing response body, %v", err)
}

if resp.StatusCode != wantStatus {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

if resType := resp.Header.Get("Content-Type"); resType != wantContentType {
t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf")
}

return respBytes
}

func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)
corsOrigins := []string{"allowed-*.com"}
_, ocr := setupTraceTest(t, "tcp", addr, WithCorsOrigins(corsOrigins))
defer ocr.Shutdown(context.Background())

url := fmt.Sprintf("http://%s/v1/trace", addr)

Expand All @@ -280,18 +253,9 @@ func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) {
func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) {
addr := testutils.GetAvailableLocalAddress(t)
corsOrigins := []string{"allowed-*.com"}

sink := new(exportertest.SinkMetricsExporter)
ocr, err := New(otlpReceiver, "tcp", addr, nil, sink, WithCorsOrigins(corsOrigins))
require.NoError(t, err, "Failed to create metrics receiver: %v", err)
_, ocr := setupTraceTest(t, "tcp", addr, WithCorsOrigins(corsOrigins))
defer ocr.Shutdown(context.Background())

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start metrics receiver: %v", err)

// TODO(nilebox): make starting server deterministic
// Wait for the servers to start
<-time.After(10 * time.Millisecond)

url := fmt.Sprintf("http://%s/v1/metrics", addr)

// Verify allowed domain gets responses that allow CORS.
Expand All @@ -306,13 +270,8 @@ func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) {
// redirect them to the web-grpc-gateway endpoint.
func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) {
t.Skip("Currently a flaky test as we need a way to flush all written traces")

addr := testutils.GetAvailableLocalAddress(t)
cbts := new(exportertest.SinkTraceExporter)
ocr, err := New(otlpReceiver, "tcp", addr, cbts, nil)
require.NoError(t, err, "Failed to create trace receiver: %v", err)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start the trace receiver: %v", err)
cbts, ocr := setupTraceTest(t, "tcp", addr)
defer ocr.Shutdown(context.Background())

// Now start the client with the various Proto affiliated gRPC Content-SubTypes as per:
Expand Down Expand Up @@ -485,11 +444,7 @@ func tempSocketName(t *testing.T) string {

func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) {
socketName := tempSocketName(t)
cbts := new(exportertest.SinkTraceExporter)
r, err := New(otlpReceiver, "unix", socketName, cbts, nil)
require.NoError(t, err)
require.NotNil(t, r)
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
_, r := setupTraceTest(t, "unix", socketName)
defer r.Shutdown(context.Background())

// Wait for the servers to start
Expand Down Expand Up @@ -631,15 +586,8 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
doneFn := observabilitytest.SetupRecordedMetricsTest()
defer doneFn()

sink := new(exportertest.SinkTraceExporter)

var opts []Option
ocr, err := New(otlpReceiver, "tcp", addr, nil, nil, opts...)
require.Nil(t, err)
require.NotNil(t, ocr)

ocr.traceConsumer = sink
require.Nil(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
sink, ocr := setupTraceTest(t, "tcp", addr, opts...)
defer ocr.Shutdown(context.Background())

cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
Expand Down

0 comments on commit ba31901

Please sign in to comment.