Skip to content

Commit

Permalink
add http testing (#143)
Browse files Browse the repository at this point in the history
Refactors otlpserver to an interface and adds a simple HTTP server that can receive and translate traces.

This is step 1 to do #142 correctly, as I wish to test the heck out of endpoint and protocol combinations in the next couple PRs.

* rename NewServer to NewGrpcServer

* rename gRPC server functions & variables

* rename server.go -> grpcserver.go

* fix double import of trace proto

* refactor grpc server into an interface and implementation

Broke the surface otel-cli and main_test care about into an interface.

Ported grpcserver and referents to the interface.

Added skeleton http server.

Tests pass :)

* refactor code further, adding protocol to fixtures

Added a simple http handler for testing. Not ready yet.

Added plumbing so http protocol can be requested, but it's not used yet.

Correct some types.

* create a failing http test

* get http traces working

Broke the code that converts protobuf data to CliEvents out into
its own function and moved it to clievent.go.

Converted the gRPC server to use that.

Added it to the HTTP server and it just worked the first time.

Since proto is now imported directly, it go.mod changed to reflect that.

* add a way to test http path, proto, etc.

Added ServerMeta to the CliEvent struct as a string map that
httpserver now plugs a bunch of metadata into.

Added that to the test harness.

Added metadata to the test so now path & proto are verified.

* add proto field go grpc server_meta

* fix test

* fix comment

* add godoc to otelToCliEvent()
  • Loading branch information
tobert authored Jan 27, 2023
1 parent b51d6fc commit b164427
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 125 deletions.
47 changes: 42 additions & 5 deletions data_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/equinix-labs/otel-cli/otelcli"
)

type serverProtocol int

const (
grpcProtocol serverProtocol = iota
httpProtocol
)

type FixtureConfig struct {
CliArgs []string
Env map[string]string
Expand All @@ -19,6 +26,8 @@ type FixtureConfig struct {
// when true this test will be excluded under go -test.short mode
// TODO: maybe move this up to the suite?
IsLongTest bool
// either grpcProtocol or httpProtocol, defaults to grpc
ServerProtocol serverProtocol
// for timeout tests we need to start the server to generate the endpoint
// but do not want it to answer when otel-cli calls, this does that
StopServerBeforeExec bool
Expand Down Expand Up @@ -77,19 +86,47 @@ var suites = []FixtureSuite{
// setting minimum envvars should result in a span being received
{
{
Name: "minimum configuration (recording)",
Name: "minimum configuration (recording, grpc)",
Config: FixtureConfig{
CliArgs: []string{"status", "--endpoint", "{{endpoint}}"},
TestTimeoutMs: 1000,
ServerProtocol: grpcProtocol,
CliArgs: []string{"status", "--endpoint", "{{endpoint}}"},
TestTimeoutMs: 1000,
},
Expect: Results{
// otel-cli should NOT set insecure when it auto-detects localhost
Config: otelcli.DefaultConfig().
WithEndpoint("{{endpoint}}").
WithInsecure(false),
SpanData: map[string]string{
"span_id": "*",
"trace_id": "*",
"span_id": "*",
"trace_id": "*",
"server_meta": "proto=grpc",
},
Diagnostics: otelcli.Diagnostics{
IsRecording: true,
NumArgs: 3,
DetectedLocalhost: true,
ParsedTimeoutMs: 1000,
OtelError: "",
},
Spans: 1,
},
}, {
Name: "minimum configuration (recording, http)",
Config: FixtureConfig{
ServerProtocol: httpProtocol,
CliArgs: []string{"status", "--endpoint", "http://{{endpoint}}"},
TestTimeoutMs: 1000,
},
Expect: Results{
// otel-cli should NOT set insecure when it auto-detects localhost
Config: otelcli.DefaultConfig().
WithEndpoint("http://{{endpoint}}").
WithInsecure(false),
SpanData: map[string]string{
"span_id": "*",
"trace_id": "*",
"server_meta": "host={{endpoint}},method=POST,proto=HTTP/1.1,uri=/v1/traces",
},
Diagnostics: otelcli.Diagnostics{
IsRecording: true,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/otel/trace v1.11.2
go.opentelemetry.io/proto/otlp v0.19.0
google.golang.org/grpc v1.52.0
google.golang.org/protobuf v1.28.1
)

require (
Expand All @@ -39,5 +40,4 @@ require (
golang.org/x/term v0.3.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
10 changes: 8 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,13 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE
return results.Spans >= fixture.Expect.Spans
}

cs := otlpserver.NewServer(cb, func(*otlpserver.Server) {})
var cs otlpserver.OtlpServer
switch fixture.Config.ServerProtocol {
case grpcProtocol:
cs = otlpserver.NewServer("grpc", cb, func(otlpserver.OtlpServer) {})
case httpProtocol:
cs = otlpserver.NewServer("http", cb, func(otlpserver.OtlpServer) {})
}
defer cs.Stop()

serverTimeout := time.Duration(fixture.Config.TestTimeoutMs) * time.Millisecond
Expand Down Expand Up @@ -349,7 +355,7 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE
// TODO: might be neat to have a mode where we start the listener and do nothing
// with it to simulate a hung server or opentelemetry-collector
go func() {
cs.ServeGPRC(listener)
cs.Serve(listener)
}()

// let things go this far to generate the endpoint port then stop the server before
Expand Down
6 changes: 3 additions & 3 deletions otelcli/server_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func init() {
}

func doServerJson(cmd *cobra.Command, args []string) {
stop := func(*otlpserver.Server) {}
cs := otlpserver.NewServer(renderJson, stop)
stop := func(otlpserver.OtlpServer) {}
cs := otlpserver.NewGrpcServer(renderJson, stop)

// stops the grpc server after timeout
timeout := parseCliTimeout()
Expand All @@ -52,7 +52,7 @@ func doServerJson(cmd *cobra.Command, args []string) {
if config.Endpoint == "" {
config.Endpoint = defaultOtlpEndpoint
}
cs.ListenAndServeGPRC(config.Endpoint)
cs.ListenAndServe(config.Endpoint)
}

// writeFile takes the spans and events and writes them out to json files in the
Expand Down
6 changes: 3 additions & 3 deletions otelcli/server_tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func doServerTui(cmd *cobra.Command, args []string) {

tuiServer.events = []otlpserver.CliEvent{}

stop := func(*otlpserver.Server) {
stop := func(otlpserver.OtlpServer) {
tuiServer.area.Stop()
}

cs := otlpserver.NewServer(renderTui, stop)
cs := otlpserver.NewGrpcServer(renderTui, stop)

// unlike the rest of otel-cli, server should default to localhost:4317
if config.Endpoint == "" {
config.Endpoint = defaultOtlpEndpoint
}
cs.ListenAndServeGPRC(config.Endpoint)
cs.ListenAndServe(config.Endpoint)
}

// renderTui takes the given span and events, appends them to the in-memory
Expand Down
59 changes: 51 additions & 8 deletions otlpserver/clievent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
colv1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

Expand All @@ -31,6 +31,9 @@ type CliEvent struct {
// the methods below will set this to true before returning
// to make it easy for consumers to tell if they got a zero value
IsPopulated bool `json:"has_been_modified"`
// somewhere for the server to put interesting facts about a span
// like what HTTP path it arrived on, what http method, etc.
ServerMeta map[string]string `json:"server_meta"`
}

// ToStringMap flattens a CliEvent into a string map for testing.
Expand Down Expand Up @@ -60,6 +63,7 @@ func (ce CliEvent) ToStringMap() map[string]string {
"attributes": mapToKVString(ce.Attributes),
"service_attributes": mapToKVString(ce.ServiceAttributes),
"is_populated": strconv.FormatBool(ce.IsPopulated),
"server_meta": mapToKVString(ce.ServerMeta),
}
}

Expand All @@ -71,7 +75,7 @@ func (cel CliEventList) Swap(i, j int) { cel[i], cel[j] = cel[j], cel[i] }
func (cel CliEventList) Less(i, j int) bool { return cel[i].Nanos < cel[j].Nanos }

// NewCliEventFromSpan converts a raw grpc span into a CliEvent.
func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss *v1.ResourceSpans) CliEvent {
func NewCliEventFromSpan(span *v1.Span, scopeSpans *v1.ScopeSpans, rss *v1.ResourceSpans, serverMeta map[string]string) CliEvent {
e := CliEvent{
TraceID: hex.EncodeToString(span.GetTraceId()),
SpanID: hex.EncodeToString(span.GetSpanId()),
Expand All @@ -85,6 +89,7 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
ServiceAttributes: make(map[string]string),
Nanos: span.GetStartTimeUnixNano(),
IsPopulated: true,
ServerMeta: make(map[string]string),
}

// copy service attributes over by string, which includes service.name
Expand All @@ -93,15 +98,15 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
}

switch span.GetKind() {
case tracepb.Span_SPAN_KIND_CLIENT:
case v1.Span_SPAN_KIND_CLIENT:
e.Kind = "client"
case tracepb.Span_SPAN_KIND_SERVER:
case v1.Span_SPAN_KIND_SERVER:
e.Kind = "server"
case tracepb.Span_SPAN_KIND_PRODUCER:
case v1.Span_SPAN_KIND_PRODUCER:
e.Kind = "producer"
case tracepb.Span_SPAN_KIND_CONSUMER:
case v1.Span_SPAN_KIND_CONSUMER:
e.Kind = "consumer"
case tracepb.Span_SPAN_KIND_INTERNAL:
case v1.Span_SPAN_KIND_INTERNAL:
e.Kind = "internal"
default:
e.Kind = "unspecified"
Expand All @@ -118,12 +123,17 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
e.Attributes[attr.GetKey()] = val
}

// explicitly copy the map, do not share the ref
for k, v := range serverMeta {
e.ServerMeta[k] = v
}

return e
}

// NewCliEventFromSpanEvent takes a span event, span, and ils and returns an event
// with all the span event info filled in.
func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeSpans *tracepb.ScopeSpans) CliEvent {
func NewCliEventFromSpanEvent(se *v1.Span_Event, span *v1.Span, scopeSpans *v1.ScopeSpans, serverMeta map[string]string) CliEvent {
// start with the span, rewrite it for the event
e := CliEvent{
TraceID: hex.EncodeToString(span.GetTraceId()),
Expand All @@ -138,15 +148,48 @@ func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeS
Attributes: make(map[string]string), // overwrite the one from the span
Nanos: se.GetTimeUnixNano(),
IsPopulated: true,
ServerMeta: make(map[string]string),
}

for _, attr := range se.GetAttributes() {
e.Attributes[attr.GetKey()] = attr.Value.String()
}

// explicitly copy the map, do not share the ref
for k, v := range serverMeta {
e.ServerMeta[k] = v
}

return e
}

// otelToCliEvent takes an otel trace request data structure and converts
// it to CliEvents, calling the provided callback for each span in the
// request.
func otelToCliEvent(cb Callback, req *colv1.ExportTraceServiceRequest, serverMeta map[string]string) bool {
rss := req.GetResourceSpans()
for _, resource := range rss {
scopeSpans := resource.GetScopeSpans()
for _, ss := range scopeSpans {
for _, span := range ss.GetSpans() {
// convert protobuf spans to something easier for humans to consume
ces := NewCliEventFromSpan(span, ss, resource, serverMeta)
events := CliEventList{}
for _, se := range span.GetEvents() {
events = append(events, NewCliEventFromSpanEvent(se, span, ss, serverMeta))
}

done := cb(ces, events)
if done {
return true
}
}
}
}

return false
}

// mapToKVString flattens attribute string maps into "k=v,k=v" strings.
func mapToKVString(in map[string]string) string {
keys := make([]string, len(in)) // for sorting
Expand Down
92 changes: 92 additions & 0 deletions otlpserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package otlpserver

import (
"context"
"log"
"net"
"strings"
"sync"

v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"

"google.golang.org/grpc"
)

// GrpcServer is a gRPC/OTLP server handle.
type GrpcServer struct {
server *grpc.Server
callback Callback
stoponce sync.Once
stopper chan struct{}
stopdone chan struct{}
doneonce sync.Once
v1.UnimplementedTraceServiceServer
}

// NewGrpcServer takes a callback and stop function and returns a Server ready
// to run with .Serve().
func NewGrpcServer(cb Callback, stop Stopper) *GrpcServer {
s := GrpcServer{
server: grpc.NewServer(),
callback: cb,
stopper: make(chan struct{}),
stopdone: make(chan struct{}, 1),
}

v1.RegisterTraceServiceServer(s.server, &s)

// single place to stop the server, used by timeout and max-spans
go func() {
<-s.stopper
stop(&s)
s.server.GracefulStop()
}()

return &s
}

// ServeGRPC takes a listener and starts the GRPC server on that listener.
// Blocks until Stop() is called.
func (gs *GrpcServer) Serve(listener net.Listener) error {
err := gs.server.Serve(listener)
gs.stopdone <- struct{}{}
return err
}

// ListenAndServeGRPC starts a TCP listener then starts the GRPC server using
// ServeGRPC for you.
func (gs *GrpcServer) ListenAndServe(otlpEndpoint string) {
otlpEndpoint = strings.TrimPrefix(otlpEndpoint, "grpc://")
listener, err := net.Listen("tcp", otlpEndpoint)
if err != nil {
log.Fatalf("failed to listen on OTLP endpoint %q: %s", otlpEndpoint, err)
}
if err := gs.Serve(listener); err != nil {
log.Fatalf("failed to serve: %s", err)
}
}

// Stop sends a value to the server shutdown goroutine so it stops GRPC
// and calls the stop function given to newServer. Safe to call multiple times.
func (gs *GrpcServer) Stop() {
gs.stoponce.Do(func() {
gs.stopper <- struct{}{}
})
}

// StopWait stops the server and waits for it to affirm shutdown.
func (gs *GrpcServer) StopWait() {
gs.Stop()
gs.doneonce.Do(func() {
<-gs.stopdone
})
}

// Export implements the gRPC server interface for exporting messages.
func (gs *GrpcServer) Export(ctx context.Context, req *v1.ExportTraceServiceRequest) (*v1.ExportTraceServiceResponse, error) {
done := otelToCliEvent(gs.callback, req, map[string]string{"proto": "grpc"})
if done {
go gs.StopWait()
}
return &v1.ExportTraceServiceResponse{}, nil
}
Loading

0 comments on commit b164427

Please sign in to comment.