Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Support custom metrics rules for PrometheusSink #3493

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2b7ae9f
Support Java 11
nwangtw Oct 26, 2019
b7285cb
config travis to use oracle jdk 11
nwangtw Oct 28, 2019
ec182a1
Java 11 support (#3399)
thinker0 Nov 11, 2019
3e33f91
Fix command arguments.
thinker0 Jan 10, 2020
e037910
Add missing parameter
thinker0 Jan 12, 2020
a39e2e8
typo
thinker0 Jan 12, 2020
cd1612a
Add pause time
thinker0 Jan 12, 2020
e00fea5
wip
thinker0 Feb 5, 2020
6d70522
Support jmx_exporter format configuration.
thinker0 Feb 6, 2020
43ce355
Fix checkstyle
thinker0 Feb 6, 2020
c4ed43b
Remove unused
thinker0 Feb 14, 2020
56c5cd8
Java 11 support (#3399)
thinker0 Nov 11, 2019
f8a1657
Fix command arguments.
thinker0 Jan 10, 2020
b4aff29
wip
thinker0 Feb 5, 2020
c6db7f7
Support jmx_exporter format configuration.
thinker0 Feb 6, 2020
f248de6
Fix checkstyle
thinker0 Feb 6, 2020
08b4db9
Remove unused
thinker0 Feb 14, 2020
249c66d
Merge branches 'feature/fix-prometheus-metrics' and 'feature/fix-prom…
thinker0 Feb 21, 2020
2b88a78
Merge branch 'master' into feature/fix-prometheus-metrics
thinker0 Mar 17, 2020
61ff45f
Update kafkaOffset metrics
thinker0 Mar 20, 2020
4b64e6f
Add Rules
thinker0 Mar 21, 2020
d013700
Make log/sink/consume Streamlet component support setName and setNumP…
nwangtw Feb 21, 2020
cfbd8ff
Patch to fix cppcheck with newer glibc (#3471)
nicknezis Feb 29, 2020
e8d0bdc
Add documents for setting up a docker based development environment (…
nwangtw Mar 3, 2020
fd5acb4
Improve concurrency for needed parts. (#3107)
thinker0 Mar 4, 2020
079fddd
Update kafkaOffset metrics
thinker0 Mar 20, 2020
15046cc
Add Rules
thinker0 Mar 21, 2020
a758486
Merge branch 'feature/fix-prometheus-metrics' of https://github.com/t…
thinker0 Mar 21, 2020
775bfe7
Update line is longer than 100 characters
thinker0 Mar 22, 2020
89f6357
Update line is longer than 100 characters
thinker0 Mar 22, 2020
5b871a7
Add attrNameSnakeCase or other metrics fix
thinker0 Mar 31, 2020
e176ef0
fix checkstyle
thinker0 Mar 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
package org.apache.heron.metricsmgr.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.cache.Cache;
Expand All @@ -33,6 +40,9 @@
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;

import static java.lang.String.format;
import static org.apache.heron.metricsmgr.sink.PrometheusSink.Prometheus.sanitizeMetricName;

/**
* A web sink that exposes and endpoint that Prometheus can scrape
*
Expand All @@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink {

// This is the cache that is used to serve the metrics
private Cache<String, Map<String, Double>> metricsCache;
private List<Rule> rules = new ArrayList<Rule>();

private String cluster;
private String role;
Expand All @@ -66,13 +77,91 @@ public PrometheusSink() {
super();
}

private enum Type {
COUNTER,
GAUGE,
SUMMARY,
HISTOGRAM,
UNTYPED,
}

private static class Rule {
public Pattern pattern;
public String name;
public String value;
public Double valueFactor = 1.0;
public String help;
public boolean attrNameSnakeCase;
public Type type = Type.UNTYPED;
public ArrayList<String> labelNames;
public ArrayList<String> labelValues;
}

@Override
void initialize(Map<String, Object> configuration, SinkContext context) {
metricsCache = createCache();

cluster = context.getCluster();
role = context.getRole();
environment = context.getEnvironment();

if (configuration.containsKey("rules")) {
List<Map<String, Object>> configRules = (List<Map<String, Object>>)
configuration.get("rules");
for (Map<String, Object> ruleObject : configRules) {
Rule rule = new Rule();
rules.add(rule);
if (ruleObject.containsKey("pattern")) {
rule.pattern = Pattern.compile("^.*(?:" + (String) ruleObject.get("pattern") + ").*$");
}
if (ruleObject.containsKey("name")) {
rule.name = (String) ruleObject.get("name");
}
if (ruleObject.containsKey("value")) {
rule.value = String.valueOf(ruleObject.get("value"));
}
if (ruleObject.containsKey("valueFactor")) {
String valueFactor = String.valueOf(ruleObject.get("valueFactor"));
try {
rule.valueFactor = Double.valueOf(valueFactor);
} catch (NumberFormatException e) {
// use default value
}
}
if (ruleObject.containsKey("attrNameSnakeCase")) {
rule.attrNameSnakeCase = (Boolean) ruleObject.get("attrNameSnakeCase");
}
if (ruleObject.containsKey("type")) {
rule.type = Type.valueOf((String) ruleObject.get("type"));
}
if (ruleObject.containsKey("help")) {
rule.help = (String) ruleObject.get("help");
}
if (ruleObject.containsKey("labels")) {
TreeMap labels = new TreeMap((Map<String, Object>) ruleObject.get("labels"));
rule.labelNames = new ArrayList<String>();
rule.labelValues = new ArrayList<String>();
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) labels
.entrySet()) {
rule.labelNames.add(entry.getKey());
rule.labelValues.add((String) entry.getValue());
}
}

// Validation.
if ((rule.labelNames != null || rule.help != null) && rule.name == null) {
throw new IllegalArgumentException("Must provide name, if help or labels are given: "
+ ruleObject);
}
if (rule.name != null && rule.pattern == null) {
throw new IllegalArgumentException("Must provide pattern, if name is given: "
+ ruleObject);
}
}
} else {
// Default to a single default rule.
rules.add(new Rule());
}
}

@Override
Expand All @@ -82,6 +171,9 @@ byte[] generateResponse() throws IOException {
final StringBuilder sb = new StringBuilder();

metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
// Map the labels.
final Map<String, String> labelKV = new TreeMap<String, String>();

String[] sources = source.split("/");
String topology = sources[0];
String component = sources[1];
Expand All @@ -96,6 +188,18 @@ byte[] generateResponse() throws IOException {
final String clusterRoleEnv = hasClusterRoleEnvironment(c, r, e)
? String.format("%s/%s/%s", c, r, e) : null;

labelKV.put("topology", topology);
labelKV.put("component", component);
labelKV.put("instance_id", instance);

if (clusterRoleEnv != null) {
labelKV.put("cluster_role_env", clusterRoleEnv);
}

if (componentType != null) {
labelKV.put("component_type", componentType);
}

sourceMetrics.forEach((String metric, Double value) -> {

// some stream manager metrics in heron contain a instance id as part of the metric name
Expand All @@ -104,46 +208,79 @@ byte[] generateResponse() throws IOException {
// __time_spent_back_pressure_by_compid/container_1_exclaim1_1
// TODO convert to small classes for less string manipulation
final String metricName;
final String metricInstanceId;
if (componentIsStreamManger) {
final boolean metricHasInstanceId = metric.contains("_by_");
final String[] metricParts = metric.split("/");
if (metricHasInstanceId && metricParts.length == 3) {
metricName = String.format("%s_%s", metricParts[0], metricParts[2]);
metricInstanceId = metricParts[1];
metricName = splitTargetInstance(metricParts[0], metricParts[2], labelKV);
labelKV.put("metric_instance_id", metricParts[1]);
} else if (metricHasInstanceId && metricParts.length == 2) {
metricName = metricParts[0];
metricInstanceId = metricParts[1];
metricName = splitTargetInstance(metricParts[0], null, labelKV);
labelKV.put("metric_instance_id", metricParts[1]);
} else if (metricParts.length == 2) {
metricName = splitTargetInstance(metricParts[0], metricParts[1], labelKV);
} else {
metricName = metric;
metricInstanceId = null;
metricName = splitTargetInstance(metric, null, labelKV);
}

} else {
metricName = metric;
metricInstanceId = null;
}

String exportedMetricName = String.format("%s_%s", HERON_PREFIX,
metricName.replace("__", "").toLowerCase());
sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
.append("{")
.append("topology=\"").append(topology).append("\",")
.append("component=\"").append(component).append("\",")
.append("instance_id=\"").append(instance).append("\"");

if (clusterRoleEnv != null) {
sb.append(",cluster_role_env=\"").append(clusterRoleEnv).append("\"");
}

if (componentType != null) {
sb.append(",component_type=\"").append(componentType).append("\"");
}

if (metricInstanceId != null) {
sb.append(",metric_instance_id=\"").append(metricInstanceId).append("\"");
final AtomicReference<String> name = new AtomicReference<>(sanitizeMetricName(metric));
rules.forEach(rule -> {
String ruleName = name.get();
Matcher matcher = null;
if (rule.pattern != null) {
matcher = rule.pattern.matcher(metric);
if (!matcher.matches()) {
return;
}
}

// If there's no name provided, use default export format.
if (rule.name == null || rule.name.isEmpty()) {
// nothing
} else {
// Matcher is set below here due to validation in the constructor.
ruleName = sanitizeMetricName(matcher.replaceAll(rule.name));
if (ruleName.isEmpty()) {
return;
}
}
if (rule.attrNameSnakeCase) {
name.set(toSnakeAndLowerCase(ruleName));
} else {
name.set(ruleName.toLowerCase());
}
if (rule.labelNames != null) {
for (int i = 0; i < rule.labelNames.size(); i++) {
final String unsafeLabelName = rule.labelNames.get(i);
final String labelValReplacement = rule.labelValues.get(i);
String labelName = sanitizeMetricName(matcher.replaceAll(unsafeLabelName));
String labelValue = matcher.replaceAll(labelValReplacement);
labelName = labelName.toLowerCase();
if (!labelName.isEmpty() && !labelValue.isEmpty()) {
labelKV.put(labelName, labelValue);
}
}
}
});
metricName = name.get();
}

// TODO Type, Help
String exportedMetricName = format("%s_%s", HERON_PREFIX,
metricName
.replace("__", "")
.toLowerCase());
sb.append(sanitizeMetricName(exportedMetricName))
.append("{");
final AtomicBoolean isFirst = new AtomicBoolean(true);
labelKV.forEach((k, v) -> {
// Add labels
if (!isFirst.get()) {
sb.append(',');
}
sb.append(format("%s=\"%s\"", k, v));
isFirst.set(false);
});
sb.append("} ")
.append(Prometheus.doubleToGoString(value))
.append(" ").append(currentTimeMillis())
Expand All @@ -154,6 +291,45 @@ byte[] generateResponse() throws IOException {
return sb.toString().getBytes();
}

private static final Pattern SPLIT_TARGET = Pattern.compile("__(?<name>\\w+)"
+ "_(?<target>(?<instance>\\w+)-\\d+)");
private static final Pattern DIGIT = Pattern.compile("[0-9]+");

private String splitTargetInstance(String part1, String part2, Map<String, String> labelKV) {
if (part2 != null) {
if (DIGIT.matcher(part2).matches()) {
labelKV.put("metric_instance_id", part2);
return part1;
}
final Matcher m = SPLIT_TARGET.matcher(part1);
if (m.matches()) {
labelKV.put("metric_instance_id", m.group("target"));
return String.format("%s_%s_%s", m.group("name"), m.group("instance"), part2);
}
return String.format("%s_%s", part1, part2);
}
return part1;
}

static String toSnakeAndLowerCase(String attrName) {
if (attrName == null || attrName.isEmpty()) {
return attrName;
}
char firstChar = attrName.subSequence(0, 1).charAt(0);
boolean prevCharIsUpperCaseOrUnderscore = Character.isUpperCase(firstChar) || firstChar == '_';
StringBuilder resultBuilder = new StringBuilder(attrName.length())
.append(Character.toLowerCase(firstChar));
for (char attrChar : attrName.substring(1).toCharArray()) {
boolean charIsUpperCase = Character.isUpperCase(attrChar);
if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase) {
resultBuilder.append("_");
}
resultBuilder.append(Character.toLowerCase(attrChar));
prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_';
}
return resultBuilder.toString();
}

@Override
public void processRecord(MetricsRecord record) {
final String[] sources = MetricsUtil.splitRecordSource(record);
Expand Down
Loading