Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Revert cluster labels (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo authored Sep 23, 2019
1 parent 2f8a46d commit c55080f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 66 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ The latest offset available for topic partition. Kafka Lag Exporter will calcul

### Labels

Each metric may include the following labels when reported. If you define the labels property for Configuration of a cluster then those labels will also be included.
Each metric may include the following labels when reported.

* `cluster_name` - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature.
* `topic` - The Kafka topic.
Expand Down Expand Up @@ -198,6 +198,7 @@ kubectl logs {POD_ID} --namespace myproject -f
To run the project in standalone mode you must first define a configuration `application.conf`. This configuration must
contain at least connection info to your Kafka cluster (`kafka-lag-exporter.clusters`). All other configuration has
defaults defined in the project itself. See [`reference.conf`](./src/main/resources/reference.conf) for defaults.

### Configuration

General Configuration (`kafka-lag-exporter{}`)
Expand All @@ -223,7 +224,7 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)
| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |
| ~~`labels`~~ | `{}` | No | Disabled until there's a resolution in [#78](https://github.com/lightbend/kafka-lag-exporter/pull/78) ~~A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus.~~ |

Watchers (`kafka-lag-exporters.watchers{}`)

Expand Down Expand Up @@ -252,10 +253,6 @@ kafka-lag-exporter {
admin-client-properties = {
client.id = "admin-client-id"
}
labels = {
location = "ny"
zone = "us-east"
}
}
]
}
Expand Down
3 changes: 0 additions & 3 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ clusters: {}
# security.protocol: SSL
# ssl.truststore.location: /path/to/my.truststore.jks
# ssl.trustore.password: mypwd
# labels:
# location: ny
# zone: "us-east"

## The interval between refreshing metrics
pollIntervalSeconds: 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.PrometheusEndpointSink.ClusterGlobalLabels
import io.prometheus.client.exporter.HTTPServer
import io.prometheus.client.hotspot.DefaultExports
import io.prometheus.client.{CollectorRegistry, Gauge}
Expand All @@ -19,44 +18,42 @@ object PrometheusEndpointSink {

def apply(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(definitions, metricWhitelist, clusterGlobalLabels, server, registry))
Try(new PrometheusEndpointSink(definitions, metricWhitelist, server, registry))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}
}

class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String],
server: HTTPServer, registry: CollectorRegistry) extends MetricsSink {

DefaultExports.initialize()

private val metrics: Map[PrometheusEndpointSink.ClusterName, Map[GaugeDefinition, Gauge]] = clusterGlobalLabels.map {
case (clusterName, globalLabels) =>
clusterName -> definitions.filter(d => metricWhitelist.exists(d.name.matches)).map { d =>
d -> Gauge.build()
private val metrics: Map[GaugeDefinition, Gauge] = register()

private def register(): Map[GaugeDefinition, Gauge] = {
definitions
.filter(d => metricWhitelist.exists(d.name.matches))
.map { d =>
d -> Gauge
.build()
.name(d.name)
.help(d.help)
.labelNames(globalLabels.keys.toSeq ++ d.labels: _*)
.labelNames(d.labels: _*)
.register(registry)
}.toMap
}
.toMap
}

override def report(m: MetricValue): Unit = {
if(metricWhitelist.exists(m.definition.name.matches)) {
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(m.clusterName, Map.empty)
metric.labels(globalLabelValuesForCluster.values.toSeq ++ m.labels: _*).set(m.value)
val metric = metrics.getOrElse(m.definition, throw new IllegalArgumentException(s"No metric with definition ${m.definition.name} registered"))
metric.labels(m.labels: _*).set(m.value)
}
}

override def remove(m: RemoveMetric): Unit = {
if(metricWhitelist.exists(m.definition.name.matches)) {
for(
clusterMetrics <- metrics.get(m.clusterName);
globalLabels <- clusterGlobalLabels.get(m.clusterName);
gauge <- clusterMetrics.get(m.definition)
) {
val metricLabels = globalLabels.values.toList ++ m.labels
gauge.remove(metricLabels: _*)
}
metrics.get(m.definition).foreach(_.remove(m.labels: _*))
}
}

Expand All @@ -68,9 +65,4 @@ class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhite
registry.clear()
server.stop()
}

private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = {
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the $clusterName registered"))
metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,38 @@ class PrometheusEndpointSinkTest extends fixture.FreeSpec with Matchers {
Set("kafka_consumergroup_group_max_lag", "kafka_consumergroup_group_max_lag_seconds")
}

"append global labels to metric labels" in { fixture =>
val groupLabel = Map(
"cluster" -> Map(
"environment" ->"dev",
"org" -> "organization",
)
)
val sink = PrometheusEndpointSink(Metrics.definitions, List(".*"), groupLabel, fixture.server, fixture.registry)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))

val metricSamples = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamples = metricSamples.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)

maxGroupTimeLagMetricSamples should have length 1
val labels = maxGroupTimeLagMetricSamples.flatMap(_.labelNames.asScala)
val labelValues = maxGroupTimeLagMetricSamples.flatMap(_.labelValues.asScala)
(labels zip labelValues).toMap should contain theSameElementsAs
Map(
"environment" ->"dev",
"org" -> "organization",
"cluster_name" -> "cluster",
"group" -> "group",
)

sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))

val metricSamplesAfterRemoval = fixture.registry.metricFamilySamples().asScala.toList
val maxGroupTimeLagMetricSamplesAfterRemoval = metricSamplesAfterRemoval.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)


maxGroupTimeLagMetricSamplesAfterRemoval should have length 0
}
// "append global labels to metric labels" in { fixture =>
// val groupLabel = Map(
// "cluster" -> Map(
// "environment" ->"dev",
// "org" -> "organization",
// )
// )
// val sink = PrometheusEndpointSink(Metrics.definitions, List(".*"), groupLabel, fixture.server, fixture.registry)
// sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group", 1))
//
// val metricSamples = fixture.registry.metricFamilySamples().asScala.toList
// val maxGroupTimeLagMetricSamples = metricSamples.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)
//
// maxGroupTimeLagMetricSamples should have length 1
// val labels = maxGroupTimeLagMetricSamples.flatMap(_.labelNames.asScala)
// val labelValues = maxGroupTimeLagMetricSamples.flatMap(_.labelValues.asScala)
// (labels zip labelValues).toMap should contain theSameElementsAs
// Map(
// "environment" ->"dev",
// "org" -> "organization",
// "cluster_name" -> "cluster",
// "group" -> "group",
// )
//
// sink.remove(Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, "cluster", "group"))
//
// val metricSamplesAfterRemoval = fixture.registry.metricFamilySamples().asScala.toList
// val maxGroupTimeLagMetricSamplesAfterRemoval = metricSamplesAfterRemoval.filter(_.name.equals(Metrics.MaxGroupTimeLagMetric.name)).flatMap(_.samples.asScala)
//
//
// maxGroupTimeLagMetricSamplesAfterRemoval should have length 0
// }

"report only metrics which match the regex" in { fixture =>
val sink = PrometheusEndpointSink(Metrics.definitions, List("kafka_consumergroup_group_max_lag"), Map("cluster" -> Map.empty),
Expand Down

0 comments on commit c55080f

Please sign in to comment.