Skip to content

Commit

Permalink
Use keep-alive for direct pod scraping (#8367)
Browse files Browse the repository at this point in the history
maintains existing no-keep-alive behaviour in the mesh case, but lets us re-use connections
when we have addressible pods.
  • Loading branch information
julz authored Jun 18, 2020
1 parent c48d1b1 commit 3b3f30d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 40 deletions.
40 changes: 26 additions & 14 deletions pkg/autoscaler/metrics/stats_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,32 @@ type scrapeClient interface {
Scrape(url string) (Stat, error)
}

// cacheDisabledClient is a http client with cache disabled. It is shared by
// every goruntime for a revision scraper.
var cacheDisabledClient = &http.Client{
// noKeepaliveClient is a http client with HTTP Keep-Alive disabled.
// This client is used in the mesh case since we want to get a new connection -
// and therefore, hopefully, host - on every scrape of the service.
var noKeepaliveClient = &http.Client{
Transport: &http.Transport{
// Do not use the cached connection
DisableKeepAlives: true,
},
Timeout: httpClientTimeout,
}

// client is a normal http client with HTTP Keep-Alive enabled.
// This client is used in the direct pod scraping (no mesh) case where we want
// to take advantage of HTTP Keep-Alive to avoid connection creation overhead
// between scrapes of the same pod.
var client = &http.Client{
Timeout: httpClientTimeout,
}

// serviceScraper scrapes Revision metrics via a K8S service by sampling. Which
// pod to be picked up to serve the request is decided by K8S. Please see
// https://kubernetes.io/docs/concepts/services-networking/network-policies/
// for details.
type serviceScraper struct {
sClient scrapeClient
directClient scrapeClient
meshClient scrapeClient

counter resources.EndpointsCounter
url string
statsCtx context.Context
Expand All @@ -131,28 +141,29 @@ type serviceScraper struct {
// the given Metric is responsible for.
func NewStatsScraper(metric *av1alpha1.Metric, counter resources.EndpointsCounter,
podAccessor resources.PodAccessor, logger *zap.SugaredLogger) (StatsScraper, error) {
sClient, err := newHTTPScrapeClient(cacheDisabledClient)
directClient, err := newHTTPScrapeClient(client)
if err != nil {
return nil, err
}
return newServiceScraperWithClient(metric, counter, podAccessor, sClient, logger)
meshClient, err := newHTTPScrapeClient(noKeepaliveClient)
if err != nil {
return nil, err
}
return newServiceScraperWithClient(metric, counter, podAccessor, directClient, meshClient, logger)
}

func newServiceScraperWithClient(
metric *av1alpha1.Metric,
counter resources.EndpointsCounter,
podAccessor resources.PodAccessor,
sClient scrapeClient,
directClient, meshClient scrapeClient,
logger *zap.SugaredLogger) (*serviceScraper, error) {
if metric == nil {
return nil, errors.New("metric must not be nil")
}
if counter == nil {
return nil, errors.New("counter must not be nil")
}
if sClient == nil {
return nil, errors.New("scrape client must not be nil")
}
revName := metric.Labels[serving.RevisionLabelKey]
if revName == "" {
return nil, errors.New("no Revision label found for Metric " + metric.Name)
Expand All @@ -166,7 +177,8 @@ func newServiceScraperWithClient(
}

return &serviceScraper{
sClient: sClient,
directClient: directClient,
meshClient: meshClient,
counter: counter,
url: urlFromTarget(metric.Spec.ScrapeTarget, metric.ObjectMeta.Namespace),
podAccessor: podAccessor,
Expand Down Expand Up @@ -253,7 +265,7 @@ func (s *serviceScraper) scrapePods(readyPods int) (Stat, error) {

// Scrape!
target := "http://" + pods[myIdx] + ":" + portAndPath
stat, err := s.sClient.Scrape(target)
stat, err := s.directClient.Scrape(target)
if err == nil {
results <- stat
return nil
Expand Down Expand Up @@ -388,7 +400,7 @@ func (s *serviceScraper) scrapeService(window time.Duration, readyPods int) (Sta
// tryScrape runs a single scrape and returns stat if this is a pod that has not been
// seen before. An error otherwise or if scraping failed.
func (s *serviceScraper) tryScrape(scrapedPods *sync.Map) (Stat, error) {
stat, err := s.sClient.Scrape(s.url)
stat, err := s.meshClient.Scrape(s.url)
if err != nil {
return emptyStat, err
}
Expand Down
51 changes: 25 additions & 26 deletions pkg/autoscaler/metrics/stats_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) {
counter: counter,
accessor: podAccessor,
expectedErr: "no Revision label found for Metric test-revision",
}, {
name: "Empty scrape client",
metric: metric,
counter: counter,
accessor: podAccessor,
expectedErr: "scrape client must not be nil",
}, {
name: "Empty counter",
metric: metric,
Expand All @@ -170,7 +164,7 @@ func TestNewServiceScraperWithClientErrorCases(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if _, err := newServiceScraperWithClient(test.metric, test.counter,
test.accessor, test.client, logger); err != nil {
test.accessor, test.client, test.client, logger); err != nil {
got := err.Error()
want := test.expectedErr
if got != want {
Expand All @@ -189,7 +183,7 @@ func TestPodDirectScrapeSuccess(t *testing.T) {
fake.KubeInformer = kubeinformers.NewSharedInformerFactory(fake.KubeClient, 0)

client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, client, true)
scraper, err := serviceScraperForTest(t, client, nil /* mesh not used */, true)
if err != nil {
t.Fatal("serviceScraperForTest:", err)
}
Expand All @@ -216,7 +210,7 @@ func TestPodDirectScrapeSomeFailButSuccess(t *testing.T) {

// For 5 pods, we need 4 successes.
client := newTestScrapeClient(testStats, []error{nil, nil, errors.New("okay"), nil, nil})
scraper, err := serviceScraperForTest(t, client, true)
scraper, err := serviceScraperForTest(t, client, nil /* mesh not used */, true)
if err != nil {
t.Fatal("serviceScraperForTest:", err)
}
Expand Down Expand Up @@ -248,13 +242,15 @@ func TestPodDirectScrapeNoneSucceed(t *testing.T) {
fake.KubeInformer = kubeinformers.NewSharedInformerFactory(fake.KubeClient, 0)

testStats := testStatsWithTime(4, youngPodCutOffDuration.Seconds() /*youngest*/)
client := newTestScrapeClient(testStats, []error{
direct := newTestScrapeClient(testStats, []error{
// Pods fail.
errors.New("okay"), errors.New("okay"), errors.New("okay"), errors.New("okay"),
})
mesh := newTestScrapeClient(testStats, []error{
// Service succeeds.
nil, nil, nil, nil,
})
scraper, err := serviceScraperForTest(t, client, true)
scraper, err := serviceScraperForTest(t, direct, mesh, true)
if err != nil {
t.Fatal("serviceScraperForTest:", err)
}
Expand Down Expand Up @@ -286,7 +282,7 @@ func TestPodDirectScrapePodsExhausted(t *testing.T) {
fake.KubeInformer = kubeinformers.NewSharedInformerFactory(fake.KubeClient, 0)

client := newTestScrapeClient(testStats, []error{nil, nil, errors.New("okay"), nil})
scraper, err := serviceScraperForTest(t, client, true)
scraper, err := serviceScraperForTest(t, client, nil /* mesh not used */, true)
if err != nil {
t.Fatal("serviceScraperForTest:", err)
}
Expand All @@ -305,7 +301,7 @@ func TestPodDirectScrapePodsExhausted(t *testing.T) {

func TestScrapeReportStatWhenAllCallsSucceed(t *testing.T) {
client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, client, false)
scraper, err := serviceScraperForTest(t, client, client, false)
if err != nil {
t.Fatal("serviceScraperForTest:", err)
}
Expand Down Expand Up @@ -336,8 +332,9 @@ func TestScrapeAllPodsYoungPods(t *testing.T) {
// acceptable.
testStats := testStatsWithTime(numP, 0. /*youngest*/)

client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, client, false)
direct := newTestScrapeClient(testStats, []error{errNoPodsScraped}) // fall back to service scrape
mesh := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, direct, mesh, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand Down Expand Up @@ -368,8 +365,9 @@ func TestScrapeAllPodsOldPods(t *testing.T) {
// All pods are at least cutoff time old, so first 5 stats will be picked.
testStats := testStatsWithTime(numP, youngPodCutOffDuration.Seconds() /*youngest*/)

client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, client, false)
direct := newTestScrapeClient(testStats, []error{errNoPodsScraped}) // fall back to service scrape
mesh := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, direct, mesh, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand Down Expand Up @@ -401,8 +399,9 @@ func TestScrapeSomePodsOldPods(t *testing.T) {
// So pods 3-10 qualify (for 11 total sample is 7).
testStats := testStatsWithTime(numP, youngPodCutOffDuration.Seconds()/2 /*youngest*/)

client := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, client, false)
direct := newTestScrapeClient(testStats, []error{errNoPodsScraped}) // fall back to service scrape
mesh := newTestScrapeClient(testStats, []error{nil})
scraper, err := serviceScraperForTest(t, direct, mesh, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand Down Expand Up @@ -430,7 +429,7 @@ func TestScrapeSomePodsOldPods(t *testing.T) {

func TestScrapeReportErrorCannotFindEnoughPods(t *testing.T) {
client := newTestScrapeClient(testStats[2:], []error{nil})
scraper, err := serviceScraperForTest(t, client, false)
scraper, err := serviceScraperForTest(t, client, client, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand All @@ -448,9 +447,9 @@ func TestScrapeReportErrorIfAnyFails(t *testing.T) {
errTest := errors.New("test")

// 1 success and 10 failures so one scrape fails permanently through retries.
client := newTestScrapeClient(testStats, []error{nil,
errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest})
scraper, err := serviceScraperForTest(t, client, false)
client := newTestScrapeClient(testStats, []error{nil, errTest, errTest,
errTest, errTest, errTest, errTest, errTest, errTest, errTest, errTest})
scraper, err := serviceScraperForTest(t, client, client, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand All @@ -466,7 +465,7 @@ func TestScrapeReportErrorIfAnyFails(t *testing.T) {

func TestScrapeDoNotScrapeIfNoPodsFound(t *testing.T) {
client := newTestScrapeClient(testStats, nil)
scraper, err := serviceScraperForTest(t, client, false)
scraper, err := serviceScraperForTest(t, client, client, false)
if err != nil {
t.Fatalf("serviceScraperForTest=%v, want no error", err)
}
Expand All @@ -483,7 +482,7 @@ func TestScrapeDoNotScrapeIfNoPodsFound(t *testing.T) {
}
}

func serviceScraperForTest(t *testing.T, sClient scrapeClient, podsAddressable bool) (*serviceScraper, error) {
func serviceScraperForTest(t *testing.T, directClient, meshClient scrapeClient, podsAddressable bool) (*serviceScraper, error) {
metric := testMetric()
counter := resources.NewScopedEndpointsCounter(
fake.KubeInformer.Core().V1().Endpoints().Lister(),
Expand All @@ -492,7 +491,7 @@ func serviceScraperForTest(t *testing.T, sClient scrapeClient, podsAddressable b
fake.KubeInformer.Core().V1().Pods().Lister(),
fake.TestNamespace, fake.TestRevision)
logger := logtesting.TestLogger(t)
ss, err := newServiceScraperWithClient(metric, counter, accessor, sClient, logger)
ss, err := newServiceScraperWithClient(metric, counter, accessor, directClient, meshClient, logger)
if ss != nil {
ss.podsAddressable = podsAddressable
}
Expand Down

0 comments on commit 3b3f30d

Please sign in to comment.