From 29c4a0241c4e771a10f78794ce75a8cfb59f2906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melisa=20Tanr=C4=B1verdi?= Date: Mon, 3 Jan 2022 13:06:10 +0300 Subject: [PATCH] Add ActiveMQ scaler (#2305) Signed-off-by: melisatanrverdi Signed-off-by: alex60217101990 --- CHANGELOG.md | 2 + pkg/scalers/activemq_scaler.go | 278 ++++++++++++++++ pkg/scalers/activemq_scaler_test.go | 273 ++++++++++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/activemq.test.ts | 486 ++++++++++++++++++++++++++++ 5 files changed, 1041 insertions(+) create mode 100644 pkg/scalers/activemq_scaler.go create mode 100644 pkg/scalers/activemq_scaler_test.go create mode 100644 tests/scalers/activemq.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c4272606554..be92e9c3c48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ ### New +- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305)) + - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) - Add New PredictKube Scaler ([#2418](https://github.com/kedacore/keda/pull/2418)) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go new file mode 100644 index 00000000000..5e854ad26f2 --- /dev/null +++ b/pkg/scalers/activemq_scaler.go @@ -0,0 +1,278 @@ +package scalers + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "text/template" + + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type activeMQScaler struct { + metadata *activeMQMetadata + httpClient *http.Client +} + +type activeMQMetadata struct { + managementEndpoint string + destinationName string + brokerName string + username string + password string + restAPITemplate string + targetQueueSize int + metricName string + scalerIndex int +} + +type activeMQMonitoring struct { + MsgCount int `json:"value"` + Status int `json:"status"` + Timestamp int64 `json:"timestamp"` +} + +const ( + defaultTargetQueueSize = 10 + defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" +) + +var activeMQLog = logf.Log.WithName("activeMQ_scaler") + +// NewActiveMQScaler creates a new activeMQ Scaler +func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseActiveMQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing ActiveMQ metadata: %s", err) + } + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + return &activeMQScaler{ + metadata: meta, + httpClient: httpClient, + }, nil +} + +func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { + meta := activeMQMetadata{} + + if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" { + meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"] + var err error + if meta, err = getRestAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err) + } + } else { + meta.restAPITemplate = defaultActiveMQRestAPITemplate + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["destinationName"] == "" { + return nil, errors.New("no destination name given") + } + meta.destinationName = config.TriggerMetadata["destinationName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] + } + + if val, ok := config.TriggerMetadata["targetQueueSize"]; ok { + queueSize, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") + } + + meta.targetQueueSize = queueSize + } else { + meta.targetQueueSize = defaultTargetQueueSize + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { + username := val + + if val, ok := config.ResolvedEnv[username]; ok && val != "" { + meta.username = val + } else { + meta.username = username + } + } + + if meta.username == "" { + return nil, fmt.Errorf("username cannot be empty") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { + password := val + + if val, ok := config.ResolvedEnv[password]; ok && val != "" { + meta.password = val + } else { + meta.password = password + } + } + + if meta.password == "" { + return nil, fmt.Errorf("password cannot be empty") + } + + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName))) + + meta.scalerIndex = config.ScalerIndex + + return &meta, nil +} + +func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return false, err + } + + return queueSize > 0, nil +} + +// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName +func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { + u, err := url.ParseRequestURI(meta.restAPITemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + replacer := strings.NewReplacer(",", "&") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + if len(v["destinationName"][0]) == 0 { + return meta, errors.New("no destinationName is given") + } + meta.destinationName = v["destinationName"][0] + + if len(v["brokerName"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + } + meta.brokerName = v["brokerName"][0] + + return meta, nil +} + +func (s *activeMQScaler) getMonitoringEndpoint() (string, error) { + var buf bytes.Buffer + endpoint := map[string]string{ + "ManagementEndpoint": s.metadata.managementEndpoint, + "BrokerName": s.metadata.brokerName, + "DestinationName": s.metadata.destinationName, + } + template, err := template.New("monitoring_endpoint").Parse(defaultActiveMQRestAPITemplate) + if err != nil { + return "", fmt.Errorf("error parsing template: %s", err) + } + err = template.Execute(&buf, endpoint) + if err != nil { + return "", fmt.Errorf("error executing template: %s", err) + } + monitoringEndpoint := buf.String() + return monitoringEndpoint, nil +} + +func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) { + var monitoringInfo *activeMQMonitoring + var queueMessageCount int + + client := s.httpClient + url, err := s.getMonitoringEndpoint() + if err != nil { + return -1, err + } + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return -1, err + } + + // Add HTTP Auth and Headers + req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { + return -1, err + } + if resp.StatusCode == 200 && monitoringInfo.Status == 200 { + queueMessageCount = monitoringInfo.MsgCount + } else { + return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) + } + + activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) + + return queueMessageCount, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.targetQueueSize), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + return nil, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(queueSize), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return []external_metrics.ExternalMetricValue{metric}, nil +} + +func (s *activeMQScaler) Close(context.Context) error { + return nil +} diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go new file mode 100644 index 00000000000..46cbabbddee --- /dev/null +++ b/pkg/scalers/activemq_scaler_test.go @@ -0,0 +1,273 @@ +package scalers + +import ( + "context" + "fmt" + "net/http" + "testing" +) + +const ( + testInvalidRestAPITemplate = "testInvalidRestAPITemplate" +) + +type parseActiveMQMetadataTestData struct { + name string + metadata map[string]string + authParams map[string]string + isError bool +} + +type activeMQMetricIdentifier struct { + metadataTestData *parseActiveMQMetadataTestData + scalerIndex int + name string +} + +// Setting metric identifier mock name +var activeMQMetricIdentifiers = []activeMQMetricIdentifier{ + {&testActiveMQMetadata[1], 0, "s0-activemq-testQueue"}, + {&testActiveMQMetadata[9], 1, "s1-activemq-testQueue"}, +} + +var testActiveMQMetadata = []parseActiveMQMetadataTestData{ + { + name: "nothing passed", + metadata: map[string]string{}, + authParams: map[string]string{}, + isError: true, + }, + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "no metricName passed, metricName is generated from destinationName", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "Invalid targetQueueSize using a string", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "AA", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing management endpoint should fail", + metadata: map[string]string{ + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing destination name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing broker name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, + { + name: "properly formed metadata with restAPITemplate", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "invalid restAPITemplate, should fail", + metadata: map[string]string{ + "restAPITemplate": testInvalidRestAPITemplate, + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, +} + +func TestParseActiveMQMetadata(t *testing.T) { + for _, testData := range testActiveMQMetadata { + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { + t.Error("Expected password from configuration but found something else: ", metadata.password) + fmt.Println(testData) + } + }) + } +} + +var testDefaultTargetQueueSize = []parseActiveMQMetadataTestData{ + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, +} + +func TestParseDefaultTargetQueueSize(t *testing.T) { + for _, testData := range testDefaultTargetQueueSize { + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + switch { + case err != nil && !testData.isError: + t.Error("Expected success but got error", err) + case testData.isError && err == nil: + t.Error("Expected error but got success") + case metadata.targetQueueSize != defaultTargetQueueSize: + t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) + } + }) + } +} + +func TestActiveMQGetMetricSpecForScaling(t *testing.T) { + for _, testData := range activeMQMetricIdentifiers { + ctx := context.Background() + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockActiveMQScaler := activeMQScaler{ + metadata: metadata, + httpClient: http.DefaultClient, + } + + metricSpec := mockActiveMQScaler.GetMetricSpecForScaling(ctx) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 7e3776686eb..b906795789c 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -327,6 +327,8 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { + case "activemq": + return scalers.NewActiveMQScaler(config) case "artemis-queue": return scalers.NewArtemisQueueScaler(config) case "aws-cloudwatch": diff --git a/tests/scalers/activemq.test.ts b/tests/scalers/activemq.test.ts new file mode 100644 index 00000000000..a0a4391ec1a --- /dev/null +++ b/tests/scalers/activemq.test.ts @@ -0,0 +1,486 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from './helpers' + +const activeMQNamespace = 'activemq-test' +const activemqConf = '/opt/apache-activemq-5.16.3/conf' +const activemqHome = '/opt/apache-activemq-5.16.3' +const activeMQPath = 'bin/activemq' +const activeMQUsername = 'admin' +const activeMQPassword = 'admin' +const destinationName = 'testQ' +const nginxDeploymentName = 'nginx-deployment' + +test.before(t => { + // install ActiveMQ + sh.exec(`kubectl create namespace ${activeMQNamespace}`) + const activeMQTmpFile = tmp.fileSync() + fs.writeFileSync(activeMQTmpFile.name, activeMQDeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${activeMQTmpFile.name}`).code, 'creating ActiveMQ deployment should work.') + t.is(0, waitForRollout('deployment', "activemq", activeMQNamespace)) + + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout + + // ActiveMQ ready check + let activeMQReady + for (let i = 0; i < 30; i++) { + activeMQReady = sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- curl -u ${activeMQUsername}:${activeMQPassword} -s http://localhost:8161/api/jolokia/exec/org.apache.activemq:type=Broker,brokerName=localhost,service=Health/healthStatus | sed -e 's/[{}]/''/g' | awk -v RS=',"' -F: '/^status/ {print $2}'`) + if (activeMQReady != 200) { + sh.exec('sleep 5s') + } + else { + break + } + } + + // deploy Nginx, scaledobject etc. + const nginxTmpFile = tmp.fileSync() + fs.writeFileSync(nginxTmpFile.name, nginxDeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${nginxTmpFile.name}`).code, 'creating Nginx deployment should work.') + t.is(0, waitForRollout('deployment', "nginx-deployment", activeMQNamespace)) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial('Deployment should scale to 5 (the max) with 1000 messages on the queue then back to 0', t => { + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout + + // produce 1000 messages to ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} producer --destination ${destinationName} --messageCount 1000`).code, + 'produce 1000 message to the ActiveMQ queue' + ) + + let replicaCount = '0' + const maxReplicaCount = '5' + + for (let i = 0; i < 30 && replicaCount !== maxReplicaCount; i++) { + replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== maxReplicaCount) { + sh.exec('sleep 2s') + } + } + t.is(maxReplicaCount, replicaCount, `Replica count should be ${maxReplicaCount} after 60 seconds`) + sh.exec('sleep 30s') + + // consume all messages from ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} consumer --destination ${destinationName} --messageCount 1000`).code, + 'consume all messages' + ) + + for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') + +}) + +test.after.always((t) => { + t.is(0, sh.exec(`kubectl delete namespace ${activeMQNamespace}`).code, 'Should delete ActiveMQ namespace') +}) + +const activeMQDeployYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: activemq-app + name: activemq +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: activemq-app + template: + metadata: + labels: + app: activemq-app + spec: + containers: + - image: symptoma/activemq:5.16.3 + imagePullPolicy: IfNotPresent + name: activemq + ports: + - containerPort: 61616 + name: jmx + protocol: TCP + - containerPort: 8161 + name: ui + protocol: TCP + - containerPort: 61616 + name: openwire + protocol: TCP + - containerPort: 5672 + name: amqp + protocol: TCP + - containerPort: 61613 + name: stomp + protocol: TCP + - containerPort: 1883 + name: mqtt + protocol: TCP + resources: + requests: + memory: 500Mi + cpu: 200m + limits: + memory: 1000Mi + cpu: 400m + volumeMounts: + - name: activemq-config + mountPath: /opt/apache-activemq-5.16.3/webapps/api/WEB-INF/classes/jolokia-access.xml + subPath: jolokia-access.xml + - name: remote-access-cm + mountPath: /opt/apache-activemq-5.16.3/conf/jetty.xml + subPath: jetty.xml + volumes: + - name: activemq-config + configMap: + name: activemq-config + items: + - key: jolokia-access.xml + path: jolokia-access.xml + - name: remote-access-cm + configMap: + name: remote-access-cm + items: + - key: jetty.xml + path: jetty.xml +--- +apiVersion: v1 +kind: Service +metadata: + name: activemq +spec: + type: ClusterIP + selector: + app: activemq-app + ports: + - name: dashboard + port: 8161 + targetPort: 8161 + protocol: TCP + - name: openwire + port: 61616 + targetPort: 61616 + protocol: TCP + - name: amqp + port: 5672 + targetPort: 5672 + protocol: TCP + - name: stomp + port: 61613 + targetPort: 61613 + protocol: TCP + - name: mqtt + port: 1883 + targetPort: 1883 + protocol: TCP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: activemq-config +data: + jolokia-access.xml: | + + + + 0.0.0.0/0 + + + + + com.sun.management:type=DiagnosticCommand + * + * + + + com.sun.management:type=HotSpotDiagnostic + * + * + + + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: remote-access-cm +data: + jetty.xml: | + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + index.html + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +` +const nginxDeployYaml = ` +apiVersion: v1 +kind: Secret +metadata: + name: activemq-secret +type: Opaque +data: + activemq-password: YWRtaW4= + activemq-username: YWRtaW4= +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: trigger-auth-activemq +spec: + secretTargetRef: + - parameter: username + name: activemq-secret + key: activemq-username + - parameter: password + name: activemq-secret + key: activemq-password +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: nginx + name: ${nginxDeploymentName} +spec: + replicas: 0 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - image: nginx + name: nginx + ports: + - containerPort: 80 +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: activemq-scaledobject + labels: + deploymentName: ${nginxDeploymentName} +spec: + scaleTargetRef: + name: ${nginxDeploymentName} + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: activemq + metadata: + managementEndpoint: "activemq.${activeMQNamespace}:8161" + destinationName: "testQ" + brokerName: "localhost" + authenticationRef: + name: trigger-auth-activemq +`