diff --git a/data_for_test.go b/data_for_test.go index 0fb14f6..c31af44 100644 --- a/data_for_test.go +++ b/data_for_test.go @@ -11,6 +11,13 @@ import ( "github.com/equinix-labs/otel-cli/otelcli" ) +type serverProtocol int + +const ( + grpcProtocol serverProtocol = iota + httpProtocol +) + type FixtureConfig struct { CliArgs []string Env map[string]string @@ -19,6 +26,8 @@ type FixtureConfig struct { // when true this test will be excluded under go -test.short mode // TODO: maybe move this up to the suite? IsLongTest bool + // either grpcProtocol or httpProtocol, defaults to grpc + ServerProtocol serverProtocol // for timeout tests we need to start the server to generate the endpoint // but do not want it to answer when otel-cli calls, this does that StopServerBeforeExec bool @@ -77,10 +86,11 @@ var suites = []FixtureSuite{ // setting minimum envvars should result in a span being received { { - Name: "minimum configuration (recording)", + Name: "minimum configuration (recording, grpc)", Config: FixtureConfig{ - CliArgs: []string{"status", "--endpoint", "{{endpoint}}"}, - TestTimeoutMs: 1000, + ServerProtocol: grpcProtocol, + CliArgs: []string{"status", "--endpoint", "{{endpoint}}"}, + TestTimeoutMs: 1000, }, Expect: Results{ // otel-cli should NOT set insecure when it auto-detects localhost @@ -88,8 +98,35 @@ var suites = []FixtureSuite{ WithEndpoint("{{endpoint}}"). WithInsecure(false), SpanData: map[string]string{ - "span_id": "*", - "trace_id": "*", + "span_id": "*", + "trace_id": "*", + "server_meta": "proto=grpc", + }, + Diagnostics: otelcli.Diagnostics{ + IsRecording: true, + NumArgs: 3, + DetectedLocalhost: true, + ParsedTimeoutMs: 1000, + OtelError: "", + }, + Spans: 1, + }, + }, { + Name: "minimum configuration (recording, http)", + Config: FixtureConfig{ + ServerProtocol: httpProtocol, + CliArgs: []string{"status", "--endpoint", "http://{{endpoint}}"}, + TestTimeoutMs: 1000, + }, + Expect: Results{ + // otel-cli should NOT set insecure when it auto-detects localhost + Config: otelcli.DefaultConfig(). + WithEndpoint("http://{{endpoint}}"). + WithInsecure(false), + SpanData: map[string]string{ + "span_id": "*", + "trace_id": "*", + "server_meta": "host={{endpoint}},method=POST,proto=HTTP/1.1,uri=/v1/traces", }, Diagnostics: otelcli.Diagnostics{ IsRecording: true, diff --git a/go.mod b/go.mod index cf7f526..3ba59d2 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/otel/trace v1.11.2 go.opentelemetry.io/proto/otlp v0.19.0 google.golang.org/grpc v1.52.0 + google.golang.org/protobuf v1.28.1 ) require ( @@ -39,5 +40,4 @@ require ( golang.org/x/term v0.3.0 // indirect golang.org/x/text v0.6.0 // indirect google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect - google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/main_test.go b/main_test.go index 4f451bd..518df8e 100644 --- a/main_test.go +++ b/main_test.go @@ -316,7 +316,13 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE return results.Spans >= fixture.Expect.Spans } - cs := otlpserver.NewServer(cb, func(*otlpserver.Server) {}) + var cs otlpserver.OtlpServer + switch fixture.Config.ServerProtocol { + case grpcProtocol: + cs = otlpserver.NewServer("grpc", cb, func(otlpserver.OtlpServer) {}) + case httpProtocol: + cs = otlpserver.NewServer("http", cb, func(otlpserver.OtlpServer) {}) + } defer cs.Stop() serverTimeout := time.Duration(fixture.Config.TestTimeoutMs) * time.Millisecond @@ -349,7 +355,7 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results, otlpserver.CliE // TODO: might be neat to have a mode where we start the listener and do nothing // with it to simulate a hung server or opentelemetry-collector go func() { - cs.ServeGPRC(listener) + cs.Serve(listener) }() // let things go this far to generate the endpoint port then stop the server before diff --git a/otelcli/server_json.go b/otelcli/server_json.go index e2793c4..30bea23 100644 --- a/otelcli/server_json.go +++ b/otelcli/server_json.go @@ -36,8 +36,8 @@ func init() { } func doServerJson(cmd *cobra.Command, args []string) { - stop := func(*otlpserver.Server) {} - cs := otlpserver.NewServer(renderJson, stop) + stop := func(otlpserver.OtlpServer) {} + cs := otlpserver.NewGrpcServer(renderJson, stop) // stops the grpc server after timeout timeout := parseCliTimeout() @@ -52,7 +52,7 @@ func doServerJson(cmd *cobra.Command, args []string) { if config.Endpoint == "" { config.Endpoint = defaultOtlpEndpoint } - cs.ListenAndServeGPRC(config.Endpoint) + cs.ListenAndServe(config.Endpoint) } // writeFile takes the spans and events and writes them out to json files in the diff --git a/otelcli/server_tui.go b/otelcli/server_tui.go index 17bc35c..7b8d22d 100644 --- a/otelcli/server_tui.go +++ b/otelcli/server_tui.go @@ -40,17 +40,17 @@ func doServerTui(cmd *cobra.Command, args []string) { tuiServer.events = []otlpserver.CliEvent{} - stop := func(*otlpserver.Server) { + stop := func(otlpserver.OtlpServer) { tuiServer.area.Stop() } - cs := otlpserver.NewServer(renderTui, stop) + cs := otlpserver.NewGrpcServer(renderTui, stop) // unlike the rest of otel-cli, server should default to localhost:4317 if config.Endpoint == "" { config.Endpoint = defaultOtlpEndpoint } - cs.ListenAndServeGPRC(config.Endpoint) + cs.ListenAndServe(config.Endpoint) } // renderTui takes the given span and events, appends them to the in-memory diff --git a/otlpserver/clievent.go b/otlpserver/clievent.go index 1c9dadf..25ff1b0 100644 --- a/otlpserver/clievent.go +++ b/otlpserver/clievent.go @@ -7,7 +7,7 @@ import ( "strings" "time" - tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + colv1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" v1 "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -31,6 +31,9 @@ type CliEvent struct { // the methods below will set this to true before returning // to make it easy for consumers to tell if they got a zero value IsPopulated bool `json:"has_been_modified"` + // somewhere for the server to put interesting facts about a span + // like what HTTP path it arrived on, what http method, etc. + ServerMeta map[string]string `json:"server_meta"` } // ToStringMap flattens a CliEvent into a string map for testing. @@ -60,6 +63,7 @@ func (ce CliEvent) ToStringMap() map[string]string { "attributes": mapToKVString(ce.Attributes), "service_attributes": mapToKVString(ce.ServiceAttributes), "is_populated": strconv.FormatBool(ce.IsPopulated), + "server_meta": mapToKVString(ce.ServerMeta), } } @@ -71,7 +75,7 @@ func (cel CliEventList) Swap(i, j int) { cel[i], cel[j] = cel[j], cel[i] } func (cel CliEventList) Less(i, j int) bool { return cel[i].Nanos < cel[j].Nanos } // NewCliEventFromSpan converts a raw grpc span into a CliEvent. -func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss *v1.ResourceSpans) CliEvent { +func NewCliEventFromSpan(span *v1.Span, scopeSpans *v1.ScopeSpans, rss *v1.ResourceSpans, serverMeta map[string]string) CliEvent { e := CliEvent{ TraceID: hex.EncodeToString(span.GetTraceId()), SpanID: hex.EncodeToString(span.GetSpanId()), @@ -85,6 +89,7 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss ServiceAttributes: make(map[string]string), Nanos: span.GetStartTimeUnixNano(), IsPopulated: true, + ServerMeta: make(map[string]string), } // copy service attributes over by string, which includes service.name @@ -93,15 +98,15 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss } switch span.GetKind() { - case tracepb.Span_SPAN_KIND_CLIENT: + case v1.Span_SPAN_KIND_CLIENT: e.Kind = "client" - case tracepb.Span_SPAN_KIND_SERVER: + case v1.Span_SPAN_KIND_SERVER: e.Kind = "server" - case tracepb.Span_SPAN_KIND_PRODUCER: + case v1.Span_SPAN_KIND_PRODUCER: e.Kind = "producer" - case tracepb.Span_SPAN_KIND_CONSUMER: + case v1.Span_SPAN_KIND_CONSUMER: e.Kind = "consumer" - case tracepb.Span_SPAN_KIND_INTERNAL: + case v1.Span_SPAN_KIND_INTERNAL: e.Kind = "internal" default: e.Kind = "unspecified" @@ -118,12 +123,17 @@ func NewCliEventFromSpan(span *tracepb.Span, scopeSpans *tracepb.ScopeSpans, rss e.Attributes[attr.GetKey()] = val } + // explicitly copy the map, do not share the ref + for k, v := range serverMeta { + e.ServerMeta[k] = v + } + return e } // NewCliEventFromSpanEvent takes a span event, span, and ils and returns an event // with all the span event info filled in. -func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeSpans *tracepb.ScopeSpans) CliEvent { +func NewCliEventFromSpanEvent(se *v1.Span_Event, span *v1.Span, scopeSpans *v1.ScopeSpans, serverMeta map[string]string) CliEvent { // start with the span, rewrite it for the event e := CliEvent{ TraceID: hex.EncodeToString(span.GetTraceId()), @@ -138,15 +148,48 @@ func NewCliEventFromSpanEvent(se *tracepb.Span_Event, span *tracepb.Span, scopeS Attributes: make(map[string]string), // overwrite the one from the span Nanos: se.GetTimeUnixNano(), IsPopulated: true, + ServerMeta: make(map[string]string), } for _, attr := range se.GetAttributes() { e.Attributes[attr.GetKey()] = attr.Value.String() } + // explicitly copy the map, do not share the ref + for k, v := range serverMeta { + e.ServerMeta[k] = v + } + return e } +// otelToCliEvent takes an otel trace request data structure and converts +// it to CliEvents, calling the provided callback for each span in the +// request. +func otelToCliEvent(cb Callback, req *colv1.ExportTraceServiceRequest, serverMeta map[string]string) bool { + rss := req.GetResourceSpans() + for _, resource := range rss { + scopeSpans := resource.GetScopeSpans() + for _, ss := range scopeSpans { + for _, span := range ss.GetSpans() { + // convert protobuf spans to something easier for humans to consume + ces := NewCliEventFromSpan(span, ss, resource, serverMeta) + events := CliEventList{} + for _, se := range span.GetEvents() { + events = append(events, NewCliEventFromSpanEvent(se, span, ss, serverMeta)) + } + + done := cb(ces, events) + if done { + return true + } + } + } + } + + return false +} + // mapToKVString flattens attribute string maps into "k=v,k=v" strings. func mapToKVString(in map[string]string) string { keys := make([]string, len(in)) // for sorting diff --git a/otlpserver/grpcserver.go b/otlpserver/grpcserver.go new file mode 100644 index 0000000..90f707c --- /dev/null +++ b/otlpserver/grpcserver.go @@ -0,0 +1,92 @@ +package otlpserver + +import ( + "context" + "log" + "net" + "strings" + "sync" + + v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" + + "google.golang.org/grpc" +) + +// GrpcServer is a gRPC/OTLP server handle. +type GrpcServer struct { + server *grpc.Server + callback Callback + stoponce sync.Once + stopper chan struct{} + stopdone chan struct{} + doneonce sync.Once + v1.UnimplementedTraceServiceServer +} + +// NewGrpcServer takes a callback and stop function and returns a Server ready +// to run with .Serve(). +func NewGrpcServer(cb Callback, stop Stopper) *GrpcServer { + s := GrpcServer{ + server: grpc.NewServer(), + callback: cb, + stopper: make(chan struct{}), + stopdone: make(chan struct{}, 1), + } + + v1.RegisterTraceServiceServer(s.server, &s) + + // single place to stop the server, used by timeout and max-spans + go func() { + <-s.stopper + stop(&s) + s.server.GracefulStop() + }() + + return &s +} + +// ServeGRPC takes a listener and starts the GRPC server on that listener. +// Blocks until Stop() is called. +func (gs *GrpcServer) Serve(listener net.Listener) error { + err := gs.server.Serve(listener) + gs.stopdone <- struct{}{} + return err +} + +// ListenAndServeGRPC starts a TCP listener then starts the GRPC server using +// ServeGRPC for you. +func (gs *GrpcServer) ListenAndServe(otlpEndpoint string) { + otlpEndpoint = strings.TrimPrefix(otlpEndpoint, "grpc://") + listener, err := net.Listen("tcp", otlpEndpoint) + if err != nil { + log.Fatalf("failed to listen on OTLP endpoint %q: %s", otlpEndpoint, err) + } + if err := gs.Serve(listener); err != nil { + log.Fatalf("failed to serve: %s", err) + } +} + +// Stop sends a value to the server shutdown goroutine so it stops GRPC +// and calls the stop function given to newServer. Safe to call multiple times. +func (gs *GrpcServer) Stop() { + gs.stoponce.Do(func() { + gs.stopper <- struct{}{} + }) +} + +// StopWait stops the server and waits for it to affirm shutdown. +func (gs *GrpcServer) StopWait() { + gs.Stop() + gs.doneonce.Do(func() { + <-gs.stopdone + }) +} + +// Export implements the gRPC server interface for exporting messages. +func (gs *GrpcServer) Export(ctx context.Context, req *v1.ExportTraceServiceRequest) (*v1.ExportTraceServiceResponse, error) { + done := otelToCliEvent(gs.callback, req, map[string]string{"proto": "grpc"}) + if done { + go gs.StopWait() + } + return &v1.ExportTraceServiceResponse{}, nil +} diff --git a/otlpserver/httpserver.go b/otlpserver/httpserver.go new file mode 100644 index 0000000..e88b0e2 --- /dev/null +++ b/otlpserver/httpserver.go @@ -0,0 +1,91 @@ +package otlpserver + +import ( + "context" + "encoding/json" + "io" + "log" + "net" + "net/http" + + v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/protobuf/proto" +) + +type HttpServer struct { + server *http.Server + callback Callback +} + +// NewServer takes a callback and stop function and returns a Server ready +// to run with .Serve(). +func NewHttpServer(cb Callback, stop Stopper) *HttpServer { + s := HttpServer{ + server: &http.Server{}, + callback: cb, + } + + s.server.Handler = &s + + return &s +} + +// ServeHTTP processes every request as if it is a trace regardless of +// method and path or anything else. +func (hs *HttpServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + data, err := io.ReadAll(req.Body) + if err != nil { + log.Fatalf("Error while reading request body: %s", err) + } + + msg := v1.ExportTraceServiceRequest{} + switch req.Header.Get("Content-Type") { + case "application/x-protobuf": + proto.Unmarshal(data, &msg) + case "application/json": + json.Unmarshal(data, &msg) + default: + rw.WriteHeader(http.StatusNotAcceptable) + } + + meta := map[string]string{ + "method": req.Method, + "proto": req.Proto, + "host": req.Host, + "uri": req.RequestURI, + } + + done := otelToCliEvent(hs.callback, &msg, meta) + if done { + go hs.StopWait() + } +} + +// ServeHttp takes a listener and starts the HTTP server on that listener. +// Blocks until Stop() is called. +func (hs *HttpServer) Serve(listener net.Listener) error { + err := hs.server.Serve(listener) + return err +} + +// ListenAndServeHttp starts a TCP listener then starts the HTTP server using +// ServeHttp for you. +func (hs *HttpServer) ListenAndServe(otlpEndpoint string) { + listener, err := net.Listen("tcp", otlpEndpoint) + if err != nil { + log.Fatalf("failed to listen on OTLP endpoint %q: %s", otlpEndpoint, err) + } + if err := hs.Serve(listener); err != nil { + log.Fatalf("failed to serve: %s", err) + } +} + +// Stop closes the http server and all active connections immediately. +func (hs *HttpServer) Stop() { + hs.server.Close() +} + +// StopWait stops the http server gracefully. +func (hs *HttpServer) StopWait() { + hs.server.Shutdown(context.Background()) +} diff --git a/otlpserver/server.go b/otlpserver/server.go index 846e564..4bf0d9d 100644 --- a/otlpserver/server.go +++ b/otlpserver/server.go @@ -1,17 +1,11 @@ -// otlpserver is a lightweight OTLP/gRPC server implementation intended for use -// in otel-cli and end-to-end testing of OpenTelemetry applications. +// otlpserver is an OTLP server with HTTP and gRPC backends available. +// It takes a lot of shortcuts to keep things simple and is not intended +// to be used as a serious OTLP service. Primarily it is for the test +// suite and also supports the otel-cli server features. package otlpserver import ( - "context" - "log" "net" - "strings" - "sync" - - v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" - - "google.golang.org/grpc" ) // Callback is a type for the function passed to newServer that is @@ -20,101 +14,26 @@ type Callback func(CliEvent, CliEventList) bool // Stopper is the function passed to newServer to be called when the // server is shut down. -type Stopper func(*Server) - -// Server is a gRPC/OTLP server handle. -type Server struct { - server *grpc.Server - callback Callback - stoponce sync.Once - stopper chan struct{} - stopdone chan struct{} - doneonce sync.Once - v1.UnimplementedTraceServiceServer -} - -// NewServer takes a callback and stop function and returns a Server ready -// to run with .ServeGRPC(). -func NewServer(cb Callback, stop Stopper) *Server { - s := Server{ - server: grpc.NewServer(), - callback: cb, - stopper: make(chan struct{}), - stopdone: make(chan struct{}, 1), - } - - v1.RegisterTraceServiceServer(s.server, &s) - - // single place to stop the server, used by timeout and max-spans - go func() { - <-s.stopper - stop(&s) - s.server.GracefulStop() - }() - - return &s -} - -// ServeGRPC takes a listener and starts the GRPC server on that listener. -// Blocks until Stop() is called. -func (cs *Server) ServeGPRC(listener net.Listener) error { - err := cs.server.Serve(listener) - cs.stopdone <- struct{}{} - return err -} - -// ListenAndServeGRPC starts a TCP listener then starts the GRPC server using -// ServeGRPC for you. -func (cs *Server) ListenAndServeGPRC(otlpEndpoint string) { - otlpEndpoint = strings.TrimPrefix(otlpEndpoint, "grpc://") - listener, err := net.Listen("tcp", otlpEndpoint) - if err != nil { - log.Fatalf("failed to listen on OTLP endpoint %q: %s", otlpEndpoint, err) - } - if err := cs.ServeGPRC(listener); err != nil { - log.Fatalf("failed to serve: %s", err) - } +type Stopper func(OtlpServer) + +// OtlpServer abstracts the minimum interface required for an OTLP +// server to be either HTTP or gRPC (but not both, for now). +type OtlpServer interface { + ListenAndServe(otlpEndpoint string) + Serve(listener net.Listener) error + Stop() + StopWait() } -// Stop sends a value to the server shutdown goroutine so it stops GRPC -// and calls the stop function given to newServer. Safe to call multiple times. -func (cs *Server) Stop() { - cs.stoponce.Do(func() { - cs.stopper <- struct{}{} - }) -} - -// StopWait stops the server and waits for it to affirm shutdown. -func (cs *Server) StopWait() { - cs.Stop() - cs.doneonce.Do(func() { - <-cs.stopdone - }) -} - -// Export implements the gRPC server interface for exporting messages. -func (cs *Server) Export(ctx context.Context, req *v1.ExportTraceServiceRequest) (*v1.ExportTraceServiceResponse, error) { - rss := req.GetResourceSpans() - for _, resource := range rss { - scopeSpans := resource.GetScopeSpans() - for _, ss := range scopeSpans { - for _, span := range ss.GetSpans() { - // convert protobuf spans to something easier for humans to consume - ces := NewCliEventFromSpan(span, ss, resource) - events := CliEventList{} - for _, se := range span.GetEvents() { - events = append(events, NewCliEventFromSpanEvent(se, span, ss)) - } - - f := cs.callback - done := f(ces, events) - if done { - go cs.StopWait() - return &v1.ExportTraceServiceResponse{}, nil - } - } - } +// NewServer will start the requested server protocol, one of grpc, http/protobuf, +// and http/json. +func NewServer(protocol string, cb Callback, stop Stopper) OtlpServer { + switch protocol { + case "grpc": + return NewGrpcServer(cb, stop) + case "http": + return NewHttpServer(cb, stop) } - return &v1.ExportTraceServiceResponse{}, nil + return nil }