Skip to content

Commit

Permalink
find the stream leader the old way if nodes are not advertised
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Fadhlika <git@fadhlika.com>
  • Loading branch information
mfadhlika committed May 9, 2023
1 parent 8def751 commit 40a7914
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276))
- **Azure Pipelines**: Fix for disallowing `$top` on query when using `meta.parentID` method ([#4397])
- **Azure Pipelines**: Respect all required demands ([#4404](https://github.com/kedacore/keda/issues/4404))
- **NATS Jetstream Scaler**: Fix compatibility if node is not advertised ([#4524](https://github.com/kedacore/keda/issues/4524))
- **Prometheus Metrics**: Create e2e tests for all exposed Prometheus metrics ([#4127](https://github.com/kedacore/keda/issues/4127))
- **Grafana Dashboard**: Fix HPA metrics panel to use range instead of instant ([#4513](https://github.com/kedacore/keda/pull/4513))

Expand Down
79 changes: 59 additions & 20 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ type jetStreamEndpointResponse struct {
}

type jetStreamServerEndpointResponse struct {
ConnectUrls []string `json:"connect_urls"`
ServerName string `json:"server_name"`
ConnectUrls []string `json:"connect_urls"`
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
}

type accountDetail struct {
Expand Down Expand Up @@ -235,27 +240,51 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
return err
}

for _, clusterURL := range jetStreamServerResp.ConnectUrls {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}
isNodeAdvertised := true
clusterUrls := jetStreamServerResp.ConnectUrls
if len(clusterUrls) == 0 {
isNodeAdvertised = false
clusterUrls = append(clusterUrls, jetStreamServerResp.Cluster.HostUrls...)
// append current node's `server_name` to check if it is a leader
// even though `server_name` is not an url, it will be split by first . (dot)
// to get the node anyway
// clusterUrls = append(clusterUrls, jetStreamServerResp.ServerName)
}

// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}
for _, clusterURL := range clusterUrls {
var (
node string
natsJetStreamMonitoringNodeURL string
)

if isNodeAdvertised {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}

node := jetStreamServerResp.ServerName
// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
node = jetStreamServerResp.ServerName

natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
}
} else {
node = strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err = s.getNATSJetStreamMonitoringNodeURLByNode(node)
if err != nil {
return err
}
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL)
Expand Down Expand Up @@ -394,13 +423,23 @@ func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname str
return "", err
}

// set the port to the monitoringURL port if exists
if port := jsURL.Port(); port != "" {
nodeHostname = net.JoinHostPort(nodeHostname, port)
}

return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURLByNode(node string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
consumerName := s.metadata.consumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ var (
)

func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) {
testNATSJetStreamScalerClusterWithStreamReplicas(t, false)
}

func TestNATSJetStreamScalerClusterWithStreamReplicasWithNoAdvertise(t *testing.T) {
testNATSJetStreamScalerClusterWithStreamReplicas(t, true)
}

func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise bool) {
// Create k8s resources.
kc := GetKubernetesClient(t)

// Deploy NATS server.
installClusterWithJetStream(t, kc)
installClusterWithJetStream(t, kc, noAdvertise)
assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)

Expand Down Expand Up @@ -129,18 +137,19 @@ func removeStreamAndConsumer(t *testing.T, streamReplicas int, stream, namespace
}

// installClusterWithJetStream install the nats helm chart with clustered jetstream enabled
func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset) {
func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset, noAdvertise bool) {
CreateNamespace(t, kc, natsNamespace)
_, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand("helm repo update")
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
nats.NatsJetStreamChartVersion,
"nats.jetstream.enabled=true",
"nats.jetstream.fileStorage.enabled=false",
"cluster.enabled=true",
fmt.Sprintf("replicas=%d", natsServerReplicas),
fmt.Sprintf("cluster.noAdvertise=%t", noAdvertise),
natsNamespace,
nats.NatsJetStreamName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
Expand Down

0 comments on commit 40a7914

Please sign in to comment.