Skip to content

Commit

Permalink
Added support for tag-based Prometheus metric filtering in Prometheus…
Browse files Browse the repository at this point in the history
…Collector2 (by using 'allowed-tags' config. setting). Improved EMS client bin/run.sh script (#50)
  • Loading branch information
ipatini authored Jan 14, 2025
1 parent 4079218 commit db0cc0b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 29 deletions.
39 changes: 22 additions & 17 deletions nebulous/ems-core/baguette-client/bin/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,33 @@ JAVA_OPTS="${JAVA_OPTS} -Djasypt.encryptor.password=$JASYPT_PASSWORD"
JAVA_OPTS="${JAVA_OPTS} --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED"

# Print settings
echo "Starting baguette client..."
echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}"
echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}"
echo "LOG_FILE=${LOG_FILE}"

echo "Starting baguette client..." &>> ${LOG_FILE}
echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}" &>> ${LOG_FILE}
echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}" &>> ${LOG_FILE}
echo "LOG_FILE=${LOG_FILE}" &>> ${LOG_FILE}
echo "" | tee -a ${LOG_FILE}
echo "EMS_CONFIG_DIR=${EMS_CONFIG_DIR}" | tee -a ${LOG_FILE}
echo "EMS_CONFIG_LOCATION=${EMS_CONFIG_LOCATION}" | tee -a ${LOG_FILE}
echo "LOG_FILE=${LOG_FILE}" | tee -a ${LOG_FILE}
echo "UNAME=$(uname -a)" | tee -a ${LOG_FILE}
echo "" | tee -a ${LOG_FILE}

# Run Baguette Client
if [ "$1" == "--i" ]; then
echo "Baguette client running in Interactive mode"
java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee ${TEE_FILE}
java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee -a ${TEE_FILE}
else
java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* &>> ${LOG_FILE} &
if command -v jps
then
PID=`jps | grep BaguetteClient | cut -d " " -f 1`
PID=`ps -ef |grep java |grep BaguetteClient | cut -c 10-14`
echo "Baguette client PID: $PID"
fi
# Setup TERM & INT signal handler
trap "echo \"Signaled EMS client to exit\" | tee -a ${LOG_FILE}" SIGTERM SIGINT

# Run Baguette Client
echo "Starting Baguette client..." | tee -a ${LOG_FILE}
java ${JAVA_OPTS} -classpath "conf:jars/*:target/classes:target/dependency/*" gr.iccs.imu.ems.baguette.client.BaguetteClient "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:${EMS_CONFIG_DIR}/logback-spring.xml" $* 2>&1 | tee -a ${LOG_FILE} &
PID=$!
echo "Baguette client PID: $PID" | tee -a ${LOG_FILE}

#if command -v jps
#then
# PID=`jps | grep BaguetteClient | cut -d " " -f 1`
# PID=`ps -ef |grep java |grep BaguetteClient | cut -c 10-14`
# echo "Baguette client PID: $PID"
#fi
fi

cd $PREVWORKDIR
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ private void applyNewConfigurations() {
String destination = config.getOrDefault("name", "").toString();
String prometheusMetric = getPrometheusMetric(config);
String url = getUrlPattern(config);
Map<String,Set<String>> allowedTags = getAllowedTags(config);
Duration delay = getDelay(config);
Duration period = getInterval(config);
Instant startsAt = startInstant.plus(delay);

scrapingTasks.add(taskScheduler.scheduleAtFixedRate(
() -> scrapeEndpoint(url, prometheusMetric, destination), startsAt, period));
() -> scrapeEndpoint(url, prometheusMetric, allowedTags, destination), startsAt, period));
log.info("Collectors::{}: Added monitoring task: prometheus-metric={}, destination={}, url={}, starts-at={}, period={}",
collectorId, prometheusMetric, destination, url, startsAt, period);
} else
Expand Down Expand Up @@ -207,6 +208,44 @@ private String getUrlPattern(Map<String, Serializable> config) {
return null;
}

// Expected 'allowed-tags' format:
// allowed-rags ==> <TAG-1>=<VALUE-1>,...,<VALUE-N>;<TAG-2>=;<TAG-3>
// Meaning:
// <TAG-1>=<VALUE-1>,...,<VALUE-N> measurements with 'TAG-1' and value is any of 'VALUE-1' to 'VALUE-N' are accepted
// <TAG-1>=<VALUE-1> measurements with 'TAG-1' and value equals to 'VALUE-1' is accepted
// <TAG-2>= measurements with 'TAG-2' and empty value '' is accepted (special case of above)
// <TAG-3> measurements with 'TAG-3' and any value is accepted
private Map<String, Set<String>> getAllowedTags(Map<String, Serializable> config) {
if (config.get("configuration") instanceof Map configMap) {
String tagsStr = configMap.getOrDefault("allowed-tags", "").toString();
if (StringUtils.isNotBlank(tagsStr)) {
Map<String, Set<String>> allowedTags = null;
String[] pairs = tagsStr.split(";");
for (String pair : pairs) {
if (StringUtils.isNotBlank(pair)) {
pair = pair.trim();
String[] part = pair.split("=", 2);
String tag = part[0].trim();
String tagValStr = part.length>1 ? part[1].trim() : null;
if (StringUtils.isNotBlank(tag)) {
String[] vals = tagValStr!=null ? tagValStr.split(",") : null;
HashSet<String> valSet = null;
if (vals!=null) {
for (int i = 0; i < vals.length; i++) vals[i] = vals[i].trim();
valSet = new LinkedHashSet<>(Arrays.asList(vals));
}
if (allowedTags==null)
allowedTags = new LinkedHashMap<>();
allowedTags.put(tag, valSet);
}
}
}
return allowedTags;
}
}
return null;
}

private Duration getDelay(Map<String, Serializable> config) {
if (config.get("configuration") instanceof Map configMap) {
long delay = Long.parseLong(configMap.getOrDefault("delay", DEFAULT_DELAY).toString());
Expand Down Expand Up @@ -241,7 +280,7 @@ private void initRestClientAndParser() {
this.openMetricsParser = new OpenMetricsParser();
}

private void scrapeEndpoint(String urlPattern, String prometheusMetric, String destination) {
private void scrapeEndpoint(String urlPattern, String prometheusMetric, Map<String, Set<String>> allowedTags, String destination) {
log.debug("Collectors::{}: scrapeEndpoint: BEGIN: Scraping Prometheus endpoints for sensor: url-pattern={}, prometheusMetric={}, destination={}",
collectorId, urlPattern, prometheusMetric, destination);

Expand Down Expand Up @@ -274,16 +313,16 @@ private void scrapeEndpoint(String urlPattern, String prometheusMetric, String d
log.trace("Collectors::{}: scrapeEndpoint: Parsed payload: {} -- Metrics:\n{}", collectorId, node, results);
}

// Get values for the requested metric
// Get values for the requested metric (and tags if provided)
if (results != null) {
List<OpenMetricsParser.MetricInstance> matches = results.stream()
.filter(m -> m.getMetricName().equalsIgnoreCase(prometheusMetric)).toList();
.filter(m -> m.getMetricName().equalsIgnoreCase(prometheusMetric))
.filter(m -> matchAnyAllowedTag(m.getTags(), allowedTags))
.toList();
log.trace("Collectors::{}: scrapeEndpoint: Found metric: {} -- Metric(s):\n{}", collectorId, node, matches);
List<Double> values = matches.stream().map(OpenMetricsParser.MetricInstance::getMetricValue).toList();
log.trace("Collectors::{}: scrapeEndpoint: Metric value(s): {} -- Value(s):\n{}", collectorId, node, values);

// Publish extracted values
queueForPublish(prometheusMetric, destination, values, node, url);
queueForPublish(prometheusMetric, destination, matches, node, url);
}

log.trace("Collectors::{}: scrapeEndpoint: Done scraping node: {} -- Endpoint: {}", collectorId, node, url);
Expand All @@ -299,15 +338,33 @@ private void scrapeEndpoint(String urlPattern, String prometheusMetric, String d
log.debug("Collectors::{}: scrapeEndpoint: END", collectorId);
}

private void queueForPublish(String prometheusMetric, String destination, List<Double> values, Serializable node, String endpoint) {
log.debug("Collectors::{}: queueForPublish: metric={}, destination={}, values={}, node={}, endpoint={}",
collectorId, node, prometheusMetric, destination, values, endpoint);
values.forEach(v -> {
EventMap event = new EventMap(v, 1);
private boolean matchAnyAllowedTag(Map<String, String> tags, Map<String, Set<String>> allowedTags) {
log.trace("Collectors::{}: matchAnyAllowedTag: BEGIN: tags={}, allowed-tags={}", collectorId, tags, allowedTags);
if (allowedTags==null || allowedTags.isEmpty()) return true;
for (Map.Entry<String,String> e : tags.entrySet()) {
String k = e.getKey()!=null ? e.getKey().trim() : null;
if (StringUtils.isBlank(k)) continue;
if (allowedTags.containsKey(k)) {
Set<String> allowedTagValues = allowedTags.get(k);
if (allowedTagValues==null) return true;
String v = e.getValue()!=null ? e.getValue().trim() : "";
if (allowedTagValues.contains(v)) return true;
}
}
return false;
}

private void queueForPublish(String prometheusMetric, String destination, List<OpenMetricsParser.MetricInstance> metricInstances, Serializable node, String endpoint) {
log.debug("Collectors::{}: queueForPublish: metric={}, destination={}, metricInstances={}, node={}, endpoint={}",
collectorId, node, prometheusMetric, destination, metricInstances, endpoint);
metricInstances.forEach(v -> {
EventMap event = new EventMap(v.getMetricValue(), 1);
event.setEventProperty("metric", prometheusMetric);
event.setEventProperty("source-node", node);
event.setEventProperty("source-endpoint", endpoint);
event.setEventProperty("destination-topic", destination);
if (v.getTags()!=null)
v.getTags().forEach((tag,tagValue) -> event.setEventProperty("tag-"+tag, tagValue));
eventsQueue.add(event);
});
}
Expand Down

0 comments on commit db0cc0b

Please sign in to comment.