Skip to content

Commit

Permalink
PoC for response propagators
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Feb 1, 2024
1 parent 242d23a commit c60efdf
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 7 deletions.
58 changes: 51 additions & 7 deletions internal/global/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,25 @@ type (
tm propagation.TextMapPropagator
}

responsePropagatorsHolder struct {
tm propagation.TextMapPropagator
}

meterProviderHolder struct {
mp metric.MeterProvider
}
)

var (
globalTracer = defaultTracerValue()
globalPropagators = defaultPropagatorsValue()
globalMeterProvider = defaultMeterProvider()

delegateTraceOnce sync.Once
delegateTextMapPropagatorOnce sync.Once
delegateMeterOnce sync.Once
globalTracer = defaultTracerValue()
globalPropagators = defaultPropagatorsValue()
globalResponsePropagators = defaultResponsePropagatorsValue()
globalMeterProvider = defaultMeterProvider()

delegateTraceOnce sync.Once
delegateTextMapPropagatorOnce sync.Once
delegateTextMapResponsePropagatorOnce sync.Once
delegateMeterOnce sync.Once
)

// TracerProvider is the internal implementation for global.TracerProvider.
Expand Down Expand Up @@ -82,6 +88,11 @@ func TextMapPropagator() propagation.TextMapPropagator {
return globalPropagators.Load().(propagatorsHolder).tm
}

// TextMapResponsePropagator is the internal implementation for global.TextMapResponsePropagator.
func TextMapResponsePropagator() propagation.TextMapPropagator {
return globalResponsePropagators.Load().(responsePropagatorsHolder).tm
}

// SetTextMapPropagator is the internal implementation for global.SetTextMapPropagator.
func SetTextMapPropagator(p propagation.TextMapPropagator) {
current := TextMapPropagator()
Expand Down Expand Up @@ -109,6 +120,33 @@ func SetTextMapPropagator(p propagation.TextMapPropagator) {
globalPropagators.Store(propagatorsHolder{tm: p})
}

// SetTextMapResponsePropagator is the internal implementation for global.SetTextMapResponsePropagator.
func SetTextMapResponsePropagator(p propagation.TextMapPropagator) {
current := TextMapResponsePropagator()

if _, cOk := current.(*textMapPropagator); cOk {
if _, pOk := p.(*textMapPropagator); pOk && current == p {
// Do not assign the default delegating TextMapPropagator to
// delegate to itself.
Error(
errors.New("no delegate configured in text map response propagator"),
"Setting text map response propagator to it's current value. No delegate will be configured",
)
return
}
}

// For the textMapPropagator already returned by TextMapPropagator
// delegate to p.
delegateTextMapResponsePropagatorOnce.Do(func() {
if def, ok := current.(*textMapPropagator); ok {
def.SetDelegate(p)
}
})
// Return p when subsequent calls to TextMapResponsePropagator are made.
globalResponsePropagators.Store(responsePropagatorsHolder{tm: p})
}

// MeterProvider is the internal implementation for global.MeterProvider.
func MeterProvider() metric.MeterProvider {
return globalMeterProvider.Load().(meterProviderHolder).mp
Expand Down Expand Up @@ -149,6 +187,12 @@ func defaultPropagatorsValue() *atomic.Value {
return v
}

func defaultResponsePropagatorsValue() *atomic.Value {
v := &atomic.Value{}
v.Store(responsePropagatorsHolder{tm: newTextMapPropagator()})
return v
}

func defaultMeterProvider() *atomic.Value {
v := &atomic.Value{}
v.Store(meterProviderHolder{mp: &meterProvider{}})
Expand Down
49 changes: 49 additions & 0 deletions internal/global/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,55 @@ func TestSetTextMapPropagator(t *testing.T) {
})
}

func TestSetTextMapResponsePropagator(t *testing.T) {
t.Run("Set With default is a noop", func(t *testing.T) {
ResetForTest(t)
SetTextMapResponsePropagator(TextMapPropagator())

tmp, ok := TextMapResponsePropagator().(*textMapPropagator)
if !ok {
t.Fatal("Global TextMapResponsePropagator should be the default propagator")
}

if tmp.delegate != nil {
t.Fatal("TextMapResponsePropagator should not delegate when setting itself")
}
})

t.Run("First Set() should replace the delegate", func(t *testing.T) {
ResetForTest(t)

SetTextMapResponsePropagator(propagation.TraceContext{})

_, ok := TextMapResponsePropagator().(*textMapPropagator)
if ok {
t.Fatal("Global TextMapResponsePropagator was not changed")
}
})

t.Run("Set() should delegate existing propagators", func(t *testing.T) {
ResetForTest(t)

p := TextMapResponsePropagator()
SetTextMapResponsePropagator(propagation.TraceContext{})

np := p.(*textMapPropagator)

if np.delegate == nil {
t.Fatal("The delegated TextMapResponsePropagators should have a delegate")
}
})

t.Run("non-comparable types should not panic", func(t *testing.T) {
ResetForTest(t)

// A composite TextMapPropagator is not comparable.
prop := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})
SetTextMapResponsePropagator(prop)
assert.NotPanics(t, func() { SetTextMapResponsePropagator(prop) })
})
}

func TestSetMeterProvider(t *testing.T) {
t.Run("Set With default is a noop", func(t *testing.T) {
ResetForTest(t)
Expand Down
2 changes: 2 additions & 0 deletions internal/global/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func ResetForTest(t testing.TB) {
t.Cleanup(func() {
globalTracer = defaultTracerValue()
globalPropagators = defaultPropagatorsValue()
globalResponsePropagators = defaultResponsePropagatorsValue()
globalMeterProvider = defaultMeterProvider()
delegateTraceOnce = sync.Once{}
delegateTextMapPropagatorOnce = sync.Once{}
delegateTextMapResponsePropagatorOnce = sync.Once{}
delegateMeterOnce = sync.Once{}
})
}
11 changes: 11 additions & 0 deletions propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ func GetTextMapPropagator() propagation.TextMapPropagator {
return global.TextMapPropagator()
}

// GetTextMapResponsePropagator returns the global TextMapResponsePropagator. If none has been
// set, a No-Op TextMapPropagator is returned.
func GetTextMapResponsePropagator() propagation.TextMapPropagator {
return global.TextMapResponsePropagator()
}

// SetTextMapPropagator sets propagator as the global TextMapPropagator.
func SetTextMapPropagator(propagator propagation.TextMapPropagator) {
global.SetTextMapPropagator(propagator)
}

// SetTextMapResponsePropagator sets propagator as the global TextMapResponsePropagator.
func SetTextMapResponsePropagator(propagator propagation.TextMapPropagator) {
global.SetTextMapResponsePropagator(propagator)
}
86 changes: 86 additions & 0 deletions propagation/responsepropagators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package propagation_test // import "go.opentelemetry.io/otel/propagation"

import (
"context"
"fmt"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

func ExampleServerTimingPropagator() {
// in your main function, or equivalent:
p := &serverTimingPropagator{}
otel.SetTextMapResponsePropagator(p)

// your code would be instrumented as usual
tr := otel.Tracer("example")
ctx, span := tr.Start(context.Background(), "operation")
defer span.End()

// the library you use to handle HTTP requests would call this when sending the response back to the caller:
hc := make(propagation.HeaderCarrier)
otel.GetTextMapResponsePropagator().Inject(ctx, hc)

// Output: traceresponse;desc=00-00000000000000000000000000000000-0000000000000000-00
fmt.Println(hc.Get("Server-Timing"))
}

type serverTimingPropagator struct {
}

func (p serverTimingPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
header := carrier.Get("Server-Timing")
if header == "" {
return ctx
}

// TODO: validate the header
desc := strings.Split(header, ";")[1]
values := strings.Split(desc, "-")

traceID, _ := trace.TraceIDFromHex(values[1])
spanID, _ := trace.SpanIDFromHex(values[2])

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: 0, // TODO: properly parse this
})

return trace.ContextWithRemoteSpanContext(ctx, sc)
}

func (serverTimingPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
tctx := trace.SpanContextFromContext(ctx)
traceID := tctx.TraceID()
spanID := tctx.SpanID()

samplingFlag := "00"
if tctx.IsSampled() {
samplingFlag = "01"
}

header := fmt.Sprintf("%s;desc=%s-%s-%s-%s", "traceresponse", "00", traceID, spanID, samplingFlag)
carrier.Set("Server-Timing", header)
}

func (serverTimingPropagator) Fields() []string {
return []string{"Server-Timing"}
}

0 comments on commit c60efdf

Please sign in to comment.