Skip to content

Commit

Permalink
calling fetchAndSaveCounts in the scaler after any interceptor deploy…
Browse files Browse the repository at this point in the history
…ment events (#386)

* adding deployment cache to the scaler

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* more progress

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>

* fixing test

Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
arschles authored Feb 17, 2022
1 parent 7b8aaa9 commit 6526950
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 21 deletions.
5 changes: 5 additions & 0 deletions scaler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type config struct {
TargetNamespace string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_NAMESPACE" required:"true"`
// TargetService is the name of the service to issue metrics RPC requests to interceptors
TargetService string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_SERVICE" required:"true"`
// TargetDeployment is the name of the deployment to issue metrics RPC requests to interceptors
TargetDeployment string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_DEPLOYMENT" required:"true"`
// TargetPort is the port on TargetService to which to issue metrics RPC requests to
// interceptors
TargetPort int `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_PORT" required:"true"`
Expand All @@ -29,6 +31,9 @@ type config struct {
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
// DeploymentCacheRsyncPeriod is the time interval
// for the deployment informer to rsync the local cache.
DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"`
// QueueTickDuration is the duration between queue requests
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
// This will be the 'Target Pending Requests' for the interceptor
Expand Down
15 changes: 15 additions & 0 deletions scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
healthPort := cfg.HealthPort
namespace := cfg.TargetNamespace
svcName := cfg.TargetService
deplName := cfg.TargetDeployment
targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
targetPendingRequests := cfg.TargetPendingRequests
targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor
Expand All @@ -54,6 +55,7 @@ func main() {
k8s.EndpointsFuncForK8sClientset(k8sCl),
namespace,
svcName,
deplName,
targetPortStr,
)
if err != nil {
Expand Down Expand Up @@ -81,14 +83,27 @@ func main() {
k8sCl,
cfg.ConfigMapCacheRsyncPeriod,
)
// create the deployment informer
deployInformer := k8s.NewInformerBackedDeploymentCache(
lggr,
k8sCl,
cfg.DeploymentCacheRsyncPeriod,
)

grp, ctx := errgroup.WithContext(ctx)

// start the deployment informer
grp.Go(func() error {
defer done()
return deployInformer.Start(ctx)
})

grp.Go(func() error {
defer done()
return pinger.start(
ctx,
time.NewTicker(cfg.QueueTickDuration),
deployInformer,
)
})

Expand Down
59 changes: 39 additions & 20 deletions scaler/queue_pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ import (
// go pinger.start(ctx, ticker)
//
type queuePinger struct {
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
interceptorDeplName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
}

func newQueuePinger(
Expand All @@ -51,18 +52,20 @@ func newQueuePinger(
getEndpointsFn k8s.GetEndpointsFunc,
ns,
svcName,
deplName,
adminPort string,
) (*queuePinger, error) {
pingMut := new(sync.RWMutex)
pinger := &queuePinger{
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
interceptorDeplName: deplName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
}
return pinger, pinger.fetchAndSaveCounts(ctx)
}
Expand All @@ -71,11 +74,17 @@ func newQueuePinger(
func (q *queuePinger) start(
ctx context.Context,
ticker *time.Ticker,
deplCache k8s.DeploymentCache,
) error {
deployWatchIface := deplCache.Watch(q.interceptorNS, q.interceptorDeplName)
deployEvtChan := deployWatchIface.ResultChan()
defer deployWatchIface.Stop()

lggr := q.lggr.WithName("scaler.queuePinger.start")
defer ticker.Stop()
for range ticker.C {
for {
select {
// handle cancellations/timeout
case <-ctx.Done():
lggr.Error(
ctx.Err(),
Expand All @@ -85,7 +94,8 @@ func (q *queuePinger) start(
ctx.Err(),
"context marked done. stopping queuePinger loop",
)
default:
// do our regularly scheduled work
case <-ticker.C:
err := q.fetchAndSaveCounts(ctx)
if err != nil {
lggr.Error(err, "getting request counts")
Expand All @@ -94,9 +104,18 @@ func (q *queuePinger) start(
"error getting request counts",
)
}
// handle changes to the interceptor fleet
// Deployment
case <-deployEvtChan:
err := q.fetchAndSaveCounts(ctx)
if err != nil {
lggr.Error(
err,
"getting request counts after interceptor deployment event",
)
}
}
}
return nil
}

func (q *queuePinger) counts() map[string]int {
Expand Down
1 change: 1 addition & 0 deletions scaler/queue_pinger_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newFakeQueuePinger(
},
"testns",
"testsvc",
"testdepl",
opts.port,
)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion scaler/queue_pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
Expand All @@ -17,6 +18,7 @@ func TestCounts(t *testing.T) {
const (
ns = "testns"
svcName = "testsvc"
deplName = "testdepl"
tickDur = 10 * time.Millisecond
numEndpoints = 3
)
Expand Down Expand Up @@ -51,6 +53,7 @@ func TestCounts(t *testing.T) {
},
ns,
svcName,
deplName,
srvURL.Port(),
)
r.NoError(err)
Expand All @@ -66,8 +69,9 @@ func TestCounts(t *testing.T) {
q.Resize("host3", 3)
q.Resize("host4", 4)
ticker := time.NewTicker(tickDur)
fakeCache := k8s.NewFakeDeploymentCache()
go func() {
pinger.start(ctx, ticker)
pinger.start(ctx, ticker, fakeCache)
}()
// sleep to ensure we ticked and finished calling
// fetchAndSaveCounts
Expand Down Expand Up @@ -100,6 +104,7 @@ func TestFetchAndSaveCounts(t *testing.T) {
const (
ns = "testns"
svcName = "testsvc"
deplName = "testdepl"
adminPort = "8081"
numEndpoints = 3
)
Expand Down Expand Up @@ -132,6 +137,7 @@ func TestFetchAndSaveCounts(t *testing.T) {
endpointsFn,
ns,
svcName,
deplName,
srvURL.Port(),
// time.NewTicker(1*time.Millisecond),
)
Expand Down

0 comments on commit 6526950

Please sign in to comment.