Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add http testing #143

Merged
merged 13 commits into from
Jan 27, 2023
47 changes: 42 additions & 5 deletions data_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/equinix-labs/otel-cli/otelcli"
)

type serverProtocol int

const (
grpcProtocol serverProtocol = iota
httpProtocol
)

type FixtureConfig struct {
CliArgs []string
Env map[string]string
Expand All @@ -19,6 +26,8 @@ type FixtureConfig struct {
// when true this test will be excluded under go -test.short mode
// TODO: maybe move this up to the suite?
IsLongTest bool
// either grpcProtocol or httpProtocol, defaults to grpc
ServerProtocol serverProtocol
// for timeout tests we need to start the server to generate the endpoint
// but do not want it to answer when otel-cli calls, this does that
StopServerBeforeExec bool
Expand Down Expand Up @@ -77,19 +86,47 @@ var suites = []FixtureSuite{
// setting minimum envvars should result in a span being received
{
{
Name: "minimum configuration (recording)",
Name: "minimum configuration (recording, grpc)",
Config: FixtureConfig{
CliArgs: []string{"status", "--endpoint", "{{endpoint}}"},
TestTimeoutMs: 1000,
ServerProtocol: grpcProtocol,
CliArgs: []string{"status", "--endpoint", "{{endpoint}}"},
TestTimeoutMs: 1000,
},
Expect: Results{
// otel-cli should NOT set insecure when it auto-detects localhost
Config: otelcli.DefaultConfig().
WithEndpoint("{{endpoint}}").
WithInsecure(false),
SpanData: map[string]string{
"span_id": "*",
"trace_id": "*",
"span_id": "*",
"trace_id": "*",
"server_meta": "proto=grpc",
},
Diagnostics: otelcli.Diagnostics{
IsRecording: true,
NumArgs: 3,
DetectedLocalhost: true,
ParsedTimeoutMs: 1000,
OtelError: "",
},
Spans: 1,
},
}, {
Name: "minimum configuration (recording, http)",
Config: FixtureConfig{
ServerProtocol: httpProtocol,
CliArgs: []string{"status", "--endpoint", "http://{{endpoint}}"},
TestTimeoutMs: 1000,
},
Expect: Results{
// otel-cli should NOT set insecure when it auto-detects localhost
Config: otelcli.DefaultConfig().
WithEndpoint("http://{{endpoint}}").
WithInsecure(false),
SpanData: map[string]string{
"span_id": "*",
"trace_id": "*",
"server_meta": "host={{endpoint}},method=POST,proto=HTTP/1.1,uri=/v1/traces",
},
Diagnostics: otelcli.Diagnostics{
IsRecording: true,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/otel/trace v1.11.2
go.opentelemetry.io/proto/otlp v0.19.0
google.golang.org/grpc v1.52.0
google.golang.org/protobuf v1.28.1
)

require (
Expand All @@ -39,5 +40,4 @@ require (
golang.org/x/term v0.3.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
10 changes: 8 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,13 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE
return results.Spans >= fixture.Expect.Spans
}

cs := otlpserver.NewServer(cb, func(*otlpserver.Server) {})
var cs otlpserver.OtlpServer
switch fixture.Config.ServerProtocol {
case grpcProtocol:
cs = otlpserver.NewServer("grpc", cb, func(otlpserver.OtlpServer) {})
case httpProtocol:
cs = otlpserver.NewServer("http", cb, func(otlpserver.OtlpServer) {})
}
defer cs.Stop()

serverTimeout := time.Duration(fixture.Config.TestTimeoutMs) * time.Millisecond
Expand Down Expand Up @@ -349,7 +355,7 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE
// TODO: might be neat to have a mode where we start the listener and do nothing
// with it to simulate a hung server or opentelemetry-collector
go func() {
cs.ServeGPRC(listener)
cs.Serve(listener)
}()

// let things go this far to generate the endpoint port then stop the server before
Expand Down
6 changes: 3 additions & 3 deletions otelcli/server_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func init() {
}

func doServerJson(cmd *cobra.Command, args []string) {
stop := func(*otlpserver.Server) {}
cs := otlpserver.NewServer(renderJson, stop)
stop := func(otlpserver.OtlpServer) {}
cs := otlpserver.NewGrpcServer(renderJson, stop)

// stops the grpc server after timeout
timeout := parseCliTimeout()
Expand All @@ -52,7 +52,7 @@ func doServerJson(cmd *cobra.Command, args []string) {
if config.Endpoint == "" {
config.Endpoint = defaultOtlpEndpoint
}
cs.ListenAndServeGPRC(config.Endpoint)
cs.ListenAndServe(config.Endpoint)
}

// writeFile takes the spans and events and writes them out to json files in the
Expand Down
6 changes: 3 additions & 3 deletions otelcli/server_tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func doServerTui(cmd *cobra.Command, args []string) {

tuiServer.events = []otlpserver.CliEvent{}

stop := func(*otlpserver.Server) {
stop := func(otlpserver.OtlpServer) {
tuiServer.area.Stop()
}

cs := otlpserver.NewServer(renderTui, stop)
cs := otlpserver.NewGrpcServer(renderTui, stop)

// unlike the rest of otel-cli, server should default to localhost:4317
if config.Endpoint == "" {
config.Endpoint = defaultOtlpEndpoint
}
cs.ListenAndServeGPRC(config.Endpoint)
cs.ListenAndServe(config.Endpoint)
}

// renderTui takes the given span and events, appends them to the in-memory
Expand Down
59 changes: 51 additions & 8 deletions otlpserver/clievent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
colv1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

Expand All @@ -31,6 +31,9 @@ type CliEvent struct {
// the methods below will set this to true before returning
// to make it easy for consumers to tell if they got a zero value
IsPopulated bool `json:"has_been_modified"`
// somewhere for the server to put interesting facts about a span
// like what HTTP path it arrived on, what http method, etc.
ServerMeta map[string]string `json:"server_meta"`
}

// ToStringMap flattens a CliEvent into a string map for testing.
Expand Down Expand Up @@ -60,6 +63,7 @@ func (ce CliEvent) ToStringMap() map[string]string {
"attributes": mapToKVString(ce.Attributes),
"service_attributes": mapToKVString(ce.ServiceAttributes),
"is_populated": strconv.FormatBool(ce.IsPopulated),
"server_meta": mapToKVString(ce.ServerMeta),
}
}

Expand All @@ -71,7 +75,7 @@ func (cel CliEventList) Swap(i, j int) { cel[i], cel[j] = cel[j], cel[i] }
func (cel CliEventList) Less(i, j int) bool { return cel[i].Nanos < cel[j].Nanos }

// NewCliEventFromSpan converts a raw grpc span into a CliEvent.
func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss *v1.ResourceSpans) CliEvent {
func NewCliEventFromSpan(span *v1.Span, scopeSpans *v1.ScopeSpans, rss *v1.ResourceSpans, serverMeta map[string]string) CliEvent {
e := CliEvent{
TraceID: hex.EncodeToString(span.GetTraceId()),
SpanID: hex.EncodeToString(span.GetSpanId()),
Expand All @@ -85,6 +89,7 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
ServiceAttributes: make(map[string]string),
Nanos: span.GetStartTimeUnixNano(),
IsPopulated: true,
ServerMeta: make(map[string]string),
}

// copy service attributes over by string, which includes service.name
Expand All @@ -93,15 +98,15 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
}

switch span.GetKind() {
case tracepb.Span_SPAN_KIND_CLIENT:
case v1.Span_SPAN_KIND_CLIENT:
e.Kind = "client"
case tracepb.Span_SPAN_KIND_SERVER:
case v1.Span_SPAN_KIND_SERVER:
e.Kind = "server"
case tracepb.Span_SPAN_KIND_PRODUCER:
case v1.Span_SPAN_KIND_PRODUCER:
e.Kind = "producer"
case tracepb.Span_SPAN_KIND_CONSUMER:
case v1.Span_SPAN_KIND_CONSUMER:
e.Kind = "consumer"
case tracepb.Span_SPAN_KIND_INTERNAL:
case v1.Span_SPAN_KIND_INTERNAL:
e.Kind = "internal"
default:
e.Kind = "unspecified"
Expand All @@ -118,12 +123,17 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss
e.Attributes[attr.GetKey()] = val
}

// explicitly copy the map, do not share the ref
for k, v := range serverMeta {
e.ServerMeta[k] = v
}

return e
}

// NewCliEventFromSpanEvent takes a span event, span, and ils and returns an event
// with all the span event info filled in.
func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeSpans *tracepb.ScopeSpans) CliEvent {
func NewCliEventFromSpanEvent(se *v1.Span_Event, span *v1.Span, scopeSpans *v1.ScopeSpans, serverMeta map[string]string) CliEvent {
// start with the span, rewrite it for the event
e := CliEvent{
TraceID: hex.EncodeToString(span.GetTraceId()),
Expand All @@ -138,15 +148,48 @@ func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeS
Attributes: make(map[string]string), // overwrite the one from the span
Nanos: se.GetTimeUnixNano(),
IsPopulated: true,
ServerMeta: make(map[string]string),
}

for _, attr := range se.GetAttributes() {
e.Attributes[attr.GetKey()] = attr.Value.String()
}

// explicitly copy the map, do not share the ref
for k, v := range serverMeta {
e.ServerMeta[k] = v
}

return e
}

// otelToCliEvent takes an otel trace request data structure and converts
// it to CliEvents, calling the provided callback for each span in the
// request.
func otelToCliEvent(cb Callback, req *colv1.ExportTraceServiceRequest, serverMeta map[string]string) bool {
rss := req.GetResourceSpans()
for _, resource := range rss {
scopeSpans := resource.GetScopeSpans()
for _, ss := range scopeSpans {
for _, span := range ss.GetSpans() {
// convert protobuf spans to something easier for humans to consume
ces := NewCliEventFromSpan(span, ss, resource, serverMeta)
events := CliEventList{}
for _, se := range span.GetEvents() {
events = append(events, NewCliEventFromSpanEvent(se, span, ss, serverMeta))
}

done := cb(ces, events)
if done {
return true
}
}
}
}

return false
}

// mapToKVString flattens attribute string maps into "k=v,k=v" strings.
func mapToKVString(in map[string]string) string {
keys := make([]string, len(in)) // for sorting
Expand Down
92 changes: 92 additions & 0 deletions otlpserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package otlpserver

import (
"context"
"log"
"net"
"strings"
"sync"

v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"

"google.golang.org/grpc"
)

// GrpcServer is a gRPC/OTLP server handle.
type GrpcServer struct {
server *grpc.Server
callback Callback
stoponce sync.Once
stopper chan struct{}
stopdone chan struct{}
doneonce sync.Once
v1.UnimplementedTraceServiceServer
}

// NewGrpcServer takes a callback and stop function and returns a Server ready
// to run with .Serve().
func NewGrpcServer(cb Callback, stop Stopper) *GrpcServer {
s := GrpcServer{
server: grpc.NewServer(),
callback: cb,
stopper: make(chan struct{}),
stopdone: make(chan struct{}, 1),
}

v1.RegisterTraceServiceServer(s.server, &s)

// single place to stop the server, used by timeout and max-spans
go func() {
<-s.stopper
stop(&s)
s.server.GracefulStop()
}()

return &s
}

// ServeGRPC takes a listener and starts the GRPC server on that listener.
// Blocks until Stop() is called.
func (gs *GrpcServer) Serve(listener net.Listener) error {
err := gs.server.Serve(listener)
gs.stopdone <- struct{}{}
return err
}

// ListenAndServeGRPC starts a TCP listener then starts the GRPC server using
// ServeGRPC for you.
func (gs *GrpcServer) ListenAndServe(otlpEndpoint string) {
otlpEndpoint = strings.TrimPrefix(otlpEndpoint, "grpc://")
listener, err := net.Listen("tcp", otlpEndpoint)
if err != nil {
log.Fatalf("failed to listen on OTLP endpoint %q: %s", otlpEndpoint, err)
}
if err := gs.Serve(listener); err != nil {
log.Fatalf("failed to serve: %s", err)
}
}

// Stop sends a value to the server shutdown goroutine so it stops GRPC
// and calls the stop function given to newServer. Safe to call multiple times.
func (gs *GrpcServer) Stop() {
gs.stoponce.Do(func() {
gs.stopper <- struct{}{}
})
}

// StopWait stops the server and waits for it to affirm shutdown.
func (gs *GrpcServer) StopWait() {
gs.Stop()
gs.doneonce.Do(func() {
<-gs.stopdone
})
}

// Export implements the gRPC server interface for exporting messages.
func (gs *GrpcServer) Export(ctx context.Context, req *v1.ExportTraceServiceRequest) (*v1.ExportTraceServiceResponse, error) {
done := otelToCliEvent(gs.callback, req, map[string]string{"proto": "grpc"})
if done {
go gs.StopWait()
}
return &v1.ExportTraceServiceResponse{}, nil
}
Loading