Skip to content

Commit

Permalink
Adding trace support back in. (#388)
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
n3wscott authored Mar 16, 2020
1 parent 937d4d2 commit c9bd2c3
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 77 deletions.
9 changes: 5 additions & 4 deletions alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ var (
// Client Creation

NewClient = client.New
NewClientObserved = client.NewObserved
NewDefaultClient = client.NewDefault
NewHTTPReceiveHandler = client.NewHTTPReceiveHandler

// Client Options

WithEventDefaulter = client.WithEventDefaulter
WithUUIDs = client.WithUUIDs
WithTimeNow = client.WithTimeNow
WithoutTracePropagation = client.WithoutTracePropagation
WithEventDefaulter = client.WithEventDefaulter
WithUUIDs = client.WithUUIDs
WithTimeNow = client.WithTimeNow
WithTracePropagation = client.WithTracePropagation()

// Event Creation

Expand Down
33 changes: 33 additions & 0 deletions cmd/samples/http/receiver-traced/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/pkg/client"
"log"

cloudevents "github.com/cloudevents/sdk-go"
)

func main() {
ctx := context.Background()
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClientObserved(p, cloudevents.WithTracePropagation)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("will listen on :8080\n")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, receive))
}

func receive(ctx context.Context, e cloudevents.Event) {
ctx, span := client.TraceSpan(ctx, e)
defer span.End()

fmt.Printf("%s", e)
}
1 change: 0 additions & 1 deletion cmd/samples/httpb/responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func main() {
c, err := cloudevents.NewClient(p,
cloudevents.WithUUIDs(),
cloudevents.WithTimeNow(),
cloudevents.WithoutTracePropagation(),
)
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
Expand Down
40 changes: 25 additions & 15 deletions pkg/client/client_observed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/extensions"
"github.com/cloudevents/sdk-go/pkg/observability"
"go.opencensus.io/trace"
)
Expand All @@ -15,34 +16,43 @@ func NewObserved(protocol interface{}, opts ...Option) (Client, error) {
return nil, err
}

return &obsClient{client: client}, nil
c := &obsClient{client: client}

if err := c.applyOptions(opts...); err != nil {
return nil, err
}
return c, nil
}

type obsClient struct {
client Client

disableTracePropagation bool // TODO?
addTracing bool
}

//
//func (c *ceClient) applyOptions(opts ...Option) error {
// for _, fn := range opts {
// if err := fn(c); err != nil {
// return err
// }
// }
// return nil
//}
func (c *obsClient) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(c); err != nil {
return err
}
}
return nil
}

// Send transmits the provided event on a preconfigured Protocol. Send returns
// an error if there was an an issue validating the outbound event or the
// transport returns an error.
func (c *obsClient) Send(ctx context.Context, e event.Event) error {
ctx, r := observability.NewReporter(ctx, reportSend)
ctx, span := trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindClient))
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(e.Context)...)
span.AddAttributes(EventTraceAttributes(&e)...)
}

if c.addTracing {
e.Context = e.Context.Clone()
extensions.FromSpanContext(span.SpanContext()).AddTracingAttributes(&e)
}

err := c.client.Send(ctx, e)
Expand All @@ -57,10 +67,10 @@ func (c *obsClient) Send(ctx context.Context, e event.Event) error {

func (c *obsClient) Request(ctx context.Context, e event.Event) (*event.Event, error) {
ctx, r := observability.NewReporter(ctx, reportRequest)
ctx, span := trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindClient))
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(e.Context)...)
span.AddAttributes(EventTraceAttributes(&e)...)
}

resp, err := c.client.Request(ctx, e)
Expand Down
20 changes: 9 additions & 11 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func simpleBinaryClient(target string) client.Client {
return nil
}

c, err := client.New(p, client.WithoutTracePropagation(), client.WithForceBinary())
c, err := client.New(p, client.WithForceBinary())
if err != nil {
return nil
}
Expand All @@ -57,7 +57,7 @@ func simpleTracingBinaryClient(target string) client.Client {
return nil
}

c, err := client.New(p)
c, err := client.NewObserved(p, client.WithTracePropagation())
if err != nil {
return nil
}
Expand All @@ -71,7 +71,7 @@ func simpleStructuredClient(target string) client.Client {
return nil
}

c, err := client.New(p, client.WithoutTracePropagation(), client.WithForceStructured())
c, err := client.New(p, client.WithForceStructured())
if err != nil {
return nil
}
Expand Down Expand Up @@ -187,8 +187,6 @@ func TestClientSend(t *testing.T) {
}

func TestTracingClientSend(t *testing.T) {
t.Skip("skipping tracing tests for now, need to rework this for sdk v2")

now := time.Now()

testCases := map[string]struct {
Expand Down Expand Up @@ -475,8 +473,6 @@ func TestClientReceive(t *testing.T) {
}

func TestTracedClientReceive(t *testing.T) {
t.Skip("TODO: need to re-add tracedClient features into httpb")

now := time.Now()

testCases := map[string]struct {
Expand Down Expand Up @@ -518,9 +514,11 @@ func TestTracedClientReceive(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
go func() {
err = c.StartReceiver(ctx, func(ctx context.Context, event event.Event) (*event.Event, transport.Result) {
err = c.StartReceiver(ctx, func(ctx context.Context, e event.Event) (*event.Event, transport.Result) {
go func() {
spanContexts <- trace.FromContext(ctx).SpanContext()
_, span := client.TraceSpan(ctx, e)
defer span.End()
spanContexts <- span.SpanContext()
}()
return nil, nil
})
Expand All @@ -531,10 +529,10 @@ func TestTracedClientReceive(t *testing.T) {
time.Sleep(5 * time.Millisecond) // let the server start

target := fmt.Sprintf("http://localhost:%d", p.GetPort())
client := simpleBinaryClient(target)
sender := simpleTracingBinaryClient(target)

ctx, span := trace.StartSpan(context.TODO(), "test-span")
err = client.Send(ctx, tc.event)
err = sender.Send(ctx, tc.event)
span.End()

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newReceiveInvoker(fn interface{}, fns ...EventDefaulter) (Invoker, error) {

type receiveInvoker struct {
fn *receiverFn
eventDefaulterFns []EventDefaulter // TODO: set these.
eventDefaulterFns []EventDefaulter
}

func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn transport.ResponseFn) (err error) {
Expand Down
31 changes: 23 additions & 8 deletions pkg/client/observability.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package client

import (
"context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/extensions"
"github.com/cloudevents/sdk-go/pkg/observability"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -31,8 +33,6 @@ type observed int32
var _ observability.Observable = observed(0)

const (
clientSpanName = "cloudevents.client"

specversionAttr = "cloudevents.specversion"
typeAttr = "cloudevents.type"
sourceAttr = "cloudevents.source"
Expand Down Expand Up @@ -63,17 +63,32 @@ func (o observed) LatencyMs() *stats.Float64Measure {
return LatencyMs
}

func eventTraceAttributes(e event.EventContextReader) []trace.Attribute {
func EventTraceAttributes(e event.EventReader) []trace.Attribute {
as := []trace.Attribute{
trace.StringAttribute(specversionAttr, e.GetSpecVersion()),
trace.StringAttribute(typeAttr, e.GetType()),
trace.StringAttribute(sourceAttr, e.GetSource()),
trace.StringAttribute(specversionAttr, e.SpecVersion()),
trace.StringAttribute(typeAttr, e.Type()),
trace.StringAttribute(sourceAttr, e.Source()),
}
if sub := e.GetSubject(); sub != "" {
if sub := e.Subject(); sub != "" {
as = append(as, trace.StringAttribute(subjectAttr, sub))
}
if dct := e.GetDataContentType(); dct != "" {
if dct := e.DataContentType(); dct != "" {
as = append(as, trace.StringAttribute(datacontenttypeAttr, dct))
}
return as
}

// TraceSpan returns context and trace.Span based on event. Caller must call span.End()
func TraceSpan(ctx context.Context, e event.Event) (context.Context, *trace.Span) {
var span *trace.Span
if ext, ok := extensions.GetDistributedTracingExtension(e); ok {
ctx, span = ext.StartChildSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
if span == nil {
ctx, span = trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
if span.IsRecordingEvents() {
span.AddAttributes(EventTraceAttributes(&e)...)
}
return ctx, span
}
48 changes: 30 additions & 18 deletions pkg/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,68 @@ import (
)

// Option is the function signature required to be considered an client.Option.
type Option func(*ceClient) error
type Option func(interface{}) error

// WithEventDefaulter adds an event defaulter to the end of the defaulter chain.
func WithEventDefaulter(fn EventDefaulter) Option {
return func(c *ceClient) error {
if fn == nil {
return fmt.Errorf("client option was given an nil event defaulter")
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
if fn == nil {
return fmt.Errorf("client option was given an nil event defaulter")
}
c.eventDefaulterFns = append(c.eventDefaulterFns, fn)
}
c.eventDefaulterFns = append(c.eventDefaulterFns, fn)
return nil
}
}

func WithForceBinary() Option {
return func(c *ceClient) error {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary)
}
return nil
}
}

func WithForceStructured() Option {
return func(c *ceClient) error {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured)
}
return nil
}
}

// WithUUIDs adds DefaultIDToUUIDIfNotSet event defaulter to the end of the
// defaulter chain.
func WithUUIDs() Option {
return func(c *ceClient) error {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet)
}
return nil
}
}

// WithTimeNow adds DefaultTimeToNowIfNotSet event defaulter to the end of the
// defaulter chain.
func WithTimeNow() Option {
return func(c *ceClient) error {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet)
}
return nil
}
}

// WithoutTracePropagation disables automatic trace propagation via
// the distributed tracing extension.
func WithoutTracePropagation() Option {
return func(c *ceClient) error {
//c.disableTracePropagation = true
// WithTracePropagation enables trace propagation via the distributed tracing
// extension.
func WithTracePropagation() Option {
return func(i interface{}) error {
if c, ok := i.(*obsClient); ok {
c.addTracing = true
}
return nil
}
}
Loading

0 comments on commit c9bd2c3

Please sign in to comment.