Skip to content

Commit

Permalink
Fixing issue #272
Browse files Browse the repository at this point in the history
  • Loading branch information
ajanth97 committed Sep 28, 2021
1 parent f8d7c0b commit ef93931
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
2 changes: 2 additions & 0 deletions scaler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type config struct {
// UpdateRoutingTableDur is the duration between manual
// updates to the routing table.
UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"`
// This will be the 'Target Pending Requests' for the interceptor
TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"`
}

func mustParseConfig() *config {
Expand Down
45 changes: 27 additions & 18 deletions scaler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ func init() {
}

type impl struct {
lggr logr.Logger
pinger *queuePinger
routingTable routing.TableReader
targetMetric int64
lggr logr.Logger
pinger *queuePinger
routingTable routing.TableReader
targetMetric int64
targetMetricInterceptor int64
externalscaler.UnimplementedExternalScalerServer
}

Expand All @@ -33,12 +34,14 @@ func newImpl(
pinger *queuePinger,
routingTable routing.TableReader,
defaultTargetMetric int64,
defaultTargetMetricInterceptor int64,
) *impl {
return &impl{
lggr: lggr,
pinger: pinger,
routingTable: routingTable,
targetMetric: defaultTargetMetric,
lggr: lggr,
pinger: pinger,
routingTable: routingTable,
targetMetric: defaultTargetMetric,
targetMetricInterceptor: defaultTargetMetricInterceptor,
}
}

Expand Down Expand Up @@ -115,20 +118,26 @@ func (e *impl) GetMetricSpec(
lggr.Error(err, "no 'host' found in ScaledObject metadata")
return nil, err
}
target, err := e.routingTable.Lookup(host)
if err != nil {
lggr.Error(
err,
"error getting target for host",
"host",
host,
)
return nil, err
var targetPendingRequests int64
if host == "interceptor" {
targetPendingRequests = e.targetMetricInterceptor
} else {
target, err := e.routingTable.Lookup(host)
if err != nil {
lggr.Error(
err,
"error getting target for host",
"host",
host,
)
return nil, err
}
targetPendingRequests = int64(target.TargetPendingRequests)
}
metricSpecs := []*externalscaler.MetricSpec{
{
MetricName: host,
TargetSize: int64(target.TargetPendingRequests),
TargetSize: targetPendingRequests,
},
}

Expand Down
4 changes: 4 additions & 0 deletions scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func main() {
svcName := cfg.TargetService
targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
targetPendingRequests := cfg.TargetPendingRequests
targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor

k8sCl, _, err := k8s.NewClientset()
if err != nil {
Expand Down Expand Up @@ -69,6 +70,7 @@ func main() {
pinger,
table,
int64(targetPendingRequests),
int64(targetPendingRequestsInterceptor),
)
})

Expand Down Expand Up @@ -106,6 +108,7 @@ func startGrpcServer(
pinger *queuePinger,
routingTable *routing.Table,
targetPendingRequests int64,
targetPendingRequestsInterceptor int64,
) error {

addr := fmt.Sprintf("0.0.0.0:%d", port)
Expand All @@ -123,6 +126,7 @@ func startGrpcServer(
pinger,
routingTable,
targetPendingRequests,
targetPendingRequestsInterceptor,
),
)
reflection.Register(grpcServer)
Expand Down

0 comments on commit ef93931

Please sign in to comment.