diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 1ac9e8992ea..4f06e75a427 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -118,6 +118,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders, AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins, Logger: c.logger, + MetricsFactory: c.metricsFactory, }) if err != nil { return fmt.Errorf("could not start the Zipkin server %w", err) diff --git a/cmd/collector/app/server/http.go b/cmd/collector/app/server/http.go index 3a5dd7aad6f..f6523207b6a 100644 --- a/cmd/collector/app/server/http.go +++ b/cmd/collector/app/server/http.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/httpmetrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" ) @@ -71,11 +72,11 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar cfgHandler.RegisterRoutes(r) recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true) - server.Handler = recoveryHandler(r) + server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory) go func() { if err := server.Serve(listener); err != nil { if err != http.ErrServerClosed { - params.Logger.Fatal("Could not start HTTP collector", zap.Error(err)) + params.Logger.Error("Could not start HTTP collector", zap.Error(err)) } } params.HealthCheck.Set(healthcheck.Unavailable) diff --git a/cmd/collector/app/server/http_test.go b/cmd/collector/app/server/http_test.go new file mode 100644 index 00000000000..9f0643dbfec --- /dev/null +++ b/cmd/collector/app/server/http_test.go @@ -0,0 +1,60 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/pkg/healthcheck" +) + +// test wrong port number +func TestFailToListenHTTP(t *testing.T) { + logger, _ := zap.NewDevelopment() + server, err := StartHTTPServer(&HTTPServerParams{ + HostPort: ":-1", + Logger: logger, + }) + assert.Nil(t, server) + assert.EqualError(t, err, "listen tcp: address -1: invalid port") +} + +func TestSpanCollectorHTTP(t *testing.T) { + logger, _ := zap.NewDevelopment() + params := &HTTPServerParams{ + Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}), + SamplingStore: &mockSamplingStore{}, + MetricsFactory: metricstest.NewFactory(time.Hour), + HealthCheck: healthcheck.New(), + Logger: logger, + } + + server := httptest.NewServer(nil) + defer server.Close() + + serveHTTP(server.Config, server.Listener, params) + + response, err := http.Post(server.URL, "", nil) + assert.NoError(t, err) + assert.NotNil(t, response) +} diff --git a/cmd/collector/app/server/zipkin.go b/cmd/collector/app/server/zipkin.go index d9006d0b6cf..a537b5feef3 100644 --- a/cmd/collector/app/server/zipkin.go +++ b/cmd/collector/app/server/zipkin.go @@ -21,11 +21,13 @@ import ( "github.com/gorilla/mux" "github.com/rs/cors" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/httpmetrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" ) @@ -37,6 +39,7 @@ type ZipkinServerParams struct { AllowedHeaders string HealthCheck *healthcheck.HealthCheck Logger *zap.Logger + MetricsFactory metrics.Factory } // StartZipkinServer based on the given parameters @@ -74,11 +77,11 @@ func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServe }) recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true) - server.Handler = cors.Handler(recoveryHandler(r)) + server.Handler = cors.Handler(httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)) go func(listener net.Listener, server *http.Server) { if err := server.Serve(listener); err != nil { if err != http.ErrServerClosed { - params.Logger.Fatal("Could not launch Zipkin server", zap.Error(err)) + params.Logger.Error("Could not launch Zipkin server", zap.Error(err)) } } params.HealthCheck.Set(healthcheck.Unavailable) diff --git a/cmd/collector/app/server/zipkin_test.go b/cmd/collector/app/server/zipkin_test.go new file mode 100644 index 00000000000..b7a501dee2d --- /dev/null +++ b/cmd/collector/app/server/zipkin_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/pkg/healthcheck" +) + +// test wrong port number +func TestFailToListenZipkin(t *testing.T) { + logger, _ := zap.NewDevelopment() + server, err := StartZipkinServer(&ZipkinServerParams{ + HostPort: ":-1", + Logger: logger, + }) + assert.Nil(t, server) + assert.EqualError(t, err, "listen tcp: address -1: invalid port") +} + +func TestSpanCollectorZipkin(t *testing.T) { + logger, _ := zap.NewDevelopment() + params := &ZipkinServerParams{ + Handler: handler.NewZipkinSpanHandler(logger, nil, nil), + MetricsFactory: metricstest.NewFactory(time.Hour), + HealthCheck: healthcheck.New(), + Logger: logger, + } + + server := httptest.NewServer(nil) + defer server.Close() + + serveZipkin(server.Config, server.Listener, params) + + response, err := http.Post(server.URL, "", nil) + assert.NoError(t, err) + assert.NotNil(t, response) +} diff --git a/pkg/httpmetrics/metrics.go b/pkg/httpmetrics/metrics.go new file mode 100644 index 00000000000..d26d5ad958f --- /dev/null +++ b/pkg/httpmetrics/metrics.go @@ -0,0 +1,120 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpmetrics + +import ( + "net/http" + "strconv" + "sync" + "time" + + "github.com/uber/jaeger-lib/metrics" +) + +type statusRecorder struct { + http.ResponseWriter + status int + wroteHeader bool +} + +func (r *statusRecorder) WriteHeader(status int) { + if r.wroteHeader { + return + } + r.status = status + r.wroteHeader = true + r.ResponseWriter.WriteHeader(status) +} + +// Wrap returns a handler that wraps the provided one and emits metrics based on the HTTP requests and responses. +// It will record the HTTP response status, HTTP method, duration and path of the call. +// The duration will be reported in metrics.Timer and the rest will be labels on that timer. +// +// Do not use with HTTP endpoints that take parameters from URL path, such as `/user/{user_id}`, +// because they will result in high cardinality metrics. +func Wrap(h http.Handler, metricsFactory metrics.Factory) http.Handler { + timers := newRequestDurations(metricsFactory) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + recorder := &statusRecorder{ResponseWriter: w} + + h.ServeHTTP(recorder, r) + + req := recordedRequest{ + key: recordedRequestKey{ + status: strconv.Itoa(recorder.status), + path: r.URL.Path, + method: r.Method, + }, + duration: time.Since(start), + } + timers.record(req) + }) +} + +type recordedRequestKey struct { + method string + path string + status string +} + +type recordedRequest struct { + key recordedRequestKey + duration time.Duration +} + +type requestDurations struct { + lock sync.RWMutex + metrics metrics.Factory + timers map[recordedRequestKey]metrics.Timer +} + +func newRequestDurations(metricsFactory metrics.Factory) *requestDurations { + return &requestDurations{ + timers: make(map[recordedRequestKey]metrics.Timer), + metrics: metricsFactory, + } +} + +func (r *requestDurations) record(request recordedRequest) { + cacheKey := request.key + + r.lock.RLock() + timer, ok := r.timers[cacheKey] + r.lock.RUnlock() + if !ok { + r.lock.Lock() + timer, ok = r.timers[cacheKey] + if !ok { + timer = buildTimer(r.metrics, cacheKey) + r.timers[cacheKey] = timer + } + r.lock.Unlock() + } + + timer.Record(request.duration) +} + +func buildTimer(metricsFactory metrics.Factory, key recordedRequestKey) metrics.Timer { + return metricsFactory.Timer(metrics.TimerOptions{ + Name: "http.request.duration", + Help: "Duration of HTTP requests", + Tags: map[string]string{ + "status": key.status, + "path": key.path, + "method": key.method, + }, + }) +} diff --git a/pkg/httpmetrics/metrics_test.go b/pkg/httpmetrics/metrics_test.go new file mode 100644 index 00000000000..544c9550c5c --- /dev/null +++ b/pkg/httpmetrics/metrics_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package httpmetrics + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" +) + +func TestNewMetricsHandler(t *testing.T) { + dummyHandlerFunc := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + time.Sleep(time.Millisecond) + w.WriteHeader(http.StatusAccepted) + w.WriteHeader(http.StatusTeapot) // any subsequent statuses should be ignored + }) + + mb := metricstest.NewFactory(time.Hour) + handler := Wrap(dummyHandlerFunc, mb) + + req, err := http.NewRequest(http.MethodGet, "/subdir/qwerty", nil) + assert.NoError(t, err) + handler.ServeHTTP(httptest.NewRecorder(), req) + + for i := 0; i < 1000; i++ { + _, gauges := mb.Snapshot() + if _, ok := gauges["http.request.duration|method=GET|path=/subdir/qwerty|status=202.P999"]; ok { + return + } + time.Sleep(15 * time.Millisecond) + } + + assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time") +}