Skip to content

Commit

Permalink
find the stream leader the old way if nodes are not advertised (kedac…
Browse files Browse the repository at this point in the history
  • Loading branch information
mfadhlika authored and tobotg committed May 17, 2023
1 parent 62a3a29 commit efc8b83
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276))
- **Azure Pipelines**: Fix for disallowing `$top` on query when using `meta.parentID` method ([#4397])
- **Azure Pipelines**: Respect all required demands ([#4404](https://github.com/kedacore/keda/issues/4404))
- **NATS Jetstream Scaler**: Fix compatibility if node is not advertised ([#4524](https://github.com/kedacore/keda/issues/4524))
- **Prometheus Metrics**: Create e2e tests for all exposed Prometheus metrics ([#4127](https://github.com/kedacore/keda/issues/4127))
- **Grafana Dashboard**: Fix HPA metrics panel to use range instead of instant ([#4513](https://github.com/kedacore/keda/pull/4513))

Expand Down
79 changes: 59 additions & 20 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ type jetStreamEndpointResponse struct {
}

type jetStreamServerEndpointResponse struct {
ConnectUrls []string `json:"connect_urls"`
ServerName string `json:"server_name"`
ConnectUrls []string `json:"connect_urls"`
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
}

type accountDetail struct {
Expand Down Expand Up @@ -235,27 +240,51 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
return err
}

for _, clusterURL := range jetStreamServerResp.ConnectUrls {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}
isNodeAdvertised := true
clusterUrls := jetStreamServerResp.ConnectUrls
if len(clusterUrls) == 0 {
isNodeAdvertised = false
// append current node's `server_name` to check if it is a leader
// even though `server_name` is not an url, it will be split by first . (dot)
// to get the node's name anyway
clusterUrls = append(clusterUrls, jetStreamServerResp.ServerName)
clusterUrls = append(clusterUrls, jetStreamServerResp.Cluster.HostUrls...)
}

// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}
for _, clusterURL := range clusterUrls {
var (
node string
natsJetStreamMonitoringNodeURL string
)

if isNodeAdvertised {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}

node := jetStreamServerResp.ServerName
// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
node = jetStreamServerResp.ServerName

natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
}
} else {
node = strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURLByNode(node)
if err != nil {
return err
}
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL)
Expand Down Expand Up @@ -394,13 +423,23 @@ func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname str
return "", err
}

// set the port to the monitoringURL port if exists
if port := jsURL.Port(); port != "" {
nodeHostname = net.JoinHostPort(nodeHostname, port)
}

return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURLByNode(node string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
consumerName := s.metadata.consumer

Expand Down
24 changes: 22 additions & 2 deletions pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
}, false, false},
}

var testNATSJetStreamServerMockResponses = map[string][]byte{
"not-leader-1.localhost:8222": []byte(`{"server_name": "not-leader-1", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`),
"not-leader-2.localhost:8222": []byte(`{"server_name": "not-leader-2", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-1.localhost.nats.svc:8222"]}}`),
"leader.localhost:8222": []byte(`{"server_name": "leader", "cluster": {"urls": ["not-leader-1.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`),
}

func TestNATSJetStreamIsActive(t *testing.T) {
for _, mockResponse := range testNATSJetStreamMockResponses {
mockResponseJSON, err := json.Marshal(mockResponse.data)
Expand Down Expand Up @@ -314,7 +320,7 @@ func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptes

// redirect leader.localhost for the clustered test
http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
if addr == "leader.localhost:8222" {
if strings.HasSuffix(addr, ".localhost:8222") {
addr = "127.0.0.1:8222"
}
return dialer.DialContext(ctx, network, addr)
Expand All @@ -324,13 +330,27 @@ func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptes
switch r.URL.Path {
case "/jsz":
w.WriteHeader(http.StatusOK)
// if requesting from specific node and not a leader, which indicate clustered test
// send empty response
if strings.HasSuffix(r.Host, ".localhost:8222") && r.Host != "leader.localhost:8222" {
mockResponseJSON, _ = json.Marshal(&jetStreamEndpointResponse{})
}
_, err := w.Write(mockResponseJSON)
if err != nil {
t.Fatal("Could not write to the http server connection:", err)
}
case "/varz":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"cluster": {"urls": ["leader.localhost.nats.svc:8222"]}}`))
res, ok := testNATSJetStreamServerMockResponses[r.Host]
if !ok {
// if given host is not a specific node (e.g. loadbalancer)
// get response from random node
for _, v := range testNATSJetStreamServerMockResponses {
res = v
break
}
}
_, err := w.Write(res)
if err != nil {
t.Fatal("Could not write to the http server connection:", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ var (
)

func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) {
testNATSJetStreamScalerClusterWithStreamReplicas(t, false)
}

func TestNATSJetStreamScalerClusterWithStreamReplicasWithNoAdvertise(t *testing.T) {
testNATSJetStreamScalerClusterWithStreamReplicas(t, true)
}

func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise bool) {
// Create k8s resources.
kc := GetKubernetesClient(t)

// Deploy NATS server.
installClusterWithJetStream(t, kc)
installClusterWithJetStream(t, kc, noAdvertise)
assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)

Expand Down Expand Up @@ -129,18 +137,19 @@ func removeStreamAndConsumer(t *testing.T, streamReplicas int, stream, namespace
}

// installClusterWithJetStream install the nats helm chart with clustered jetstream enabled
func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset) {
func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset, noAdvertise bool) {
CreateNamespace(t, kc, natsNamespace)
_, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand("helm repo update")
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
nats.NatsJetStreamChartVersion,
"nats.jetstream.enabled=true",
"nats.jetstream.fileStorage.enabled=false",
"cluster.enabled=true",
fmt.Sprintf("replicas=%d", natsServerReplicas),
fmt.Sprintf("cluster.noAdvertise=%t", noAdvertise),
natsNamespace,
nats.NatsJetStreamName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
Expand Down

0 comments on commit efc8b83

Please sign in to comment.