Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate error to observability service #715

Merged
merged 2 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 177 additions & 5 deletions observability/opencensus/v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@ import (
"time"

"github.com/lightstep/tracecontext.go/traceparent"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/trace"

obshttp "github.com/cloudevents/sdk-go/observability/opencensus/v2/http"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/observability"
"github.com/cloudevents/sdk-go/v2/protocol"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/types"
)

func simpleTracingBinaryClient(t *testing.T, target string) client.Client {
func simpleTracingBinaryClient(t *testing.T, target string, os client.ObservabilityService) client.Client {
p, err := obshttp.NewObservedHTTP(cehttp.WithTarget(target))
require.NoError(t, err)

c, err := client.New(p, client.WithObservabilityService(New()))
c, err := client.New(p, client.WithObservabilityService(os))
require.NoError(t, err)
return c
}
Expand Down Expand Up @@ -80,7 +83,7 @@ func TestTracedClientReceive(t *testing.T) {
time.Sleep(5 * time.Millisecond) // let the server start

target := fmt.Sprintf("http://localhost:%d", p.GetListeningPort())
sender := simpleTracingBinaryClient(t, target)
sender := simpleTracingBinaryClient(t, target, New())

ctx, span := trace.StartSpan(context.TODO(), "test-span")
result := sender.Send(ctx, tc.event)
Expand All @@ -100,11 +103,76 @@ func TestTracedClientReceive(t *testing.T) {
}
}

func TestTracedClientReceiveError(t *testing.T) {
now := time.Now()

// simple exporter that holds the spans in an array
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
var te testExporter
trace.RegisterExporter(&te)
defer trace.UnregisterExporter(&te)

t.Run("RecordCallingInvoker error", func(t *testing.T) {

evt := func() event.Event {
e := event.Event{
Context: event.EventContextV03{
Type: "unit.test.client",
Source: *types.ParseURIRef("/unit/test/client"),
Time: &types.Timestamp{Time: now},
ID: "AABBCCDDEE",
}.AsV03(),
}
_ = e.SetData(event.ApplicationJSON, &map[string]string{
"sq": "42",
"msg": "hello",
})
return e
}()

p, err := obshttp.NewObservedHTTP(cehttp.WithPort(0))
require.NoError(t, err)
c, err := client.New(p, client.WithObservabilityService(fakeObservabilityServiceWithError{}))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.TODO())
go func() {
require.NoError(t, c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result {
return protocol.NewReceipt(false, "some error happened within the receiver")
}))
}()
time.Sleep(5 * time.Millisecond) // let the server start

target := fmt.Sprintf("http://localhost:%d", p.GetListeningPort())
sender := simpleTracingBinaryClient(t, target, New())

ctx, span := trace.StartSpan(context.TODO(), "test-recieve-span-error")
result := sender.Send(ctx, evt)
span.End()

require.False(t, protocol.IsACK(result))

// 1 span from the test
// 2 spans from sending the event (http client auto-instrumentation + obs service)
// 2 spans from receiving the event (http client middleware + obs service)
assert.Equal(t, 5, len(te.spans))

obsSpan := te.spans[0]

// The span created by the observability service should have the error that came from the receiver fn
assert.Equal(t, int32(trace.StatusCodeUnknown), obsSpan.Status.Code)
assert.Equal(t, "some error happened within the receiver", obsSpan.Status.Message)

// Now stop the client
cancel()
})
}

func TestTracingClientSend(t *testing.T) {
now := time.Now()

testCases := map[string]struct {
c func(t *testing.T, target string) client.Client
c func(t *testing.T, target string, os client.ObservabilityService) client.Client
event event.Event
resp *http.Response
sample bool
Expand Down Expand Up @@ -163,7 +231,7 @@ func TestTracingClientSend(t *testing.T) {
server := httptest.NewServer(handler)
defer server.Close()

c := tc.c(t, server.URL)
c := tc.c(t, server.URL, New())

var sampler trace.Sampler
if tc.sample {
Expand Down Expand Up @@ -203,6 +271,67 @@ func TestTracingClientSend(t *testing.T) {
}
}

func TestTracingClientSendError(t *testing.T) {

// simple exporter that holds the spans in an array
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
var te testExporter
trace.RegisterExporter(&te)
defer trace.UnregisterExporter(&te)

now := time.Now()

ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(500)
w.Write([]byte(`some error happened`))
}),
)
defer ts.Close()

t.Run("RecordSendingEvent error", func(t *testing.T) {

sender := simpleTracingBinaryClient(t, ts.URL, fakeObservabilityServiceWithError{})
event := func() event.Event {
e := event.Event{
Context: event.EventContextV1{
Type: "unit.test.client",
Source: *types.ParseURIRef("/unit/test/client"),
Time: &types.Timestamp{Time: now},
ID: "AABBCCDDEE",
}.AsV1(),
}
_ = e.SetData(event.ApplicationJSON, &map[string]interface{}{
"sq": 42,
"msg": "hello",
})
return e
}()

ctx, span := trace.StartSpan(context.Background(), "test-send-span-error")

result := sender.Send(ctx, event)
span.End()

spans := te.spans

roundTripSpan := spans[0]
obsSpan := spans[1]
parent := spans[2]

assert.Equal(t, false, protocol.IsACK(result))

// the correct parents are set in the spans
assert.Equal(t, parent.SpanID, obsSpan.ParentSpanID)
assert.Equal(t, obsSpan.SpanID, roundTripSpan.ParentSpanID)

// The span created by the observability service should have the error
assert.Equal(t, int32(trace.StatusCodeUnknown), obsSpan.Status.Code)
assert.Equal(t, "500: some error happened", obsSpan.Status.Message)
})
}

type requestValidation struct {
Host string
Headers http.Header
Expand All @@ -215,6 +344,49 @@ type fakeHandler struct {
requests []requestValidation
}

type testExporter struct {
spans []*trace.SpanData
}

func (t *testExporter) ExportSpan(s *trace.SpanData) {
t.spans = append(t.spans, s)
}

type fakeObservabilityServiceWithError struct{}

func (n fakeObservabilityServiceWithError) InboundContextDecorators() []func(context.Context, binding.Message) context.Context {
return nil
}

func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, err error) {
}

func (n fakeObservabilityServiceWithError) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) {
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))

return ctx, func(errOrResult error) {
if !protocol.IsACK(errOrResult) {
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeUnknown), Message: errOrResult.Error()})
}
span.End()
}
}

func (n fakeObservabilityServiceWithError) RecordSendingEvent(ctx context.Context, event event.Event) (context.Context, func(errOrResult error)) {
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))

return ctx, func(errOrResult error) {
if !protocol.IsACK(errOrResult) {
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeUnknown), Message: errOrResult.Error()})
}
span.End()
}
}

func (n fakeObservabilityServiceWithError) RecordRequestEvent(ctx context.Context, e event.Event) (context.Context, func(errOrResult error, event *event.Event)) {
return ctx, func(errOrResult error, event *event.Event) {}
}

func (f *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

Expand Down
8 changes: 3 additions & 5 deletions v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,10 @@ func (c *ceClient) Send(ctx context.Context, e event.Event) protocol.Result {
return err
}

// Event has been defaulted and validated, record we are going to preform send.
// Event has been defaulted and validated, record we are going to perform send.
ctx, cb := c.observabilityService.RecordSendingEvent(ctx, e)
defer cb(err)

err = c.sender.Send(ctx, (*binding.EventMessage)(&e))
defer cb(err)
return err
}

Expand Down Expand Up @@ -160,7 +159,6 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, pr

// Event has been defaulted and validated, record we are going to perform request.
ctx, cb := c.observabilityService.RecordRequestEvent(ctx, e)
defer cb(err, resp)

// If provided a requester, use it to do request/response.
var msg binding.Message
Expand All @@ -186,7 +184,7 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, pr
} else {
resp = rs
}

defer cb(err, resp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change should not do anything as err and resp are defined here: https://github.com/cloudevents/sdk-go/pull/715/files#diff-bfa445e7f19a76da55d2f4a1f51a4dd9cb7e2bf54efd28fd96428078c1453732R139-R140 and are never recreated in the function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. It seems to matter for error type. which is annoying...

return resp, err
}

Expand Down
2 changes: 1 addition & 1 deletion v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p

var cb func(error)
ctx, cb = r.observabilityService.RecordCallingInvoker(ctx, e)
defer cb(result)

resp, result = r.fn.invoke(ctx, e)
defer cb(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can you confirm the new test fails when the defer order is changed to the original?

return
}()

Expand Down