Skip to content

Commit

Permalink
[Metrics] Add support for splitting topic and partition label in Prom…
Browse files Browse the repository at this point in the history
…etheus (#12225)

* [Metrics] Add support for splitting topic and partition label in Prometheus

Fix: #11432

Currently, we are only expose the partition name for the topic label in Prometheus metrics, which is difficult to
have an aggregated metrics for a partitioned topic.

Before this change, we can only get (topic=xxx-partition-0) in the metrics. After this change, we can get 2 labels (topic=xxx, partition=0).
By default, the broker expose the single tag for topic. It need to change `splitTopicAndPartitionLabelInPrometheus=true` in the broker.conf

New tests added.

* Fix checkstyle.
  • Loading branch information
codelipenghui authored Sep 30, 2021
1 parent 0a2c895 commit 039079e
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 149 deletions.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,15 @@ statsUpdateInitialDelayInSecs=60
# Default is false.
exposePreciseBacklogInPrometheus=false

# Enable splitting topic and partition label in Prometheus.
# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
# another one is partition index, e.g. (topic=xxx, partition=0).
# If the topic is a non-partitioned topic, -1 will be used for the partition index.
# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
# Default is false.

splitTopicAndPartitionLabelInPrometheus=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
9 changes: 9 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,15 @@ exposePublisherStats=true
# Default is false.
exposePreciseBacklogInPrometheus=false

# Enable splitting topic and partition label in Prometheus.
# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
# another one is partition index, e.g. (topic=xxx, partition=0).
# If the topic is a non-partitioned topic, -1 will be used for the partition index.
# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
# Default is false.

splitTopicAndPartitionLabelInPrometheus=false

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean exposeSubscriptionBacklogSizeInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable splitting topic and partition label in Prometheus.\n" +
" If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" +
" another one is partition index, e.g. (topic=xxx, partition=0).\n" +
" If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" +
" If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" +
" Default is false."
)
private boolean splitTopicAndPartitionLabelInPrometheus = false;

/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,8 @@ config, localMetadataStore, getZkClient(),
this.metricsServlet = new PrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus());
config.isExposeProducerLevelMetricsInPrometheus(),
config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected TopicStats initialValue() throws Exception {
};

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, SimpleTextOutputStream stream) {
boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats.resetTypes();
Expand All @@ -81,7 +81,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
splitTopicAndPartitionIndexLabel);
} else {
namespaceStats.updateStats(topicStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,19 @@ public double get() {

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, out, null);
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
}

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders)
boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
splitTopicAndPartitionIndexLabel, out, null);
}

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
List<PrometheusRawMetricsProvider> metricsProviders)
throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
Expand All @@ -100,7 +108,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());

NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
includeProducerMetrics, stream);
includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);

if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,19 @@ public class PrometheusMetricsServlet extends HttpServlet {
private final boolean shouldExportConsumerMetrics;
private final boolean shouldExportProducerMetrics;
private final long metricsServletTimeoutMs;
private final boolean splitTopicAndPartitionLabel;
private List<PrometheusRawMetricsProvider> metricsProviders;

private ExecutorService executor = null;

public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean shouldExportProducerMetrics) {
boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
this.shouldExportConsumerMetrics = includeConsumerMetrics;
this.shouldExportProducerMetrics = shouldExportProducerMetrics;
this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
}

@Override
Expand All @@ -73,7 +75,8 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
shouldExportProducerMetrics, res.getOutputStream(), metricsProviders);
shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(),
metricsProviders);
context.complete();

} catch (Exception e) {
Expand Down
Loading

0 comments on commit 039079e

Please sign in to comment.