Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fill in some testing gaps, fix grpc headers #232

Merged
merged 4 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions data_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Results struct {
Diagnostics otlpclient.Diagnostics `json:"diagnostics"`
// these are specific to tests...
ServerMeta map[string]string
Headers map[string]string // headers sent by the client
ResourceSpans *tracepb.ResourceSpans
CliOutput string // merged stdout and stderr
CliOutputRe *regexp.Regexp // regular expression to clean the output before comparison
Expand Down Expand Up @@ -951,4 +952,78 @@ var suites = []FixtureSuite{
},
},
},
// full-system test --otlp-headers makes it to grpc/http servers
{
{
Name: "#231 gRPC headers for authentication",
Config: FixtureConfig{
CliArgs: []string{
"status",
"--endpoint", "{{endpoint}}",
"--protocol", "grpc",
"--otlp-headers", "x-otel-cli-otlpserver-token=abcdefgabcdefg",
},
ServerProtocol: grpcProtocol,
},
Expect: Results{
SpanCount: 1,
Config: otlpclient.DefaultConfig().
WithEndpoint("{{endpoint}}").
WithProtocol("grpc").
WithHeaders(map[string]string{
"x-otel-cli-otlpserver-token": "abcdefgabcdefg",
}),
Headers: map[string]string{
":authority": "{{endpoint}}\n",
"content-type": "application/grpc\n",
"user-agent": "*",
"x-otel-cli-otlpserver-token": "abcdefgabcdefg\n",
},
Diagnostics: otlpclient.Diagnostics{
IsRecording: true,
DetectedLocalhost: true,
NumArgs: 7,
ParsedTimeoutMs: 1000,
Endpoint: "grpc://{{endpoint}}",
EndpointSource: "general",
},
},
},
{
Name: "#231 http headers for authentication",
Config: FixtureConfig{
CliArgs: []string{
"status",
"--endpoint", "http://{{endpoint}}",
"--protocol", "http/protobuf",
"--otlp-headers", "x-otel-cli-otlpserver-token=abcdefgabcdefg",
},
ServerProtocol: httpProtocol,
},
Expect: Results{
SpanCount: 1,
Config: otlpclient.DefaultConfig().
WithEndpoint("http://{{endpoint}}").
WithProtocol("http/protobuf").
WithHeaders(map[string]string{
"x-otel-cli-otlpserver-token": "abcdefgabcdefg",
}),
Headers: map[string]string{
"Content-Type": "application/x-protobuf",
"Accept-Encoding": "gzip",
"User-Agent": "Go-http-client/1.1",
"Content-Length": "232",
"X-Otel-Cli-Otlpserver-Token": "abcdefgabcdefg",
},
Diagnostics: otlpclient.Diagnostics{
IsRecording: true,
DetectedLocalhost: true,
NumArgs: 7,
ParsedTimeoutMs: 1000,
Endpoint: "http://{{endpoint}}/v1/traces",
EndpointSource: "general",
},
},
},
},
}
25 changes: 24 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main_test
// see TESTING.md for details

import (
"context"
"crypto/tls"
"encoding/json"
"log"
Expand Down Expand Up @@ -173,6 +174,10 @@ func checkAll(t *testing.T, fixture Fixture, results Results) {
checkOutput(t, fixture, results)
}

if len(fixture.Expect.Headers) > 0 {
checkHeaders(t, fixture, results)
}

if len(fixture.Expect.ServerMeta) > 0 {
checkServerMeta(t, fixture, results)
}
Expand Down Expand Up @@ -324,6 +329,23 @@ func checkSpanData(t *testing.T, fixture Fixture, results Results) {
}
}

// checkHeaders compares the expected and received headers.
func checkHeaders(t *testing.T, fixture Fixture, results Results) {
injectMapVars(fixture.Endpoint, fixture.Expect.Headers, fixture.TlsData)
injectMapVars(fixture.Endpoint, results.Headers, fixture.TlsData)

for k, v := range fixture.Expect.Headers {
if v == "*" {
// overwrite value so cmp.Diff will ignore
results.Headers[k] = "*"
}
}

if diff := cmp.Diff(fixture.Expect.Headers, results.Headers); diff != "" {
t.Errorf("[%s] headers did not match expected (-want +got):\n%s", fixture.Name, diff)
}
}

// checkServerMeta compares the expected and received server metadata.
func checkServerMeta(t *testing.T, fixture Fixture, results Results) {
injectMapVars(fixture.Endpoint, fixture.Expect.ServerMeta, fixture.TlsData)
Expand Down Expand Up @@ -358,14 +380,15 @@ func runOtelCli(t *testing.T, fixture Fixture) (string, Results) {
rcvEvents := make(chan []*tracepb.Span_Event, 100)

// otlpserver calls this function for each span received
cb := func(span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, meta map[string]string) bool {
cb := func(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool {
rcvSpan <- span
rcvEvents <- events

results.ServerMeta = meta
results.ResourceSpans = rss
results.SpanCount++
results.EventCount += len(events)
results.Headers = headers

// true tells the server we're done and it can exit its loop
return results.SpanCount >= fixture.Expect.SpanCount
Expand Down
3 changes: 2 additions & 1 deletion otelcli/server_json.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package otelcli

import (
"context"
"encoding/hex"
"encoding/json"
"log"
Expand Down Expand Up @@ -58,7 +59,7 @@ func doServerJson(cmd *cobra.Command, args []string) {

// writeFile takes the spans and events and writes them out to json files in the
// tid/sid/span.json and tid/sid/events.json files.
func renderJson(span *tracepb.Span, events []*tracepb.Span_Event, ss *tracepb.ResourceSpans, meta map[string]string) bool {
func renderJson(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, ss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool {
jsonSvr.spansSeen++ // count spans for exiting on --max-spans

// TODO: check for existence of outdir and error when it doesn't exist
Expand Down
3 changes: 2 additions & 1 deletion otelcli/server_tui.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package otelcli

import (
"context"
"encoding/hex"
"log"
"math"
Expand Down Expand Up @@ -56,7 +57,7 @@ func doServerTui(cmd *cobra.Command, args []string) {

// renderTui takes the given span and events, appends them to the in-memory
// event list, sorts that, then prints it as a pterm table.
func renderTui(span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, meta map[string]string) bool {
func renderTui(ctx context.Context, span *tracepb.Span, events []*tracepb.Span_Event, rss *tracepb.ResourceSpans, headers map[string]string, meta map[string]string) bool {
spanTraceId := hex.EncodeToString(span.TraceId)
if _, ok := tuiServer.traces[spanTraceId]; !ok {
tuiServer.traces[spanTraceId] = span
Expand Down
14 changes: 7 additions & 7 deletions otlpclient/otlp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func SendSpan(ctx context.Context, client OTLPClient, config Config, span *trace

ctx, err = client.UploadTraces(ctx, rsps)
if err != nil {
return SaveError(ctx, err)
return SaveError(ctx, time.Now(), err)
}

return ctx, nil
Expand Down Expand Up @@ -330,15 +330,15 @@ func GetErrorList(ctx context.Context) ErrorList {

// SaveError writes the provided error to the ErrorList in ctx, returning an
// updated ctx.
func SaveError(ctx context.Context, err error) (context.Context, error) {
func SaveError(ctx context.Context, t time.Time, err error) (context.Context, error) {
if err == nil {
return ctx, nil
}

Diag.SetError(err) // legacy, will go away when Diag is removed

te := TimestampedError{
Timestamp: time.Now(),
Timestamp: t,
Error: err.Error(),
}

Expand Down Expand Up @@ -370,31 +370,31 @@ func retry(ctx context.Context, config Config, timeout time.Duration, fun retryF
for {
if ctx, keepGoing, wait, err := fun(ctx); err != nil {
if err != nil {
ctx, _ = SaveError(ctx, err)
ctx, _ = SaveError(ctx, time.Now(), err)
}
config.SoftLog("error on retry %d: %s", Diag.Retries, err)

if keepGoing {
if wait > 0 {
if time.Now().Add(wait).After(deadline) {
// wait will be after deadline, give up now
return SaveError(ctx, err)
return SaveError(ctx, time.Now(), err)
}
time.Sleep(wait)
} else {
time.Sleep(sleep)
}

if time.Now().After(deadline) {
return SaveError(ctx, err)
return SaveError(ctx, time.Now(), err)
}

// linearly increase sleep time up to 5 seconds
if sleep < time.Second*5 {
sleep = sleep + time.Millisecond*100
}
} else {
return SaveError(ctx, err)
return SaveError(ctx, time.Now(), err)
}
} else {
return ctx, nil
Expand Down
9 changes: 6 additions & 3 deletions otlpclient/otlp_client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ func (gc *GrpcClient) Start(ctx context.Context) (context.Context, error) {

// UploadTraces takes a list of protobuf spans and sends them out, doing retries
// on some errors as needed.
// TODO: look into grpc.WaitForReady(), esp for status use cases
func (gc *GrpcClient) UploadTraces(ctx context.Context, rsps []*tracepb.ResourceSpans) (context.Context, error) {
// add headers onto the request
md := metadata.New(gc.config.Headers)
grpcOpts := []grpc.CallOption{grpc.HeaderCallOption{HeaderAddr: &md}}
if len(gc.config.Headers) > 0 {
md := metadata.New(gc.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)
}

req := coltracepb.ExportTraceServiceRequest{ResourceSpans: rsps}

timeout := gc.config.ParseCliTimeout()
return retry(ctx, gc.config, timeout, func(innerCtx context.Context) (context.Context, bool, time.Duration, error) {
etsr, err := gc.client.Export(innerCtx, &req, grpcOpts...)
etsr, err := gc.client.Export(innerCtx, &req)
return processGrpcStatus(innerCtx, etsr, err)
})
}
Expand Down
40 changes: 39 additions & 1 deletion otlpclient/otlp_client_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,48 @@
package otlpclient

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
)

func TestErrorLists(t *testing.T) {
now := time.Now()

for _, tc := range []struct {
call func(context.Context) context.Context
want ErrorList
}{
{
call: func(ctx context.Context) context.Context {
err := fmt.Errorf("")
ctx, _ = SaveError(ctx, now, err)
return ctx
},
want: ErrorList{
TimestampedError{now, ""},
},
},
} {
ctx := context.Background()
ctx = tc.call(ctx)
list := GetErrorList(ctx)

if len(list) < len(tc.want) {
t.Errorf("got %d errors but expected %d", len(tc.want), len(list))
}

// TODO: sort?
if diff := cmp.Diff(list, tc.want); diff != "" {
t.Errorf("error list mismatch (-want +got):\n%s", diff)
}

}
}

func TestParseEndpoint(t *testing.T) {
// func parseEndpoint(config Config) (*url.URL, string) {

Expand Down Expand Up @@ -71,5 +110,4 @@ func TestParseEndpoint(t *testing.T) {
t.Errorf("Expected source %q for test url %q but got %q", tc.wantSource, u.String(), src)
}
}

}
17 changes: 16 additions & 1 deletion otlpserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package otlpserver

import (
"bytes"
"context"
"encoding/csv"
"log"
"net"
"sync"

coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// GrpcServer is a gRPC/OTLP server handle.
Expand Down Expand Up @@ -82,7 +85,19 @@ func (gs *GrpcServer) StopWait() {

// Export implements the gRPC server interface for exporting messages.
func (gs *GrpcServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
done := doCallback(gs.callback, req, map[string]string{"proto": "grpc"})
// OTLP/gRPC headers are passed in metadata, copy them to serverMeta
// for now. This isn't ideal but gets them exposed to the test suite.
headers := make(map[string]string)
if md, ok := metadata.FromIncomingContext(ctx); ok {
for mdk := range md {
vals := md.Get(mdk)
buf := bytes.NewBuffer([]byte{})
csv.NewWriter(buf).WriteAll([][]string{vals})
headers[mdk] = buf.String()
}
}

done := doCallback(ctx, gs.callback, req, headers, map[string]string{"proto": "grpc"})
if done {
go gs.StopWait()
}
Expand Down
7 changes: 6 additions & 1 deletion otlpserver/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ func (hs *HttpServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
"uri": req.RequestURI,
}

done := doCallback(hs.callback, &msg, meta)
headers := make(map[string]string)
for k := range req.Header {
headers[k] = req.Header.Get(k)
}

done := doCallback(req.Context(), hs.callback, &msg, headers, meta)
if done {
go hs.StopWait()
}
Expand Down
Loading