Skip to content

Commit

Permalink
feat(pubsub): allow trace extraction from protobuf message (#10827)
Browse files Browse the repository at this point in the history
Export the function NewMessageCarrierFromPB which, given a protobuf
PubsubMessage, returns a TextMapCarrier which can be used to extract
trace contexts.

This allows it to be used in e.g. HTTP push endpoints.

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
radhus and hongalex authored Sep 6, 2024
1 parent 48addbf commit caa826c
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"sync"

pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -273,33 +274,42 @@ func tracer() trace.Tracer {

var _ propagation.TextMapCarrier = (*messageCarrier)(nil)

// messageCarrier injects and extracts traces from a pubsub.Message.
// messageCarrier injects and extracts traces from pubsub.Message attributes.
type messageCarrier struct {
msg *Message
attributes map[string]string
}

const googclientPrefix string = "googclient_"

// newMessageCarrier creates a new PubsubMessageCarrier.
func newMessageCarrier(msg *Message) messageCarrier {
return messageCarrier{msg: msg}
return messageCarrier{attributes: msg.Attributes}
}

// NewMessageCarrierFromPB creates a propagation.TextMapCarrier that can be used to extract the trace
// context from a protobuf PubsubMessage.
//
// Example:
// ctx = propagation.TraceContext{}.Extract(ctx, pubsub.NewMessageCarrierFromPB(msg))
func NewMessageCarrierFromPB(msg *pb.PubsubMessage) propagation.TextMapCarrier {
return messageCarrier{attributes: msg.Attributes}
}

// Get retrieves a single value for a given key.
func (c messageCarrier) Get(key string) string {
return c.msg.Attributes[googclientPrefix+key]
return c.attributes[googclientPrefix+key]
}

// Set sets an attribute.
func (c messageCarrier) Set(key, val string) {
c.msg.Attributes[googclientPrefix+key] = val
c.attributes[googclientPrefix+key] = val
}

// Keys returns a slice of all keys in the carrier.
func (c messageCarrier) Keys() []string {
i := 0
out := make([]string, len(c.msg.Attributes))
for k := range c.msg.Attributes {
out := make([]string, len(c.attributes))
for k := range c.attributes {
out[i] = k
i++
}
Expand Down

0 comments on commit caa826c

Please sign in to comment.