From 9488dd55d8656a8965f57b9fdfa56b2a1909ab70 Mon Sep 17 00:00:00 2001 From: Somesh Koli Date: Fri, 27 Jan 2023 02:49:08 +0530 Subject: [PATCH] fead: Move http scaled object from single host to multi host system Signed-off-by: Somesh Koli Signed-off-by: Jocelyn Thode --- .github/workflows/e2e-tests.yaml | 1 - .../bases/http.keda.sh_httpscaledobjects.yaml | 8 +- examples/xkcd/templates/httpscaledobject.yaml | 5 +- examples/xkcd/templates/ingress.yaml | 4 +- examples/xkcd/values.yaml | 4 +- .../http/v1alpha1/httpscaledobject_types.go | 4 +- operator/controllers/http/app.go | 4 +- operator/controllers/http/routing_table.go | 36 +++-- .../controllers/http/routing_table_test.go | 24 ++- operator/controllers/http/scaled_object.go | 2 +- .../controllers/http/scaled_object_test.go | 6 + operator/controllers/http/suite_test.go | 1 + pkg/k8s/scaledobject.go | 8 +- pkg/k8s/templates/scaledobject.yaml | 24 +++ scaler/handlers.go | 137 ++++++++++-------- scaler/handlers_test.go | 123 +++++++++++++--- scaler/host_counts.go | 21 --- scaler/hosts.go | 45 ++++++ 18 files changed, 310 insertions(+), 147 deletions(-) create mode 100644 pkg/k8s/templates/scaledobject.yaml delete mode 100644 scaler/host_counts.go create mode 100644 scaler/hosts.go diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index 83355110d..90e6f7475 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -51,7 +51,6 @@ jobs: - name: Show Kubernetes version run: | kubectl version - - name: Run e2e test run: | make e2e-test diff --git a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml index 6b01f0bc2..7df2122f7 100644 --- a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml +++ b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml @@ -59,11 +59,13 @@ spec: spec: description: HTTPScaledObjectSpec defines the desired state of HTTPScaledObject properties: - host: + hosts: description: The host to route. All requests with this host in the "Host" header will be routed to the Service and Port specified in the scaleTargetRef - type: string + items: + type: string + type: array replicas: description: (optional) Replica information properties: @@ -107,7 +109,7 @@ spec: format: int32 type: integer required: - - host + - hosts - scaleTargetRef type: object status: diff --git a/examples/xkcd/templates/httpscaledobject.yaml b/examples/xkcd/templates/httpscaledobject.yaml index 613280ee3..8a4ca4e0a 100644 --- a/examples/xkcd/templates/httpscaledobject.yaml +++ b/examples/xkcd/templates/httpscaledobject.yaml @@ -3,7 +3,10 @@ apiVersion: http.keda.sh/v1alpha1 metadata: name: {{ include "xkcd.fullname" . }} spec: - host: {{ .Values.host }} + {{- with .Values.hosts }} + hosts: + {{- toYaml . | nindent 8 }} + {{- end }} targetPendingRequests: {{ .Values.targetPendingRequests }} scaleTargetRef: deployment: {{ include "xkcd.fullname" . }} diff --git a/examples/xkcd/templates/ingress.yaml b/examples/xkcd/templates/ingress.yaml index 97e564bbf..0e6f6a8ae 100644 --- a/examples/xkcd/templates/ingress.yaml +++ b/examples/xkcd/templates/ingress.yaml @@ -8,7 +8,8 @@ metadata: kubernetes.io/ingress.class: nginx spec: rules: - - host: {{ .Values.host }} + {{- range .Values.hosts }} + - host: {{ . | toString }} http: paths: - path: / @@ -18,3 +19,4 @@ spec: name: keda-add-ons-http-interceptor-proxy port: number: 8080 + {{- end }} diff --git a/examples/xkcd/values.yaml b/examples/xkcd/values.yaml index 716eb9db3..4584960bd 100644 --- a/examples/xkcd/values.yaml +++ b/examples/xkcd/values.yaml @@ -1,5 +1,7 @@ replicaCount: 1 -host: myhost.com +hosts: + - "myhost.com" + - "myhost2.com" targetPendingRequests: 200 # This is the namespace that the ingress should be installed # into. It should be set to the same namespace as the diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index 5bc5b69a8..b38c5fb08 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -40,9 +40,9 @@ type ReplicaStruct struct { // HTTPScaledObjectSpec defines the desired state of HTTPScaledObject type HTTPScaledObjectSpec struct { - // The host to route. All requests with this host in the "Host" header will + // The hosts to route. All requests with these hosts in the "Host" header will // be routed to the Service and Port specified in the scaleTargetRef - Host string `json:"host"` + Hosts []string `json:"hosts"` // The name of the deployment to route HTTP requests to (and to autoscale). // Either this or Image must be set ScaleTargetRef *ScaleTargetRef `json:"scaleTargetRef"` diff --git a/operator/controllers/http/app.go b/operator/controllers/http/app.go index ef1a0f187..87c9fe870 100644 --- a/operator/controllers/http/app.go +++ b/operator/controllers/http/app.go @@ -85,7 +85,7 @@ func removeApplicationResources( logger, cl, routingTable, - httpso.Spec.Host, + httpso.Spec.Hosts, baseConfig.CurrentNamespace, ); err != nil { return err @@ -149,7 +149,7 @@ func createOrUpdateApplicationResources( logger, cl, routingTable, - httpso.Spec.Host, + httpso.Spec.Hosts, routing.NewTarget( httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service, diff --git a/operator/controllers/http/routing_table.go b/operator/controllers/http/routing_table.go index e50c16842..128c2a059 100644 --- a/operator/controllers/http/routing_table.go +++ b/operator/controllers/http/routing_table.go @@ -16,17 +16,19 @@ func removeAndUpdateRoutingTable( lggr logr.Logger, cl client.Client, table *routing.Table, - host, + hosts []string, namespace string, ) error { lggr = lggr.WithName("removeAndUpdateRoutingTable") - if err := table.RemoveTarget(host); err != nil { - lggr.Error( - err, - "could not remove host from routing table, progressing anyway", - "host", - host, - ) + for _, host := range hosts { + if err := table.RemoveTarget(host); err != nil { + lggr.Error( + err, + "could not remove host from routing table, progressing anyway", + "host", + host, + ) + } } return updateRoutingMap(ctx, lggr, cl, namespace, table) @@ -37,18 +39,20 @@ func addAndUpdateRoutingTable( lggr logr.Logger, cl client.Client, table *routing.Table, - host string, + hosts []string, target routing.Target, namespace string, ) error { lggr = lggr.WithName("addAndUpdateRoutingTable") - if err := table.AddTarget(host, target); err != nil { - lggr.Error( - err, - "could not add host to routing table, progressing anyway", - "host", - host, - ) + for _, host := range hosts { + if err := table.AddTarget(host, target); err != nil { + lggr.Error( + err, + "could not add host to routing table, progressing anyway", + "host", + host, + ) + } } return updateRoutingMap(ctx, lggr, cl, namespace, table) } diff --git a/operator/controllers/http/routing_table_test.go b/operator/controllers/http/routing_table_test.go index 07d6bf14e..3f754ae38 100644 --- a/operator/controllers/http/routing_table_test.go +++ b/operator/controllers/http/routing_table_test.go @@ -13,14 +13,18 @@ import ( "github.com/kedacore/http-add-on/pkg/routing" ) +func getHosts() []string { + return []string{"myhost.com"} +} + func TestRoutingTable(t *testing.T) { table := routing.NewTable() const ( - host = "myhost.com" ns = "testns" svcName = "testsvc" deplName = "testdepl" ) + hosts := getHosts() r := require.New(t) ctx := context.Background() cl := k8s.NewFakeRuntimeClient() @@ -45,7 +49,7 @@ func TestRoutingTable(t *testing.T) { logr.Discard(), cl, table, - host, + hosts, target, ns, )) @@ -56,16 +60,18 @@ func TestRoutingTable(t *testing.T) { r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates)) r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates)) - retTarget, err := table.Lookup(host) - r.NoError(err) - r.Equal(&target, retTarget) + for _, host := range hosts { + retTarget, err := table.Lookup(host) + r.NoError(err) + r.Equal(target, *retTarget) + } r.NoError(removeAndUpdateRoutingTable( ctx, logr.Discard(), cl, table, - host, + hosts, ns, )) @@ -76,6 +82,8 @@ func TestRoutingTable(t *testing.T) { r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates)) r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates)) - _, err = table.Lookup(host) - r.Error(err) + for _, host := range hosts { + _, err := table.Lookup(host) + r.Error(err) + } } diff --git a/operator/controllers/http/scaled_object.go b/operator/controllers/http/scaled_object.go index afa9aed33..fa4a881bb 100644 --- a/operator/controllers/http/scaled_object.go +++ b/operator/controllers/http/scaled_object.go @@ -39,7 +39,7 @@ func createOrUpdateScaledObject( fmt.Sprintf("%s-app", httpso.GetName()), // HTTPScaledObject name is the same as the ScaledObject name httpso.Spec.ScaleTargetRef.Deployment, externalScalerHostName, - httpso.Spec.Host, + httpso.Spec.Hosts, minReplicaCount, maxReplicaCount, httpso.Spec.CooldownPeriod, diff --git a/operator/controllers/http/scaled_object_test.go b/operator/controllers/http/scaled_object_test.go index 25d583bcd..07280bd20 100644 --- a/operator/controllers/http/scaled_object_test.go +++ b/operator/controllers/http/scaled_object_test.go @@ -72,6 +72,12 @@ func TestCreateOrUpdateScaledObject(t *testing.T) { spec.MaxReplicaCount, ) + // get hosts from spec and ensure all the hosts are there + r.Equal( + 2, + len(testInfra.httpso.Spec.Hosts), + ) + // now update the min and max replicas on the httpso // and call createOrUpdateScaledObject again if spec := &testInfra.httpso.Spec; spec.Replicas == nil { diff --git a/operator/controllers/http/suite_test.go b/operator/controllers/http/suite_test.go index 347068958..63de5b286 100644 --- a/operator/controllers/http/suite_test.go +++ b/operator/controllers/http/suite_test.go @@ -113,6 +113,7 @@ func newCommonTestInfra(namespace, appName string) *commonTestInfra { Service: appName, Port: 8081, }, + Hosts: []string{"myhost1.com", "myhost2.com"}, }, } diff --git a/pkg/k8s/scaledobject.go b/pkg/k8s/scaledobject.go index ddc20b0fe..e185a892f 100644 --- a/pkg/k8s/scaledobject.go +++ b/pkg/k8s/scaledobject.go @@ -1,6 +1,8 @@ package k8s import ( + "strings" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,7 +14,7 @@ const ( soTriggerType = "external-push" mkScalerAddress = "scalerAddress" - mkHost = "host" + mkHosts = "hosts" ) // NewScaledObject creates a new ScaledObject in memory @@ -21,7 +23,7 @@ func NewScaledObject( name string, deploymentName string, scalerAddress string, - host string, + hosts []string, minReplicas *int32, maxReplicas *int32, cooldownPeriod *int32, @@ -54,7 +56,7 @@ func NewScaledObject( Type: soTriggerType, Metadata: map[string]string{ mkScalerAddress: scalerAddress, - mkHost: host, + mkHosts: strings.Join(hosts, ","), }, }, }, diff --git a/pkg/k8s/templates/scaledobject.yaml b/pkg/k8s/templates/scaledobject.yaml new file mode 100644 index 000000000..9ebae7a9e --- /dev/null +++ b/pkg/k8s/templates/scaledobject.yaml @@ -0,0 +1,24 @@ +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{ .Name }} + namespace: {{ .Namespace }} + labels: + {{- range $key, $val := .Labels }} + {{ $key }}: {{ $val }} + {{- end }} +spec: + minReplicaCount: {{ .MinReplicas }} + maxReplicaCount: {{ .MaxReplicas }} + cooldownPeriod: {{ .CooldownPeriod }} + pollingInterval: 1 + scaleTargetRef: + name: {{ .DeploymentName }} + kind: Deployment + triggers: + - type: external-push + metadata: + scalerAddress: {{ $.ScalerAddress }} + hosts: '{{ join "," .Values.Hosts }}' + advanced: + restoreToOriginalReplicaCount: true diff --git a/scaler/handlers.go b/scaler/handlers.go index f616fc240..95e1e74e3 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -59,32 +59,38 @@ func (e *impl) IsActive( scaledObject *externalscaler.ScaledObjectRef, ) (*externalscaler.IsActiveResponse, error) { lggr := e.lggr.WithName("IsActive") - host, ok := scaledObject.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("no 'host' field found in ScaledObject metadata") - lggr.Error(err, "returning immediately from IsActive RPC call", "ScaledObject", scaledObject) + + hosts, err := getHostsFromScaledObjectRef(lggr, scaledObject) + if err != nil { return nil, err } - if host == interceptor { - return &externalscaler.IsActiveResponse{ - Result: true, - }, nil - } - hostCount, ok := getHostCount( - host, - e.pinger.counts(), - e.routingTable, - ) - if !ok { - err := fmt.Errorf("host '%s' not found in counts", host) - allCounts := e.pinger.mergeCountsWithRoutingTable( + totalHostCount := 0 + for _, host := range hosts { + if host == interceptor { + return &externalscaler.IsActiveResponse{ + Result: true, + }, nil + } + + hostCount, ok := getHostCount( + host, + e.pinger.counts(), e.routingTable, ) - lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts) - return nil, err + if !ok { + err := fmt.Errorf("host '%s' not found in counts", host) + allCounts := e.pinger.mergeCountsWithRoutingTable( + e.routingTable, + ) + lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts) + return nil, err + } + + totalHostCount += hostCount } - active := hostCount > 0 + + active := totalHostCount > 0 return &externalscaler.IsActiveResponse{ Result: active, }, nil @@ -131,37 +137,39 @@ func (e *impl) GetMetricSpec( sor *externalscaler.ScaledObjectRef, ) (*externalscaler.GetMetricSpecResponse, error) { lggr := e.lggr.WithName("GetMetricSpec") - host, ok := sor.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("'host' not found in ScaledObject metadata") - lggr.Error(err, "no 'host' found in ScaledObject metadata") + + hosts, err := getHostsFromScaledObjectRef(lggr, sor) + if err != nil { return nil, err } + var targetPendingRequests int64 - if host == interceptor { - targetPendingRequests = e.targetMetricInterceptor - } else { - target, err := e.routingTable.Lookup(host) - if err != nil { - lggr.Error( - err, - "error getting target for host", - "host", - host, - ) - return nil, err + var hostMetricSpec []*externalscaler.MetricSpec + for _, host := range hosts { + if host == interceptor { + targetPendingRequests = e.targetMetricInterceptor + } else { + target, err := e.routingTable.Lookup(host) + if err != nil { + lggr.Error( + err, + "error getting target for host", + "host", + host, + ) + return nil, err + } + targetPendingRequests = int64(target.TargetPendingRequests) } - targetPendingRequests = int64(target.TargetPendingRequests) - } - metricSpecs := []*externalscaler.MetricSpec{ - { + + hostMetricSpec = append(hostMetricSpec, &externalscaler.MetricSpec{ MetricName: host, TargetSize: targetPendingRequests, - }, + }) } return &externalscaler.GetMetricSpecResponse{ - MetricSpecs: metricSpecs, + MetricSpecs: hostMetricSpec, }, nil } @@ -170,35 +178,36 @@ func (e *impl) GetMetrics( metricRequest *externalscaler.GetMetricsRequest, ) (*externalscaler.GetMetricsResponse, error) { lggr := e.lggr.WithName("GetMetrics") - host, ok := metricRequest.ScaledObjectRef.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("no 'host' field found in ScaledObject metadata") - lggr.Error(err, "ScaledObjectRef", metricRequest.ScaledObjectRef) + + hosts, err := getHostsFromScaledObjectRef(lggr, metricRequest.ScaledObjectRef) + if err != nil { return nil, err } - hostCount, ok := getHostCount( - host, - e.pinger.counts(), - e.routingTable, - ) - if !ok { - if host == interceptor { - hostCount = e.pinger.aggregate() - } else { - err := fmt.Errorf("host '%s' not found in counts", host) - allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable) - lggr.Error(err, "allCounts", allCounts) - return nil, err + var hostMetricValues []*externalscaler.MetricValue + for _, host := range hosts { + hostCount, ok := getHostCount( + host, + e.pinger.counts(), + e.routingTable, + ) + if !ok { + if host == interceptor { + hostCount = e.pinger.aggregate() + } else { + err := fmt.Errorf("host '%s' not found in counts", host) + allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable) + lggr.Error(err, "allCounts", allCounts) + return nil, err + } } - } - metricValues := []*externalscaler.MetricValue{ - { + hostMetricValues = append(hostMetricValues, &externalscaler.MetricValue{ MetricName: host, MetricValue: int64(hostCount), - }, + }) } + return &externalscaler.GetMetricsResponse{ - MetricValues: metricValues, + MetricValues: hostMetricValues, }, nil } diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 8c0d95457..a208b73b3 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -2,6 +2,7 @@ package main import ( context "context" + "encoding/json" "fmt" "net" "testing" @@ -31,7 +32,7 @@ func standardTarget() routing.Target { func TestStreamIsActive(t *testing.T) { type testCase struct { name string - host string + hosts string expected bool expectedErr bool setup func(*routing.Table, *queuePinger) @@ -40,7 +41,7 @@ func TestStreamIsActive(t *testing.T) { testCases := []testCase{ { name: "Simple host inactive", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -52,14 +53,14 @@ func TestStreamIsActive(t *testing.T) { }, { name: "Host is 'interceptor'", - host: "interceptor", + hosts: "interceptor", expected: true, expectedErr: false, setup: func(*routing.Table, *queuePinger) {}, }, { name: "Simple host active", - host: t.Name(), + hosts: t.Name(), expected: true, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -69,9 +70,22 @@ func TestStreamIsActive(t *testing.T) { q.allCounts[t.Name()] = 1 }, }, + { + name: "Simple multi host active", + hosts: "host1,host2", + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + r.NoError(table.AddTarget(t.Name(), standardTarget())) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts["host1"] = 1 + q.allCounts["host2"] = 1 + }, + }, { name: "No host present, but host in routing table", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -80,7 +94,7 @@ func TestStreamIsActive(t *testing.T) { }, { name: "Host doesn't exist", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: true, setup: func(*routing.Table, *queuePinger) {}, @@ -130,9 +144,10 @@ func TestStreamIsActive(t *testing.T) { client := externalscaler.NewExternalScalerClient(conn) + serializedHosts, _ := json.Marshal(tc.hosts) testRef := &externalscaler.ScaledObjectRef{ ScalerMetadata: map[string]string{ - "host": tc.host, + "hosts": string(serializedHosts), }, } @@ -163,7 +178,7 @@ func TestStreamIsActive(t *testing.T) { func TestIsActive(t *testing.T) { type testCase struct { name string - host string + hosts string expected bool expectedErr bool setup func(*routing.Table, *queuePinger) @@ -172,7 +187,7 @@ func TestIsActive(t *testing.T) { testCases := []testCase{ { name: "Simple host inactive", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -184,14 +199,14 @@ func TestIsActive(t *testing.T) { }, { name: "Host is 'interceptor'", - host: "interceptor", + hosts: "interceptor", expected: true, expectedErr: false, setup: func(*routing.Table, *queuePinger) {}, }, { name: "Simple host active", - host: t.Name(), + hosts: t.Name(), expected: true, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -201,9 +216,22 @@ func TestIsActive(t *testing.T) { q.allCounts[t.Name()] = 1 }, }, + { + name: "Simple multi host active", + hosts: "host1,host2", + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + r.NoError(table.AddTarget(t.Name(), standardTarget())) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts["host1"] = 1 + q.allCounts["host2"] = 1 + }, + }, { name: "No host present, but host in routing table", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -212,7 +240,7 @@ func TestIsActive(t *testing.T) { }, { name: "Host doesn't exist", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: true, setup: func(*routing.Table, *queuePinger) {}, @@ -236,11 +264,13 @@ func TestIsActive(t *testing.T) { 123, 200, ) + + serializedHosts, _ := json.Marshal(tc.hosts) res, err := hdl.IsActive( ctx, &externalscaler.ScaledObjectRef{ ScalerMetadata: map[string]string{ - "host": tc.host, + "hosts": string(serializedHosts), }, }, ) @@ -268,13 +298,16 @@ func TestGetMetricSpecTable(t *testing.T) { checker func(*testing.T, *externalscaler.GetMetricSpecResponse, error) } r := require.New(t) + t1Hosts, _ := json.Marshal("validHost") + t2Hosts, _ := json.Marshal("validHost1,validHost2") + t3Hosts, _ := json.Marshal("interceptor") cases := []testCase{ { - name: "valid host as host value in scaler metadata", + name: "valid host as single host value in scaler metadata", defaultTargetMetric: 0, defaultTargetMetricInterceptor: 123, scalerMetadata: map[string]string{ - "host": "validHost", + "hosts": string(t1Hosts), "targetPendingRequests": "123", }, newRoutingTableFn: func() *routing.Table { @@ -299,12 +332,52 @@ func TestGetMetricSpecTable(t *testing.T) { r.Equal(int64(123), spec.TargetSize) }, }, + { + name: "valid hosts as multiple hosts value in scaler metadata", + defaultTargetMetric: 0, + defaultTargetMetricInterceptor: 123, + scalerMetadata: map[string]string{ + "hosts": string(t2Hosts), + "targetPendingRequests": "123", + }, + newRoutingTableFn: func() *routing.Table { + ret := routing.NewTable() + r.NoError(ret.AddTarget("validHost1", routing.NewTarget( + ns, + "testsrv", + 8080, + "testdepl", + 123, + ))) + r.NoError(ret.AddTarget("validHost2", routing.NewTarget( + ns, + "testsrv", + 8080, + "testdepl", + 456, + ))) + return ret + }, + checker: func(t *testing.T, res *externalscaler.GetMetricSpecResponse, err error) { + t.Helper() + r := require.New(t) + r.NoError(err) + r.NotNil(res) + r.Equal(2, len(res.MetricSpecs)) + spec := res.MetricSpecs[0] + r.Equal("validHost1", spec.MetricName) + r.Equal(int64(123), spec.TargetSize) + spec = res.MetricSpecs[1] + r.Equal("validHost2", spec.MetricName) + r.Equal(int64(456), spec.TargetSize) + }, + }, { name: "interceptor as host in scaler metadata", defaultTargetMetric: 1000, defaultTargetMetricInterceptor: 2000, scalerMetadata: map[string]string{ - "host": "interceptor", + "hosts": string(t3Hosts), "targetPendingRequests": "123", }, newRoutingTableFn: func() *routing.Table { @@ -423,9 +496,13 @@ func TestGetMetrics(t *testing.T) { }, nil } + t2Hosts, _ := json.Marshal("missingHostInQueue") + t4Hosts, _ := json.Marshal("interceptor") + t5Hosts, _ := json.Marshal("myhost.com") + t6Hosts, _ := json.Marshal("validHost") testCases := []testCase{ { - name: "no 'host' field in the scaler metadata field", + name: "no 'hosts' field in the scaler metadata field", scalerMetadata: map[string]string{}, setupFn: func( ctx context.Context, @@ -445,7 +522,7 @@ func TestGetMetrics(t *testing.T) { r.Nil(res) r.Contains( err.Error(), - "no 'host' field found in ScaledObject metadata", + "no 'hosts' field in the scaler metadata field", ) }, defaultTargetMetric: int64(200), @@ -454,7 +531,7 @@ func TestGetMetrics(t *testing.T) { { name: "missing host value in the queue pinger", scalerMetadata: map[string]string{ - "host": "missingHostInQueue", + "hosts": string(t2Hosts), }, setupFn: func( ctx context.Context, @@ -481,7 +558,7 @@ func TestGetMetrics(t *testing.T) { { name: "valid host", scalerMetadata: map[string]string{ - "host": "validHost", + "hosts": string(t6Hosts), }, setupFn: func( ctx context.Context, @@ -513,7 +590,7 @@ func TestGetMetrics(t *testing.T) { { name: "'interceptor' as host", scalerMetadata: map[string]string{ - "host": "interceptor", + "hosts": string(t4Hosts), }, setupFn: func( ctx context.Context, @@ -548,7 +625,7 @@ func TestGetMetrics(t *testing.T) { { name: "host in routing table, missing in queue pinger", scalerMetadata: map[string]string{ - "host": "myhost.com", + "hosts": string(t5Hosts), }, setupFn: func( ctx context.Context, diff --git a/scaler/host_counts.go b/scaler/host_counts.go deleted file mode 100644 index 0a8874842..000000000 --- a/scaler/host_counts.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "github.com/kedacore/http-add-on/pkg/routing" -) - -// getHostCount gets proper count for given host regardless whether -// host is in counts or only in routerTable -func getHostCount( - host string, - counts map[string]int, - table routing.TableReader, -) (int, bool) { - count, exists := counts[host] - if exists { - return count, exists - } - - exists = table.HasHost(host) - return 0, exists -} diff --git a/scaler/hosts.go b/scaler/hosts.go new file mode 100644 index 000000000..bef1b8a24 --- /dev/null +++ b/scaler/hosts.go @@ -0,0 +1,45 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/pkg/routing" + externalscaler "github.com/kedacore/http-add-on/proto" +) + +// getHostCount gets proper count for given host regardless whether +// host is in counts or only in routerTable +func getHostCount( + host string, + counts map[string]int, + table routing.TableReader, +) (int, bool) { + count, exists := counts[host] + if exists { + return count, exists + } + + exists = table.HasHost(host) + return 0, exists +} + +// gets hosts from scaledobjectref +func getHostsFromScaledObjectRef(lggr logr.Logger, sor *externalscaler.ScaledObjectRef) ([]string, error) { + serializedHosts, ok := sor.ScalerMetadata["hosts"] + if !ok { + err := fmt.Errorf("no 'hosts' field in the scaler metadata field") + lggr.Error(err, "'hosts' not found in the scaler metadata field") + return make([]string, 0), err + } + var hostsString string + err := json.Unmarshal([]byte(serializedHosts), &hostsString) + if err != nil { + err := fmt.Errorf("unable to unmarshal 'hosts' from scaledobject config") + lggr.Error(err, "'hosts' not configured properly in scaledobjectref") + return make([]string, 0), err + } + return strings.Split(hostsString, ","), nil +}