From 1412ad108fd432240b3a6df3713536b2f3270968 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 27 Jan 2022 22:53:33 +0000 Subject: [PATCH 1/3] adding deployment cache to the scaler Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index e982a6e1..0ab5b8dc 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -74,8 +74,9 @@ func (q *queuePinger) start( ) error { lggr := q.lggr.WithName("scaler.queuePinger.start") defer ticker.Stop() - for range ticker.C { + for { select { + // handle cancellations/timeout case <-ctx.Done(): lggr.Error( ctx.Err(), @@ -85,7 +86,8 @@ func (q *queuePinger) start( ctx.Err(), "context marked done. stopping queuePinger loop", ) - default: + // do our regularly scheduled work + case <-ticker.C: err := q.fetchAndSaveCounts(ctx) if err != nil { lggr.Error(err, "getting request counts") @@ -94,6 +96,17 @@ func (q *queuePinger) start( "error getting request counts", ) } + // handle changes to the interceptor fleet + // Deployment + case <-deployEvtChan: + err := q.fetchAndSaveCounts(ctx) + if err != nil { + lggr.Error(err, "getting request counts") + // return errors.Wrap( + // err, + // "error getting request counts", + // ) + } } } return nil From 1a7740d9dee761116144040c4c4a0858ca6dbc66 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Thu, 27 Jan 2022 23:09:35 +0000 Subject: [PATCH 2/3] more progress Signed-off-by: Aaron Schlesinger --- scaler/config.go | 5 ++++ scaler/main.go | 15 +++++++++++ scaler/queue_pinger.go | 52 +++++++++++++++++++++---------------- scaler/queue_pinger_fake.go | 1 + scaler/queue_pinger_test.go | 4 +++ 5 files changed, 54 insertions(+), 23 deletions(-) diff --git a/scaler/config.go b/scaler/config.go index 3a09d34e..c18853b7 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -18,6 +18,8 @@ type config struct { TargetNamespace string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_NAMESPACE" required:"true"` // TargetService is the name of the service to issue metrics RPC requests to interceptors TargetService string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_SERVICE" required:"true"` + // TargetDeployment is the name of the deployment to issue metrics RPC requests to interceptors + TargetDeployment string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_DEPLOYMENT" required:"true"` // TargetPort is the port on TargetService to which to issue metrics RPC requests to // interceptors TargetPort int `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_PORT" required:"true"` @@ -29,6 +31,9 @@ type config struct { // ConfigMapCacheRsyncPeriod is the time interval // for the configmap informer to rsync the local cache. ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"` + // DeploymentCacheRsyncPeriod is the time interval + // for the deployment informer to rsync the local cache. + DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"` // QueueTickDuration is the duration between queue requests QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"` // This will be the 'Target Pending Requests' for the interceptor diff --git a/scaler/main.go b/scaler/main.go index 76dc074b..a6128b47 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -39,6 +39,7 @@ func main() { healthPort := cfg.HealthPort namespace := cfg.TargetNamespace svcName := cfg.TargetService + deplName := cfg.TargetDeployment targetPortStr := fmt.Sprintf("%d", cfg.TargetPort) targetPendingRequests := cfg.TargetPendingRequests targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor @@ -54,6 +55,7 @@ func main() { k8s.EndpointsFuncForK8sClientset(k8sCl), namespace, svcName, + deplName, targetPortStr, ) if err != nil { @@ -81,14 +83,27 @@ func main() { k8sCl, cfg.ConfigMapCacheRsyncPeriod, ) + // create the deployment informer + deployInformer := k8s.NewInformerBackedDeploymentCache( + lggr, + k8sCl, + cfg.DeploymentCacheRsyncPeriod, + ) grp, ctx := errgroup.WithContext(ctx) + // start the deployment informer + grp.Go(func() error { + defer done() + return deployInformer.Start(ctx) + }) + grp.Go(func() error { defer done() return pinger.start( ctx, time.NewTicker(cfg.QueueTickDuration), + deployInformer, ) }) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index 0ab5b8dc..96063944 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -34,15 +34,16 @@ import ( // go pinger.start(ctx, ticker) // type queuePinger struct { - getEndpointsFn k8s.GetEndpointsFunc - interceptorNS string - interceptorSvcName string - adminPort string - pingMut *sync.RWMutex - lastPingTime time.Time - allCounts map[string]int - aggregateCount int - lggr logr.Logger + getEndpointsFn k8s.GetEndpointsFunc + interceptorNS string + interceptorSvcName string + interceptorDeplName string + adminPort string + pingMut *sync.RWMutex + lastPingTime time.Time + allCounts map[string]int + aggregateCount int + lggr logr.Logger } func newQueuePinger( @@ -51,18 +52,20 @@ func newQueuePinger( getEndpointsFn k8s.GetEndpointsFunc, ns, svcName, + deplName, adminPort string, ) (*queuePinger, error) { pingMut := new(sync.RWMutex) pinger := &queuePinger{ - getEndpointsFn: getEndpointsFn, - interceptorNS: ns, - interceptorSvcName: svcName, - adminPort: adminPort, - pingMut: pingMut, - lggr: lggr, - allCounts: map[string]int{}, - aggregateCount: 0, + getEndpointsFn: getEndpointsFn, + interceptorNS: ns, + interceptorSvcName: svcName, + interceptorDeplName: deplName, + adminPort: adminPort, + pingMut: pingMut, + lggr: lggr, + allCounts: map[string]int{}, + aggregateCount: 0, } return pinger, pinger.fetchAndSaveCounts(ctx) } @@ -71,7 +74,12 @@ func newQueuePinger( func (q *queuePinger) start( ctx context.Context, ticker *time.Ticker, + deplCache k8s.DeploymentCache, ) error { + deployWatchIface := deplCache.Watch(q.interceptorNS, q.interceptorDeplName) + deployEvtChan := deployWatchIface.ResultChan() + defer deployWatchIface.Stop() + lggr := q.lggr.WithName("scaler.queuePinger.start") defer ticker.Stop() for { @@ -101,15 +109,13 @@ func (q *queuePinger) start( case <-deployEvtChan: err := q.fetchAndSaveCounts(ctx) if err != nil { - lggr.Error(err, "getting request counts") - // return errors.Wrap( - // err, - // "error getting request counts", - // ) + lggr.Error( + err, + "getting request counts after interceptor deployment event", + ) } } } - return nil } func (q *queuePinger) counts() map[string]int { diff --git a/scaler/queue_pinger_fake.go b/scaler/queue_pinger_fake.go index 5df308be..f31b98ed 100644 --- a/scaler/queue_pinger_fake.go +++ b/scaler/queue_pinger_fake.go @@ -77,6 +77,7 @@ func newFakeQueuePinger( }, "testns", "testsvc", + "testdepl", opts.port, ) if err != nil { diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 42e156e3..c949df2c 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -17,6 +17,7 @@ func TestCounts(t *testing.T) { const ( ns = "testns" svcName = "testsvc" + deplName = "testdepl" tickDur = 10 * time.Millisecond numEndpoints = 3 ) @@ -51,6 +52,7 @@ func TestCounts(t *testing.T) { }, ns, svcName, + deplName, srvURL.Port(), ) r.NoError(err) @@ -100,6 +102,7 @@ func TestFetchAndSaveCounts(t *testing.T) { const ( ns = "testns" svcName = "testsvc" + deplName = "testdepl" adminPort = "8081" numEndpoints = 3 ) @@ -132,6 +135,7 @@ func TestFetchAndSaveCounts(t *testing.T) { endpointsFn, ns, svcName, + deplName, srvURL.Port(), // time.NewTicker(1*time.Millisecond), ) From 0b6809f3a63e38113d6115b6aa8211a915a935a3 Mon Sep 17 00:00:00 2001 From: Aaron Schlesinger Date: Tue, 15 Feb 2022 21:11:56 +0000 Subject: [PATCH 3/3] fixing test Signed-off-by: Aaron Schlesinger --- scaler/queue_pinger_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index c949df2c..7ec4b963 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -68,8 +69,9 @@ func TestCounts(t *testing.T) { q.Resize("host3", 3) q.Resize("host4", 4) ticker := time.NewTicker(tickDur) + fakeCache := k8s.NewFakeDeploymentCache() go func() { - pinger.start(ctx, ticker) + pinger.start(ctx, ticker, fakeCache) }() // sleep to ensure we ticked and finished calling // fetchAndSaveCounts