Skip to content

Commit

Permalink
Adding trace support back in.
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
Scott Nichols committed Mar 16, 2020
1 parent e5f85d9 commit 938649b
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 59 deletions.
33 changes: 33 additions & 0 deletions cmd/samples/http/receiver-traced/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/pkg/client"
"log"

cloudevents "github.com/cloudevents/sdk-go"
)

func main() {
ctx := context.Background()
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("will listen on :8080\n")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, receive))
}

func receive(ctx context.Context, e cloudevents.Event) {
ctx, span := client.TraceSpan(ctx, e)
defer span.End()

fmt.Printf("%s", e)
}
40 changes: 25 additions & 15 deletions pkg/client/client_observed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/extensions"
"github.com/cloudevents/sdk-go/pkg/observability"
"go.opencensus.io/trace"
)
Expand All @@ -15,34 +16,43 @@ func NewObserved(protocol interface{}, opts ...Option) (Client, error) {
return nil, err
}

return &obsClient{client: client}, nil
c := &obsClient{client: client}

if err := c.applyOptions(opts...); err != nil {
return nil, err
}
return c, nil
}

type obsClient struct {
client Client

disableTracePropagation bool // TODO?
addTracing bool
}

//
//func (c *ceClient) applyOptions(opts ...Option) error {
// for _, fn := range opts {
// if err := fn(c); err != nil {
// return err
// }
// }
// return nil
//}
func (c *obsClient) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(c); err != nil {
return err
}
}
return nil
}

// Send transmits the provided event on a preconfigured Protocol. Send returns
// an error if there was an an issue validating the outbound event or the
// transport returns an error.
func (c *obsClient) Send(ctx context.Context, e event.Event) error {
ctx, r := observability.NewReporter(ctx, reportSend)
ctx, span := trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindClient))
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(e.Context)...)
span.AddAttributes(EventTraceAttributes(&e)...)
}

if c.addTracing {
e.Context = e.Context.Clone()
extensions.FromSpanContext(span.SpanContext()).AddTracingAttributes(&e)
}

err := c.client.Send(ctx, e)
Expand All @@ -57,10 +67,10 @@ func (c *obsClient) Send(ctx context.Context, e event.Event) error {

func (c *obsClient) Request(ctx context.Context, e event.Event) (*event.Event, error) {
ctx, r := observability.NewReporter(ctx, reportRequest)
ctx, span := trace.StartSpan(ctx, clientSpanName, trace.WithSpanKind(trace.SpanKindClient))
ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(eventTraceAttributes(e.Context)...)
span.AddAttributes(EventTraceAttributes(&e)...)
}

resp, err := c.client.Request(ctx, e)
Expand Down
20 changes: 9 additions & 11 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func simpleBinaryClient(target string) client.Client {
return nil
}

c, err := client.New(p, client.WithoutTracePropagation(), client.WithForceBinary())
c, err := client.New(p, client.WithForceBinary())
if err != nil {
return nil
}
Expand All @@ -57,7 +57,7 @@ func simpleTracingBinaryClient(target string) client.Client {
return nil
}

c, err := client.New(p)
c, err := client.NewObserved(p, client.WithTracePropagation())
if err != nil {
return nil
}
Expand All @@ -71,7 +71,7 @@ func simpleStructuredClient(target string) client.Client {
return nil
}

c, err := client.New(p, client.WithoutTracePropagation(), client.WithForceStructured())
c, err := client.New(p, client.WithForceStructured())
if err != nil {
return nil
}
Expand Down Expand Up @@ -187,8 +187,6 @@ func TestClientSend(t *testing.T) {
}

func TestTracingClientSend(t *testing.T) {
t.Skip("skipping tracing tests for now, need to rework this for sdk v2")

now := time.Now()

testCases := map[string]struct {
Expand Down Expand Up @@ -475,8 +473,6 @@ func TestClientReceive(t *testing.T) {
}

func TestTracedClientReceive(t *testing.T) {
t.Skip("TODO: need to re-add tracedClient features into httpb")

now := time.Now()

testCases := map[string]struct {
Expand Down Expand Up @@ -518,9 +514,11 @@ func TestTracedClientReceive(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
go func() {
err = c.StartReceiver(ctx, func(ctx context.Context, event event.Event) (*event.Event, transport.Result) {
err = c.StartReceiver(ctx, func(ctx context.Context, e event.Event) (*event.Event, transport.Result) {
go func() {
spanContexts <- trace.FromContext(ctx).SpanContext()
_, span := client.TraceSpan(ctx, e)
defer span.End()
spanContexts <- span.SpanContext()
}()
return nil, nil
})
Expand All @@ -531,10 +529,10 @@ func TestTracedClientReceive(t *testing.T) {
time.Sleep(5 * time.Millisecond) // let the server start

target := fmt.Sprintf("http://localhost:%d", p.GetPort())
client := simpleBinaryClient(target)
sender := simpleTracingBinaryClient(target)

ctx, span := trace.StartSpan(context.TODO(), "test-span")
err = client.Send(ctx, tc.event)
err = sender.Send(ctx, tc.event)
span.End()

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newReceiveInvoker(fn interface{}, fns ...EventDefaulter) (Invoker, error) {

type receiveInvoker struct {
fn *receiverFn
eventDefaulterFns []EventDefaulter // TODO: set these.
eventDefaulterFns []EventDefaulter
}

func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn transport.ResponseFn) (err error) {
Expand Down
31 changes: 23 additions & 8 deletions pkg/client/observability.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package client

import (
"context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/extensions"
"github.com/cloudevents/sdk-go/pkg/observability"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -31,8 +33,6 @@ type observed int32
var _ observability.Observable = observed(0)

const (
clientSpanName = "cloudevents.client"

specversionAttr = "cloudevents.specversion"
typeAttr = "cloudevents.type"
sourceAttr = "cloudevents.source"
Expand Down Expand Up @@ -63,17 +63,32 @@ func (o observed) LatencyMs() *stats.Float64Measure {
return LatencyMs
}

func eventTraceAttributes(e event.EventContextReader) []trace.Attribute {
func EventTraceAttributes(e event.EventReader) []trace.Attribute {
as := []trace.Attribute{
trace.StringAttribute(specversionAttr, e.GetSpecVersion()),
trace.StringAttribute(typeAttr, e.GetType()),
trace.StringAttribute(sourceAttr, e.GetSource()),
trace.StringAttribute(specversionAttr, e.SpecVersion()),
trace.StringAttribute(typeAttr, e.Type()),
trace.StringAttribute(sourceAttr, e.Source()),
}
if sub := e.GetSubject(); sub != "" {
if sub := e.Subject(); sub != "" {
as = append(as, trace.StringAttribute(subjectAttr, sub))
}
if dct := e.GetDataContentType(); dct != "" {
if dct := e.DataContentType(); dct != "" {
as = append(as, trace.StringAttribute(datacontenttypeAttr, dct))
}
return as
}

// TraceSpan returns context and trace.Span based on event. Caller must call span.End()
func TraceSpan(ctx context.Context, e event.Event) (context.Context, *trace.Span) {
var span *trace.Span
if ext, ok := extensions.GetDistributedTracingExtension(e); ok {
ctx, span = ext.StartChildSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
if span == nil {
ctx, span = trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer))
}
if span.IsRecordingEvents() {
span.AddAttributes(EventTraceAttributes(&e)...)
}
return ctx, span
}
48 changes: 30 additions & 18 deletions pkg/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,68 @@ import (
)

// Option is the function signature required to be considered an client.Option.
type Option func(*ceClient) error
type Option func(interface{}) error

// WithEventDefaulter adds an event defaulter to the end of the defaulter chain.
func WithEventDefaulter(fn EventDefaulter) Option {
return func(c *ceClient) error {
if fn == nil {
return fmt.Errorf("client option was given an nil event defaulter")
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
if fn == nil {
return fmt.Errorf("client option was given an nil event defaulter")
}
c.eventDefaulterFns = append(c.eventDefaulterFns, fn)
}
c.eventDefaulterFns = append(c.eventDefaulterFns, fn)
return nil
}
}

func WithForceBinary() Option {
return func(c *ceClient) error {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary)
}
return nil
}
}

func WithForceStructured() Option {
return func(c *ceClient) error {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured)
}
return nil
}
}

// WithUUIDs adds DefaultIDToUUIDIfNotSet event defaulter to the end of the
// defaulter chain.
func WithUUIDs() Option {
return func(c *ceClient) error {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet)
}
return nil
}
}

// WithTimeNow adds DefaultTimeToNowIfNotSet event defaulter to the end of the
// defaulter chain.
func WithTimeNow() Option {
return func(c *ceClient) error {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet)
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet)
}
return nil
}
}

// WithoutTracePropagation disables automatic trace propagation via
// the distributed tracing extension.
func WithoutTracePropagation() Option {
return func(c *ceClient) error {
//c.disableTracePropagation = true
// WithTracePropagation enables trace propagation via the distributed tracing
// extension.
func WithTracePropagation() Option {
return func(i interface{}) error {
if c, ok := i.(*obsClient); ok {
c.addTracing = true
}
return nil
}
}
9 changes: 3 additions & 6 deletions pkg/extensions/distributed_tracing_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (

// EventTracer interface allows setting extension for cloudevents context.
type EventTracer interface {
SetExtension(k string, v interface{}) error
SetExtension(k string, v interface{})
}

// DistributedTracingExtension represents the extension for cloudevents context
Expand All @@ -31,7 +31,7 @@ type DistributedTracingExtension struct {
}

// AddTracingAttributes adds the tracing attributes traceparent and tracestate to the cloudevents context
func (d DistributedTracingExtension) AddTracingAttributes(ec EventTracer) error {
func (d DistributedTracingExtension) AddTracingAttributes(ec EventTracer) {
if d.TraceParent != "" {
value := reflect.ValueOf(d)
typeOf := value.Type()
Expand All @@ -42,12 +42,9 @@ func (d DistributedTracingExtension) AddTracingAttributes(ec EventTracer) error
if k == TraceStateExtension && v == "" {
continue
}
if err := ec.SetExtension(k, v); err != nil {
return err
}
ec.SetExtension(k, v)
}
}
return nil
}

func GetDistributedTracingExtension(event event.Event) (DistributedTracingExtension, bool) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/observability/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ var (
)

const (
// ClientSpanName is the key used to start spans from the client.
ClientSpanName = "cloudevents.client"

// ResultError is a shared result tag value for error.
ResultError = "error"
// ResultOK is a shared result tag value for success.
Expand Down

0 comments on commit 938649b

Please sign in to comment.