diff --git a/data_for_test.go b/data_for_test.go index 334197d..07c52ad 100644 --- a/data_for_test.go +++ b/data_for_test.go @@ -61,6 +61,7 @@ type Results struct { Diagnostics otlpclient.Diagnostics `json:"diagnostics"` // these are specific to tests... ServerMeta map[string]string + Headers map[string]string // headers sent by the client ResourceSpans *tracepb.ResourceSpans CliOutput string // merged stdout and stderr CliOutputRe *regexp.Regexp // regular expression to clean the output before comparison @@ -951,4 +952,78 @@ var suites = []FixtureSuite{ }, }, }, + // full-system test --otlp-headers makes it to grpc/http servers + { + { + Name: "#231 gRPC headers for authentication", + Config: FixtureConfig{ + CliArgs: []string{ + "status", + "--endpoint", "{{endpoint}}", + "--protocol", "grpc", + "--otlp-headers", "x-otel-cli-otlpserver-token=abcdefgabcdefg", + }, + ServerProtocol: grpcProtocol, + }, + Expect: Results{ + SpanCount: 1, + Config: otlpclient.DefaultConfig(). + WithEndpoint("{{endpoint}}"). + WithProtocol("grpc"). + WithHeaders(map[string]string{ + "x-otel-cli-otlpserver-token": "abcdefgabcdefg", + }), + Headers: map[string]string{ + ":authority": "{{endpoint}}\n", + "content-type": "application/grpc\n", + "user-agent": "*", + "x-otel-cli-otlpserver-token": "abcdefgabcdefg\n", + }, + Diagnostics: otlpclient.Diagnostics{ + IsRecording: true, + DetectedLocalhost: true, + NumArgs: 7, + ParsedTimeoutMs: 1000, + Endpoint: "grpc://{{endpoint}}", + EndpointSource: "general", + }, + }, + }, + { + Name: "#231 http headers for authentication", + Config: FixtureConfig{ + CliArgs: []string{ + "status", + "--endpoint", "http://{{endpoint}}", + "--protocol", "http/protobuf", + "--otlp-headers", "x-otel-cli-otlpserver-token=abcdefgabcdefg", + }, + ServerProtocol: httpProtocol, + }, + Expect: Results{ + SpanCount: 1, + Config: otlpclient.DefaultConfig(). + WithEndpoint("http://{{endpoint}}"). + WithProtocol("http/protobuf"). + WithHeaders(map[string]string{ + "x-otel-cli-otlpserver-token": "abcdefgabcdefg", + }), + Headers: map[string]string{ + "Content-Type": "application/x-protobuf", + "Accept-Encoding": "gzip", + "User-Agent": "Go-http-client/1.1", + "Content-Length": "232", + "X-Otel-Cli-Otlpserver-Token": "abcdefgabcdefg", + }, + Diagnostics: otlpclient.Diagnostics{ + IsRecording: true, + DetectedLocalhost: true, + NumArgs: 7, + ParsedTimeoutMs: 1000, + Endpoint: "http://{{endpoint}}/v1/traces", + EndpointSource: "general", + }, + }, + }, + }, } diff --git a/main_test.go b/main_test.go index a464bdc..8e466c1 100644 --- a/main_test.go +++ b/main_test.go @@ -7,6 +7,7 @@ package main_test // see TESTING.md for details import ( + "context" "crypto/tls" "encoding/json" "log" @@ -173,6 +174,10 @@ func checkAll(t *testing.T, fixture Fixture, results Results) { checkOutput(t, fixture, results) } + if len(fixture.Expect.Headers) > 0 { + checkHeaders(t, fixture, results) + } + if len(fixture.Expect.ServerMeta) > 0 { checkServerMeta(t, fixture, results) } @@ -324,6 +329,23 @@ func checkSpanData(t *testing.T, fixture Fixture, results Results) { } } +// checkHeaders compares the expected and received headers. +func checkHeaders(t *testing.T, fixture Fixture, results Results) { + injectMapVars(fixture.Endpoint, fixture.Expect.Headers, fixture.TlsData) + injectMapVars(fixture.Endpoint, results.Headers, fixture.TlsData) + + for k, v := range fixture.Expect.Headers { + if v == "*" { + // overwrite value so cmp.Diff will ignore + results.Headers[k] = "*" + } + } + + if diff := cmp.Diff(fixture.Expect.Headers, results.Headers); diff != "" { + t.Errorf("[%s] headers did not match expected (-want +got):\n%s", fixture.Name, diff) + } +} + // checkServerMeta compares the expected and received server metadata. func checkServerMeta(t *testing.T, fixture Fixture, results Results) { injectMapVars(fixture.Endpoint, fixture.Expect.ServerMeta, fixture.TlsData) @@ -358,7 +380,7 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results) { rcvEvents := make(chan []*tracepb.Span_Event, 100) // otlpserver calls this function for each span received - cb := func(span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, meta map[string]string) bool { + cb := func(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool { rcvSpan <- span rcvEvents <- events @@ -366,6 +388,7 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results) { results.ResourceSpans = rss results.SpanCount++ results.EventCount += len(events) + results.Headers = headers // true tells the server we're done and it can exit its loop return results.SpanCount >= fixture.Expect.SpanCount diff --git a/otelcli/server_json.go b/otelcli/server_json.go index e747bac..d61ad77 100644 --- a/otelcli/server_json.go +++ b/otelcli/server_json.go @@ -1,6 +1,7 @@ package otelcli import ( + "context" "encoding/hex" "encoding/json" "log" @@ -58,7 +59,7 @@ func doServerJson(cmd *cobra.Command, args []string) { // writeFile takes the spans and events and writes them out to json files in the // tid/sid/span.json and tid/sid/events.json files. -func renderJson(span *tracepb.Span, events []*tracepb.Span_Event, ss *tracepb.ResourceSpans, meta map[string]string) bool { +func renderJson(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, ss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool { jsonSvr.spansSeen++ // count spans for exiting on --max-spans // TODO: check for existence of outdir and error when it doesn't exist diff --git a/otelcli/server_tui.go b/otelcli/server_tui.go index 3a1488f..43152e2 100644 --- a/otelcli/server_tui.go +++ b/otelcli/server_tui.go @@ -1,6 +1,7 @@ package otelcli import ( + "context" "encoding/hex" "log" "math" @@ -56,7 +57,7 @@ func doServerTui(cmd *cobra.Command, args []string) { // renderTui takes the given span and events, appends them to the in-memory // event list, sorts that, then prints it as a pterm table. -func renderTui(span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, meta map[string]string) bool { +func renderTui(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool { spanTraceId := hex.EncodeToString(span.TraceId) if _, ok := tuiServer.traces[spanTraceId]; !ok { tuiServer.traces[spanTraceId] = span diff --git a/otlpclient/otlp_client.go b/otlpclient/otlp_client.go index 6a1fe54..2dc357e 100644 --- a/otlpclient/otlp_client.go +++ b/otlpclient/otlp_client.go @@ -94,7 +94,7 @@ func SendSpan(ctx context.Context, client OTLPClient, config Config, span *trace ctx, err = client.UploadTraces(ctx, rsps) if err != nil { - return SaveError(ctx, err) + return SaveError(ctx, time.Now(), err) } return ctx, nil @@ -330,7 +330,7 @@ func GetErrorList(ctx context.Context) ErrorList { // SaveError writes the provided error to the ErrorList in ctx, returning an // updated ctx. -func SaveError(ctx context.Context, err error) (context.Context, error) { +func SaveError(ctx context.Context, t time.Time, err error) (context.Context, error) { if err == nil { return ctx, nil } @@ -338,7 +338,7 @@ func SaveError(ctx context.Context, err error) (context.Context, error) { Diag.SetError(err) // legacy, will go away when Diag is removed te := TimestampedError{ - Timestamp: time.Now(), + Timestamp: t, Error: err.Error(), } @@ -370,7 +370,7 @@ func retry(ctx context.Context, config Config, timeout time.Duration, fun retryF for { if ctx, keepGoing, wait, err := fun(ctx); err != nil { if err != nil { - ctx, _ = SaveError(ctx, err) + ctx, _ = SaveError(ctx, time.Now(), err) } config.SoftLog("error on retry %d: %s", Diag.Retries, err) @@ -378,7 +378,7 @@ func retry(ctx context.Context, config Config, timeout time.Duration, fun retryF if wait > 0 { if time.Now().Add(wait).After(deadline) { // wait will be after deadline, give up now - return SaveError(ctx, err) + return SaveError(ctx, time.Now(), err) } time.Sleep(wait) } else { @@ -386,7 +386,7 @@ func retry(ctx context.Context, config Config, timeout time.Duration, fun retryF } if time.Now().After(deadline) { - return SaveError(ctx, err) + return SaveError(ctx, time.Now(), err) } // linearly increase sleep time up to 5 seconds @@ -394,7 +394,7 @@ func retry(ctx context.Context, config Config, timeout time.Duration, fun retryF sleep = sleep + time.Millisecond*100 } } else { - return SaveError(ctx, err) + return SaveError(ctx, time.Now(), err) } } else { return ctx, nil diff --git a/otlpclient/otlp_client_grpc.go b/otlpclient/otlp_client_grpc.go index 256de31..84433ad 100644 --- a/otlpclient/otlp_client_grpc.go +++ b/otlpclient/otlp_client_grpc.go @@ -74,16 +74,19 @@ func (gc *GrpcClient) Start(ctx context.Context) (context.Context, error) { // UploadTraces takes a list of protobuf spans and sends them out, doing retries // on some errors as needed. +// TODO: look into grpc.WaitForReady(), esp for status use cases func (gc *GrpcClient) UploadTraces(ctx context.Context, rsps []*tracepb.ResourceSpans) (context.Context, error) { // add headers onto the request - md := metadata.New(gc.config.Headers) - grpcOpts := []grpc.CallOption{grpc.HeaderCallOption{HeaderAddr: &md}} + if len(gc.config.Headers) > 0 { + md := metadata.New(gc.config.Headers) + ctx = metadata.NewOutgoingContext(ctx, md) + } req := coltracepb.ExportTraceServiceRequest{ResourceSpans: rsps} timeout := gc.config.ParseCliTimeout() return retry(ctx, gc.config, timeout, func(innerCtx context.Context) (context.Context, bool, time.Duration, error) { - etsr, err := gc.client.Export(innerCtx, &req, grpcOpts...) + etsr, err := gc.client.Export(innerCtx, &req) return processGrpcStatus(innerCtx, etsr, err) }) } diff --git a/otlpclient/otlp_client_test.go b/otlpclient/otlp_client_test.go index 3675267..4c00434 100644 --- a/otlpclient/otlp_client_test.go +++ b/otlpclient/otlp_client_test.go @@ -1,9 +1,48 @@ package otlpclient import ( + "context" + "fmt" "testing" + "time" + + "github.com/google/go-cmp/cmp" ) +func TestErrorLists(t *testing.T) { + now := time.Now() + + for _, tc := range []struct { + call func(context.Context) context.Context + want ErrorList + }{ + { + call: func(ctx context.Context) context.Context { + err := fmt.Errorf("") + ctx, _ = SaveError(ctx, now, err) + return ctx + }, + want: ErrorList{ + TimestampedError{now, ""}, + }, + }, + } { + ctx := context.Background() + ctx = tc.call(ctx) + list := GetErrorList(ctx) + + if len(list) < len(tc.want) { + t.Errorf("got %d errors but expected %d", len(tc.want), len(list)) + } + + // TODO: sort? + if diff := cmp.Diff(list, tc.want); diff != "" { + t.Errorf("error list mismatch (-want +got):\n%s", diff) + } + + } +} + func TestParseEndpoint(t *testing.T) { // func parseEndpoint(config Config) (*url.URL, string) { @@ -71,5 +110,4 @@ func TestParseEndpoint(t *testing.T) { t.Errorf("Expected source %q for test url %q but got %q", tc.wantSource, u.String(), src) } } - } diff --git a/otlpserver/grpcserver.go b/otlpserver/grpcserver.go index 8ae0671..4e32b9b 100644 --- a/otlpserver/grpcserver.go +++ b/otlpserver/grpcserver.go @@ -1,7 +1,9 @@ package otlpserver import ( + "bytes" "context" + "encoding/csv" "log" "net" "sync" @@ -9,6 +11,7 @@ import ( coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // GrpcServer is a gRPC/OTLP server handle. @@ -82,7 +85,19 @@ func (gs *GrpcServer) StopWait() { // Export implements the gRPC server interface for exporting messages. func (gs *GrpcServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { - done := doCallback(gs.callback, req, map[string]string{"proto": "grpc"}) + // OTLP/gRPC headers are passed in metadata, copy them to serverMeta + // for now. This isn't ideal but gets them exposed to the test suite. + headers := make(map[string]string) + if md, ok := metadata.FromIncomingContext(ctx); ok { + for mdk := range md { + vals := md.Get(mdk) + buf := bytes.NewBuffer([]byte{}) + csv.NewWriter(buf).WriteAll([][]string{vals}) + headers[mdk] = buf.String() + } + } + + done := doCallback(ctx, gs.callback, req, headers, map[string]string{"proto": "grpc"}) if done { go gs.StopWait() } diff --git a/otlpserver/httpserver.go b/otlpserver/httpserver.go index 141696e..dde7861 100644 --- a/otlpserver/httpserver.go +++ b/otlpserver/httpserver.go @@ -57,7 +57,12 @@ func (hs *HttpServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { "uri": req.RequestURI, } - done := doCallback(hs.callback, &msg, meta) + headers := make(map[string]string) + for k := range req.Header { + headers[k] = req.Header.Get(k) + } + + done := doCallback(req.Context(), hs.callback, &msg, headers, meta) if done { go hs.StopWait() } diff --git a/otlpserver/server.go b/otlpserver/server.go index c6f9fa0..683c4f6 100644 --- a/otlpserver/server.go +++ b/otlpserver/server.go @@ -5,6 +5,7 @@ package otlpserver import ( + "context" "net" colv1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" @@ -13,7 +14,7 @@ import ( // Callback is a type for the function passed to newServer that is // called for each incoming span. -type Callback func(*tracepb.Span, []*tracepb.Span_Event, *tracepb.ResourceSpans, map[string]string) bool +type Callback func(context.Context, *tracepb.Span, []*tracepb.Span_Event, *tracepb.ResourceSpans, map[string]string, map[string]string) bool // Stopper is the function passed to newServer to be called when the // server is shut down. @@ -43,7 +44,7 @@ func NewServer(protocol string, cb Callback, stop Stopper) OtlpServer { // doCallback unwraps the OTLP service request and calls the callback // for each span in the request. -func doCallback(cb Callback, req *colv1.ExportTraceServiceRequest, serverMeta map[string]string) bool { +func doCallback(ctx context.Context, cb Callback, req *colv1.ExportTraceServiceRequest, headers map[string]string, serverMeta map[string]string) bool { rss := req.GetResourceSpans() for _, resource := range rss { scopeSpans := resource.GetScopeSpans() @@ -53,7 +54,8 @@ func doCallback(cb Callback, req *colv1.ExportTraceServiceRequest, serverMeta ma if events == nil { events = []*tracepb.Span_Event{} } - done := cb(span, events, resource, serverMeta) + + done := cb(ctx, span, events, resource, headers, serverMeta) if done { return true }