Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calling fetchAndSaveCounts in the scaler after any interceptor deployment events #386

Merged
merged 5 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions scaler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type config struct {
TargetNamespace string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_NAMESPACE" required:"true"`
// TargetService is the name of the service to issue metrics RPC requests to interceptors
TargetService string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_SERVICE" required:"true"`
// TargetDeployment is the name of the deployment to issue metrics RPC requests to interceptors
TargetDeployment string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_DEPLOYMENT" required:"true"`
// TargetPort is the port on TargetService to which to issue metrics RPC requests to
// interceptors
TargetPort int `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_PORT" required:"true"`
Expand All @@ -29,6 +31,9 @@ type config struct {
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
// DeploymentCacheRsyncPeriod is the time interval
// for the deployment informer to rsync the local cache.
DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"`
// QueueTickDuration is the duration between queue requests
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
// This will be the 'Target Pending Requests' for the interceptor
Expand Down
15 changes: 15 additions & 0 deletions scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
healthPort := cfg.HealthPort
namespace := cfg.TargetNamespace
svcName := cfg.TargetService
deplName := cfg.TargetDeployment
targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
targetPendingRequests := cfg.TargetPendingRequests
targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor
Expand All @@ -54,6 +55,7 @@ func main() {
k8s.EndpointsFuncForK8sClientset(k8sCl),
namespace,
svcName,
deplName,
targetPortStr,
)
if err != nil {
Expand Down Expand Up @@ -81,14 +83,27 @@ func main() {
k8sCl,
cfg.ConfigMapCacheRsyncPeriod,
)
// create the deployment informer
deployInformer := k8s.NewInformerBackedDeploymentCache(
lggr,
k8sCl,
cfg.DeploymentCacheRsyncPeriod,
)

grp, ctx := errgroup.WithContext(ctx)

// start the deployment informer
grp.Go(func() error {
defer done()
return deployInformer.Start(ctx)
})

grp.Go(func() error {
defer done()
return pinger.start(
ctx,
time.NewTicker(cfg.QueueTickDuration),
deployInformer,
)
})

Expand Down
59 changes: 39 additions & 20 deletions scaler/queue_pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ import (
// go pinger.start(ctx, ticker)
//
type queuePinger struct {
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
interceptorDeplName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
}

func newQueuePinger(
Expand All @@ -51,18 +52,20 @@ func newQueuePinger(
getEndpointsFn k8s.GetEndpointsFunc,
ns,
svcName,
deplName,
adminPort string,
) (*queuePinger, error) {
pingMut := new(sync.RWMutex)
pinger := &queuePinger{
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
interceptorDeplName: deplName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
}
return pinger, pinger.fetchAndSaveCounts(ctx)
}
Expand All @@ -71,11 +74,17 @@ func newQueuePinger(
func (q *queuePinger) start(
ctx context.Context,
ticker *time.Ticker,
deplCache k8s.DeploymentCache,
) error {
deployWatchIface := deplCache.Watch(q.interceptorNS, q.interceptorDeplName)
deployEvtChan := deployWatchIface.ResultChan()
defer deployWatchIface.Stop()

lggr := q.lggr.WithName("scaler.queuePinger.start")
defer ticker.Stop()
for range ticker.C {
for {
select {
// handle cancellations/timeout
case <-ctx.Done():
lggr.Error(
ctx.Err(),
Expand All @@ -85,7 +94,8 @@ func (q *queuePinger) start(
ctx.Err(),
"context marked done. stopping queuePinger loop",
)
default:
// do our regularly scheduled work
case <-ticker.C:
err := q.fetchAndSaveCounts(ctx)
if err != nil {
lggr.Error(err, "getting request counts")
Expand All @@ -94,9 +104,18 @@ func (q *queuePinger) start(
"error getting request counts",
)
}
// handle changes to the interceptor fleet
// Deployment
case <-deployEvtChan:
err := q.fetchAndSaveCounts(ctx)
if err != nil {
lggr.Error(
err,
"getting request counts after interceptor deployment event",
)
}
}
}
return nil
}

func (q *queuePinger) counts() map[string]int {
Expand Down
1 change: 1 addition & 0 deletions scaler/queue_pinger_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newFakeQueuePinger(
},
"testns",
"testsvc",
"testdepl",
opts.port,
)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion scaler/queue_pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
Expand All @@ -17,6 +18,7 @@ func TestCounts(t *testing.T) {
const (
ns = "testns"
svcName = "testsvc"
deplName = "testdepl"
tickDur = 10 * time.Millisecond
numEndpoints = 3
)
Expand Down Expand Up @@ -51,6 +53,7 @@ func TestCounts(t *testing.T) {
},
ns,
svcName,
deplName,
srvURL.Port(),
)
r.NoError(err)
Expand All @@ -66,8 +69,9 @@ func TestCounts(t *testing.T) {
q.Resize("host3", 3)
q.Resize("host4", 4)
ticker := time.NewTicker(tickDur)
fakeCache := k8s.NewFakeDeploymentCache()
go func() {
pinger.start(ctx, ticker)
pinger.start(ctx, ticker, fakeCache)
}()
// sleep to ensure we ticked and finished calling
// fetchAndSaveCounts
Expand Down Expand Up @@ -100,6 +104,7 @@ func TestFetchAndSaveCounts(t *testing.T) {
const (
ns = "testns"
svcName = "testsvc"
deplName = "testdepl"
adminPort = "8081"
numEndpoints = 3
)
Expand Down Expand Up @@ -132,6 +137,7 @@ func TestFetchAndSaveCounts(t *testing.T) {
endpointsFn,
ns,
svcName,
deplName,
srvURL.Port(),
// time.NewTicker(1*time.Millisecond),
)
Expand Down