Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Support TLS/SASL enabled clusters #21

Merged
merged 1 commit into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions charts/kafka-lag-exporter/templates/030-Deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ spec:
value: "{{ $cluster.name }}"
- name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.bootstrap-brokers"
value: "{{ $cluster.bootstrapBrokers }}"
{{- if $cluster.securityProtocol }}
- name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.security-protocol"
value: "{{ $cluster.securityProtocol }}"
{{- end }}
{{- if $cluster.saslMechanism }}
- name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.sasl-mechanism"
value: "{{ $cluster.saslMechanism }}"
{{- end }}
{{- if $cluster.saslJaasConfig }}
- name: "KAFKA_LAG_EXPORTER_CLUSTERS.{{ $index }}.sasl-jaas-config"
value: "{{ $cluster.saslJaasConfig }}"
{{- end }}
{{- end }}
ports:
- name: http
Expand Down
5 changes: 5 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ clusters: {}
# clusters:
# - name: "default"
# bootstrapBrokers: "my-cluster-kafka-bootstrap:9092"
# # optional values for TLS/SASL enabled clusters
# securityProtocol: SASL_SSL
# saslMechanism: PLAIN
# saslJaasConfig: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"foo\" password=\"bar\";

## The interval between refreshing metrics
pollIntervalSeconds: 5
## Size of the sliding window of offsets to keep in each partition's lookup table
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ object AppConfig {
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers")
clusterConfig.getString("bootstrap-brokers"),
if (clusterConfig.hasPath("security-protocol")) clusterConfig.getString("security-protocol") else "PLAINTEXT",
if (clusterConfig.hasPath("sasl-mechanism")) clusterConfig.getString("sasl-mechanism") else "",
if (clusterConfig.hasPath("sasl-jaas-config")) clusterConfig.getString("sasl-jaas-config") else ""
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String)
final case class KafkaCluster(name: String, bootstrapBrokers: String, securityProtocol: String = "PLAINTEXT",
saslMechanism: String = "", saslJaasConfig: String = "")
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
Expand All @@ -41,6 +45,8 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
s"""
| Cluster name: ${cluster.name}
| Cluster Kafka bootstrap brokers: ${cluster.bootstrapBrokers}
| Cluster security protocol: ${cluster.securityProtocol}
| Cluster SASL mechanism: ${cluster.saslMechanism}
""".stripMargin
}.mkString("\n")
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ object ConsumerGroupCollector {
final case class CollectorConfig(
pollInterval: FiniteDuration,
lookupTableSize: Int,
clusterName: String,
clusterBootstrapBrokers: String,
cluster: KafkaCluster,
clock: Clock = Clock.systemUTC()
)

Expand All @@ -47,15 +46,15 @@ object ConsumerGroupCollector {
)

def init(config: CollectorConfig,
clientCreator: String => KafkaClientContract,
clientCreator: KafkaCluster => KafkaClientContract,
reporter: ActorRef[MetricsSink.Message]): Behavior[Message] = Behaviors.supervise[Message] {
Behaviors.setup { context =>
context.log.info("Spawned ConsumerGroupCollector for cluster: {}", config.clusterName)
context.log.info("Spawned ConsumerGroupCollector for cluster: {}", config.cluster.name)

context.self ! Collect

val collectorState = CollectorState(Domain.TopicPartitionTable(config.lookupTableSize))
collector(config, clientCreator(config.clusterBootstrapBrokers), reporter, collectorState)
collector(config, clientCreator(config.cluster), reporter, collectorState)
}
}.onFailure(SupervisorStrategy.restartWithBackoff(1 seconds, 10 seconds, 0.2))

Expand Down Expand Up @@ -115,7 +114,7 @@ object ConsumerGroupCollector {
Behaviors.receiveSignal {
case (_, PostStop) =>
client.close()
context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", config.clusterName)
context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", config.cluster.name)
Behaviors.same
}
}
Expand Down Expand Up @@ -155,9 +154,9 @@ object ConsumerGroupCollector {

val offsetLag = mostRecentPoint.offset - groupPoint.offset

reporter ! Metrics.LastGroupOffsetMetric(config.clusterName, gtp, member, groupPoint.offset)
reporter ! Metrics.OffsetLagMetric(config.clusterName, gtp, member, offsetLag)
reporter ! Metrics.TimeLagMetric(config.clusterName, gtp, member, timeLag)
reporter ! Metrics.LastGroupOffsetMetric(config.cluster.name, gtp, member, groupPoint.offset)
reporter ! Metrics.OffsetLagMetric(config.cluster.name, gtp, member, offsetLag)
reporter ! Metrics.TimeLagMetric(config.cluster.name, gtp, member, timeLag)

GroupPartitionLag(gtp, offsetLag, timeLag)
}
Expand All @@ -168,8 +167,8 @@ object ConsumerGroupCollector {
val maxOffsetLag = values.maxBy(_.offsetLag)
val maxTimeLag = values.maxBy(_.timeLag)

reporter ! Metrics.MaxGroupOffsetLagMetric(config.clusterName, group, maxOffsetLag.offsetLag)
reporter ! Metrics.MaxGroupTimeLagMetric(config.clusterName, group, maxTimeLag.timeLag)
reporter ! Metrics.MaxGroupOffsetLagMetric(config.cluster.name, group, maxOffsetLag.offsetLag)
reporter ! Metrics.MaxGroupTimeLagMetric(config.cluster.name, group, maxTimeLag.timeLag)
}
}

Expand All @@ -181,7 +180,7 @@ object ConsumerGroupCollector {
for {
(tp, table: LookupTable.Table) <- tables.all
point <- table.mostRecentPoint()
} reporter ! Metrics.LatestOffsetMetric(config.clusterName, tp, point.offset)
} reporter ! Metrics.LatestOffsetMetric(config.cluster.name, tp, point.offset)
}

private def defaultMissingPartitions(newOffsets: NewOffsets): NewOffsets = {
Expand Down
26 changes: 17 additions & 9 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import java.{lang, util}

import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.{KafkaFuture, TopicPartition => KafkaTopicPartition}

Expand All @@ -26,8 +28,8 @@ object KafkaClient {
val CommonClientConfigRetryBackoffMs = 1000 // longer interval between retry attempts so we don't overload clusters (default = 100ms)
val ConsumerConfigAutoCommit = false

def apply(bootstrapBrokers: String, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract =
new KafkaClient(bootstrapBrokers, groupId, clientTimeout)(ec)
def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract =
new KafkaClient(cluster, groupId, clientTimeout)(ec)

trait KafkaClientContract {
def getGroups(): Future[List[Domain.ConsumerGroup]]
Expand All @@ -49,20 +51,26 @@ object KafkaClient {
p.future
}

private def createAdminClient(brokers: String, clientTimeout: Duration): AdminClient = {
private def createAdminClient(cluster: KafkaCluster, clientTimeout: Duration): AdminClient = {
val props = new Properties()
// AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism)
props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString)
props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString)
props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, CommonClientConfigRetryBackoffMs.toString)
AdminClient.create(props)
}

private def createConsumerClient(brokers: String, groupId: String, clientTimeout: Duration): KafkaConsumer[String, String] = {
private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: Duration): KafkaConsumer[String, String] = {
val props = new Properties()
val deserializer = (new StringDeserializer).getClass.getName
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cluster.securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, cluster.saslMechanism)
props.put(SaslConfigs.SASL_JAAS_CONFIG, cluster.saslJaasConfig)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString)
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
Expand All @@ -83,14 +91,14 @@ object KafkaClient {
}
}

class KafkaClient private(bootstrapBrokers: String, groupId: String, clientTimeout: FiniteDuration)
class KafkaClient private(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)
(implicit ec: ExecutionContext) extends KafkaClientContract {
import KafkaClient._

private implicit val _clientTimeout: Duration = clientTimeout.toJava

private lazy val adminClient = createAdminClient(bootstrapBrokers, _clientTimeout)
private lazy val consumer = createConsumerClient(bootstrapBrokers, groupId, _clientTimeout)
private lazy val adminClient = createAdminClient(cluster, _clientTimeout)
private lazy val consumer = createConsumerClient(cluster, groupId, _clientTimeout)

private lazy val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
private lazy val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object KafkaClusterManager {
def init(
appConfig: AppConfig,
metricsSink: () => MetricsSink,
clientCreator: String => KafkaClientContract): Behavior[Message] = Behaviors.setup { context =>
clientCreator: KafkaCluster => KafkaClientContract): Behavior[Message] = Behaviors.setup { context =>

context.log.info("Starting Kafka Lag Exporter with configuration: \n{}", appConfig)

Expand All @@ -38,7 +38,7 @@ object KafkaClusterManager {

def manager(
appConfig: AppConfig,
clientCreator: String => KafkaClientContract,
clientCreator: KafkaCluster => KafkaClientContract,
reporter: ActorRef[MetricsSink.Message],
collectors: Map[KafkaCluster, ActorRef[ConsumerGroupCollector.Message]],
watchers: Seq[ActorRef[Watcher.Message]]): Behavior[Message] =
Expand All @@ -49,8 +49,7 @@ object KafkaClusterManager {
val config = ConsumerGroupCollector.CollectorConfig(
appConfig.pollInterval,
appConfig.lookupTableSize,
cluster.name,
cluster.bootstrapBrokers
cluster
)
val collector = context.spawn(
ConsumerGroupCollector.init(config, clientCreator, reporter),
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ object MainApp extends App {

val appConfig = AppConfig(config)

val clientCreator = (bootstrapBrokers: String) =>
KafkaClient(bootstrapBrokers, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val clientCreator = (cluster: KafkaCluster) =>
KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val endpointCreator = () => PrometheusEndpointSink(appConfig.port, Metrics.metricDefinitions)

ActorSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._

class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData with MockitoSugar {
val client: KafkaClientContract = mock[KafkaClientContract]
val config = ConsumerGroupCollector.CollectorConfig(0 seconds, 20, "default", "", Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()))
val config = ConsumerGroupCollector.CollectorConfig(0 seconds, 20, KafkaCluster("default", ""), Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault()))

val timestampNow = 200

Expand Down Expand Up @@ -44,28 +44,28 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi

"latest offset metric" in {
metrics should contain(
Metrics.LatestOffsetMetric(config.clusterName, topicPartition0, value = 200))
Metrics.LatestOffsetMetric(config.cluster.name, topicPartition0, value = 200))
}

"last group offset metric" in {
metrics should contain(
Metrics.LastGroupOffsetMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 180))
Metrics.LastGroupOffsetMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 180))
}

"offset lag metric" in {
metrics should contain(Metrics.OffsetLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 20))
metrics should contain(Metrics.OffsetLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 20))
}

"time lag metric" in {
metrics should contain(Metrics.TimeLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0.02))
metrics should contain(Metrics.TimeLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0.02))
}

"max group offset lag metric" in {
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupSingleMember, value = 20))
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupSingleMember, value = 20))
}

"max group time lag metric" in {
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupSingleMember, value = 0.02))
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupSingleMember, value = 0.02))
}
}

Expand Down Expand Up @@ -102,11 +102,11 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val metrics = reporter.receiveAll()

"max group offset lag metric" in {
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupThreeMember, value = 100))
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupThreeMember, value = 100))
}

"max group time lag metric" in {
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupThreeMember, value = 0.1))
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupThreeMember, value = 0.1))
}
}

Expand All @@ -131,27 +131,27 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
val metrics = reporter.receiveAll()

"latest offset metric" in {
metrics should contain(Metrics.LatestOffsetMetric(config.clusterName, topicPartition0, value = 200))
metrics should contain(Metrics.LatestOffsetMetric(config.cluster.name, topicPartition0, value = 200))
}

"last group offset metric" in {
metrics should contain(Metrics.LastGroupOffsetMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0))
metrics should contain(Metrics.LastGroupOffsetMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0))
}

"offset lag metric" in {
metrics should contain(Metrics.OffsetLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 200))
metrics should contain(Metrics.OffsetLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 200))
}

"time lag metric" in {
metrics should contain(Metrics.TimeLagMetric(config.clusterName, gtpSingleMember, consumerGroupMember0, value = 0.2))
metrics should contain(Metrics.TimeLagMetric(config.cluster.name, gtpSingleMember, consumerGroupMember0, value = 0.2))
}

"max group offset lag metric" in {
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.clusterName, consumerGroupSingleMember, value = 200))
metrics should contain(Metrics.MaxGroupOffsetLagMetric(config.cluster.name, consumerGroupSingleMember, value = 200))
}

"max group time lag metric" in {
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.clusterName, consumerGroupSingleMember, value = 0.2))
metrics should contain(Metrics.MaxGroupTimeLagMetric(config.cluster.name, consumerGroupSingleMember, value = 0.2))
}
}
}