From 150c3fbcfe07959d5527d9c7fea544d3bc744187 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 20:37:40 +0000 Subject: [PATCH 01/10] refactoring concurrency in scaler's queue pinger Signed-off-by: Aaron Schlesinger --- pkg/queue/queue_counts_test.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 pkg/queue/queue_counts_test.go diff --git a/pkg/queue/queue_counts_test.go b/pkg/queue/queue_counts_test.go new file mode 100644 index 00000000..898afef9 --- /dev/null +++ b/pkg/queue/queue_counts_test.go @@ -0,0 +1,7 @@ +package queue + +import "testing" + +func TestAggregate(t *testing.T) { + t.Fatalf("TODO") +} From 7b141a75e7eb75a1757fcac87714cd3e00f84ae9 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 20:38:34 +0000 Subject: [PATCH 02/10] refactoring concurrency in scaler's queue pinger Signed-off-by: Aaron Schlesinger --- operator/controllers/ping_test.go | 3 +- pkg/k8s/fake_endpoints.go | 31 +++++++- pkg/queue/queue_counts.go | 9 +++ scaler/main.go | 2 +- scaler/queue_pinger.go | 122 +++++++++++++++++++++--------- scaler/queue_pinger_fake.go | 10 ++- scaler/queue_pinger_test.go | 75 +++++++++++++++++- 7 files changed, 204 insertions(+), 48 deletions(-) diff --git a/operator/controllers/ping_test.go b/operator/controllers/ping_test.go index dd94d030..61567162 100644 --- a/operator/controllers/ping_test.go +++ b/operator/controllers/ping_test.go @@ -29,7 +29,8 @@ func TestPingInterceptors(t *testing.T) { r.NoError(err) defer srv.Close() ctx := context.Background() - endpoints := k8s.FakeEndpointsForURL(url, ns, svcName, 2) + endpoints, err := k8s.FakeEndpointsForURL(url, ns, svcName, 2) + r.NoError(err) cl := fake.NewClientBuilder().WithObjects(endpoints).Build() r.NoError(pingInterceptors( ctx, diff --git a/pkg/k8s/fake_endpoints.go b/pkg/k8s/fake_endpoints.go index 43e0b67a..a4188c66 100644 --- a/pkg/k8s/fake_endpoints.go +++ b/pkg/k8s/fake_endpoints.go @@ -2,6 +2,7 @@ package k8s import ( "net/url" + "strconv" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,13 +17,36 @@ func FakeEndpointsForURL( namespace, name string, num int, -) *v1.Endpoints { - addrs := make([]v1.EndpointAddress, num) +) (*v1.Endpoints, error) { + urls := make([]*url.URL, num) for i := 0; i < num; i++ { + urls[i] = u + } + return FakeEndpointsForURLs(urls, namespace, name) +} + +// FakeEndpointsForURLs creates and returns a new +// *v1.Endpoints with a single v1.EndpointSubset in it +// that has each url in the urls parameter in it. +func FakeEndpointsForURLs( + urls []*url.URL, + namespace, + name string, +) (*v1.Endpoints, error) { + addrs := make([]v1.EndpointAddress, len(urls)) + ports := make([]v1.EndpointPort, len(urls)) + for i, u := range urls { addrs[i] = v1.EndpointAddress{ Hostname: u.Hostname(), IP: u.Hostname(), } + portInt, err := strconv.Atoi(u.Port()) + if err != nil { + return nil, err + } + ports[i] = v1.EndpointPort{ + Port: int32(portInt), + } } return &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -32,7 +56,8 @@ func FakeEndpointsForURL( Subsets: []v1.EndpointSubset{ { Addresses: addrs, + Ports: ports, }, }, - } + }, nil } diff --git a/pkg/queue/queue_counts.go b/pkg/queue/queue_counts.go index f1cfb992..fc8ce5bc 100644 --- a/pkg/queue/queue_counts.go +++ b/pkg/queue/queue_counts.go @@ -25,6 +25,15 @@ func NewCounts() *Counts { } } +// Aggregate returns the total count across all hosts +func (q *Counts) Aggregate() int { + agg := 0 + for _, count := range q.Counts { + agg += count + } + return agg +} + // MarshalJSON implements json.Marshaler func (q *Counts) MarshalJSON() ([]byte, error) { return json.Marshal(q.Counts) diff --git a/scaler/main.go b/scaler/main.go index 3e3cf8c0..b36e8345 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -161,7 +161,7 @@ func startHealthcheckServer( mux.HandleFunc("/queue_ping", func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() lggr := lggr.WithName("route.counts_ping") - if err := pinger.requestCounts(ctx); err != nil { + if err := pinger.fetchAndSaveCounts(ctx); err != nil { lggr.Error(err, "requesting counts failed") w.WriteHeader(500) w.Write([]byte("error requesting counts from interceptors")) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index d874ae7f..a39b583c 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -44,12 +44,14 @@ func newQueuePinger( pingMut: pingMut, lggr: lggr, allCounts: map[string]int{}, + aggregateCount: 0, } go func() { defer pingTicker.Stop() for range pingTicker.C { - if err := pinger.requestCounts(ctx); err != nil { + err := pinger.fetchAndSaveCounts(ctx) + if err != nil { lggr.Error(err, "getting request counts") } } @@ -70,25 +72,74 @@ func (q *queuePinger) aggregate() int { return q.aggregateCount } -func (q *queuePinger) requestCounts(ctx context.Context) error { - lggr := q.lggr.WithName("queuePinger.requestCounts") - - endpointURLs, err := k8s.EndpointsForService( +// fetchAndSaveCounts calls fetchCounts, and then +// saves them to internal state in q +func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error { + q.pingMut.Lock() + defer q.pingMut.Unlock() + counts, agg, err := fetchCounts( ctx, + q.lggr, + q.getEndpointsFn, q.ns, q.svcName, q.adminPort, - q.getEndpointsFn, ) if err != nil { + q.lggr.Error(err, "getting request counts") return err } + q.allCounts = counts + q.aggregateCount = agg + q.lastPingTime = time.Now() + + return nil + +} + +// fetchCounts fetches all counts from every endpoint returned +// by endpointsFn for the given service named svcName on the +// port adminPort, in namespace ns. +// +// Requests to fetch endpoints are made concurrently and +// aggregated when all requests return successfully. +// +// Upon any failure, a non-nil error is returned and the +// other two return values are nil and 0, respectively. +func fetchCounts( + ctx context.Context, + lggr logr.Logger, + endpointsFn k8s.GetEndpointsFunc, + ns, + svcName, + adminPort string, +) (map[string]int, int, error) { + lggr = lggr.WithName("queuePinger.requestCounts") + + endpointURLs, err := k8s.EndpointsForService( + ctx, + ns, + svcName, + adminPort, + endpointsFn, + ) + if err != nil { + return nil, 0, err + } countsCh := make(chan *queue.Counts) - defer close(countsCh) - fetchGrp, _ := errgroup.WithContext(ctx) + var wg sync.WaitGroup + fetchGrp, ctx := errgroup.WithContext(ctx) for _, endpoint := range endpointURLs { + // capture the endpoint in a loop-local + // variable so that the goroutine can + // use it u := endpoint + // have the errgroup goroutine send to + // a "private" goroutine, which we'll + // then forward on to countsCh + ch := make(chan *queue.Counts) + wg.Add(1) fetchGrp.Go(func() error { counts, err := queue.GetCounts( ctx, @@ -105,43 +156,44 @@ func (q *queuePinger) requestCounts(ctx context.Context) error { ) return err } - countsCh <- counts + ch <- counts return nil }) + // forward the "private" goroutine + // on to countsCh separately + go func() { + defer wg.Done() + res := <-ch + countsCh <- res + }() } - // consume the results of the counts channel in a goroutine. - // we'll must for all the fetcher goroutines to finish after we - // start up this goroutine so that all goroutines can make - // progress + // close countsCh after all goroutines are done sending + // to their "private" channels, so that we can range + // over countsCh normally below go func() { - agg := 0 - totalCounts := make(map[string]int) - // range through the result of each endpoint - for count := range countsCh { - // each endpoint returns a map of counts, one count - // per host. add up the counts for each host - for host, val := range count.Counts { - agg += val - totalCounts[host] += val - } - } - - q.pingMut.Lock() - defer q.pingMut.Unlock() - q.allCounts = totalCounts - q.aggregateCount = agg - q.lastPingTime = time.Now() + wg.Wait() + close(countsCh) }() - // now that the counts channel is being consumed, all the - // fetch goroutines can make progress. wait for them - // to finish and check for errors. if err := fetchGrp.Wait(); err != nil { lggr.Error(err, "fetching all counts failed") - return err + return nil, 0, err } - return nil + // consume the results of the counts channel + agg := 0 + totalCounts := make(map[string]int) + // range through the result of each endpoint + for count := range countsCh { + // each endpoint returns a map of counts, one count + // per host. add up the counts for each host + for host, val := range count.Counts { + agg += val + totalCounts[host] += val + } + } + + return totalCounts, agg, nil } diff --git a/scaler/queue_pinger_fake.go b/scaler/queue_pinger_fake.go index 53eca52f..5c9e03ec 100644 --- a/scaler/queue_pinger_fake.go +++ b/scaler/queue_pinger_fake.go @@ -31,13 +31,15 @@ func startFakeQueueEndpointServer( ) (*httptest.Server, *url.URL, *v1.Endpoints, error) { hdl := http.NewServeMux() queue.AddCountsRoute(logr.Discard(), hdl, q) - srv, url, err := kedanet.StartTestServer(hdl) + srv, srvURL, err := kedanet.StartTestServer(hdl) if err != nil { return nil, nil, nil, err } - - endpoints := k8s.FakeEndpointsForURL(url, ns, svcName, numEndpoints) - return srv, url, endpoints, nil + endpoints, err := k8s.FakeEndpointsForURL(srvURL, ns, svcName, numEndpoints) + if err != nil { + return nil, nil, nil, err + } + return srv, srvURL, endpoints, nil } type fakeQueuePingerOpts struct { diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 2b59823c..8158d254 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -2,6 +2,7 @@ package main import ( context "context" + "encoding/json" "net/http" "testing" "time" @@ -14,7 +15,7 @@ import ( v1 "k8s.io/api/core/v1" ) -func TestRequestCounts(t *testing.T) { +func TestCounts(t *testing.T) { r := require.New(t) ctx := context.Background() const ( @@ -38,11 +39,12 @@ func TestRequestCounts(t *testing.T) { hdl := http.NewServeMux() queue.AddCountsRoute(logr.Discard(), hdl, q) - srv, url, err := kedanet.StartTestServer(hdl) + srv, srvURL, err := kedanet.StartTestServer(hdl) r.NoError(err) defer srv.Close() - endpoints := k8s.FakeEndpointsForURL(url, ns, svcName, 3) + endpoints, err := k8s.FakeEndpointsForURL(srvURL, ns, svcName, 3) + r.NoError(err) // set the initial ticker to effectively never tick so that we // can check the behavior of the pinger before the first // tick @@ -55,7 +57,7 @@ func TestRequestCounts(t *testing.T) { }, ns, svcName, - url.Port(), + srvURL.Port(), ticker, ) // the pinger starts a background watch loop but won't request the counts @@ -93,3 +95,68 @@ func TestRequestCounts(t *testing.T) { } } + +func TestFetchCounts(t *testing.T) { + r := require.New(t) + ctx, done := context.WithCancel(context.Background()) + defer done() + const ( + ns = "testns" + svcName = "testsvc" + adminPort = "8081" + numEndpoints = 3 + ) + counts := queue.NewCounts() + counts.Counts = map[string]int{ + "host1": 123, + "host2": 234, + "host3": 345, + } + hdl := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc( + func(wr http.ResponseWriter, req *http.Request) { + err := json.NewEncoder(wr).Encode(counts) + r.NoError(err) + }, + ), + ) + srv, srvURL, err := kedanet.StartTestServer(hdl) + r.NoError(err) + endpointsForURLs, err := k8s.FakeEndpointsForURL( + srvURL, + ns, + svcName, + numEndpoints, + ) + r.NoError(err) + defer srv.Close() + endpointsFn := func( + ctx context.Context, + ns, + svcName string, + ) (*v1.Endpoints, error) { + return endpointsForURLs, nil + } + + cts, agg, err := fetchCounts( + ctx, + logr.Discard(), + endpointsFn, + ns, + svcName, + srvURL.Port(), + ) + r.NoError(err) + // since all endpoints serve the same counts, + // expected aggregate is individual count * # endpoints + expectedAgg := counts.Aggregate() * numEndpoints + r.Equal(expectedAgg, agg) + // again, since all endpoints serve the same counts, + // the hosts will be the same as the original counts, + // but the value is (individual count * # endpoints) + expectedCounts := counts.Counts + for host, val := range expectedCounts { + expectedCounts[host] = val * numEndpoints + } + r.Equal(expectedCounts, cts) +} From 7e7b325cd4e19544e5d0ad9697bb54e2c1a57ab8 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 21:47:56 +0000 Subject: [PATCH 03/10] implementing test for queue.Counts.Aggregate() Signed-off-by: Aaron Schlesinger --- pkg/queue/queue_counts_test.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/queue/queue_counts_test.go b/pkg/queue/queue_counts_test.go index 898afef9..6e877b5b 100644 --- a/pkg/queue/queue_counts_test.go +++ b/pkg/queue/queue_counts_test.go @@ -1,7 +1,23 @@ package queue -import "testing" +import ( + "testing" + + "github.com/stretchr/testify/require" +) func TestAggregate(t *testing.T) { - t.Fatalf("TODO") + r := require.New(t) + counts := NewCounts() + counts.Counts = map[string]int{ + "host1": 123, + "host2": 234, + "host3": 456, + "host4": 567, + } + expectedAgg := 0 + for _, v := range counts.Counts { + expectedAgg += v + } + r.Equal(expectedAgg, counts.Aggregate()) } From 0a847704e5e402f9861cd62263293f6b5865ff8a Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 21:52:59 +0000 Subject: [PATCH 04/10] adding test for fetchAndSaveCounts Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger_test.go | 68 +++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 8158d254..b36cd9e8 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -96,6 +96,74 @@ func TestCounts(t *testing.T) { } +func TestFetchAndSaveCounts(t *testing.T) { + r := require.New(t) + ctx, done := context.WithCancel(context.Background()) + defer done() + const ( + ns = "testns" + svcName = "testsvc" + adminPort = "8081" + numEndpoints = 3 + ) + counts := queue.NewCounts() + counts.Counts = map[string]int{ + "host1": 123, + "host2": 234, + "host3": 345, + } + hdl := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc( + func(wr http.ResponseWriter, req *http.Request) { + err := json.NewEncoder(wr).Encode(counts) + r.NoError(err) + }, + ), + ) + srv, srvURL, err := kedanet.StartTestServer(hdl) + r.NoError(err) + endpointsForURLs, err := k8s.FakeEndpointsForURL( + srvURL, + ns, + svcName, + numEndpoints, + ) + r.NoError(err) + defer srv.Close() + endpointsFn := func( + ctx context.Context, + ns, + svcName string, + ) (*v1.Endpoints, error) { + return endpointsForURLs, nil + } + + pinger := newQueuePinger( + ctx, + logr.Discard(), + endpointsFn, + ns, + svcName, + srvURL.Port(), + time.NewTicker(1*time.Millisecond), + ) + + r.NoError(pinger.fetchAndSaveCounts(ctx)) + + // since all endpoints serve the same counts, + // expected aggregate is individual count * # endpoints + expectedAgg := counts.Aggregate() * numEndpoints + r.Equal(expectedAgg, pinger.aggregateCount) + // again, since all endpoints serve the same counts, + // the hosts will be the same as the original counts, + // but the value is (individual count * # endpoints) + expectedCounts := counts.Counts + for host, val := range expectedCounts { + expectedCounts[host] = val * numEndpoints + } + r.Equal(expectedCounts, pinger.allCounts) +} + func TestFetchCounts(t *testing.T) { r := require.New(t) ctx, done := context.WithCancel(context.Background()) From 655bc3dcf6707beb6e289f187625f41affb14a51 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 22:31:39 +0000 Subject: [PATCH 05/10] moving background processing out of the newQueuePinger constructor Signed-off-by: Aaron Schlesinger --- scaler/config.go | 2 ++ scaler/handlers_test.go | 24 +++++++++++++++++------ scaler/main.go | 16 +++++++++++++-- scaler/main_test.go | 3 ++- scaler/queue_pinger.go | 39 ++++++++++++++++++++++++++++--------- scaler/queue_pinger_fake.go | 11 +++++++---- 6 files changed, 73 insertions(+), 22 deletions(-) diff --git a/scaler/config.go b/scaler/config.go index 8a50aace..5a4e53a4 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -29,6 +29,8 @@ type config struct { // UpdateRoutingTableDur is the duration between manual // updates to the routing table. UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"` + // QueueTickDuration is the duration between queue requests + QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"` } func mustParseConfig() *config { diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index f6dc97af..18a91080 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -20,7 +20,8 @@ func TestIsActive(t *testing.T) { ctx := context.Background() lggr := logr.Discard() table := routing.NewTable() - ticker, pinger := newFakeQueuePinger(ctx, lggr) + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) defer ticker.Stop() pinger.pingMut.Lock() pinger.allCounts[host] = 0 @@ -79,7 +80,8 @@ func TestGetMetricSpec(t *testing.T) { "testdepl", int32(target), )) - ticker, pinger := newFakeQueuePinger(ctx, lggr) + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) defer ticker.Stop() hdl := newImpl(lggr, pinger, table, 123) meta := map[string]string{ @@ -108,7 +110,8 @@ func TestGetMetricsMissingHostInMetadata(t *testing.T) { ScaledObjectRef: &externalscaler.ScaledObjectRef{}, } table := routing.NewTable() - ticker, pinger := newFakeQueuePinger(ctx, lggr) + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) defer ticker.Stop() hdl := newImpl(lggr, pinger, table, 123) @@ -135,7 +138,9 @@ func TestGetMetricsMissingHostInQueue(t *testing.T) { } table := routing.NewTable() - ticker, pinger := newFakeQueuePinger(ctx, lggr) + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) + defer ticker.Stop() hdl := newImpl(lggr, pinger, table, 123) @@ -200,13 +205,19 @@ func TestGetMetricsHostFoundInQueueCounts(t *testing.T) { table := routing.NewTable() // create a fake queue pinger. this is the simulated // scaler that pings the above fake interceptor - ticker, pinger := newFakeQueuePinger( + ticker, pinger, err := newFakeQueuePinger( ctx, lggr, func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints }, func(opts *fakeQueuePingerOpts) { opts.tickDur = 1 * time.Millisecond }, func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() }, ) + r.NoError(err) + go func() { + // start the pinger watch loop + pinger.start(ctx, ticker) + }() + defer ticker.Stop() time.Sleep(50 * time.Millisecond) @@ -273,13 +284,14 @@ func TestGetMetricsInterceptorReturnsAggregate(t *testing.T) { // create a fake queue pinger. this is the simulated // scaler that pings the above fake interceptor const tickDur = 5 * time.Millisecond - ticker, pinger := newFakeQueuePinger( + ticker, pinger, err := newFakeQueuePinger( ctx, lggr, func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints }, func(opts *fakeQueuePingerOpts) { opts.tickDur = tickDur }, func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() }, ) + r.NoError(err) defer ticker.Stop() // sleep for more than enough time for the pinger to do its diff --git a/scaler/main.go b/scaler/main.go index b36e8345..f5963746 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -47,19 +47,31 @@ func main() { lggr.Error(err, "getting a Kubernetes client") os.Exit(1) } - pinger := newQueuePinger( + pinger, err := newQueuePinger( context.Background(), lggr, k8s.EndpointsFuncForK8sClientset(k8sCl), namespace, svcName, targetPortStr, - time.NewTicker(500*time.Millisecond), ) + if err != nil { + lggr.Error(err, "creating a queue pinger") + os.Exit(1) + } table := routing.NewTable() grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + defer done() + return pinger.start( + ctx, + time.NewTicker(cfg.QueueTickDuration), + ) + }) + grp.Go(func() error { defer done() return startGrpcServer( diff --git a/scaler/main_test.go b/scaler/main_test.go index 215a35d0..58603b16 100644 --- a/scaler/main_test.go +++ b/scaler/main_test.go @@ -22,7 +22,8 @@ func TestHealthChecks(t *testing.T) { errgrp, ctx := errgroup.WithContext(ctx) - ticker, pinger := newFakeQueuePinger(ctx, lggr) + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) defer ticker.Stop() srvFunc := func() error { return startHealthcheckServer(ctx, lggr, port, pinger) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index a39b583c..648ca7c6 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -11,6 +11,7 @@ import ( "github.com/go-logr/logr" "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -33,8 +34,7 @@ func newQueuePinger( ns, svcName, adminPort string, - pingTicker *time.Ticker, -) *queuePinger { +) (*queuePinger, error) { pingMut := new(sync.RWMutex) pinger := &queuePinger{ getEndpointsFn: getEndpointsFn, @@ -46,18 +46,39 @@ func newQueuePinger( allCounts: map[string]int{}, aggregateCount: 0, } + return pinger, pinger.fetchAndSaveCounts(ctx) +} - go func() { - defer pingTicker.Stop() - for range pingTicker.C { - err := pinger.fetchAndSaveCounts(ctx) +// start starts the queuePinger +func (q *queuePinger) start( + ctx context.Context, + ticker *time.Ticker, +) error { + lggr := q.lggr.WithName("scaler.queuePinger.start") + defer ticker.Stop() + for range ticker.C { + select { + case <-ctx.Done(): + lggr.Error( + ctx.Err(), + "context marked done. stopping queuePinger loop", + ) + return errors.Wrap( + ctx.Err(), + "context marked done. stopping queuePinger loop", + ) + default: + err := q.fetchAndSaveCounts(ctx) if err != nil { lggr.Error(err, "getting request counts") + return errors.Wrap( + err, + "error getting request counts", + ) } } - }() - - return pinger + } + return nil } func (q *queuePinger) counts() map[string]int { diff --git a/scaler/queue_pinger_fake.go b/scaler/queue_pinger_fake.go index 5c9e03ec..88c26b78 100644 --- a/scaler/queue_pinger_fake.go +++ b/scaler/queue_pinger_fake.go @@ -58,7 +58,7 @@ func newFakeQueuePinger( ctx context.Context, lggr logr.Logger, optsFuncs ...optsFunc, -) (*time.Ticker, *queuePinger) { +) (*time.Ticker, *queuePinger, error) { opts := &fakeQueuePingerOpts{ endpoints: &v1.Endpoints{}, tickDur: time.Second, @@ -68,7 +68,8 @@ func newFakeQueuePinger( optsFunc(opts) } ticker := time.NewTicker(opts.tickDur) - pinger := newQueuePinger( + + pinger, err := newQueuePinger( ctx, lggr, func( @@ -82,7 +83,9 @@ func newFakeQueuePinger( "testns", "testsvc", opts.port, - ticker, ) - return ticker, pinger + if err != nil { + return nil, nil, err + } + return ticker, pinger, nil } From 56c74228d61e7cdc3f8ead7d572cc005d2d60aa9 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 22:31:58 +0000 Subject: [PATCH 06/10] updating queue pinger test Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger_test.go | 80 +++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 043f30d8..218f46d1 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -19,8 +19,10 @@ func TestCounts(t *testing.T) { r := require.New(t) ctx := context.Background() const ( - ns = "testns" - svcName = "testsvc" + ns = "testns" + svcName = "testsvc" + tickDur = 10 * time.Millisecond + numEndpoints = 3 ) // assemble an in-memory queue and start up a fake server that serves it. @@ -45,11 +47,7 @@ func TestCounts(t *testing.T) { endpoints, err := k8s.FakeEndpointsForURL(srvURL, ns, svcName, 3) r.NoError(err) - // set the initial ticker to effectively never tick so that we - // can check the behavior of the pinger before the first - // tick - ticker := time.NewTicker(10000 * time.Hour) - pinger := newQueuePinger( + pinger, err := newQueuePinger( ctx, logr.Discard(), func(context.Context, string, string) (*v1.Endpoints, error) { @@ -58,42 +56,45 @@ func TestCounts(t *testing.T) { ns, svcName, srvURL.Port(), - ticker, ) - // the pinger starts a background watch loop but won't request the counts - // before the first tick. since the first tick effectively won't - // happen (it was set to a very long duration above), there should be - // no counts right now + r.NoError(err) + // the pinger does an initial fetch, so ensure that + // the saved counts are correct retCounts := pinger.counts() - r.Equal(0, len(retCounts)) - - // reset the ticker to tick practically immediately. sleep for a little - // bit to ensure that the tick occurred and the counts were successfully - // computed, then check them. - ticker.Reset(1 * time.Nanosecond) - time.Sleep(50 * time.Millisecond) - - // now that the tick has happened, there should be as many - // key/value pairs in the returned counts map as addresses - retCounts = pinger.counts() r.Equal(len(counts), len(retCounts)) - // each interceptor returns the same counts, so for each host in - // the counts map, the integer count should be - // (val * # interceptors) - for retHost, retCount := range retCounts { - expectedCount, ok := counts[retHost] - r.True(ok, "unexpected host %s returned", retHost) - expectedCount *= len(endpoints.Subsets[0].Addresses) - r.Equal( - expectedCount, - retCount, - "count for host %s was not the expected %d", - retCount, - expectedCount, + // now update the queue, start the ticker, and ensure + // that counts are updated after the first tick + q.Resize("host1", 1) + q.Resize("host2", 2) + q.Resize("host3", 3) + q.Resize("host4", 4) + ticker := time.NewTicker(tickDur) + go func() { + pinger.start(ctx, ticker) + }() + // sleep to ensure we ticked and finished calling + // fetchAndSaveCounts + time.Sleep(tickDur * 2) + + // now ensure that all the counts in the pinger + // are the same as in the queue, which has been updated + retCounts = pinger.counts() + expectedCounts, err := q.Current() + r.NoError(err) + r.Equal(len(expectedCounts.Counts), len(retCounts)) + for host, count := range expectedCounts.Counts { + retCount, ok := retCounts[host] + r.True( + ok, + "returned count not found for host %s", + host, ) - } + // note that the returned value should be: + // (queue_count * num_endpoints) + r.Equal(count*3, retCount) + } } func TestFetchAndSaveCounts(t *testing.T) { @@ -138,15 +139,16 @@ func TestFetchAndSaveCounts(t *testing.T) { return endpointsForURLs, nil } - pinger := newQueuePinger( + pinger, err := newQueuePinger( ctx, logr.Discard(), endpointsFn, ns, svcName, srvURL.Port(), - time.NewTicker(1*time.Millisecond), + // time.NewTicker(1*time.Millisecond), ) + r.NoError(err) r.NoError(pinger.fetchAndSaveCounts(ctx)) From b2f8ef15e503b6fde5d0a66014c574fd41744aed Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 23:11:02 +0000 Subject: [PATCH 07/10] adding documentation to the queue pinger Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index 648ca7c6..addc9b64 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -15,6 +15,24 @@ import ( "golang.org/x/sync/errgroup" ) +// queuePinger has functionality to ping all interceptors +// behind a given `Service`, fetch their pending queue counts, +// and aggregate all of those counts together. +// +// It's capable of doing that work in parallel when possible +// as well. +// +// Sample usage: +// +// pinger, err := newQueuePinger(ctx, lggr, getEndpointsFn, ns, svcName, adminPort) +// if err != nil { +// panic(err) +// } +// // make sure to start the background pinger loop. +// // you can shut this loop down by using a cancellable +// // context +// go pinger.start(ctx, ticker) +// type queuePinger struct { getEndpointsFn k8s.GetEndpointsFunc ns string From 36948595a3f2fef196a989e06b507a9feaff58de Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 23:11:37 +0000 Subject: [PATCH 08/10] removing errant ticker.Stop() call Signed-off-by: Aaron Schlesinger --- scaler/handlers_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 18a91080..c6f502c8 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -218,7 +218,6 @@ func TestGetMetricsHostFoundInQueueCounts(t *testing.T) { pinger.start(ctx, ticker) }() - defer ticker.Stop() time.Sleep(50 * time.Millisecond) // sleep for more than enough time for the pinger to do its @@ -307,5 +306,4 @@ func TestGetMetricsInterceptorReturnsAggregate(t *testing.T) { r.Equal("interceptor", metricVal.MetricName) aggregate := pinger.aggregate() r.Equal(int64(aggregate), metricVal.MetricValue) - } From fee8add41d499adb5c6e381705782af21d2c1a65 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 23:11:49 +0000 Subject: [PATCH 09/10] removing errant newlines Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger_fake.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/scaler/queue_pinger_fake.go b/scaler/queue_pinger_fake.go index 88c26b78..5df308be 100644 --- a/scaler/queue_pinger_fake.go +++ b/scaler/queue_pinger_fake.go @@ -72,13 +72,8 @@ func newFakeQueuePinger( pinger, err := newQueuePinger( ctx, lggr, - func( - ctx context.Context, - namespace, - serviceName string, - ) (*v1.Endpoints, error) { + func(context.Context, string, string) (*v1.Endpoints, error) { return opts.endpoints, nil - }, "testns", "testsvc", From 6b2600bd0469763f3f895758ef9acf26c103ea77 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 7 Oct 2021 23:28:19 +0000 Subject: [PATCH 10/10] removing manual interceptor fakes in favor of startFakeQueueEndpointServer Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger_test.go | 69 +++++++++++++------------------------ 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 218f46d1..42e156e3 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -2,14 +2,10 @@ package main import ( context "context" - "encoding/json" - "net/http" "testing" "time" "github.com/go-logr/logr" - "github.com/kedacore/http-add-on/pkg/k8s" - kedanet "github.com/kedacore/http-add-on/pkg/net" "github.com/kedacore/http-add-on/pkg/queue" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -39,14 +35,14 @@ func TestCounts(t *testing.T) { q.Resize(host, count) } - hdl := http.NewServeMux() - queue.AddCountsRoute(logr.Discard(), hdl, q) - srv, srvURL, err := kedanet.StartTestServer(hdl) + srv, srvURL, endpoints, err := startFakeQueueEndpointServer( + ns, + svcName, + q, + 3, + ) r.NoError(err) defer srv.Close() - - endpoints, err := k8s.FakeEndpointsForURL(srvURL, ns, svcName, 3) - r.NoError(err) pinger, err := newQueuePinger( ctx, logr.Discard(), @@ -113,21 +109,12 @@ func TestFetchAndSaveCounts(t *testing.T) { "host2": 234, "host3": 345, } - hdl := kedanet.NewTestHTTPHandlerWrapper( - http.HandlerFunc( - func(wr http.ResponseWriter, req *http.Request) { - err := json.NewEncoder(wr).Encode(counts) - r.NoError(err) - }, - ), - ) - srv, srvURL, err := kedanet.StartTestServer(hdl) - r.NoError(err) - endpointsForURLs, err := k8s.FakeEndpointsForURL( - srvURL, - ns, - svcName, - numEndpoints, + q := queue.NewMemory() + for host, count := range counts.Counts { + q.Resize(host, count) + } + srv, srvURL, endpoints, err := startFakeQueueEndpointServer( + ns, svcName, q, numEndpoints, ) r.NoError(err) defer srv.Close() @@ -136,7 +123,7 @@ func TestFetchAndSaveCounts(t *testing.T) { ns, svcName string, ) (*v1.Endpoints, error) { - return endpointsForURLs, nil + return endpoints, nil } pinger, err := newQueuePinger( @@ -182,30 +169,22 @@ func TestFetchCounts(t *testing.T) { "host2": 234, "host3": 345, } - hdl := kedanet.NewTestHTTPHandlerWrapper( - http.HandlerFunc( - func(wr http.ResponseWriter, req *http.Request) { - err := json.NewEncoder(wr).Encode(counts) - r.NoError(err) - }, - ), - ) - srv, srvURL, err := kedanet.StartTestServer(hdl) - r.NoError(err) - endpointsForURLs, err := k8s.FakeEndpointsForURL( - srvURL, - ns, - svcName, - numEndpoints, + q := queue.NewMemory() + for host, count := range counts.Counts { + r.NoError(q.Resize(host, count)) + } + srv, srvURL, endpoints, err := startFakeQueueEndpointServer( + ns, svcName, q, numEndpoints, ) r.NoError(err) + defer srv.Close() endpointsFn := func( - ctx context.Context, - ns, - svcName string, + context.Context, + string, + string, ) (*v1.Endpoints, error) { - return endpointsForURLs, nil + return endpoints, nil } cts, agg, err := fetchCounts(