From e898e2dd6ac388c2af03803c857213b3933b5e32 Mon Sep 17 00:00:00 2001 From: Joshua Banton Date: Tue, 18 Sep 2018 14:48:12 -0400 Subject: [PATCH] Updates for burrow v3 version of api schema. --- checks.d/burrow.py | 62 +++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/checks.d/burrow.py b/checks.d/burrow.py index 5a1fd35..c474a2a 100644 --- a/checks.d/burrow.py +++ b/checks.d/burrow.py @@ -8,11 +8,13 @@ # project from checks import AgentCheck +__version__ = "0.1.1" + SERVICE_CHECK_NAME = 'burrow.can_connect' DEFAULT_BURROW_URI = 'http://localhost:8000' -CLUSTER_ENDPOINT = '/v2/kafka' +CLUSTER_ENDPOINT = '/v3/kafka' CHECK_TIMEOUT = 10 @@ -54,7 +56,7 @@ def _consumer_groups_lags(self, clusters, burrow_address, extra_tags): status = lag_json["status"] consumer_tags = ["cluster:%s" % cluster, "consumer:%s" % consumer] + extra_tags - self.gauge("kafka.consumer.maxlag", status["maxlag"], tags=consumer_tags) + self.gauge("kafka.consumer.maxlag", status["maxlag"]["end"]["lag"], tags=consumer_tags) self.gauge("kafka.consumer.totallag", status["totallag"], tags=consumer_tags) self._submit_lag_status("kafka.consumer.lag_status", status["status"], tags=consumer_tags) @@ -84,9 +86,12 @@ def _submit_lag_status(self, metric_namespace, status, tags): self.gauge("%s.%s" % (metric_namespace, metric_name.lower()), value, tags=tags) def _submit_partition_lags(self, partition, tags): - lag = partition.get("end").get("lag") + end = partition.get("end") + if not end: + return + lag = end.get("lag") timestamp = partition.get("end").get("timestamp") / 1000 - self.gauge("kafka.consumer.partition_lag", lag, tags=tags, timestamp=timestamp) + self.gauge("kafka.consumer.partition_lag", lag, tags=tags) def _check_burrow(self, burrow_address, extra_tags): """ @@ -112,17 +117,16 @@ def _topic_offsets(self, clusters, burrow_address, extra_tags): Retrieve the offsets for all topics in the clusters """ for cluster in clusters: - cluster_path = "%s/%s" % (CLUSTER_ENDPOINT, cluster) - offsets_topic = self._rest_request_to_json(burrow_address, cluster_path)["cluster"]["offsets_topic"] - topics_path = "%s/topic" % cluster_path + topics_path = "%s/%s/topic" % (CLUSTER_ENDPOINT, cluster) topics_list = self._rest_request_to_json(burrow_address, topics_path).get("topics", []) for topic in topics_list: - if topic == offsets_topic: - continue topic_path = "%s/%s" % (topics_path, topic) response = self._rest_request_to_json(burrow_address, topic_path) tags = ["topic:%s" % topic, "cluster:%s" % cluster] + extra_tags - self._submit_offsets_from_json(offsets_type="topic", json=response, tags=tags) + offsets_list = [] + for offset in response["offsets"]: + offsets_list.append({"offsets": [{"offset": offset, "timestamp": 1}]}) + self._submit_offsets_from_json(offsets_type="topic", offsets_list=offsets_list, tags=tags) def _consumer_groups_offsets(self, clusters, burrow_address, extra_tags): """ @@ -132,30 +136,32 @@ def _consumer_groups_offsets(self, clusters, burrow_address, extra_tags): consumers_path = "%s/%s/consumer" % (CLUSTER_ENDPOINT, cluster) consumers_list = self._rest_request_to_json(burrow_address, consumers_path).get("consumers", []) for consumer in consumers_list: - topics_path = "%s/%s/topic" % (consumers_path, consumer) - topics_list = self._rest_request_to_json(burrow_address, topics_path).get("topics", []) - for topic in topics_list: - topic_path = "%s/%s" % (topics_path, topic) - response = self._rest_request_to_json(burrow_address, topic_path) - if not response: - continue + topics_path = "%s/%s" % (consumers_path, consumer) + topics = self._rest_request_to_json(burrow_address, topics_path).get("topics", []) + for topic in topics: tags = ["topic:%s" % topic, "cluster:%s" % cluster, "consumer:%s" % consumer] + extra_tags - self._submit_offsets_from_json(offsets_type="consumer", json=response, tags=tags) + self._submit_offsets_from_json(offsets_type="consumer", offsets_list=topics[topic], tags=tags) - def _submit_offsets_from_json(self, offsets_type, json, tags): + def _submit_offsets_from_json(self, offsets_type, offsets_list, tags): """ Find the offsets and push them into the metrics """ - offsets = json.get("offsets") - if offsets: - # for unconsumed or empty partitions, change an offset of -1 to 0 so the - # sum isn't affected by the number of empty partitions. - offsets = [max(offset, 0) for offset in offsets] - self.gauge("kafka.%s.offsets.total" % offsets_type, sum(offsets), tags=tags) - for partition_number, offset in enumerate(offsets): - new_tags = tags + ["partition:%s" % partition_number] - self.gauge("kafka.%s.offsets" % offsets_type, offset, tags=new_tags) + # for unconsumed or empty partitions, change an offset of -1 to 0 so the + # sum isn't affected by the number of empty partitions. + offsets = [] + for partition_number, offset_info in enumerate(offsets_list): + new_tags = tags + ["partition:%s" % partition_number] + latest_offset = None + offset_timestamp = 0 + for offset in offset_info["offsets"]: + if not offset: + continue + if offset["timestamp"] > offset_timestamp: + latest_offset = offset["offset"] + self.gauge("kafka.%s.offsets" % offsets_type, latest_offset, tags=new_tags) + offsets = [max(offset, 0) for offset in offsets] + self.gauge("kafka.%s.offsets.total" % offsets_type, sum(offsets), tags=tags) def _find_clusters(self, address, target): """