diff --git a/charts/kafka-lag-exporter/templates/030-Deployment.yaml b/charts/kafka-lag-exporter/templates/030-Deployment.yaml index 50061950..6ca80924 100644 --- a/charts/kafka-lag-exporter/templates/030-Deployment.yaml +++ b/charts/kafka-lag-exporter/templates/030-Deployment.yaml @@ -48,6 +48,18 @@ spec: value: "{{ $cluster.name }}" - name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.bootstrap-brokers" value: "{{ $cluster.bootstrapBrokers }}" + {{- if $cluster.securityProtocol }} + - name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.security-protocol" + value: "{{ $cluster.securityProtocol }}" + {{- end }} + {{- if $cluster.saslMechanism }} + - name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.sasl-mechanism" + value: "{{ $cluster.saslMechanism }}" + {{- end }} + {{- if $cluster.saslJaasConfig }} + - name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.sasl-jaas-config" + value: "{{ $cluster.saslJaasConfig }}" + {{- end }} {{- end }} ports: - name: http diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 05420791..68e2c725 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -4,6 +4,11 @@ clusters: {} # clusters: # - name: "default" # bootstrapBrokers: "my-cluster-kafka-bootstrap:9092" +# # optional values for TLS/SASL enabled clusters +# securityProtocol: SASL_SSL +# saslMechanism: PLAIN +# saslJaasConfig: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"foo\" password=\"bar\"; + ## The interval between refreshing metrics pollIntervalSeconds: 5 ## Size of the sliding window of offsets to keep in each partition's lookup table diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index f7184843..2609d43f 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -21,7 +21,10 @@ object AppConfig { val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig => KafkaCluster( clusterConfig.getString("name"), - clusterConfig.getString("bootstrap-brokers") + clusterConfig.getString("bootstrap-brokers"), + if (clusterConfig.hasPath("security-protocol")) clusterConfig.getString("security-protocol") else "PLAINTEXT", + if (clusterConfig.hasPath("sasl-mechanism")) clusterConfig.getString("sasl-mechanism") else "", + if (clusterConfig.hasPath("sasl-jaas-config")) clusterConfig.getString("sasl-jaas-config") else "" ) } val strimziWatcher = c.getString("watchers.strimzi").toBoolean @@ -29,7 +32,8 @@ object AppConfig { } } -final case class KafkaCluster(name: String, bootstrapBrokers: String) +final case class KafkaCluster(name: String, bootstrapBrokers: String, securityProtocol: String = "PLAINTEXT", + saslMechanism: String = "", saslJaasConfig: String = "") final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String, clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) { override def toString(): String = { @@ -41,6 +45,8 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p s""" | Cluster name: ${cluster.name} | Cluster Kafka bootstrap brokers: ${cluster.bootstrapBrokers} + | Cluster security protocol: ${cluster.securityProtocol} + | Cluster SASL mechanism: ${cluster.saslMechanism} """.stripMargin }.mkString("\n") s""" diff --git a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala index a8e53107..fc71c462 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala @@ -36,8 +36,7 @@ object ConsumerGroupCollector { final case class CollectorConfig( pollInterval: FiniteDuration, lookupTableSize: Int, - clusterName: String, - clusterBootstrapBrokers: String, + cluster: KafkaCluster, clock: Clock = Clock.systemUTC() ) @@ -47,15 +46,15 @@ object ConsumerGroupCollector { ) def init(config: CollectorConfig, - clientCreator: String => KafkaClientContract, + clientCreator: KafkaCluster => KafkaClientContract, reporter: ActorRef[MetricsSink.Message]): Behavior[Message] = Behaviors.supervise[Message] { Behaviors.setup { context => - context.log.info("Spawned ConsumerGroupCollector for cluster: {}", config.clusterName) + context.log.info("Spawned ConsumerGroupCollector for cluster: {}", config.cluster.name) context.self ! Collect val collectorState = CollectorState(Domain.TopicPartitionTable(config.lookupTableSize)) - collector(config, clientCreator(config.clusterBootstrapBrokers), reporter, collectorState) + collector(config, clientCreator(config.cluster), reporter, collectorState) } }.onFailure(SupervisorStrategy.restartWithBackoff(1 seconds, 10 seconds, 0.2)) @@ -115,7 +114,7 @@ object ConsumerGroupCollector { Behaviors.receiveSignal { case (_, PostStop) => client.close() - context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", config.clusterName) + context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", config.cluster.name) Behaviors.same } } @@ -155,9 +154,9 @@ object ConsumerGroupCollector { val offsetLag = mostRecentPoint.offset - groupPoint.offset - reporter ! Metrics.LastGroupOffsetMetric(config.clusterName, gtp, member, groupPoint.offset) - reporter ! Metrics.OffsetLagMetric(config.clusterName, gtp, member, offsetLag) - reporter ! Metrics.TimeLagMetric(config.clusterName, gtp, member, timeLag) + reporter ! Metrics.LastGroupOffsetMetric(config.cluster.name, gtp, member, groupPoint.offset) + reporter ! Metrics.OffsetLagMetric(config.cluster.name, gtp, member, offsetLag) + reporter ! Metrics.TimeLagMetric(config.cluster.name, gtp, member, timeLag) GroupPartitionLag(gtp, offsetLag, timeLag) } @@ -168,8 +167,8 @@ object ConsumerGroupCollector { val maxOffsetLag = values.maxBy(_.offsetLag) val maxTimeLag = values.maxBy(_.timeLag) - reporter ! Metrics.MaxGroupOffsetLagMetric(config.clusterName, group, maxOffsetLag.offsetLag) - reporter ! Metrics.MaxGroupTimeLagMetric(config.clusterName, group, maxTimeLag.timeLag) + reporter ! Metrics.MaxGroupOffsetLagMetric(config.cluster.name, group, maxOffsetLag.offsetLag) + reporter ! Metrics.MaxGroupTimeLagMetric(config.cluster.name, group, maxTimeLag.timeLag) } } @@ -181,7 +180,7 @@ object ConsumerGroupCollector { for { (tp, table: LookupTable.Table) <- tables.all point <- table.mostRecentPoint() - } reporter ! Metrics.LatestOffsetMetric(config.clusterName, tp, point.offset) + } reporter ! Metrics.LatestOffsetMetric(config.cluster.name, tp, point.offset) } private def defaultMissingPartitions(newOffsets: NewOffsets): NewOffsets = { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 3b8ae1a3..44e609dc 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -11,7 +11,9 @@ import java.{lang, util} import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.{KafkaFuture, TopicPartition => KafkaTopicPartition} @@ -26,8 +28,8 @@ object KafkaClient { val CommonClientConfigRetryBackoffMs = 1000 // longer interval between retry attempts so we don't overload clusters (default = 100ms) val ConsumerConfigAutoCommit = false - def apply(bootstrapBrokers: String, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = - new KafkaClient(bootstrapBrokers, groupId, clientTimeout)(ec) + def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = + new KafkaClient(cluster, groupId, clientTimeout)(ec) trait KafkaClientContract { def getGroups(): Future[List[Domain.ConsumerGroup]] @@ -49,20 +51,26 @@ object KafkaClient { p.future } - private def createAdminClient(brokers: String, clientTimeout: Duration): AdminClient = { + private def createAdminClient(cluster: KafkaCluster, clientTimeout: Duration): AdminClient = { val props = new Properties() // AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol) + props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism) + props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString) props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString) props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, CommonClientConfigRetryBackoffMs.toString) AdminClient.create(props) } - private def createConsumerClient(brokers: String, groupId: String, clientTimeout: Duration): KafkaConsumer[String, String] = { + private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: Duration): KafkaConsumer[String, String] = { val props = new Properties() val deserializer = (new StringDeserializer).getClass.getName - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol) + props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism) + props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig) props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString) //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") @@ -83,14 +91,14 @@ object KafkaClient { } } -class KafkaClient private(bootstrapBrokers: String, groupId: String, clientTimeout: FiniteDuration) +class KafkaClient private(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration) (implicit ec: ExecutionContext) extends KafkaClientContract { import KafkaClient._ private implicit val _clientTimeout: Duration = clientTimeout.toJava - private lazy val adminClient = createAdminClient(bootstrapBrokers, _clientTimeout) - private lazy val consumer = createConsumerClient(bootstrapBrokers, groupId, _clientTimeout) + private lazy val adminClient = createAdminClient(cluster, _clientTimeout) + private lazy val consumer = createConsumerClient(cluster, groupId, _clientTimeout) private lazy val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt) private lazy val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt) diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala index 11555906..53268350 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala @@ -19,7 +19,7 @@ object KafkaClusterManager { def init( appConfig: AppConfig, metricsSink: () => MetricsSink, - clientCreator: String => KafkaClientContract): Behavior[Message] = Behaviors.setup { context => + clientCreator: KafkaCluster => KafkaClientContract): Behavior[Message] = Behaviors.setup { context => context.log.info("Starting Kafka Lag Exporter with configuration: \n{}", appConfig) @@ -38,7 +38,7 @@ object KafkaClusterManager { def manager( appConfig: AppConfig, - clientCreator: String => KafkaClientContract, + clientCreator: KafkaCluster => KafkaClientContract, reporter: ActorRef[MetricsSink.Message], collectors: Map[KafkaCluster, ActorRef[ConsumerGroupCollector.Message]], watchers: Seq[ActorRef[Watcher.Message]]): Behavior[Message] = @@ -49,8 +49,7 @@ object KafkaClusterManager { val config = ConsumerGroupCollector.CollectorConfig( appConfig.pollInterval, appConfig.lookupTableSize, - cluster.name, - cluster.bootstrapBrokers + cluster ) val collector = context.spawn( ConsumerGroupCollector.init(config, clientCreator, reporter), diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala index 8a425c74..b4693b42 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala @@ -27,8 +27,8 @@ object MainApp extends App { val appConfig = AppConfig(config) - val clientCreator = (bootstrapBrokers: String) => - KafkaClient(bootstrapBrokers, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc) + val clientCreator = (cluster: KafkaCluster) => + KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc) val endpointCreator = () => PrometheusEndpointSink(appConfig.port, Metrics.metricDefinitions) ActorSystem( diff --git a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala index d166807a..bd46e63d 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration._ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData with MockitoSugar { val client: KafkaClientContract = mock[KafkaClientContract] - val config = ConsumerGroupCollector.CollectorConfig(0 seconds, 20, "default", "", Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault())) + val config = ConsumerGroupCollector.CollectorConfig(0 seconds, 20, KafkaCluster("default", ""), Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault())) val timestampNow = 200 @@ -44,28 +44,28 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi "latest offset metric" in { metrics should contain( - Metrics.LatestOffsetMetric(config.clusterName, topicPartition0, value = 200)) + Metrics.LatestOffsetMetric(config.cluster.name, topicPartition0, value = 200)) } "last group offset metric" in { metrics should contain( - Metrics.LastGroupOffsetMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 180)) + Metrics.LastGroupOffsetMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 180)) } "offset lag metric" in { - metrics should contain(Metrics.OffsetLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 20)) + metrics should contain(Metrics.OffsetLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 20)) } "time lag metric" in { - metrics should contain(Metrics.TimeLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0.02)) + metrics should contain(Metrics.TimeLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0.02)) } "max group offset lag metric" in { - metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupSingleMember, value = 20)) + metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupSingleMember, value = 20)) } "max group time lag metric" in { - metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupSingleMember, value = 0.02)) + metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupSingleMember, value = 0.02)) } } @@ -102,11 +102,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val metrics = reporter.receiveAll() "max group offset lag metric" in { - metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupThreeMember, value = 100)) + metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupThreeMember, value = 100)) } "max group time lag metric" in { - metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupThreeMember, value = 0.1)) + metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupThreeMember, value = 0.1)) } } @@ -131,27 +131,27 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi val metrics = reporter.receiveAll() "latest offset metric" in { - metrics should contain(Metrics.LatestOffsetMetric(config.clusterName, topicPartition0, value = 200)) + metrics should contain(Metrics.LatestOffsetMetric(config.cluster.name, topicPartition0, value = 200)) } "last group offset metric" in { - metrics should contain(Metrics.LastGroupOffsetMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0)) + metrics should contain(Metrics.LastGroupOffsetMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0)) } "offset lag metric" in { - metrics should contain(Metrics.OffsetLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 200)) + metrics should contain(Metrics.OffsetLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 200)) } "time lag metric" in { - metrics should contain(Metrics.TimeLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0.2)) + metrics should contain(Metrics.TimeLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0.2)) } "max group offset lag metric" in { - metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupSingleMember, value = 200)) + metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupSingleMember, value = 200)) } "max group time lag metric" in { - metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupSingleMember, value = 0.2)) + metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupSingleMember, value = 0.2)) } } }