Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Rename endpoint to operation for RPC metrics (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored May 16, 2017
1 parent 39b0a42 commit c762689
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 75 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c Configuration) New(
if c.RPCMetrics {
Observer(
rpcmetrics.NewObserver(
opts.metrics.Namespace("jaeger-rpc", map[string]string{"component": "jaeger"}),
opts.metrics.Namespace("jaeger", map[string]string{"component": "jaeger"}),
rpcmetrics.DefaultNameNormalizer,
),
)(&opts) // adds to c.observers
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func TestConfigWithRPCMetrics(t *testing.T) {

testutils.AssertCounterMetrics(t, metrics,
testutils.ExpectedMetric{
Name: "jaeger-rpc.requests",
Tags: map[string]string{"component": "jaeger", "endpoint": "test", "error": "false"},
Name: "jaeger.requests",
Tags: map[string]string{"component": "jaeger", "operation": "test", "error": "false"},
Value: 1,
},
)
Expand Down
52 changes: 26 additions & 26 deletions rpcmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
)

const (
otherEndpointsPlaceholder = "other"
endpointNameMetricTag = "endpoint"
otherOperationsPlaceholder = "other"
operationNameMetricTag = "operation"
)

// Metrics is a collection of metrics for an endpoint describing
// Metrics is a collection of metrics for an operation describing
// throughput, success, errors, and performance.
type Metrics struct {
// RequestCountSuccess is a counter of the total number of successes.
Expand Down Expand Up @@ -71,35 +71,35 @@ func (m *Metrics) recordHTTPStatusCode(statusCode uint16) {
}
}

// MetricsByEndpoint is a registry/cache of metrics for each unique endpoint name.
// Only maxNumberOfEndpoints Metrics are stored, all other endpoint names are mapped
// to a generic endpoint name "other".
type MetricsByEndpoint struct {
metricsFactory metrics.Factory
endpoints *normalizedEndpoints
metricsByEndpoint map[string]*Metrics
mux sync.RWMutex
// MetricsByOperation is a registry/cache of metrics for each unique operation name.
// Only maxNumberOfOperations Metrics are stored, all other operation names are mapped
// to a generic operation name "other".
type MetricsByOperation struct {
metricsFactory metrics.Factory
operations *normalizedOperations
metricsByOperation map[string]*Metrics
mux sync.RWMutex
}

func newMetricsByEndpoint(
func newMetricsByOperation(
metricsFactory metrics.Factory,
normalizer NameNormalizer,
maxNumberOfEndpoints int,
) *MetricsByEndpoint {
return &MetricsByEndpoint{
metricsFactory: metricsFactory,
endpoints: newNormalizedEndpoints(maxNumberOfEndpoints, normalizer),
metricsByEndpoint: make(map[string]*Metrics, maxNumberOfEndpoints+1), // +1 for "other"
maxNumberOfOperations int,
) *MetricsByOperation {
return &MetricsByOperation{
metricsFactory: metricsFactory,
operations: newNormalizedOperations(maxNumberOfOperations, normalizer),
metricsByOperation: make(map[string]*Metrics, maxNumberOfOperations+1), // +1 for "other"
}
}

func (m *MetricsByEndpoint) get(endpoint string) *Metrics {
safeName := m.endpoints.normalize(endpoint)
func (m *MetricsByOperation) get(operation string) *Metrics {
safeName := m.operations.normalize(operation)
if safeName == "" {
safeName = otherEndpointsPlaceholder
safeName = otherOperationsPlaceholder
}
m.mux.RLock()
met := m.metricsByEndpoint[safeName]
met := m.metricsByOperation[safeName]
m.mux.RUnlock()
if met != nil {
return met
Expand All @@ -109,22 +109,22 @@ func (m *MetricsByEndpoint) get(endpoint string) *Metrics {
}

// split to make easier to test
func (m *MetricsByEndpoint) getWithWriteLock(safeName string) *Metrics {
func (m *MetricsByOperation) getWithWriteLock(safeName string) *Metrics {
m.mux.Lock()
defer m.mux.Unlock()

// it is possible that the name has been already registered after we released
// the read lock and before we grabbed the write lock, so check for that.
if met, ok := m.metricsByEndpoint[safeName]; ok {
if met, ok := m.metricsByOperation[safeName]; ok {
return met
}

// it would be nice to create the struct before locking, since Init() is somewhat
// expensive, however some metrics backends (e.g. expvar) may not like duplicate metrics.
met := &Metrics{}
tags := map[string]string{endpointNameMetricTag: safeName}
tags := map[string]string{operationNameMetricTag: safeName}
metrics.Init(met, m.metricsFactory, tags)

m.metricsByEndpoint[safeName] = met
m.metricsByOperation[safeName] = met
return met
}
14 changes: 7 additions & 7 deletions rpcmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func tags(kv ...string) map[string]string {
return m
}

func endpointTags(endpoint string, kv ...string) map[string]string {
return tags(append([]string{"endpoint", endpoint}, kv...)...)
func operationTags(operation string, kv ...string) map[string]string {
return tags(append([]string{"operation", operation}, kv...)...)
}

func TestMetricsByEndpoint(t *testing.T) {
func TestMetricsByOperation(t *testing.T) {
met := metrics.NewLocalFactory(0)
mbe := newMetricsByEndpoint(met, DefaultNameNormalizer, 2)
mbe := newMetricsByOperation(met, DefaultNameNormalizer, 2)

m1 := mbe.get("abc1")
m2 := mbe.get("abc1") // from cache
Expand All @@ -60,8 +60,8 @@ func TestMetricsByEndpoint(t *testing.T) {
}

testutils.AssertCounterMetrics(t, met,
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc1", "error", "false"), Value: 3},
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("abc3", "error", "false"), Value: 1},
testutils.ExpectedMetric{Name: "requests", Tags: endpointTags("other", "error", "false"), Value: 2},
testutils.ExpectedMetric{Name: "requests", Tags: operationTags("abc1", "error", "false"), Value: 3},
testutils.ExpectedMetric{Name: "requests", Tags: operationTags("abc3", "error", "false"), Value: 1},
testutils.ExpectedMetric{Name: "requests", Tags: operationTags("other", "error", "false"), Value: 2},
)
}
4 changes: 2 additions & 2 deletions rpcmetrics/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

package rpcmetrics

// NameNormalizer is used to convert the endpoint names to strings
// NameNormalizer is used to convert the operation names to strings
// that can be safely used as tags in the metrics.
type NameNormalizer interface {
Normalize(name string) string
}

// DefaultNameNormalizer converts endpoint names so that they contain only characters
// DefaultNameNormalizer converts operation names so that they contain only characters
// from the safe charset [a-zA-Z0-9-./_]. All other characters are replaced with '-'.
var DefaultNameNormalizer = &SimpleNameNormalizer{
SafeSets: []SafeCharacterSet{
Expand Down
34 changes: 17 additions & 17 deletions rpcmetrics/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ import (
jaeger "github.com/uber/jaeger-client-go"
)

const defaultMaxNumberOfEndpoints = 200
const defaultMaxNumberOfOperations = 200

// Observer is an observer that can emit RPC metrics.
type Observer struct {
metricsByEndpoint *MetricsByEndpoint
metricsByOperation *MetricsByOperation
}

// NewObserver creates a new observer that can emit RPC metrics.
func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
return &Observer{
metricsByEndpoint: newMetricsByEndpoint(
metricsByOperation: newMetricsByOperation(
metricsFactory,
normalizer,
defaultMaxNumberOfEndpoints,
defaultMaxNumberOfOperations,
),
}
}
Expand All @@ -55,7 +55,7 @@ func (o *Observer) OnStartSpan(
operationName string,
options opentracing.StartSpanOptions,
) jaeger.SpanObserver {
return NewSpanObserver(o.metricsByEndpoint, operationName, options)
return NewSpanObserver(o.metricsByOperation, operationName, options)
}

// SpanKind identifies the span as inboud, outbound, or internal
Expand All @@ -72,25 +72,25 @@ const (

// SpanObserver collects RPC metrics
type SpanObserver struct {
metricsByEndpoint *MetricsByEndpoint
operationName string
startTime time.Time
mux sync.Mutex
kind SpanKind
httpStatusCode uint16
err bool
metricsByOperation *MetricsByOperation
operationName string
startTime time.Time
mux sync.Mutex
kind SpanKind
httpStatusCode uint16
err bool
}

// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
func NewSpanObserver(
metricsByEndpoint *MetricsByEndpoint,
metricsByOperation *MetricsByOperation,
operationName string,
options opentracing.StartSpanOptions,
) *SpanObserver {
so := &SpanObserver{
metricsByEndpoint: metricsByEndpoint,
operationName: operationName,
startTime: options.StartTime,
metricsByOperation: metricsByOperation,
operationName: operationName,
startTime: options.StartTime,
}
for k, v := range options.Tags {
so.handleTagInLock(k, v)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
return
}

mets := so.metricsByEndpoint.get(so.operationName)
mets := so.metricsByOperation.get(so.operationName)
latency := options.FinishTime.Sub(so.startTime)
if so.err {
mets.RequestCountFailures.Inc(1)
Expand Down
21 changes: 11 additions & 10 deletions rpcmetrics/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func ExampleObserver() {
span.Finish()

c, _ := metricsFactory.Snapshot()
fmt.Printf("requests (success): %d\n", c["requests|endpoint=test|error=false"])
fmt.Printf("requests (failure): %d\n", c["requests|endpoint=test|error=true"])
fmt.Printf("requests (success): %d\n", c["requests|error=false|operation=test"])
fmt.Printf("requests (failure): %d\n", c["requests|error=true|operation=test"])
// Output:
// requests (success): 1
// requests (failure): 0
Expand Down Expand Up @@ -118,17 +118,18 @@ func TestObserver(t *testing.T) {

u.AssertCounterMetrics(t,
testTracer.metrics,
u.ExpectedMetric{Name: "requests", Tags: endpointTags("local-span", "error", "false"), Value: 0},
u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user", "error", "false"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user", "error", "true"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user-override", "error", "false"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: endpointTags("get-user-client", "error", "false"), Value: 0},
u.ExpectedMetric{Name: "requests", Tags: operationTags("local-span", "error", "false"), Value: 0},
u.ExpectedMetric{Name: "requests", Tags: operationTags("get-user", "error", "false"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: operationTags("get-user", "error", "true"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: operationTags("get-user-override", "error", "false"), Value: 1},
u.ExpectedMetric{Name: "requests", Tags: operationTags("get-user-client", "error", "false"), Value: 0},
)
// TODO something wrong with string generation, .P99 should not be appended to the tag
// as a result we cannot use u.AssertGaugeMetrics
_, g := testTracer.metrics.Snapshot()
assert.EqualValues(t, 51, g["request_latency|endpoint=get-user|error=false.P99"])
assert.EqualValues(t, 51, g["request_latency|endpoint=get-user|error=true.P99"])
t.Log(g)
assert.EqualValues(t, 51, g["request_latency|error=false|operation=get-user.P99"])
assert.EqualValues(t, 51, g["request_latency|error=true|operation=get-user.P99"])
})
}

Expand Down Expand Up @@ -169,7 +170,7 @@ func TestTags(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
for i := range testCase.metrics {
testCase.metrics[i].Tags["endpoint"] = "span"
testCase.metrics[i].Tags["operation"] = "span"
}
t.Run(fmt.Sprintf("%s-%v", testCase.key, testCase.value), func(t *testing.T) {
withTestTracer(func(testTracer *testTracer) {
Expand Down
12 changes: 6 additions & 6 deletions rpcmetrics/endpoints.go → rpcmetrics/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ package rpcmetrics

import "sync"

// normalizedEndpoints is a cache for endpointName -> safeName mappings.
type normalizedEndpoints struct {
// normalizedOperations is a cache for operationName -> safeName mappings.
type normalizedOperations struct {
names map[string]string
maxSize int
defaultName string
normalizer NameNormalizer
mux sync.RWMutex
}

func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedEndpoints {
return &normalizedEndpoints{
func newNormalizedOperations(maxSize int, normalizer NameNormalizer) *normalizedOperations {
return &normalizedOperations{
maxSize: maxSize,
normalizer: normalizer,
names: make(map[string]string, maxSize),
Expand All @@ -42,7 +42,7 @@ func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedE
// normalize looks up the name in the cache, if not found it uses normalizer
// to convert the name to a safe name. If called with more than maxSize unique
// names it returns "" for all other names beyond those already cached.
func (n *normalizedEndpoints) normalize(name string) string {
func (n *normalizedOperations) normalize(name string) string {
n.mux.RLock()
norm, ok := n.names[name]
l := len(n.names)
Expand All @@ -56,7 +56,7 @@ func (n *normalizedEndpoints) normalize(name string) string {
return n.normalizeWithLock(name)
}

func (n *normalizedEndpoints) normalizeWithLock(name string) string {
func (n *normalizedOperations) normalizeWithLock(name string) string {
norm := n.normalizer.Normalize(name)
n.mux.Lock()
defer n.mux.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNormalizedEndpoints(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)
func TestNormalizedOperations(t *testing.T) {
n := newNormalizedOperations(1, DefaultNameNormalizer)

assertLen := func(l int) {
n.mux.RLock()
Expand All @@ -42,8 +42,8 @@ func TestNormalizedEndpoints(t *testing.T) {
assertLen(1)
}

func TestNormalizedEndpointsDoubleLocking(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)
func TestNormalizedOperationsDoubleLocking(t *testing.T) {
n := newNormalizedOperations(1, DefaultNameNormalizer)
assert.Equal(t, "ab-cd", n.normalize("ab^cd"), "fill out the cache")
assert.Equal(t, "", n.normalizeWithLock("xys"), "cache overflow")
}

0 comments on commit c762689

Please sign in to comment.