Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation handler to collector endpoints #2664

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
20fbdf7
Add instrumentation handler to collector endpoints
dimitarvdimitrov Nov 28, 2020
0bb4540
Fix static check problems
dimitarvdimitrov Nov 28, 2020
8888520
Add godoc for NewMetricsHandler
dimitarvdimitrov Nov 28, 2020
274c17c
Fix import formatting
dimitarvdimitrov Nov 28, 2020
a521fa8
Up test coverage
dimitarvdimitrov Nov 28, 2020
d3a7adb
Rename packages and update license statements
dimitarvdimitrov Nov 29, 2020
e27b434
Up test coverage for server package
dimitarvdimitrov Nov 29, 2020
042bcd7
Cache metric timers in httpmetrics
dimitarvdimitrov Nov 29, 2020
c0a283f
Add synchronisation in httpmetrics
dimitarvdimitrov Nov 29, 2020
ce04b38
Change server tests to use ephemeral ports
dimitarvdimitrov Nov 30, 2020
17739dc
Replace string with struct keys in httpmetrics
dimitarvdimitrov Nov 30, 2020
73f5086
Increase await timeout in httmetrics test
dimitarvdimitrov Nov 30, 2020
f683323
Merge branch 'master' into metric-for-unparsable-zipkin-spans
dimitarvdimitrov Nov 30, 2020
056ccaa
Merge branch 'master' into metric-for-unparsable-zipkin-spans
dimitarvdimitrov Nov 30, 2020
739a444
Simplify network calls in server tests
dimitarvdimitrov Nov 30, 2020
9602149
Consolidate port strings in server tests
dimitarvdimitrov Nov 30, 2020
84b1966
Clean up a Stringer.String() and zero value init
dimitarvdimitrov Nov 30, 2020
00ff54b
Use httptest in collector/app/server tests
dimitarvdimitrov Nov 30, 2020
9fbafb7
Merge branch 'master' into metric-for-unparsable-zipkin-spans
dimitarvdimitrov Nov 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders,
AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
})
if err != nil {
return fmt.Errorf("could not start the Zipkin server %w", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
)

Expand Down Expand Up @@ -71,11 +72,11 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
cfgHandler.RegisterRoutes(r)

recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = recoveryHandler(r)
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)
go func() {
if err := server.Serve(listener); err != nil {
if err != http.ErrServerClosed {
params.Logger.Fatal("Could not start HTTP collector", zap.Error(err))
params.Logger.Error("Could not start HTTP collector", zap.Error(err))
}
}
params.HealthCheck.Set(healthcheck.Unavailable)
Expand Down
67 changes: 67 additions & 0 deletions cmd/collector/app/server/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"net"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
)

// test wrong port number
func TestFailToListenHttp(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func TestFailToListenHttp(t *testing.T) {
func TestFailToListenHTTP(t *testing.T) {

logger, _ := zap.NewDevelopment()
server, err := StartHTTPServer(&HTTPServerParams{
HostPort: ":-1",
Logger: logger,
})
assert.Nil(t, server)
assert.EqualError(t, err, "listen tcp: address -1: invalid port")
}

func TestSpanCollectorHttp(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func TestSpanCollectorHttp(t *testing.T) {
func TestSpanCollectorHTTP(t *testing.T) {

logger, _ := zap.NewDevelopment()
params := &HTTPServerParams{
Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
MetricsFactory: metricstest.NewFactory(time.Hour),
HealthCheck: healthcheck.New(),
Logger: logger,
}

listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer listener.Close()

server := &http.Server{Addr: listener.Addr().String()}
defer server.Close()
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

serveHTTP(server, listener, params)

url := fmt.Sprintf("http://%s", listener.Addr().String())
response, err := http.Post(url, "", nil)
assert.NoError(t, err)
assert.NotNil(t, response)
}
7 changes: 5 additions & 2 deletions cmd/collector/app/server/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (

"github.com/gorilla/mux"
"github.com/rs/cors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
)

Expand All @@ -37,6 +39,7 @@ type ZipkinServerParams struct {
AllowedHeaders string
HealthCheck *healthcheck.HealthCheck
Logger *zap.Logger
MetricsFactory metrics.Factory
}

// StartZipkinServer based on the given parameters
Expand Down Expand Up @@ -74,11 +77,11 @@ func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServe
})

recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = cors.Handler(recoveryHandler(r))
server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory))
go func(listener net.Listener, server *http.Server) {
if err := server.Serve(listener); err != nil {
if err != http.ErrServerClosed {
params.Logger.Fatal("Could not launch Zipkin server", zap.Error(err))
params.Logger.Error("Could not launch Zipkin server", zap.Error(err))
}
}
params.HealthCheck.Set(healthcheck.Unavailable)
Expand Down
66 changes: 66 additions & 0 deletions cmd/collector/app/server/zipkin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"net"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
)

// test wrong port number
func TestFailToListenZipkin(t *testing.T) {
logger, _ := zap.NewDevelopment()
server, err := StartZipkinServer(&ZipkinServerParams{
HostPort: ":-1",
Logger: logger,
})
assert.Nil(t, server)
assert.EqualError(t, err, "listen tcp: address -1: invalid port")
}

func TestSpanCollectorZipkin(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &ZipkinServerParams{
Handler: handler.NewZipkinSpanHandler(logger, nil, nil),
MetricsFactory: metricstest.NewFactory(time.Hour),
HealthCheck: healthcheck.New(),
Logger: logger,
}

listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer listener.Close()

server := &http.Server{Addr: listener.Addr().String()}
defer server.Close()

serveZipkin(server, listener, params)

url := fmt.Sprintf("http://%s", listener.Addr().String())
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
response, err := http.Post(url, "", nil)
assert.NoError(t, err)
assert.NotNil(t, response)
}
121 changes: 121 additions & 0 deletions pkg/httpmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpmetrics

import (
"net/http"
"strconv"
"sync"
"time"

"github.com/uber/jaeger-lib/metrics"
)

type statusRecorder struct {
http.ResponseWriter
status int
wroteHeader bool
}

func (r *statusRecorder) WriteHeader(status int) {
if r.wroteHeader {
return
}
r.status = status
r.wroteHeader = true
r.ResponseWriter.WriteHeader(status)
}

// Wrap returns a handler that wraps the provided one and emits metrics based on the HTTP requests and responses.
// It will record the HTTP response status, HTTP method, duration and path of the call.
// The duration will be reported in metrics.Timer and the rest will be labels on that timer.
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
//
// Do not use with HTTP endpoints that take parameters from URL path, such as `/user/{user_id}`,
// because they will result in high cardinality metrics.
func Wrap(h http.Handler, metricsFactory metrics.Factory) http.Handler {
timers := newRequestDurations(metricsFactory)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
recorder := &statusRecorder{ResponseWriter: w}

h.ServeHTTP(recorder, r)

req := recordedRequest{
key: recordedRequestKey{
status: strconv.Itoa(recorder.status),
path: r.URL.Path,
method: r.Method,
},
duration: time.Since(start),
}
timers.record(req)
})
}

type recordedRequestKey struct {
method string
path string
status string
}

type recordedRequest struct {
key recordedRequestKey
duration time.Duration
}

type requestDurations struct {
lock sync.RWMutex
metrics metrics.Factory
timers map[recordedRequestKey]metrics.Timer
}

func newRequestDurations(metricsFactory metrics.Factory) *requestDurations {
return &requestDurations{
timers: make(map[recordedRequestKey]metrics.Timer),
metrics: metricsFactory,
lock: sync.RWMutex{},
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (r *requestDurations) record(request recordedRequest) {
cacheKey := request.key

r.lock.RLock()
timer, ok := r.timers[cacheKey]
r.lock.RUnlock()
if !ok {
r.lock.Lock()
timer, ok = r.timers[cacheKey]
if !ok {
timer = buildTimer(r.metrics, cacheKey)
r.timers[cacheKey] = timer
}
r.lock.Unlock()
}

timer.Record(request.duration)
}

func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics.Timer {
return metricsFactory.Timer(metrics.TimerOptions{
Name: "http.request.duration",
Help: "Duration of HTTP requests",
Tags: map[string]string{
"status": key.status,
"path": key.path,
"method": key.method,
},
})
}
50 changes: 50 additions & 0 deletions pkg/httpmetrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpmetrics

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics/metricstest"
)

func TestNewMetricsHandler(t *testing.T) {
dummyHandlerFunc := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Millisecond)
w.WriteHeader(http.StatusAccepted)
w.WriteHeader(http.StatusTeapot) // any subsequent statuses should be ignored
})

mb := metricstest.NewFactory(time.Hour)
handler := Wrap(dummyHandlerFunc, mb)

req, err := http.NewRequest(http.MethodGet, "/subdir/qwerty", nil)
assert.NoError(t, err)
handler.ServeHTTP(httptest.NewRecorder(), req)

for i := 0; i < 1000; i++ {
_, gauges := mb.Snapshot()
if _, ok := gauges["http.request.duration|method=GET|path=/subdir/qwerty|status=202.P999"]; ok {
return
}
time.Sleep(15 * time.Millisecond)
}

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
}