Skip to content

Commit

Permalink
Add a new stat metric in queue to prevent double counting
Browse files Browse the repository at this point in the history
Currently a request pending in both the activator and queue proxy is
counted twice, once in the activator, and one in the queue proxy. This
change fixes it by letting queue report a concurrency metric for proxied
requests and letting the autoscaler discount such concurrency when it
calculates the total concurrency for scaling decision.
  • Loading branch information
hohaichi committed Mar 25, 2019
1 parent d3556e1 commit 0311790
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 85 deletions.
99 changes: 55 additions & 44 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/knative/pkg/logging/logkey"
"github.com/knative/serving/cmd/util"
"github.com/knative/serving/pkg/activator"
activatorutil "github.com/knative/serving/pkg/activator/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"github.com/knative/serving/pkg/autoscaler"
Expand Down Expand Up @@ -111,20 +112,12 @@ func initEnv() {
func reportStats(statChan chan *autoscaler.Stat) {
for {
s := <-statChan
if err := reporter.Report(float64(s.RequestCount), s.AverageConcurrentRequests); err != nil {
if err := reporter.Report(s); err != nil {
logger.Errorw("Error while sending stat", zap.Error(err))
}
}
}

func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
if req.ProtoMajor == 2 {
return h2cProxy
}

return httpProxy
}

func knativeProbeHeader(r *http.Request) string {
return r.Header.Get(network.ProbeHeaderName)
}
Expand All @@ -135,6 +128,10 @@ func isKubeletProbe(r *http.Request) bool {
return strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/")
}

func knativeProxyHeader(r *http.Request) string {
return r.Header.Get(network.ProxyHeaderName)
}

func probeUserContainer() bool {
var err error
wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) {
Expand All @@ -152,47 +149,61 @@ func probeUserContainer() bool {
return err == nil
}

func handler(w http.ResponseWriter, r *http.Request) {
proxy := proxyForRequest(r)
// Make handler a closure for testing.
func handler(rc chan queue.ReqEvent, b *queue.Breaker, h1, h2 *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) {
reqChan := rc
breaker := b
httpProxy := h1
h2cProxy := h2
return func(w http.ResponseWriter, r *http.Request) {
proxy := httpProxy
if r.ProtoMajor == 2 {
proxy = h2cProxy
}

ph := knativeProbeHeader(r)
switch {
case ph != "":
if ph != queue.Name {
http.Error(w, fmt.Sprintf("unexpected probe header value: %q", ph), http.StatusServiceUnavailable)
ph := knativeProbeHeader(r)
switch {
case ph != "":
if ph != queue.Name {
http.Error(w, fmt.Sprintf("unexpected probe header value: %q", ph), http.StatusServiceUnavailable)
return
}
if probeUserContainer() {
// Respond with the name of the component handling the request.
w.Write([]byte(queue.Name))
} else {
http.Error(w, "container not ready", http.StatusServiceUnavailable)
}
return
case isKubeletProbe(r):
// Do not count health checks for concurrency metrics
proxy.ServeHTTP(w, r)
return
}
if probeUserContainer() {
// Respond with the name of the component handling the request.
w.Write([]byte(queue.Name))
} else {
http.Error(w, "container not ready", http.StatusServiceUnavailable)
}
return
case isKubeletProbe(r):
// Do not count health checks for concurrency metrics
proxy.ServeHTTP(w, r)
return
}

// Metrics for autoscaling
reqChan <- queue.ReqEvent{Time: time.Now(), EventType: queue.ReqIn}
defer func() {
reqChan <- queue.ReqEvent{Time: time.Now(), EventType: queue.ReqOut}
}()

// Enforce queuing and concurrency limits
if breaker != nil {
ok := breaker.Maybe(func() {
// Metrics for autoscaling
h := knativeProxyHeader(r)
in, out := queue.ReqIn, queue.ReqOut
if activator.Name == h {
in, out = queue.ProxiedIn, queue.ProxiedOut
}
reqChan <- queue.ReqEvent{Time: time.Now(), EventType: in}
defer func() {
reqChan <- queue.ReqEvent{Time: time.Now(), EventType: out}
}()

// Enforce queuing and concurrency limits
if breaker != nil {
ok := breaker.Maybe(func() {
proxy.ServeHTTP(w, r)
})
if !ok {
http.Error(w, "overload", http.StatusServiceUnavailable)
}
} else {
proxy.ServeHTTP(w, r)
})
if !ok {
http.Error(w, "overload", http.StatusServiceUnavailable)
}
} else {
proxy.ServeHTTP(w, r)
}

}

// Sets up /health and /wait-for-drain endpoints.
Expand Down Expand Up @@ -274,7 +285,7 @@ func main() {

server = h2c.NewServer(
fmt.Sprintf(":%d", v1alpha1.RequestQueuePort),
queue.TimeToFirstByteTimeoutHandler(http.HandlerFunc(handler),
queue.TimeToFirstByteTimeoutHandler(http.HandlerFunc(handler(reqChan, breaker, httpProxy, h2cProxy)),
time.Duration(revisionTimeoutSeconds)*time.Second, "request timeout"))

errChan := make(chan error, 2)
Expand Down
65 changes: 65 additions & 0 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"testing"

"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/network"

"github.com/knative/serving/pkg/queue"
)

func TestHandler_ReqEvent(t *testing.T) {
var httpHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(activator.RevisionHeaderName) != "" {
w.WriteHeader(http.StatusBadRequest)
return
}

if r.Header.Get(activator.RevisionHeaderNamespace) != "" {
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
}

server := httptest.NewServer(httpHandler)
serverURL, _ := url.Parse(server.URL)

defer server.Close()
proxy := httputil.NewSingleHostReverseProxy(serverURL)

params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}
breaker := queue.NewBreaker(params)
reqChan := make(chan queue.ReqEvent, 10)
h := handler(reqChan, breaker, proxy, proxy)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
req.Header.Set(network.ProxyHeaderName, activator.Name)
h(writer, req)
e := <-reqChan
if e.EventType != queue.ProxiedIn {
t.Errorf("Want: %v, got: %v\n", queue.ReqIn, e.EventType)
}
}
2 changes: 2 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (a *ActivationHandler) proxyRequest(w http.ResponseWriter, r *http.Request,
proxy.Transport = a.Transport
proxy.FlushInterval = -1

r.Header.Set(network.ProxyHeaderName, activator.Name)

util.SetupHeaderPruning(proxy)

proxy.ServeHTTP(capture, r)
Expand Down
33 changes: 33 additions & 0 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,39 @@ func TestActivationHandler_OverflowSeveralRevisions(t *testing.T) {
assertResponses(wantedSuccess, wantedFailure, overallRequests, lockerCh, respCh, t)
}

func TestActivationHandler_ProxyHeader(t *testing.T) {
breakerParams := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}
namespace, revName := testNamespace, testRevName

act := newStubActivator(namespace, revName)
interceptCh := make(chan *http.Request, 1)
rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
interceptCh <- r
fake := httptest.NewRecorder()
return fake.Result(), nil
})
throttler := getThrottler(breakerParams, t)

handler := ActivationHandler{
Activator: act,
Transport: rt,
Logger: TestLogger(t),
Reporter: &fakeReporter{},
Throttler: throttler,
}

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
req.Header.Set(activator.RevisionHeaderNamespace, namespace)
req.Header.Set(activator.RevisionHeaderName, revName)
handler.ServeHTTP(writer, req)

httpReq := <-interceptCh
if got := httpReq.Header.Get(network.ProxyHeaderName); got != activator.Name {
t.Errorf("Header '%s' does not have the expected value. Want = '%s', got = '%s'.", network.ProxyHeaderName, activator.Name, got)
}
}

// sendRequests sends `count` concurrent requests via the given handler and writes
// the recorded responses to the `respCh`.
func sendRequests(count int, namespace, revName string, respCh chan *httptest.ResponseRecorder, handler ActivationHandler) {
Expand Down
11 changes: 8 additions & 3 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ type Stat struct {
// Average number of requests currently being handled by this pod.
AverageConcurrentRequests float64

// Similar to AverageConcurrentRequests, but for requests going through a proxy.
AverageProxiedConcurrency float64

// Number of requests received since last Stat (approximately QPS).
RequestCount int32

// Number of requests received through a proxy since last Stat.
ProxiedCount int32
}

// StatMessage wraps a Stat with identifying information so it can be routed
Expand Down Expand Up @@ -80,7 +86,7 @@ func (b statsBucket) concurrency() float64 {
for _, podStats := range b {
var subtotal float64
for _, stat := range podStats {
subtotal += stat.AverageConcurrentRequests
subtotal += stat.AverageConcurrentRequests - stat.AverageProxiedConcurrency
}
total += subtotal / float64(len(podStats))
}
Expand Down Expand Up @@ -231,8 +237,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {

// aggregateData aggregates bucketed stats over the stableWindow and panicWindow
// respectively and returns the observedStableConcurrency, observedPanicConcurrency
// and the last bucket that was aggregated. The boolean indicates whether or not
// the aggregation was successful.
// and the last bucket that was aggregated.
func (a *Autoscaler) aggregateData(now time.Time, stableWindow, panicWindow time.Duration) (
stableConcurrency float64, panicConcurrency float64, lastBucket statsBucket) {
a.statsMutex.Lock()
Expand Down
26 changes: 24 additions & 2 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,34 @@ func TestAutoscaler_UpdateTarget(t *testing.T) {
a.expectScale(t, now, 100, true)
}

func TestAutoScaler_Concurrency(t *testing.T) {
a := newTestAutoscaler(1.0)
now := roundedNow()
stat := Stat{
Time: &now,
PodName: "activator",
AverageConcurrentRequests: 1,
RequestCount: 1,
}
a.Record(TestContextWithLogger(t), stat)
stat = Stat{
Time: &now,
PodName: "pod1",
AverageConcurrentRequests: 1,
AverageProxiedConcurrency: 1,
RequestCount: 1,
ProxiedCount: 1,
}
a.Record(TestContextWithLogger(t), stat)
a.expectScale(t, now, 1, true)
}

type linearSeries struct {
startConcurrency int
endConcurrency int
duration time.Duration
podCount int
podIdOffset int
podIDOffset int
}

type mockReporter struct{}
Expand Down Expand Up @@ -552,7 +574,7 @@ func (a *Autoscaler) recordLinearSeries(test *testing.T, now time.Time, s linear
}
stat := Stat{
Time: &t,
PodName: fmt.Sprintf("pod-%v", j+s.podIdOffset),
PodName: fmt.Sprintf("pod-%v", j+s.podIDOffset),
AverageConcurrentRequests: float64(point),
RequestCount: int32(requestCount),
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/autoscaler/stats_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (s *ServiceScraper) Scrape(ctx context.Context, statsCh chan<- *StatMessage
Time: stat.Time,
PodName: scraperPodName,
AverageConcurrentRequests: stat.AverageConcurrentRequests * float64(readyPodsCount),
AverageProxiedConcurrency: stat.AverageProxiedConcurrency * float64(readyPodsCount),
RequestCount: stat.RequestCount * int32(readyPodsCount),
ProxiedCount: stat.ProxiedCount * int32(readyPodsCount),
}

s.sendStatMessage(newStat, statsCh)
Expand Down Expand Up @@ -188,12 +190,24 @@ func extractData(body io.Reader) (*Stat, error) {
return nil, errors.New("could not find value for queue_average_concurrent_requests in response")
}

if pMetric := getPrometheusMetric(metricFamilies, "queue_average_proxied_concurrency"); pMetric != nil {
stat.AverageProxiedConcurrency = *pMetric.Gauge.Value
} else {
return nil, errors.New("could not find value for queue_average_proxied_concurrency in response")
}

if pMetric := getPrometheusMetric(metricFamilies, "queue_operations_per_second"); pMetric != nil {
stat.RequestCount = int32(*pMetric.Gauge.Value)
} else {
return nil, errors.New("could not find value for queue_operations_per_second in response")
}

if pMetric := getPrometheusMetric(metricFamilies, "queue_proxied_operations_per_second"); pMetric != nil {
stat.ProxiedCount = int32(*pMetric.Gauge.Value)
} else {
return nil, errors.New("could not find value for queue_proxied_operations_per_second in response")
}

return &stat, nil
}

Expand Down
Loading

0 comments on commit 0311790

Please sign in to comment.