Skip to content

Commit

Permalink
Add sampler_type tag to collector ingestion metrics (#1576)
Browse files Browse the repository at this point in the history
  • Loading branch information
guanw authored and black-adder committed Jun 6, 2019
1 parent 09dad38 commit 7f2a49d
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 25 deletions.
159 changes: 140 additions & 19 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package app

import (
"strings"
"sync"

"github.com/uber/jaeger-lib/metrics"
Expand All @@ -28,6 +29,23 @@ const (

// otherServices is the catch-all label when number of services exceeds maxServiceNames
otherServices = "other-services"

samplerTypeKey = "sampler_type"
samplerTypeConst = "const"
samplerTypeProbabilistic = "probabilistic"
samplerTypeRateLimiting = "ratelimiting"
samplerTypeLowerBound = "lowerbound"
samplerTypeUnknown = "unknown"
// types of samplers: const, probabilistic, ratelimiting, lowerbound
numOfSamplerTypes = 4

concatenation = "$_$"

otherServicesConstSampler = otherServices + concatenation + samplerTypeConst
otherServicesProbabilisticSampler = otherServices + concatenation + samplerTypeProbabilistic
otherServicesRateLimitingSampler = otherServices + concatenation + samplerTypeRateLimiting
otherServicesLowerBoundSampler = otherServices + concatenation + samplerTypeLowerBound
otherServicesUnknownSampler = otherServices + concatenation + samplerTypeUnknown
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand Down Expand Up @@ -59,9 +77,18 @@ type countsBySvc struct {
category string
}

type spanCountsBySvc struct {
countsBySvc
}

type traceCountsBySvc struct {
countsBySvc
stringBuilderPool *sync.Pool
}

type metricsBySvc struct {
spans countsBySvc // number of spans received per service
traces countsBySvc // number of traces originated per service
spans spanCountsBySvc // number of spans received per service
traces traceCountsBySvc // number of traces originated per service
}

// InboundTransport identifies the transport used to receive spans.
Expand Down Expand Up @@ -136,23 +163,50 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc {
spansFactory := factory.Namespace(metrics.NSOptions{Name: "spans", Tags: nil})
tracesFactory := factory.Namespace(metrics.NSOptions{Name: "traces", Tags: nil})
return metricsBySvc{
spans: newCountsBySvc(spansFactory, category, maxServiceNames),
traces: newCountsBySvc(tracesFactory, category, maxServiceNames),
spans: newSpanCountsBySvc(spansFactory, category, maxServiceNames),
traces: newTraceCountsBySvc(tracesFactory, category, maxServiceNames),
}
}

func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc {
return countsBySvc{
counts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}),
func newTraceCountsBySvc(factory metrics.Factory, category string, maxServices int) traceCountsBySvc {
return traceCountsBySvc{
countsBySvc: countsBySvc{
counts: newTraceCountsOtherServices(factory, category, "false"),
debugCounts: newTraceCountsOtherServices(factory, category, "true"),
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServices + numOfSamplerTypes, // numOfSamplerType is the offset added to maxServices threshold
category: category,
},
debugCounts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}),
// use sync.Pool to reduce allocation of stringBuilder
stringBuilderPool: &sync.Pool{
New: func() interface{} {
return new(strings.Builder)
},
},
}
}

func newTraceCountsOtherServices(factory metrics.Factory, category string, isDebug string) map[string]metrics.Counter {
return map[string]metrics.Counter{
otherServicesConstSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeConst}}),
otherServicesLowerBoundSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeLowerBound}}),
otherServicesProbabilisticSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeProbabilistic}}),
otherServicesRateLimitingSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeRateLimiting}}),
otherServicesUnknownSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeUnknown}}),
}
}

func newSpanCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) spanCountsBySvc {
return spanCountsBySvc{
countsBySvc: countsBySvc{
counts: map[string]metrics.Counter{otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}})},
debugCounts: map[string]metrics.Counter{otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}})},
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServiceNames,
category: category,
},
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServiceNames,
category: category,
}
}

Expand Down Expand Up @@ -196,7 +250,9 @@ func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
}
m.countSpansByServiceName(serviceName, span.Flags.IsDebug())
if span.ParentSpanID() == 0 {
m.countTracesByServiceName(serviceName, span.Flags.IsDebug())

m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), span.
GetSamplerType())
}
}

Expand All @@ -207,11 +263,64 @@ func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool)

// countTracesByServiceName counts how many traces are received per service,
// i.e. the counter is only incremented for the root spans.
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) {
m.traces.countByServiceName(serviceName, isDebug)
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, samplerType string) {
m.traces.countByServiceName(serviceName, isDebug, samplerType)
}

// traceCountsBySvc.countByServiceName maintains a map of counters for each service name it's
// given and increments the respective counter when called. The service name
// are first normalized to safe-for-metrics format. If the number of counters
// exceeds maxServiceNames, new service names are ignored to avoid polluting
// the metrics namespace and overloading M3.
//
// The reportServiceNameCount() function runs on a timer and will report the
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *traceCountsBySvc) countByServiceName(serviceName string, isDebug bool, samplerType string) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
counts = m.debugCounts
}
var counter metrics.Counter
m.lock.Lock()

// trace counter key is combination of serviceName and samplerType.
key := m.buildKey(serviceName, samplerType)

if c, ok := counts[key]; ok {
counter = c
} else if len(counts) < m.maxServiceNames {
debugStr := "false"
if isDebug {
debugStr = "true"
}
// Only trace metrics have samplerType tag
tags := map[string]string{"svc": serviceName, "debug": debugStr, samplerTypeKey: samplerType}

c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[key] = c
counter = c
} else {
switch samplerType {
case samplerTypeConst:
counter = counts[otherServicesConstSampler]
case samplerTypeLowerBound:
counter = counts[otherServicesLowerBoundSampler]
case samplerTypeProbabilistic:
counter = counts[otherServicesProbabilisticSampler]
case samplerTypeRateLimiting:
counter = counts[otherServicesRateLimitingSampler]
default:
counter = counts[otherServicesUnknownSampler]
}
}
m.lock.Unlock()
counter.Inc(1)
}

// countByServiceName maintains a map of counters for each service name it's
// spanCountsBySvc.countByServiceName maintains a map of counters for each service name it's
// given and increments the respective counter when called. The service name
// are first normalized to safe-for-metrics format. If the number of counters
// exceeds maxServiceNames, new service names are ignored to avoid polluting
Expand All @@ -221,14 +330,15 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool)
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
func (m *spanCountsBySvc) countByServiceName(serviceName string, isDebug bool) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
counts = m.debugCounts
}
var counter metrics.Counter
m.lock.Lock()

if c, ok := counts[serviceName]; ok {
counter = c
} else if len(counts) < m.maxServiceNames {
Expand All @@ -246,3 +356,14 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
m.lock.Unlock()
counter.Inc(1)
}

func (m *traceCountsBySvc) buildKey(serviceName, samplerType string) string {
keyBuilder := m.stringBuilderPool.Get().(*strings.Builder)
keyBuilder.Reset()
keyBuilder.WriteString(serviceName)
keyBuilder.WriteString(concatenation)
keyBuilder.WriteString(samplerType)
key := keyBuilder.String()
m.stringBuilderPool.Put(keyBuilder)
return key
}
48 changes: 44 additions & 4 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,46 @@ func TestProcessorMetrics(t *testing.T) {

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unknown|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|sampler_type=unknown|svc=fry|transport=tchannel"])
assert.Empty(t, gauges)
}

func TestNewCountsBySvc(t *testing.T) {
func TestNewTraceCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3)
metrics := newTraceCountsBySvc(baseMetrics, "not_on_my_level", 3)

metrics.countByServiceName("fry", false, "unknown")
metrics.countByServiceName("leela", false, "unknown")
metrics.countByServiceName("bender", false, "unknown")
metrics.countByServiceName("zoidberg", false, "unknown")

counters, _ := baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=fry"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=leela"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=other-services"])

metrics.countByServiceName("bender", true, "const")
metrics.countByServiceName("bender", true, "probabilistic")
metrics.countByServiceName("leela", true, "probabilistic")
metrics.countByServiceName("fry", true, "ratelimiting")
metrics.countByServiceName("fry", true, "const")
metrics.countByServiceName("elzar", true, "lowerbound")
metrics.countByServiceName("url", true, "unknown")

counters, _ = baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=const|svc=bender"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=probabilistic|svc=bender"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=probabilistic|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=ratelimiting|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=const|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=lowerbound|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=unknown|svc=other-services"])
}

func TestNewSpanCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newSpanCountsBySvc(baseMetrics, "not_on_my_level", 3)
metrics.countByServiceName("fry", false)
metrics.countByServiceName("leela", false)
metrics.countByServiceName("bender", false)
Expand All @@ -83,3 +114,12 @@ func TestNewCountsBySvc(t *testing.T) {
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"])
}

func TestBuildKey(t *testing.T) {
// This test checks if stringBuilder is reset every time buildKey is called.
tc := newTraceCountsBySvc(jaegerM.NullFactory, "received", 100)
key := tc.buildKey("sample-service", "unknown")
assert.Equal(t, "sample-service$_$unknown", key)
key = tc.buildKey("sample-service2", "const")
assert.Equal(t, "sample-service2$_$const", key)
}
4 changes: 2 additions & 2 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ func TestBySvcMetrics(t *testing.T) {
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|sampler_type=unknown|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|sampler_type=unknown|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
}
Expand Down
15 changes: 15 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
SampledFlag = Flags(1)
// DebugFlag is the bit set in Flags in order to define a span as a debug span
DebugFlag = Flags(2)

samplerType = "sampler.type"
samplerTypeUnknown = "unknown"
)

// Flags is a bit map of flags for a span
Expand All @@ -47,6 +50,18 @@ func (s *Span) HasSpanKind(kind ext.SpanKindEnum) bool {
return false
}

// GetSamplerType returns the sampler type for span
func (s *Span) GetSamplerType() string {
// There's no corresponding opentracing-go tag label corresponding to sampler.type
if tag, ok := KeyValues(s.Tags).FindByKey(samplerType); ok {
if tag.VStr == "" {
return samplerTypeUnknown
}
return tag.VStr
}
return samplerTypeUnknown
}

// IsRPCClient returns true if the span represents a client side of an RPC,
// as indicated by the `span.kind` tag set to `client`.
func (s *Span) IsRPCClient() bool {
Expand Down
9 changes: 9 additions & 0 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ func TestIsDebug(t *testing.T) {
assert.True(t, flags.IsDebug())
}

func TestSamplerType(t *testing.T) {
span := makeSpan(model.String("sampler.type", "lowerbound"))
assert.Equal(t, "lowerbound", span.GetSamplerType())
span = makeSpan(model.String("sampler.type", ""))
assert.Equal(t, "unknown", span.GetSamplerType())
span = makeSpan(model.KeyValue{})
assert.Equal(t, "unknown", span.GetSamplerType())
}

func TestIsSampled(t *testing.T) {
flags := model.Flags(0)
flags.SetSampled()
Expand Down

0 comments on commit 7f2a49d

Please sign in to comment.