diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 9f4b603e98..0409931905 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1773,6 +1773,55 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9250-9260."); + // ------------------------------------------------------------------------ + // ConfigOptions for prometheus push gateway reporter + // ------------------------------------------------------------------------ + public static final ConfigOption METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL = + key("metrics.reporter.prometheus-push.host-url") + .stringType() + .noDefaultValue() + .withDescription( + "The PushGateway server host URL including scheme, host name, and port."); + + public static final ConfigOption METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME = + key("metrics.reporter.prometheus-push.job-name") + .stringType() + .noDefaultValue() + .withDescription("The job name under which metrics will be pushed"); + + public static final ConfigOption + METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX = + key("metrics.reporter.prometheus-push.random-job-name-suffix") + .booleanType() + .defaultValue(true) + .withDescription( + "Specifies whether a random suffix should be appended to the job name. " + + "This is useful when multiple instances of the reporter " + + "are running on the same host."); + + public static final ConfigOption + METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN = + key("metrics.reporter.prometheus-push.delete-on-shutdown") + .booleanType() + .defaultValue(true) + .withDescription( + "Specifies whether to delete metrics from the PushGateway on shutdown. Fluss will try its best to delete the metrics but this is not guaranteed."); + + public static final ConfigOption METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY = + key("metrics.reporter.prometheus-push.grouping-key") + .stringType() + .noDefaultValue() + .withDescription( + "Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2."); + + public static final ConfigOption + METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL = + key("metrics.reporter.prometheus-push.push-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "The interval of pushing metrics to Prometheus PushGateway."); + // ------------------------------------------------------------------------ // ConfigOptions for jmx reporter // ------------------------------------------------------------------------ diff --git a/fluss-metrics/fluss-metrics-prometheus/pom.xml b/fluss-metrics/fluss-metrics-prometheus/pom.xml index 677c8969cc..6ca4367c80 100644 --- a/fluss-metrics/fluss-metrics-prometheus/pom.xml +++ b/fluss-metrics/fluss-metrics-prometheus/pom.xml @@ -54,6 +54,12 @@ ${prometheus.version} + + io.prometheus + simpleclient_pushgateway + ${prometheus.version} + + org.apache.fluss diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java new file mode 100644 index 0000000000..c8d1e955f1 --- /dev/null +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.prometheus; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.HistogramStatistics; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.MetricReporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Base class for Prometheus metric reporters. Contains common logic for metric registration and + * collector management. + */ +public abstract class AbstractPrometheusReporter implements MetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractPrometheusReporter.class); + + private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); + + private static final CharacterFilter CHARACTER_FILTER = + AbstractPrometheusReporter::replaceInvalidChars; + + @VisibleForTesting protected static final char SCOPE_SEPARATOR = '_'; + + @VisibleForTesting protected static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + + private final Map> + collectorsWithCountByMetricName = new HashMap<>(); + + @VisibleForTesting protected final CollectorRegistry registry = new CollectorRegistry(true); + + protected static String replaceInvalidChars(final String input) { + // https://prometheus.io/docs/instrumenting/writing_exporters/ + // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to + // an underscore. + return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); + } + + /** + * Configures the reporter. Subclasses should override this to add their own configuration. + * + * @param config the configuration + */ + @Override + public void open(Configuration config) { + // default no-op + } + + /** + * Closes the reporter and clears the registry. Subclasses should override to add their own + * cleanup logic and call super.close(). + */ + @Override + public void close() { + registry.clear(); + } + + /** + * Called when a new {@link Metric} was added. + * + * @param metric the metric that was added + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + List dimensionKeys = new LinkedList<>(); + List dimensionValues = new LinkedList<>(); + for (final Map.Entry dimension : group.getAllVariables().entrySet()) { + dimensionKeys.add(CHARACTER_FILTER.filterCharacters(dimension.getKey())); + dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); + } + + final String scopedMetricName = getScopedName(metricName, group); + final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; + + final Collector collector; + Integer count = 0; + + synchronized (this) { + if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = + collectorsWithCountByMetricName.get(scopedMetricName); + collector = collectorWithCount.getKey(); + count = collectorWithCount.getValue(); + } else { + collector = + createCollector( + metric, + dimensionKeys, + dimensionValues, + scopedMetricName, + helpString); + try { + collector.register(registry); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}.", metricName, e); + } + } + addMetric(metric, dimensionValues, collector); + collectorsWithCountByMetricName.put( + scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); + } + } + + /** + * Called when a {@link Metric} was removed. + * + * @param metric the metric that should be removed + * @param metricName the name of the metric + * @param group the group that contains the metric + */ + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + List dimensionValues = new LinkedList<>(); + for (final Map.Entry dimension : group.getAllVariables().entrySet()) { + dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); + } + + final String scopedMetricName = getScopedName(metricName, group); + synchronized (this) { + final AbstractMap.SimpleImmutableEntry collectorWithCount = + collectorsWithCountByMetricName.get(scopedMetricName); + final Integer count = collectorWithCount.getValue(); + final Collector collector = collectorWithCount.getKey(); + + removeMetric(metric, dimensionValues, collector); + + if (count == 1) { + try { + registry.unregister(collector); + } catch (Exception e) { + LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e); + } + collectorsWithCountByMetricName.remove(scopedMetricName); + } else { + collectorsWithCountByMetricName.put( + scopedMetricName, + new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); + } + } + } + + protected static String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + + getLogicalScope(group) + + SCOPE_SEPARATOR + + CHARACTER_FILTER.filterCharacters(metricName); + } + + protected static String getLogicalScope(MetricGroup group) { + return group.getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); + } + + protected Collector createCollector( + Metric metric, + List dimensionKeys, + List dimensionValues, + String scopedMetricName, + String helpString) { + Collector collector; + switch (metric.getMetricType()) { + case GAUGE: + case COUNTER: + case METER: + collector = + io.prometheus.client.Gauge.build() + .name(scopedMetricName) + .help(helpString) + .labelNames(toArray(dimensionKeys)) + .create(); + break; + case HISTOGRAM: + collector = + new HistogramSummaryProxy( + (Histogram) metric, + scopedMetricName, + helpString, + dimensionKeys, + dimensionValues); + break; + default: + LOG.warn( + "Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + collector = null; + } + return collector; + } + + protected void addMetric(Metric metric, List dimensionValues, Collector collector) { + switch (metric.getMetricType()) { + case GAUGE: + ((io.prometheus.client.Gauge) collector) + .setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); + break; + case COUNTER: + ((io.prometheus.client.Gauge) collector) + .setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); + break; + case METER: + ((io.prometheus.client.Gauge) collector) + .setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); + break; + case HISTOGRAM: + ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); + break; + default: + LOG.warn( + "Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + } + } + + protected void removeMetric(Metric metric, List dimensionValues, Collector collector) { + switch (metric.getMetricType()) { + case GAUGE: + case COUNTER: + case METER: + ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues)); + break; + case HISTOGRAM: + ((HistogramSummaryProxy) collector).remove(dimensionValues); + break; + default: + LOG.warn( + "Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + } + } + + protected static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + final Object value = gauge.getValue(); + if (value == null) { + LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge); + return 0; + } + if (value instanceof Double) { + return (double) value; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + if (value instanceof Boolean) { + return ((Boolean) value) ? 1 : 0; + } + LOG.debug( + "Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", + gauge, + value.getClass().getName()); + return 0; + } + }; + } + + protected static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return (double) counter.getCount(); + } + }; + } + + protected static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { + return new io.prometheus.client.Gauge.Child() { + @Override + public double get() { + return meter.getRate(); + } + }; + } + + @VisibleForTesting + static class HistogramSummaryProxy extends Collector { + static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); + + private final String metricName; + private final String helpString; + private final List labelNamesWithQuantile; + + private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); + + HistogramSummaryProxy( + final Histogram histogram, + final String metricName, + final String helpString, + final List labelNames, + final List labelValues) { + this.metricName = metricName; + this.helpString = helpString; + this.labelNamesWithQuantile = addToList(labelNames, "quantile"); + histogramsByLabelValues.put(labelValues, histogram); + } + + @Override + public List collect() { + // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values + // (at least for Dropwizard histograms, + // whose snapshot's values array only holds a sample of recent values). + + List samples = new LinkedList<>(); + for (Map.Entry, Histogram> labelValuesToHistogram : + histogramsByLabelValues.entrySet()) { + addSamples( + labelValuesToHistogram.getKey(), + labelValuesToHistogram.getValue(), + samples); + } + return Collections.singletonList( + new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); + } + + void addChild(final Histogram histogram, final List labelValues) { + histogramsByLabelValues.put(labelValues, histogram); + } + + void remove(final List labelValues) { + histogramsByLabelValues.remove(labelValues); + } + + private void addSamples( + final List labelValues, + final Histogram histogram, + final List samples) { + samples.add( + new MetricFamilySamples.Sample( + metricName + "_count", + labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), + labelValues, + histogram.getCount())); + final HistogramStatistics statistics = histogram.getStatistics(); + for (final Double quantile : QUANTILES) { + samples.add( + new MetricFamilySamples.Sample( + metricName, + labelNamesWithQuantile, + addToList(labelValues, quantile.toString()), + statistics.getQuantile(quantile))); + } + } + } + + protected static List addToList(List list, String element) { + final List result = new ArrayList<>(list); + result.add(element); + return result; + } + + protected static String[] toArray(List list) { + return list.toArray(new String[0]); + } +} diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java new file mode 100644 index 0000000000..f6102f9cdb --- /dev/null +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.prometheus; + +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; + +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.time.Duration; +import java.util.Map; + +/** {@link ScheduledMetricReporter} that pushes {@link Metric Metrics} to Prometheus PushGateway. */ +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter + implements ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + private final PushGateway pushGateway; + private final String jobName; + private final Map groupingKey; + private final boolean deleteOnShutdown; + private final Duration pushInterval; + + public PrometheusPushGatewayReporter( + URL hostUrl, + String jobName, + Map groupingKey, + final boolean deleteOnShutdown, + Duration pushInterval) { + this.pushGateway = new PushGateway(hostUrl); + this.jobName = jobName; + this.groupingKey = groupingKey; + this.deleteOnShutdown = deleteOnShutdown; + this.pushInterval = pushInterval; + } + + @Override + public void close() { + if (deleteOnShutdown) { + try { + pushGateway.delete(jobName, groupingKey); + LOG.info("Deleted metrics from PushGateway."); + } catch (IOException e) { + LOG.warn("Could not delete metrics from PushGateway.", e); + } + } + super.close(); + } + + @Override + public Duration scheduleInterval() { + return pushInterval; + } + + @Override + public void report() { + try { + pushGateway.push(registry, jobName, groupingKey); + } catch (IOException e) { + LOG.warn("Could not push metrics to PushGateway.", e); + } + } +} diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java new file mode 100644 index 0000000000..5786a002cd --- /dev/null +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.prometheus; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; +import org.apache.fluss.utils.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX; + +/** {@link MetricReporterPlugin} for {@link PrometheusPushGatewayReporter}. */ +public class PrometheusPushGatewayReporterPlugin implements MetricReporterPlugin { + private static final Logger LOG = + LoggerFactory.getLogger(PrometheusPushGatewayReporterPlugin.class); + + private static final String PLUGIN_NAME = "prometheus-push"; + + @Override + public MetricReporter createMetricReporter(Configuration config) { + String hostUrl = config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL); + String configuredJobName = config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME); + boolean deleteOnShutdown = + config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN); + boolean randomSuffix = + config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX); + Duration pushInterval = config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL); + String jobName = configuredJobName; + if (randomSuffix) { + jobName = configuredJobName + new Random().nextLong(); + } + Map groupingKey = + parseGroupingKey(config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY)); + LOG.info( + "Configured PrometheusPushGatewayReporter with {hostUrl:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}, pushInterval:{}}", + hostUrl, + jobName, + randomSuffix, + deleteOnShutdown, + groupingKey, + pushInterval); + try { + return new PrometheusPushGatewayReporter( + new URL(hostUrl), jobName, groupingKey, deleteOnShutdown, pushInterval); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } + + @VisibleForTesting + static Map parseGroupingKey(final String groupingKeyConfig) { + if (!groupingKeyConfig.isEmpty()) { + Map groupingKey = new HashMap<>(); + String[] kvs = groupingKeyConfig.split(";"); + for (String kv : kvs) { + int idx = kv.indexOf("="); + if (idx < 0) { + LOG.warn("Invalid prometheusPushGateway groupingKey:{}, will be ignored", kv); + continue; + } + + String labelKey = kv.substring(0, idx); + String labelValue = kv.substring(idx + 1); + if (StringUtils.isNullOrWhitespaceOnly(labelKey) + || StringUtils.isNullOrWhitespaceOnly(labelValue)) { + LOG.warn( + "Invalid groupingKey {labelKey:{}, labelValue:{}} must not be empty", + labelKey, + labelValue); + continue; + } + groupingKey.put(labelKey, labelValue); + } + + return groupingKey; + } + + return Collections.emptyMap(); + } +} diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java index 1f0925aeaa..358330b6ed 100644 --- a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java @@ -17,59 +17,23 @@ package org.apache.fluss.metrics.prometheus; -import org.apache.fluss.annotation.VisibleForTesting; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.metrics.CharacterFilter; -import org.apache.fluss.metrics.Counter; -import org.apache.fluss.metrics.Gauge; -import org.apache.fluss.metrics.Histogram; -import org.apache.fluss.metrics.HistogramStatistics; -import org.apache.fluss.metrics.Meter; import org.apache.fluss.metrics.Metric; -import org.apache.fluss.metrics.groups.MetricGroup; -import org.apache.fluss.metrics.reporter.MetricReporter; -import io.prometheus.client.Collector; -import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.HTTPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; import static org.apache.fluss.utils.Preconditions.checkState; -/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. */ - -/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */ -public class PrometheusReporter implements MetricReporter { +/** {@link PrometheusReporter} that exports {@link Metric} via Prometheus HTTP server. */ +public class PrometheusReporter extends AbstractPrometheusReporter { private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class); - private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); - private static final CharacterFilter CHARACTER_FILTER = PrometheusReporter::replaceInvalidChars; - - @VisibleForTesting static final char SCOPE_SEPARATOR = '_'; - @VisibleForTesting static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; - - private final Map> - collectorsWithCountByMetricName = new HashMap<>(); - - @VisibleForTesting final CollectorRegistry registry = new CollectorRegistry(true); - private HTTPServer httpServer; private int port; @@ -97,310 +61,11 @@ int getPort() { } } - static String replaceInvalidChars(final String input) { - // https://prometheus.io/docs/instrumenting/writing_exporters/ - // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to - // an underscore. - return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_"); - } - @Override - public void open(Configuration config) { - // do nothing now; - } - public void close() { if (httpServer != null) { httpServer.stop(); } - registry.clear(); - } - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - List dimensionKeys = new LinkedList<>(); - List dimensionValues = new LinkedList<>(); - for (final Map.Entry dimension : group.getAllVariables().entrySet()) { - final String key = dimension.getKey(); - dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key)); - dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); - } - - final String scopedMetricName = getScopedName(metricName, group); - final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; - - final Collector collector; - Integer count = 0; - - synchronized (this) { - if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = - collectorsWithCountByMetricName.get(scopedMetricName); - collector = collectorWithCount.getKey(); - count = collectorWithCount.getValue(); - } else { - collector = - createCollector( - metric, - dimensionKeys, - dimensionValues, - scopedMetricName, - helpString); - try { - collector.register(registry); - } catch (Exception e) { - LOG.warn("There was a problem registering metric {}.", metricName, e); - } - } - addMetric(metric, dimensionValues, collector); - collectorsWithCountByMetricName.put( - scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); - } - } - - private static String getScopedName(String metricName, MetricGroup group) { - return SCOPE_PREFIX - + getLogicalScope(group) - + SCOPE_SEPARATOR - + CHARACTER_FILTER.filterCharacters(metricName); - } - - private Collector createCollector( - Metric metric, - List dimensionKeys, - List dimensionValues, - String scopedMetricName, - String helpString) { - Collector collector; - switch (metric.getMetricType()) { - case GAUGE: - case COUNTER: - case METER: - collector = - io.prometheus.client.Gauge.build() - .name(scopedMetricName) - .help(helpString) - .labelNames(toArray(dimensionKeys)) - .create(); - break; - case HISTOGRAM: - collector = - new HistogramSummaryProxy( - (Histogram) metric, - scopedMetricName, - helpString, - dimensionKeys, - dimensionValues); - break; - default: - LOG.warn( - "Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - collector = null; - } - return collector; - } - - private void addMetric(Metric metric, List dimensionValues, Collector collector) { - switch (metric.getMetricType()) { - case GAUGE: - ((io.prometheus.client.Gauge) collector) - .setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); - break; - case COUNTER: - ((io.prometheus.client.Gauge) collector) - .setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); - break; - case METER: - ((io.prometheus.client.Gauge) collector) - .setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); - break; - case HISTOGRAM: - ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); - break; - default: - LOG.warn( - "Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - } - } - - private void removeMetric(Metric metric, List dimensionValues, Collector collector) { - switch (metric.getMetricType()) { - case GAUGE: - case COUNTER: - case METER: - ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues)); - break; - case HISTOGRAM: - ((HistogramSummaryProxy) collector).remove(dimensionValues); - break; - default: - LOG.warn( - "Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", - metric.getClass().getName()); - } - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { - - List dimensionValues = new LinkedList<>(); - for (final Map.Entry dimension : group.getAllVariables().entrySet()) { - dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); - } - - final String scopedMetricName = getScopedName(metricName, group); - synchronized (this) { - final AbstractMap.SimpleImmutableEntry collectorWithCount = - collectorsWithCountByMetricName.get(scopedMetricName); - final Integer count = collectorWithCount.getValue(); - final Collector collector = collectorWithCount.getKey(); - - removeMetric(metric, dimensionValues, collector); - - if (count == 1) { - try { - registry.unregister(collector); - } catch (Exception e) { - LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e); - } - collectorsWithCountByMetricName.remove(scopedMetricName); - } else { - collectorsWithCountByMetricName.put( - scopedMetricName, - new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); - } - } - } - - private static String getLogicalScope(MetricGroup group) { - return group.getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); - } - - @VisibleForTesting - io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - final Object value = gauge.getValue(); - if (value == null) { - LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge); - return 0; - } - if (value instanceof Double) { - return (double) value; - } - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - if (value instanceof Boolean) { - return ((Boolean) value) ? 1 : 0; - } - LOG.debug( - "Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", - gauge, - value.getClass().getName()); - return 0; - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return (double) counter.getCount(); - } - }; - } - - private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { - return new io.prometheus.client.Gauge.Child() { - @Override - public double get() { - return meter.getRate(); - } - }; - } - - @VisibleForTesting - static class HistogramSummaryProxy extends Collector { - static final List QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); - - private final String metricName; - private final String helpString; - private final List labelNamesWithQuantile; - - private final Map, Histogram> histogramsByLabelValues = new HashMap<>(); - - HistogramSummaryProxy( - final Histogram histogram, - final String metricName, - final String helpString, - final List labelNames, - final List labelValues) { - this.metricName = metricName; - this.helpString = helpString; - this.labelNamesWithQuantile = addToList(labelNames, "quantile"); - histogramsByLabelValues.put(labelValues, histogram); - } - - @Override - public List collect() { - // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values - // (at least for Dropwizard histograms, - // whose snapshot's values array only holds a sample of recent values). - - List samples = new LinkedList<>(); - for (Map.Entry, Histogram> labelValuesToHistogram : - histogramsByLabelValues.entrySet()) { - addSamples( - labelValuesToHistogram.getKey(), - labelValuesToHistogram.getValue(), - samples); - } - return Collections.singletonList( - new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); - } - - void addChild(final Histogram histogram, final List labelValues) { - histogramsByLabelValues.put(labelValues, histogram); - } - - void remove(final List labelValues) { - histogramsByLabelValues.remove(labelValues); - } - - private void addSamples( - final List labelValues, - final Histogram histogram, - final List samples) { - samples.add( - new MetricFamilySamples.Sample( - metricName + "_count", - labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), - labelValues, - histogram.getCount())); - final HistogramStatistics statistics = histogram.getStatistics(); - for (final Double quantile : QUANTILES) { - samples.add( - new MetricFamilySamples.Sample( - metricName, - labelNamesWithQuantile, - addToList(labelValues, quantile.toString()), - statistics.getQuantile(quantile))); - } - } - } - - private static List addToList(List list, String element) { - final List result = new ArrayList<>(list); - result.add(element); - return result; - } - - private static String[] toArray(List list) { - return list.toArray(new String[0]); + super.close(); } } diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java index 217444948c..7e133e1aa3 100644 --- a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java @@ -25,10 +25,6 @@ import java.util.Iterator; -/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. */ - /** {@link MetricReporterPlugin} for {@link PrometheusReporter}. */ public class PrometheusReporterPlugin implements MetricReporterPlugin { diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE index acb20d591d..72854e435a 100644 --- a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE @@ -8,4 +8,5 @@ This project bundles the following dependencies under the Apache Software Licens - io.prometheus:simpleclient:0.8.1 - io.prometheus:simpleclient_common:0.8.1 -- io.prometheus:simpleclient_httpserver:0.8.1 \ No newline at end of file +- io.prometheus:simpleclient_httpserver:0.8.1 +- io.prometheus:simpleclient_pushgateway:0.8.1 \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin index 8061791078..af91cddb75 100644 --- a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin +++ b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin @@ -18,3 +18,4 @@ org.apache.fluss.metrics.prometheus.PrometheusReporterPlugin +org.apache.fluss.metrics.prometheus.PrometheusPushGatewayReporterPlugin diff --git a/fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java b/fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java new file mode 100644 index 0000000000..7dba3718e2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.prometheus; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Test for {@link PrometheusPushGatewayReporterPlugin}. */ +public class PrometheusPushGatewayReporterPluginTest { + + @Test + void testParseGroupingKey() { + Map groupingKey = + PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1=v1;k2=v2"); + assertThat(groupingKey).containsEntry("k1", "v1"); + assertThat(groupingKey).containsEntry("k2", "v2"); + } + + @Test + void testParseIncompleteGroupingKey() { + Map groupingKey = + PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1="); + assertThat(groupingKey).isEmpty(); + + groupingKey = PrometheusPushGatewayReporterPlugin.parseGroupingKey("=v1"); + assertThat(groupingKey).isEmpty(); + + groupingKey = PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1"); + assertThat(groupingKey).isEmpty(); + } +} diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 97da3457d0..39c350d0e2 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -162,12 +162,19 @@ during the Fluss cluster working. ## Metrics -| Option | Type | Default | Description | -|----------------------------------|--------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| metrics.reporters | List | (None) | An optional list of reporter names. If configured, only reporters whose name matches in the list will be started | -| metrics.reporter.prometheus.port | String | 9249 | The port the Prometheus reporter listens on. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9250-9260. | -| metrics.reporter.jmx.port | String | (None) | The port for the JMXServer that JMX clients can connect to. If not set, the JMXServer won't start. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9990-9999. | - +More metrics example could be found in [Observability - Metric Reporters](observability/metric-reporters.md). + +| Option | Type | Default | Description | +|---------------------------------------------------------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| metrics.reporters | List | (None) | An optional list of reporter names. If configured, only reporters whose name matches in the list will be started | +| metrics.reporter.prometheus.port | String | 9249 | The port the Prometheus reporter listens on. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9250-9260. | +| metrics.reporter.jmx.port | String | (None) | The port for the JMXServer that JMX clients can connect to. If not set, the JMXServer won't start. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9990-9999. | +| metrics.reporter.prometheus-push.host-url | String | (None) | The PushGateway server host URL including scheme, host name, and port | +| metrics.reporter.prometheus-push.job-name | String | (None) | The job name under which metrics will be pushed | +| metrics.reporter.prometheus-push.push-interval | String | 10 seconds | The interval of pushing metrics to Prometheus PushGateway, defaults to 10 SECONDS. | +| metrics.reporter.prometheus-push.random-job-name-suffix | Boolean | true | Specifies whether a random suffix should be appended to the job name, defaults to true. This is useful when multiple instances of the reporter are running on the same host. | +| metrics.reporter.prometheus-push.delete-on-shutdown | Boolean | true | Specifies whether to delete metrics from the PushGateway on shutdown, defaults to true. Fluss will try its best to delete the metrics but this is not guaranteed. | +| metrics.reporter.prometheus-push.grouping-key | String | (None) | Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. | ## Lakehouse | Option | Type | Default | Description | diff --git a/website/docs/maintenance/observability/metric-reporters.md b/website/docs/maintenance/observability/metric-reporters.md index 48f94770f8..43ce9473b0 100644 --- a/website/docs/maintenance/observability/metric-reporters.md +++ b/website/docs/maintenance/observability/metric-reporters.md @@ -79,4 +79,29 @@ Fluss metric types are mapped to Prometheus metric types as follows: | Counter | Gauge |Prometheus counters cannot be decremented.| | Gauge | Gauge |Only numbers and booleans are supported. | | Histogram | Summary |Quantiles .5, .75, .95, .98, .99 and .999 | -| Meter | Gauge |The gauge exports the meter's rate. | +| Meter | Gauge |The gauge exports the meter's rate. + +### PrometheusPushGateway + +Type: push + +Parameters: + +- `metrics.reporter.prometheus-push.host-url` - The PushGateway server host URL including scheme, host name, and port. +- `metrics.reporter.prometheus-push.job-name` - The job name under which metrics will be pushed. +- `metrics.reporter.prometheus-push.push-interval` - (Optional) The interval of pushing metrics to Prometheus PushGateway, defaults to 10 SECONDS. +- `metrics.reporter.prometheus-push.random-job-name-suffix` - (Optional) Specifies whether a random suffix should be appended to the job name, defaults to true. This is useful when multiple instances of the reporter are running on the same host. +- `metrics.reporter.prometheus-push.delete-on-shutdown` - (Optional) Specifies whether to delete metrics from the PushGateway on shutdown, defaults to true. Fluss will try its best to delete the metrics but this is not guaranteed. +- `metrics.reporter.prometheus-push.grouping-key` - Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by `=`, and labels are separated by `;`, e.g., `k1=v1;k2=v2`. + +Example configuration: + +```yaml +metrics.reporters: prometheus-push +metrics.reporter.prometheus-push.host-url: http://localhost:9091 +metrics.reporter.prometheus-push.job-name: fluss-tablet-server +metrics.reporter.prometheus-push.push-interval: 10 SECONDS +metrics.reporter.prometheus-push.random-job-name-suffix: true +metrics.reporter.prometheus-push.delete-on-shutdown: true +metrics.reporter.prometheus-push.grouping-key: instance=instance01;cluster=clusterA +```