From db0cc0b8454128b67a39d5bc038084b84d47b39e Mon Sep 17 00:00:00 2001 From: ipatini <3739531+ipatini@users.noreply.github.com> Date: Tue, 14 Jan 2025 19:11:24 +0200 Subject: [PATCH] Added support for tag-based Prometheus metric filtering in PrometheusCollector2 (by using 'allowed-tags' config. setting). Improved EMS client bin/run.sh script (#50) --- nebulous/ems-core/baguette-client/bin/run.sh | 39 +++++---- .../prometheus/PrometheusCollector2.java | 81 ++++++++++++++++--- 2 files changed, 91 insertions(+), 29 deletions(-) diff --git a/nebulous/ems-core/baguette-client/bin/run.sh b/nebulous/ems-core/baguette-client/bin/run.sh index 0fbc402..83f3632 100644 --- a/nebulous/ems-core/baguette-client/bin/run.sh +++ b/nebulous/ems-core/baguette-client/bin/run.sh @@ -69,28 +69,33 @@ JAVA_OPTS="${JAVA_OPTS} -Djasypt.encryptor.password=$JASYPT_PASSWORD" JAVA_OPTS="${JAVA_OPTS} --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" # Print settings -echo "Starting baguette client..." -echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}" -echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}" -echo "LOG_FILE=${LOG_FILE}" - -echo "Starting baguette client..." &>> ${LOG_FILE} -echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}" &>> ${LOG_FILE} -echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}" &>> ${LOG_FILE} -echo "LOG_FILE=${LOG_FILE}" &>> ${LOG_FILE} +echo "" | tee -a ${LOG_FILE} +echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}" | tee -a ${LOG_FILE} +echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}" | tee -a ${LOG_FILE} +echo "LOG_FILE=${LOG_FILE}" | tee -a ${LOG_FILE} +echo "UNAME=$(uname -a)" | tee -a ${LOG_FILE} +echo "" | tee -a ${LOG_FILE} # Run Baguette Client if [ "$1" == "--i" ]; then echo "Baguette client running in Interactive mode" - java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee ${TEE_FILE} + java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee -a ${TEE_FILE} else - java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* &>> ${LOG_FILE} & - if command -v jps - then - PID=`jps | grep BaguetteClient | cut -d " " -f 1` - PID=`ps -ef |grep java |grep BaguetteClient | cut -c 10-14` - echo "Baguette client PID: $PID" - fi + # Setup TERM & INT signal handler + trap "echo \"Signaled EMS client to exit\" | tee -a ${LOG_FILE}" SIGTERM SIGINT + + # Run Baguette Client + echo "Starting Baguette client..." | tee -a ${LOG_FILE} + java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee -a ${LOG_FILE} & + PID=$! + echo "Baguette client PID: $PID" | tee -a ${LOG_FILE} + + #if command -v jps + #then + # PID=`jps | grep BaguetteClient | cut -d " " -f 1` + # PID=`ps -ef |grep java |grep BaguetteClient | cut -c 10-14` + # echo "Baguette client PID: $PID" + #fi fi cd $PREVWORKDIR \ No newline at end of file diff --git a/nebulous/ems-core/baguette-client/src/main/java/gr/iccs/imu/ems/baguette/client/collector/prometheus/PrometheusCollector2.java b/nebulous/ems-core/baguette-client/src/main/java/gr/iccs/imu/ems/baguette/client/collector/prometheus/PrometheusCollector2.java index a10cfdb..3987cc4 100644 --- a/nebulous/ems-core/baguette-client/src/main/java/gr/iccs/imu/ems/baguette/client/collector/prometheus/PrometheusCollector2.java +++ b/nebulous/ems-core/baguette-client/src/main/java/gr/iccs/imu/ems/baguette/client/collector/prometheus/PrometheusCollector2.java @@ -135,12 +135,13 @@ private void applyNewConfigurations() { String destination = config.getOrDefault("name", "").toString(); String prometheusMetric = getPrometheusMetric(config); String url = getUrlPattern(config); + Map> allowedTags = getAllowedTags(config); Duration delay = getDelay(config); Duration period = getInterval(config); Instant startsAt = startInstant.plus(delay); scrapingTasks.add(taskScheduler.scheduleAtFixedRate( - () -> scrapeEndpoint(url, prometheusMetric, destination), startsAt, period)); + () -> scrapeEndpoint(url, prometheusMetric, allowedTags, destination), startsAt, period)); log.info("Collectors::{}: Added monitoring task: prometheus-metric={}, destination={}, url={}, starts-at={}, period={}", collectorId, prometheusMetric, destination, url, startsAt, period); } else @@ -207,6 +208,44 @@ private String getUrlPattern(Map config) { return null; } + // Expected 'allowed-tags' format: + // allowed-rags ==> =,...,;=; + // Meaning: + // =,..., measurements with 'TAG-1' and value is any of 'VALUE-1' to 'VALUE-N' are accepted + // = measurements with 'TAG-1' and value equals to 'VALUE-1' is accepted + // = measurements with 'TAG-2' and empty value '' is accepted (special case of above) + // measurements with 'TAG-3' and any value is accepted + private Map> getAllowedTags(Map config) { + if (config.get("configuration") instanceof Map configMap) { + String tagsStr = configMap.getOrDefault("allowed-tags", "").toString(); + if (StringUtils.isNotBlank(tagsStr)) { + Map> allowedTags = null; + String[] pairs = tagsStr.split(";"); + for (String pair : pairs) { + if (StringUtils.isNotBlank(pair)) { + pair = pair.trim(); + String[] part = pair.split("=", 2); + String tag = part[0].trim(); + String tagValStr = part.length>1 ? part[1].trim() : null; + if (StringUtils.isNotBlank(tag)) { + String[] vals = tagValStr!=null ? tagValStr.split(",") : null; + HashSet valSet = null; + if (vals!=null) { + for (int i = 0; i < vals.length; i++) vals[i] = vals[i].trim(); + valSet = new LinkedHashSet<>(Arrays.asList(vals)); + } + if (allowedTags==null) + allowedTags = new LinkedHashMap<>(); + allowedTags.put(tag, valSet); + } + } + } + return allowedTags; + } + } + return null; + } + private Duration getDelay(Map config) { if (config.get("configuration") instanceof Map configMap) { long delay = Long.parseLong(configMap.getOrDefault("delay", DEFAULT_DELAY).toString()); @@ -241,7 +280,7 @@ private void initRestClientAndParser() { this.openMetricsParser = new OpenMetricsParser(); } - private void scrapeEndpoint(String urlPattern, String prometheusMetric, String destination) { + private void scrapeEndpoint(String urlPattern, String prometheusMetric, Map> allowedTags, String destination) { log.debug("Collectors::{}: scrapeEndpoint: BEGIN: Scraping Prometheus endpoints for sensor: url-pattern={}, prometheusMetric={}, destination={}", collectorId, urlPattern, prometheusMetric, destination); @@ -274,16 +313,16 @@ private void scrapeEndpoint(String urlPattern, String prometheusMetric, String d log.trace("Collectors::{}: scrapeEndpoint: Parsed payload: {} -- Metrics:\n{}", collectorId, node, results); } - // Get values for the requested metric + // Get values for the requested metric (and tags if provided) if (results != null) { List matches = results.stream() - .filter(m -> m.getMetricName().equalsIgnoreCase(prometheusMetric)).toList(); + .filter(m -> m.getMetricName().equalsIgnoreCase(prometheusMetric)) + .filter(m -> matchAnyAllowedTag(m.getTags(), allowedTags)) + .toList(); log.trace("Collectors::{}: scrapeEndpoint: Found metric: {} -- Metric(s):\n{}", collectorId, node, matches); - List values = matches.stream().map(OpenMetricsParser.MetricInstance::getMetricValue).toList(); - log.trace("Collectors::{}: scrapeEndpoint: Metric value(s): {} -- Value(s):\n{}", collectorId, node, values); // Publish extracted values - queueForPublish(prometheusMetric, destination, values, node, url); + queueForPublish(prometheusMetric, destination, matches, node, url); } log.trace("Collectors::{}: scrapeEndpoint: Done scraping node: {} -- Endpoint: {}", collectorId, node, url); @@ -299,15 +338,33 @@ private void scrapeEndpoint(String urlPattern, String prometheusMetric, String d log.debug("Collectors::{}: scrapeEndpoint: END", collectorId); } - private void queueForPublish(String prometheusMetric, String destination, List values, Serializable node, String endpoint) { - log.debug("Collectors::{}: queueForPublish: metric={}, destination={}, values={}, node={}, endpoint={}", - collectorId, node, prometheusMetric, destination, values, endpoint); - values.forEach(v -> { - EventMap event = new EventMap(v, 1); + private boolean matchAnyAllowedTag(Map tags, Map> allowedTags) { + log.trace("Collectors::{}: matchAnyAllowedTag: BEGIN: tags={}, allowed-tags={}", collectorId, tags, allowedTags); + if (allowedTags==null || allowedTags.isEmpty()) return true; + for (Map.Entry e : tags.entrySet()) { + String k = e.getKey()!=null ? e.getKey().trim() : null; + if (StringUtils.isBlank(k)) continue; + if (allowedTags.containsKey(k)) { + Set allowedTagValues = allowedTags.get(k); + if (allowedTagValues==null) return true; + String v = e.getValue()!=null ? e.getValue().trim() : ""; + if (allowedTagValues.contains(v)) return true; + } + } + return false; + } + + private void queueForPublish(String prometheusMetric, String destination, List metricInstances, Serializable node, String endpoint) { + log.debug("Collectors::{}: queueForPublish: metric={}, destination={}, metricInstances={}, node={}, endpoint={}", + collectorId, node, prometheusMetric, destination, metricInstances, endpoint); + metricInstances.forEach(v -> { + EventMap event = new EventMap(v.getMetricValue(), 1); event.setEventProperty("metric", prometheusMetric); event.setEventProperty("source-node", node); event.setEventProperty("source-endpoint", endpoint); event.setEventProperty("destination-topic", destination); + if (v.getTags()!=null) + v.getTags().forEach((tag,tagValue) -> event.setEventProperty("tag-"+tag, tagValue)); eventsQueue.add(event); }); }