Skip to content

Commit

Permalink
feat(v0.4.5): v0.4.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Jan 21, 2025
1 parent 646a508 commit 63ed326
Show file tree
Hide file tree
Showing 8 changed files with 840 additions and 0 deletions.
46 changes: 46 additions & 0 deletions adapter/kratos/middleware/ktracing/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package ktracing

import (
"context"

"go.opentelemetry.io/otel/propagation"

"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/metadata"
)

const serviceHeader = "x-md-service-name"

// Metadata is tracing metadata propagator
type Metadata struct{}

var _ propagation.TextMapPropagator = Metadata{}

// Inject sets metadata key-values from ctx into the carrier.
func (b Metadata) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
app, ok := kratos.FromContext(ctx)
if ok {
carrier.Set(serviceHeader, app.Name())
}
}

// Extract returns a copy of parent with the metadata from the carrier added.
func (b Metadata) Extract(parent context.Context, carrier propagation.TextMapCarrier) context.Context {
name := carrier.Get(serviceHeader)
if name == "" {
return parent
}
if md, ok := metadata.FromServerContext(parent); ok {
md.Set(serviceHeader, name)
return parent
}
md := metadata.New()
md.Set(serviceHeader, name)
parent = metadata.NewServerContext(parent, md)
return parent
}

// Fields returns the keys who's values are set with Inject.
func (b Metadata) Fields() []string {
return []string{serviceHeader}
}
107 changes: 107 additions & 0 deletions adapter/kratos/middleware/ktracing/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package ktracing

import (
"context"
"reflect"
"testing"

"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/metadata"

"go.opentelemetry.io/otel/propagation"
)

func TestMetadata_Inject(t *testing.T) {
type args struct {
appName string
carrier propagation.TextMapCarrier
}
tests := []struct {
name string
args args
want string
}{
{
name: "https://go-kratos.dev",
args: args{"https://go-kratos.dev", propagation.HeaderCarrier{}},
want: "https://go-kratos.dev",
},
{
name: "https://github.com/go-kratos/kratos",
args: args{"https://github.com/go-kratos/kratos", propagation.HeaderCarrier{"mode": []string{"test"}}},
want: "https://github.com/go-kratos/kratos",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := kratos.New(kratos.Name(tt.args.appName))
ctx := kratos.NewContext(context.Background(), a)
m := new(Metadata)
m.Inject(ctx, tt.args.carrier)
if res := tt.args.carrier.Get(serviceHeader); tt.want != res {
t.Errorf("Get(serviceHeader) :%s want: %s", res, tt.want)
}
})
}
}

func TestMetadata_Extract(t *testing.T) {
type args struct {
parent context.Context
carrier propagation.TextMapCarrier
}
tests := []struct {
name string
args args
want string
crash bool
}{
{
name: "https://go-kratos.dev",
args: args{
parent: context.Background(),
carrier: propagation.HeaderCarrier{"X-Md-Service-Name": []string{"https://go-kratos.dev"}},
},
want: "https://go-kratos.dev",
},
{
name: "https://github.com/go-kratos/kratos",
args: args{
parent: metadata.NewServerContext(context.Background(), metadata.Metadata{}),
carrier: propagation.HeaderCarrier{"X-Md-Service-Name": []string{"https://github.com/go-kratos/kratos"}},
},
want: "https://github.com/go-kratos/kratos",
},
{
name: "https://github.com/go-kratos/kratos",
args: args{
parent: metadata.NewServerContext(context.Background(), metadata.Metadata{}),
carrier: propagation.HeaderCarrier{"X-Md-Service-Name": nil},
},
crash: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := Metadata{}
ctx := b.Extract(tt.args.parent, tt.args.carrier)
md, ok := metadata.FromServerContext(ctx)
if !ok {
if tt.crash {
return
}
t.Errorf("expect %v, got %v", true, ok)
}
if !reflect.DeepEqual(md.Get(serviceHeader), tt.want) {
t.Errorf("expect %v, got %v", tt.want, md.Get(serviceHeader))
}
})
}
}

func TestFields(t *testing.T) {
b := Metadata{}
if !reflect.DeepEqual(b.Fields(), []string{"x-md-service-name"}) {
t.Errorf("expect %v, got %v", []string{"x-md-service-name"}, b.Fields())
}
}
152 changes: 152 additions & 0 deletions adapter/kratos/middleware/ktracing/span.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package ktracing

import (
"context"
"net"
"net/url"
"strings"

"github.com/go-kratos/kratos/v2/metadata"
"github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/http"

"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
)

func setClientSpan(ctx context.Context, span trace.Span, m interface{}) {
var (
attrs []attribute.KeyValue
remote string
operation string
rpcKind string
)
tr, ok := transport.FromClientContext(ctx)
if ok {
operation = tr.Operation()
rpcKind = tr.Kind().String()
switch tr.Kind() {
case transport.KindHTTP:
if ht, ok := tr.(http.Transporter); ok {
method := ht.Request().Method
route := ht.PathTemplate()
path := ht.Request().URL.Path
attrs = append(attrs, semconv.HTTPMethodKey.String(method))
attrs = append(attrs, semconv.HTTPRouteKey.String(route))
attrs = append(attrs, semconv.HTTPTargetKey.String(path))
remote = ht.Request().Host
}
case transport.KindGRPC:
remote, _ = parseTarget(tr.Endpoint())
}
}
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind))
_, mAttrs := parseFullMethod(operation)
attrs = append(attrs, mAttrs...)
if remote != "" {
attrs = append(attrs, peerAttr(remote)...)
}
if p, ok := m.(proto.Message); ok {
attrs = append(attrs, attribute.Key("send_msg.size").Int(proto.Size(p)))
}

span.SetAttributes(attrs...)
}

func setServerSpan(ctx context.Context, span trace.Span, m interface{}) {
var (
attrs []attribute.KeyValue
remote string
operation string
rpcKind string
)
tr, ok := transport.FromServerContext(ctx)
if ok {
operation = tr.Operation()
rpcKind = tr.Kind().String()
switch tr.Kind() {
case transport.KindHTTP:
if ht, ok := tr.(http.Transporter); ok {
method := ht.Request().Method
route := ht.PathTemplate()
path := ht.Request().URL.Path
attrs = append(attrs, semconv.HTTPMethodKey.String(method))
attrs = append(attrs, semconv.HTTPRouteKey.String(route))
attrs = append(attrs, semconv.HTTPTargetKey.String(path))
remote = ht.Request().RemoteAddr
}
case transport.KindGRPC:
if p, ok := peer.FromContext(ctx); ok {
remote = p.Addr.String()
}
}
}
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind))
_, mAttrs := parseFullMethod(operation)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(remote)...)
if p, ok := m.(proto.Message); ok {
attrs = append(attrs, attribute.Key("recv_msg.size").Int(proto.Size(p)))
}
if md, ok := metadata.FromServerContext(ctx); ok {
attrs = append(attrs, semconv.PeerServiceKey.String(md.Get(serviceHeader)))
}

span.SetAttributes(attrs...)
}

// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span attribute.KeyValue attributes based
// on a gRPC's FullMethod.
func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 { //nolint:mnd
// Invalid format, does not follow `/package.service/method`.
return name, []attribute.KeyValue{attribute.Key("rpc.operation").String(fullMethod)}
}

var attrs []attribute.KeyValue
if service := parts[0]; service != "" {
attrs = append(attrs, semconv.RPCServiceKey.String(service))
}
if method := parts[1]; method != "" {
attrs = append(attrs, semconv.RPCMethodKey.String(method))
}
return name, attrs
}

// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []attribute.KeyValue {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return []attribute.KeyValue(nil)
}

if host == "" {
host = "127.0.0.1"
}

return []attribute.KeyValue{
semconv.NetPeerIPKey.String(host),
semconv.NetPeerPortKey.String(port),
}
}

func parseTarget(endpoint string) (address string, err error) {
var u *url.URL
u, err = url.Parse(endpoint)
if err != nil {
if u, err = url.Parse("http://" + endpoint); err != nil {
return "", err
}
return u.Host, nil
}
if len(u.Path) > 1 {
return u.Path[1:], nil
}
return endpoint, nil
}
43 changes: 43 additions & 0 deletions adapter/kratos/middleware/ktracing/statshandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ktracing

import (
"context"
"fmt"

"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
)

// ClientHandler is tracing ClientHandler
type ClientHandler struct{}

// HandleConn exists to satisfy gRPC stats.Handler.
func (c *ClientHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
fmt.Println("Handle connection.")
}

// TagConn exists to satisfy gRPC stats.Handler.
func (c *ClientHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (c *ClientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
if _, ok := rs.(*stats.OutHeader); !ok {
return
}
p, ok := peer.FromContext(ctx)
if !ok {
return
}
span := trace.SpanFromContext(ctx)
if span.SpanContext().IsValid() {
span.SetAttributes(peerAttr(p.Addr.String())...)
}
}

// TagRPC implements per-RPC context management.
func (c *ClientHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
Loading

0 comments on commit 63ed326

Please sign in to comment.