Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add usage tracking metrics for Kafka clients. #658

Merged
merged 11 commits into from
Oct 17, 2022
9 changes: 9 additions & 0 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self, application, enabled=None, source=None):
self._loop_time = 0.0

self._frameworks = set()
self._message_brokers = set()

self._frozen_path = None

Expand Down Expand Up @@ -541,6 +542,10 @@ def __exit__(self, exc, value, tb):
for framework, version in self._frameworks:
self.record_custom_metric("Python/Framework/%s/%s" % (framework, version), 1)

if self._message_brokers:
for message_broker, version in self._message_brokers:
self.record_custom_metric("Python/MessageBroker/%s/%s" % (message_broker, version), 1)

if self._settings.distributed_tracing.enabled:
# Sampled and priority need to be computed at the end of the
# transaction when distributed tracing or span events are enabled.
Expand Down Expand Up @@ -1666,6 +1671,10 @@ def add_framework_info(self, name, version=None):
if name:
self._frameworks.add((name, version))

def add_messagebroker_info(self, name, version=None):
if name:
self._message_brokers.add((name, version))

def dump(self, file):
"""Dumps details about the transaction to the file object."""

Expand Down
8 changes: 8 additions & 0 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout"


def confluent_kafka_version():
import confluent_kafka

return getattr(confluent_kafka, "__version__", None)


def wrap_Producer_produce(wrapped, instance, args, kwargs):
transaction = current_transaction()
if transaction is None:
Expand All @@ -56,6 +62,8 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
else:
topic = kwargs.get("topic", None)

transaction.add_messagebroker_info("Confluent-Kafka", confluent_kafka_version())

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down
7 changes: 7 additions & 0 deletions newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import sys

import kafka
from kafka.serializer import Serializer

from newrelic.api.application import application_instance
Expand All @@ -39,6 +40,10 @@ def _bind_send(topic, value=None, key=None, headers=None, partition=None, timest
return topic, value, key, headers, partition, timestamp_ms


def kafka_python_version():
return getattr(kafka, "__version__", None)


def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
transaction = current_transaction()

Expand All @@ -48,6 +53,8 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
headers = list(headers) if headers else []

transaction.add_messagebroker_info("Kafka-Python", kafka_python_version())

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def producer_callback(err, msg):


def test_trace_metrics(topic, send_producer_message):
from confluent_kafka import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -73,6 +75,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_kafkapython/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@


def test_trace_metrics(topic, send_producer_message):
from kafka.version import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -36,6 +38,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down