diff --git a/CHANGELOG.md b/CHANGELOG.md index 20f0b57af69..64a9318a0db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276)) - **Azure Pipelines**: Fix for disallowing `$top` on query when using `meta.parentID` method ([#4397]) - **Azure Pipelines**: Respect all required demands ([#4404](https://github.com/kedacore/keda/issues/4404)) +- **NATS Jetstream Scaler**: Fix compatibility if node is not advertised ([#4524](https://github.com/kedacore/keda/issues/4524)) - **Prometheus Metrics**: Create e2e tests for all exposed Prometheus metrics ([#4127](https://github.com/kedacore/keda/issues/4127)) - **Grafana Dashboard**: Fix HPA metrics panel to use range instead of instant ([#4513](https://github.com/kedacore/keda/pull/4513)) diff --git a/pkg/scalers/nats_jetstream_scaler.go b/pkg/scalers/nats_jetstream_scaler.go index aab737690c0..d6874a1ad47 100644 --- a/pkg/scalers/nats_jetstream_scaler.go +++ b/pkg/scalers/nats_jetstream_scaler.go @@ -53,8 +53,13 @@ type jetStreamEndpointResponse struct { } type jetStreamServerEndpointResponse struct { - ConnectUrls []string `json:"connect_urls"` - ServerName string `json:"server_name"` + ConnectUrls []string `json:"connect_urls"` + Cluster jetStreamCluster `json:"cluster"` + ServerName string `json:"server_name"` +} + +type jetStreamCluster struct { + HostUrls []string `json:"urls"` } type accountDetail struct { @@ -235,27 +240,51 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context return err } - for _, clusterURL := range jetStreamServerResp.ConnectUrls { - // get hostname from the url - // nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or - // 172.0.1.3:4221 -> 172.0.1.3 - nodeHostname := strings.Split(clusterURL, ":")[0] - natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname) - if err != nil { - return err - } + isNodeAdvertised := true + clusterUrls := jetStreamServerResp.ConnectUrls + if len(clusterUrls) == 0 { + isNodeAdvertised = false + // append current node's `server_name` to check if it is a leader + // even though `server_name` is not an url, it will be split by first . (dot) + // to get the node's name anyway + clusterUrls = append(clusterUrls, jetStreamServerResp.ServerName) + clusterUrls = append(clusterUrls, jetStreamServerResp.Cluster.HostUrls...) + } - // Query server info to get its name - jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL) - if err != nil { - return err - } + for _, clusterURL := range clusterUrls { + var ( + node string + natsJetStreamMonitoringNodeURL string + ) + + if isNodeAdvertised { + // get hostname from the url + // nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or + // 172.0.1.3:4221 -> 172.0.1.3 + nodeHostname := strings.Split(clusterURL, ":")[0] + natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname) + if err != nil { + return err + } - node := jetStreamServerResp.ServerName + // Query server info to get its name + jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL) + if err != nil { + return err + } - natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname) - if err != nil { - return err + node = jetStreamServerResp.ServerName + + natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURL(nodeHostname) + if err != nil { + return err + } + } else { + node = strings.Split(clusterURL, ".")[0] + natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURLByNode(node) + if err != nil { + return err + } } jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL) @@ -394,6 +423,7 @@ func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname str return "", err } + // set the port to the monitoringURL port if exists if port := jsURL.Port(); port != "" { nodeHostname = net.JoinHostPort(nodeHostname, port) } @@ -401,6 +431,15 @@ func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname str return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil } +func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURLByNode(node string) (string, error) { + jsURL, err := url.Parse(s.metadata.monitoringURL) + if err != nil { + s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL) + return "", err + } + return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil +} + func (s *natsJetStreamScaler) getMaxMsgLag() int64 { consumerName := s.metadata.consumer diff --git a/pkg/scalers/nats_jetstream_scaler_test.go b/pkg/scalers/nats_jetstream_scaler_test.go index 12f67d39acd..d31bedf6cf6 100644 --- a/pkg/scalers/nats_jetstream_scaler_test.go +++ b/pkg/scalers/nats_jetstream_scaler_test.go @@ -217,6 +217,12 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{ }, false, false}, } +var testNATSJetStreamServerMockResponses = map[string][]byte{ + "not-leader-1.localhost:8222": []byte(`{"server_name": "not-leader-1", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`), + "not-leader-2.localhost:8222": []byte(`{"server_name": "not-leader-2", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-1.localhost.nats.svc:8222"]}}`), + "leader.localhost:8222": []byte(`{"server_name": "leader", "cluster": {"urls": ["not-leader-1.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`), +} + func TestNATSJetStreamIsActive(t *testing.T) { for _, mockResponse := range testNATSJetStreamMockResponses { mockResponseJSON, err := json.Marshal(mockResponse.data) @@ -314,7 +320,7 @@ func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptes // redirect leader.localhost for the clustered test http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { - if addr == "leader.localhost:8222" { + if strings.HasSuffix(addr, ".localhost:8222") { addr = "127.0.0.1:8222" } return dialer.DialContext(ctx, network, addr) @@ -324,13 +330,27 @@ func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptes switch r.URL.Path { case "/jsz": w.WriteHeader(http.StatusOK) + // if requesting from specific node and not a leader, which indicate clustered test + // send empty response + if strings.HasSuffix(r.Host, ".localhost:8222") && r.Host != "leader.localhost:8222" { + mockResponseJSON, _ = json.Marshal(&jetStreamEndpointResponse{}) + } _, err := w.Write(mockResponseJSON) if err != nil { t.Fatal("Could not write to the http server connection:", err) } case "/varz": w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte(`{"cluster": {"urls": ["leader.localhost.nats.svc:8222"]}}`)) + res, ok := testNATSJetStreamServerMockResponses[r.Host] + if !ok { + // if given host is not a specific node (e.g. loadbalancer) + // get response from random node + for _, v := range testNATSJetStreamServerMockResponses { + res = v + break + } + } + _, err := w.Write(res) if err != nil { t.Fatal("Could not write to the http server connection:", err) } diff --git a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go index 23e05124d6d..d390bec549a 100644 --- a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go +++ b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go @@ -36,11 +36,19 @@ var ( ) func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) { + testNATSJetStreamScalerClusterWithStreamReplicas(t, false) +} + +func TestNATSJetStreamScalerClusterWithStreamReplicasWithNoAdvertise(t *testing.T) { + testNATSJetStreamScalerClusterWithStreamReplicas(t, true) +} + +func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise bool) { // Create k8s resources. kc := GetKubernetesClient(t) // Deploy NATS server. - installClusterWithJetStream(t, kc) + installClusterWithJetStream(t, kc, noAdvertise) assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3), "replica count should be %d after 3 minutes", minReplicaCount) @@ -129,18 +137,19 @@ func removeStreamAndConsumer(t *testing.T, streamReplicas int, stream, namespace } // installClusterWithJetStream install the nats helm chart with clustered jetstream enabled -func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset) { +func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset, noAdvertise bool) { CreateNamespace(t, kc, natsNamespace) _, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo)) assert.NoErrorf(t, err, "cannot execute command - %s", err) _, err = ExecuteCommand("helm repo update") assert.NoErrorf(t, err, "cannot execute command - %s", err) - _, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`, + _, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`, nats.NatsJetStreamChartVersion, "nats.jetstream.enabled=true", "nats.jetstream.fileStorage.enabled=false", "cluster.enabled=true", fmt.Sprintf("replicas=%d", natsServerReplicas), + fmt.Sprintf("cluster.noAdvertise=%t", noAdvertise), natsNamespace, nats.NatsJetStreamName)) assert.NoErrorf(t, err, "cannot execute command - %s", err)