Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: CSM Observability client side component changes #7256

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"google.golang.org/grpc"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)

type clientStatsHandler struct {
Expand All @@ -49,11 +51,11 @@ func (csh *clientStatsHandler) initializeMetrics() {

setOfMetrics := csh.o.MetricsOptions.Metrics.metrics

csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -63,6 +65,15 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}

startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
csh.perCallMetrics(ctx, err, startTime, ci)
Expand Down Expand Up @@ -98,6 +109,16 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
method: csh.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
}

startTime := time.Now()

callback := func(err error) {
Expand All @@ -110,7 +131,7 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
s := status.Convert(err)
callLatency := float64(time.Since(startTime)) / float64(time.Second)
csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code()))))
csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
}

// TagConn exists to satisfy stats.Handler.
Expand All @@ -123,11 +144,24 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

// TagRPC implements per RPC attempt context management.
func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
mi := &metricsInfo{ // populates information about RPC start.
// Numerous stats handlers can be used for the same channel. The cluster
// impl balancer which writes to this will only write once, thus have this
// stats handler's per attempt scoped context point to the same optional
// labels map if set.
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
ri := &rpcInfo{
mi: mi,
ai: ai,
}
return setRPCInfo(ctx, ri)
}
Expand All @@ -138,10 +172,10 @@ func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats)
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
csh.processRPCEvent(ctx, rs, ri.mi)
csh.processRPCEvent(ctx, rs, ri.ai)
}

func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.Begin:
ci := getCallInfo(ctx)
Expand All @@ -150,34 +184,64 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
return
}

csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target)))
csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
case *stats.OutPayload:
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
case *stats.InHeader:
csh.setLabelsFromPluginOption(ai, st.Header)
case *stats.InTrailer:
csh.setLabelsFromPluginOption(ai, st.Trailer)
case *stats.End:
csh.processRPCEnd(ctx, mi, st)
csh.processRPCEnd(ctx, ai, st)
default:
}
}

func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
func (csh *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
if ai.pluginOptionLabels == nil && csh.o.MetricsOptions.pluginOption != nil {
labels := csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
ai.pluginOptionLabels = labels
}
}

func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
ci := getCallInfo(ctx)
if ci == nil {
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
return
}
latency := float64(time.Since(mi.startTime)) / float64(time.Second)
latency := float64(time.Since(ai.startTime)) / float64(time.Second)
st := "OK"
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
}

clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st))
attributes := []otelattribute.KeyValue{
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", st),
}

for k, v := range ai.pluginOptionLabels {
attributes = append(attributes, otelattribute.String(k, v))
}

for _, o := range csh.o.MetricsOptions.OptionalLabels {
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
}

clientAttributeOption := otelmetric.WithAttributes(attributes...)
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption)
}

const (
Expand Down
17 changes: 14 additions & 3 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -126,6 +127,13 @@ type MetricsOptions struct {
// grpc.StaticMethodCallOption as a call option into Invoke or NewStream.
// This only applies for server side metrics.
MethodAttributeFilter func(string) bool

// OptionalLabels are labels received from LB Policies that this component
// should add to metrics that record after receiving incoming metadata.
OptionalLabels []string

// pluginOption is used to get labels to attach to certain metrics, if set.
pluginOption otelinternal.PluginOption
}

// DialOption returns a dial option which enables OpenTelemetry instrumentation
Expand Down Expand Up @@ -187,7 +195,7 @@ func getCallInfo(ctx context.Context) *callInfo {
// rpcInfo is RPC information scoped to the RPC attempt life span client side,
// and the RPC life span server side.
type rpcInfo struct {
mi *metricsInfo
ai *attemptInfo
}

type rpcInfoKey struct{}
Expand All @@ -207,9 +215,9 @@ func removeLeadingSlash(mn string) string {
return strings.TrimLeft(mn, "/")
}

// metricsInfo is RPC information scoped to the RPC attempt life span client
// attemptInfo is RPC information scoped to the RPC attempt life span client
// side, and the RPC life span server side.
type metricsInfo struct {
type attemptInfo struct {
// access these counts atomically for hedging in the future:
// number of bytes after compression (within each message) from side (client
// || server).
Expand All @@ -220,6 +228,9 @@ type metricsInfo struct {

startTime time.Time
method string

pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
xdsLabels map[string]string
}

type clientMetrics struct {
Expand Down
26 changes: 13 additions & 13 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func (ssh *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
}
}

mi := &metricsInfo{
ai := &attemptInfo{
startTime: time.Now(),
method: removeLeadingSlash(method),
}
ri := &rpcInfo{
mi: mi,
ai: ai,
}
return setRPCInfo(ctx, ri)
}
Expand All @@ -99,35 +99,35 @@ func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats)
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
return
}
ssh.processRPCData(ctx, rs, ri.mi)
ssh.processRPCData(ctx, rs, ri.ai)
}

func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.InHeader:
ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method)))
ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ai.method)))
case *stats.OutPayload:
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
case *stats.End:
ssh.processRPCEnd(ctx, mi, st)
ssh.processRPCEnd(ctx, ai, st)
default:
}
}

func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
latency := float64(time.Since(mi.startTime)) / float64(time.Second)
func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
latency := float64(time.Since(ai.startTime)) / float64(time.Second)
st := "OK"
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
}
serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st))
serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ai.method), attribute.String("grpc.status", st))

ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption)
ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), serverAttributeOption)
ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), serverAttributeOption)
}

const (
Expand Down
Loading