Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send request metrics from queue proxy #3596

Merged
merged 15 commits into from
Apr 2, 2019
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ required = [
[[override]]
name = "github.com/knative/pkg"
# HEAD as of 2019-03-29
revision = "1d9d8f8871250649adab16e3369e1a09e5595b04"
revision = "916205998db9d085aaff357221b208bc89290ea5"

[[override]]
name = "go.uber.org/zap"
Expand Down
76 changes: 64 additions & 12 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/knative/pkg/signals"

"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/metrics"
"github.com/knative/serving/cmd/util"
"github.com/knative/serving/pkg/activator"
activatorutil "github.com/knative/serving/pkg/activator/util"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
queuestats "github.com/knative/serving/pkg/queue/stats"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand All @@ -60,15 +62,22 @@ const (
// from its configuration and propagate that to all istio-proxies
// in the mesh.
quitSleepDuration = 20 * time.Second

// commonMetricsPort is the port where common metrics, e.g. request metrics
// are exposed in Prometheus. This is different from the metrics used
// for autoscaling, which are exposed in 9090.
commonMetricsPort = 9091
)

var (
podName string
servingService string
servingConfig string
servingNamespace string
servingRevision string
servingRevisionKey string
servingAutoscaler string
servingPodIP string
servingPodName string
autoscalerNamespace string
servingAutoscalerPort int
userTargetPort int
Expand All @@ -88,11 +97,13 @@ var (
)

func initEnv() {
podName = util.GetRequiredEnvOrFatal("SERVING_POD", logger)
servingService = os.Getenv("SERVING_SERVICE") // KService is optional
servingConfig = util.GetRequiredEnvOrFatal("SERVING_CONFIGURATION", logger)
servingNamespace = util.GetRequiredEnvOrFatal("SERVING_NAMESPACE", logger)
servingRevision = util.GetRequiredEnvOrFatal("SERVING_REVISION", logger)
servingAutoscaler = util.GetRequiredEnvOrFatal("SERVING_AUTOSCALER", logger)
servingPodIP = util.GetRequiredEnvOrFatal("SERVING_POD_IP", logger)
servingPodName = util.GetRequiredEnvOrFatal("SERVING_POD", logger)
autoscalerNamespace = util.GetRequiredEnvOrFatal("SYSTEM_NAMESPACE", logger)
servingAutoscalerPort = util.MustParseIntEnvOrFatal("SERVING_AUTOSCALER_PORT", logger)
containerConcurrency = util.MustParseIntEnvOrFatal("CONTAINER_CONCURRENCY", logger)
Expand All @@ -102,7 +113,7 @@ func initEnv() {

// TODO(mattmoor): Move this key to be in terms of the KPA.
servingRevisionKey = autoscaler.NewMetricKey(servingNamespace, servingRevision)
_psr, err := queue.NewPrometheusStatsReporter(servingNamespace, servingConfig, servingRevision, podName)
_psr, err := queue.NewPrometheusStatsReporter(servingNamespace, servingConfig, servingRevision, servingPodName)
if err != nil {
logger.Fatalw("Failed to create stats reporter", zap.Error(err))
}
Expand Down Expand Up @@ -220,7 +231,7 @@ func main() {
initEnv()
logger = logger.With(
zap.String(logkey.Key, servingRevisionKey),
zap.String(logkey.Pod, podName))
zap.String(logkey.Pod, servingPodName))

target, err := url.Parse(fmt.Sprintf("http://%s", userTargetAddress))
if err != nil {
Expand Down Expand Up @@ -261,7 +272,7 @@ func main() {

reportTicker := time.NewTicker(queue.ReporterReportingPeriod)
defer reportTicker.Stop()
queue.NewStats(podName, queue.Channels{
queue.NewStats(servingPodName, queue.Channels{
ReqChan: reqChan,
ReportChan: reportTicker.C,
StatChan: statChan,
Expand All @@ -274,7 +285,7 @@ func main() {

timeoutHandler := queue.TimeToFirstByteTimeoutHandler(http.HandlerFunc(handler(reqChan, breaker, httpProxy, h2cProxy)),
time.Duration(revisionTimeoutSeconds)*time.Second, "request timeout")
composedHandler := pushRequestLogHandler(timeoutHandler)
composedHandler := pushRequestMetricHandler(pushRequestLogHandler(timeoutHandler))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to replace the log handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Hmm, what I did is adding a handler layer to the log handler, isn't it?

server = h2c.NewServer(fmt.Sprintf(":%d", v1alpha1.RequestQueuePort), composedHandler)

errChan := make(chan error, 2)
Expand Down Expand Up @@ -326,12 +337,12 @@ func pushRequestLogHandler(currentHandler http.Handler) http.Handler {
}

revInfo := &queue.RequestLogRevInfo{
Name: os.Getenv("SERVING_REVISION"),
Namespace: os.Getenv("SERVING_NAMESPACE"),
Service: os.Getenv("SERVING_SERVICE"),
Configuration: os.Getenv("SERVING_CONFIGURATION"),
PodName: os.Getenv("SERVING_POD"),
PodIP: os.Getenv("SERVING_POD_IP"),
Name: servingRevision,
Namespace: servingNamespace,
Service: servingService,
Configuration: servingConfig,
PodName: servingPodName,
PodIP: servingPodIP,
}
handler, err := queue.NewRequestLogHandler(currentHandler, utils.NewSyncFileWriter(os.Stdout), templ, revInfo)

Expand All @@ -342,6 +353,47 @@ func pushRequestLogHandler(currentHandler http.Handler) http.Handler {
return handler
}

func pushRequestMetricHandler(currentHandler http.Handler) http.Handler {
backend := os.Getenv("SERVING_REQUEST_METRICS_BACKEND")
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("SERVING_REQUEST_METRICS_BACKEND=%v", backend)
if backend == "" {
return currentHandler
}

r, err := queuestats.NewStatsReporter(servingNamespace, servingService, servingConfig, servingRevision)
if err != nil {
logger.Errorw("Error setting up request metrics reporter. Request metrics will be unavailable.", zap.Error(err))
return currentHandler
}

// Set up OpenCensus exporter.
// NOTE: We use revision as the component instead of queue because queue is
// implementation specific. The current metrics are request relative. Using
// revision is reasonable.
// TODO(yanweiguo): add the ability to emit metrics with names not combined
// to component.
ops := metrics.ExporterOptions{
Domain: "knative.dev/serving",
Component: "revision",
PrometheusPort: commonMetricsPort,
ConfigMap: map[string]string{
metrics.BackendDestinationKey: backend,
},
}
err = metrics.UpdateExporter(ops, logger)
if err != nil {
logger.Errorw("Error setting up request metrics exporter. Request metrics will be unavailable.", zap.Error(err))
return currentHandler
}

handler, err := queue.NewRequestMetricHandler(currentHandler, r)
if err != nil {
logger.Errorw("Error setting up request metrics handler. Request metrics will be unavailable.", zap.Error(err))
return currentHandler
}
return handler
}

func flush(logger *zap.SugaredLogger) {
logger.Sync()
os.Stdout.Sync()
Expand Down
7 changes: 6 additions & 1 deletion config/config-observability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ data:
# Note: Using stackdriver will incur additional charges
metrics.backend-destination: prometheus

# metrics.request-metrics-backend-destination specifies the request metrics
# destination. If non-empty, it enables queue proxy to send request metrics.
# Currently supported values: prometheus, stackdriver.
metrics.request-metrics-backend-destination: prometheus
yanweiguo marked this conversation as resolved.
Show resolved Hide resolved

# metrics.stackdriver-project-id field specifies the stackdriver project ID. This
# field is optional. When running on GCE, application default credentials will be
# used if this field is not provided.
Expand All @@ -143,4 +148,4 @@ data:
# metrics are not supported by "knative_revision" resource type. Setting this
# flag to "true" could cause extra Stackdriver charge.
# If metrics.backend-destination is not Stackdriver, this is ignored.
metrics.allow-stackdriver-custom-metrics: false
metrics.allow-stackdriver-custom-metrics: "false"
65 changes: 65 additions & 0 deletions pkg/queue/request_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"errors"
"net/http"
"time"

pkghttp "github.com/knative/serving/pkg/http"
"github.com/knative/serving/pkg/queue/stats"
)

type requestMetricHandler struct {
handler http.Handler
statsReporter stats.StatsReporter
}

// NewRequestMetricHandler creates an http.Handler that emits request metrics.
func NewRequestMetricHandler(h http.Handler, r stats.StatsReporter) (http.Handler, error) {
if r == nil {
return nil, errors.New("StatsReporter must not be nil")
}

return &requestMetricHandler{
handler: h,
statsReporter: r,
}, nil
}

func (h *requestMetricHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rr := pkghttp.NewResponseRecorder(w, http.StatusOK)
startTime := time.Now()
defer func() {
// If ServeHTTP panics, recover, record the failure and panic again.
err := recover()
latency := time.Since(startTime)
if err != nil {
h.sendRequestMetrics(http.StatusInternalServerError, latency)
panic(err)
} else {
h.sendRequestMetrics(rr.ResponseCode, latency)
}
}()
h.handler.ServeHTTP(rr, r)
}

func (h *requestMetricHandler) sendRequestMetrics(respCode int, latency time.Duration) {
h.statsReporter.ReportRequestCount(respCode, 1)
h.statsReporter.ReportResponseTime(respCode, latency)
}
117 changes: 117 additions & 0 deletions pkg/queue/request_metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/knative/serving/pkg/queue/stats"
)

func TestNewRequestMetricHandler_failure(t *testing.T) {
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

var r stats.StatsReporter
_, err := NewRequestMetricHandler(baseHandler, r)
if err == nil {
t.Error("should get error when StatsReporter is emtpy")
}
}

func TestRequestMetricHandler(t *testing.T) {
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
r := &fakeStatsReporter{}
handler, err := NewRequestMetricHandler(baseHandler, r)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", bytes.NewBufferString("test"))
handler.ServeHTTP(resp, req)

// Serve one request, should get 1 request count and none zero latency
if got, want := r.lastRespCode, http.StatusOK; got != want {
t.Errorf("response code got %v, want %v", got, want)
}
if got, want := r.lastReqCount, 1; got != int64(want) {
t.Errorf("request count got %v, want %v", got, want)
}
if r.lastReqLatency == 0 {
t.Errorf("request latency got %v, want lager than 0", r.lastReqLatency)
}
}

func TestRequestMetricHandler_PanickingHandler(t *testing.T) {
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
panic("no!")
})
r := &fakeStatsReporter{}
handler, err := NewRequestMetricHandler(baseHandler, r)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", bytes.NewBufferString("test"))
defer func() {
err := recover()
if err == nil {
t.Error("want ServeHTTP to panic, got nothing.")
}

// Serve one request, should get 1 request count and none zero latency
if got, want := r.lastRespCode, http.StatusInternalServerError; got != want {
t.Errorf("response code got %v, want %v", got, want)
}
if got, want := r.lastReqCount, 1; got != int64(want) {
t.Errorf("request count got %v, want %v", got, want)
}
if r.lastReqLatency == 0 {
t.Errorf("request latency got %v, want lager than 0", r.lastReqLatency)
}
}()
handler.ServeHTTP(resp, req)

}

// fakeStatsReporter just record the last stat it received.
type fakeStatsReporter struct {
lastRespCode int
lastReqCount int64
lastReqLatency time.Duration
}

func (r *fakeStatsReporter) ReportRequestCount(responseCode int, v int64) error {
r.lastRespCode = responseCode
r.lastReqCount = v
return nil
}

func (r *fakeStatsReporter) ReportResponseTime(responseCode int, d time.Duration) error {
r.lastRespCode = responseCode
r.lastReqLatency = d
return nil
}
Loading