From 1dbec8b010da04fe00bc704f2f0097bfacc42055 Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:14:44 -0800 Subject: [PATCH] fix: address comments Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- pkg/scalers/nsq_scaler.go | 18 ++----- tests/scalers/nsq/nsq_test.go | 89 ++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/pkg/scalers/nsq_scaler.go b/pkg/scalers/nsq_scaler.go index 7a99d95baa8..ab5ccb015fe 100644 --- a/pkg/scalers/nsq_scaler.go +++ b/pkg/scalers/nsq_scaler.go @@ -66,14 +66,6 @@ func (m nsqMetadata) Validate() error { return fmt.Errorf("no nsqLookupdHTTPAddresses given") } - if m.Topic == "" { - return fmt.Errorf("no topic given") - } - - if m.Channel == "" { - return fmt.Errorf("no channel given") - } - if m.DepthThreshold <= 0 { return fmt.Errorf("depthThreshold must be a positive integer") } @@ -101,7 +93,7 @@ func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ( return []external_metrics.ExternalMetricValue{}, false, err } - s.logger.Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) + s.logger.V(1).Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) metric := GenerateMetricInMili(metricName, float64(depth)) @@ -115,7 +107,7 @@ func (s nsqScaler) getTopicChannelDepth() (int64, error) { } if len(nsqdHosts) == 0 { - s.logger.Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) + s.logger.V(1).Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) return 0, nil } @@ -287,7 +279,7 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str if len(t.Channels) == 0 { // topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap - s.logger.Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) depth += t.Depth continue } @@ -301,14 +293,14 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str if ch.Paused { // if it's paused on a single nsqd host, it's depth should not go into the aggregate // meaning if paused on all nsqd hosts => depth == 0 - s.logger.Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) continue } depth += ch.Depth } if !channelExists { // topic exists with channels, but not the one in question - fallback to topic depth - s.logger.Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) depth += t.Depth } } diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index a75acc99f78..6a29805efaa 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -21,16 +21,18 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) - jobName = fmt.Sprintf("%s-producer-job", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - nsqNamespace = "nsq" - nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" - minReplicas = 1 - maxReplicas = 10 - topicName = "test_topic" - channelName = "test_channel" + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) + jobName = fmt.Sprintf("%s-producer-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + nsqNamespace = "nsq" + nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" + minReplicaCount = 0 + maxReplicaCount = 2 + depthThreshold = 10 + activationDepthThreshold = 5 + topicName = "test_topic" + channelName = "test_channel" ) const ( @@ -58,6 +60,7 @@ spec: - "--mode=consumer" - "--topic={{.TopicName}}" - "--channel={{.ChannelName}}" + - "--sleep-duration=1s" - "--nsqlookupd-http-address=nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" imagePullPolicy: Always ` @@ -73,9 +76,8 @@ metadata: spec: pollingInterval: 5 cooldownPeriod: 10 - idleReplicaCount: 0 - maxReplicaCount: {{.MaxReplicas}} - minReplicaCount: {{.MinReplicas}} + maxReplicaCount: {{.MaxReplicaCount}} + minReplicaCount: {{.MinReplicaCount}} scaleTargetRef: apiVersion: "apps/v1" kind: "Deployment" @@ -87,8 +89,8 @@ spec: nsqLookupdHTTPAddresses: "nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" topic: "{{.TopicName}}" channel: "{{.ChannelName}}" - depthThreshold: "10" - activationDepthThreshold: "5" + depthThreshold: "{{.DepthThreshold}}" + activationDepthThreshold: "{{.ActivationDepthThreshold}}" ` jobTemplate = ` @@ -114,16 +116,18 @@ spec: ) type templateData struct { - TestNamespace string - NSQNamespace string - DeploymentName string - ScaledObjectName string - JobName string - MinReplicas int - MaxReplicas int - TopicName string - ChannelName string - MessageCount int + TestNamespace string + NSQNamespace string + DeploymentName string + ScaledObjectName string + JobName string + MinReplicaCount int + MaxReplicaCount int + DepthThreshold int + ActivationDepthThreshold int + TopicName string + ChannelName string + MessageCount int } func TestNSQScaler(t *testing.T) { @@ -172,15 +176,17 @@ func uninstallNSQ(t *testing.T) { func getTemplateData() (templateData, []Template) { return templateData{ - TestNamespace: testNamespace, - NSQNamespace: nsqNamespace, - DeploymentName: deploymentName, - JobName: jobName, - ScaledObjectName: scaledObjectName, - MinReplicas: minReplicas, - MaxReplicas: maxReplicas, - TopicName: topicName, - ChannelName: channelName, + TestNamespace: testNamespace, + NSQNamespace: nsqNamespace, + DeploymentName: deploymentName, + JobName: jobName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TopicName: topicName, + ChannelName: channelName, + DepthThreshold: depthThreshold, + ActivationDepthThreshold: activationDepthThreshold, }, []Template{ {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, @@ -190,20 +196,25 @@ func getTemplateData() (templateData, []Template) { func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing activation ---") - data.MessageCount = 5 + data.MessageCount = activationDepthThreshold KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) - AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + data.MessageCount = 1 // total message count > activationDepthThreshold + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), + "replica count should reach 1 in under 1 minute") } func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing scale out ---") - data.MessageCount = 1 // 5 already published + 1 > activationDepthThreshold + // can handle depthThreshold messages per replica - using maxReplicaCount + 1 to ensure scaling to maxReplicaCount + data.MessageCount = depthThreshold * (maxReplicaCount + 1) KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) - require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), - "replica count should be 1 after 1 minute") + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1), + "replica count should reach 2 in under 1 minute") } func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {