Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add usage tracking metrics for Kafka clients. #658

Merged
merged 11 commits into from
Oct 17, 2022
9 changes: 9 additions & 0 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self, application, enabled=None, source=None):
self._loop_time = 0.0

self._frameworks = set()
self._message_brokers = set()

self._frozen_path = None

Expand Down Expand Up @@ -545,6 +546,10 @@ def __exit__(self, exc, value, tb):
for framework, version in self._frameworks:
self.record_custom_metric("Python/Framework/%s/%s" % (framework, version), 1)

if self._message_brokers:
for message_broker, version in self._message_brokers:
self.record_custom_metric("Python/MessageBroker/%s/%s" % (message_broker, version), 1)

if self._settings.distributed_tracing.enabled:
# Sampled and priority need to be computed at the end of the
# transaction when distributed tracing or span events are enabled.
Expand Down Expand Up @@ -1676,6 +1681,10 @@ def add_framework_info(self, name, version=None):
if name:
self._frameworks.add((name, version))

def add_messagebroker_info(self, name, version=None):
if name:
self._message_brokers.add((name, version))

def dump(self, file):
"""Dumps details about the transaction to the file object."""

Expand Down
23 changes: 23 additions & 0 deletions newrelic/common/package_version_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys


def get_package_version(name):
# importlib was introduced into the standard library starting in Python3.8.
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that previously these paths (ex: sys.modules["importlib"].metadata.version) were cached and only looked up once whereas now they are being looked up on every call. I don't think this is an issue but I'm calling it out here just in case I'm missing something. Because this function is called many times at startup of the application it's important to make sure we don't introduce a performance bottleneck here.

return sys.modules["importlib"].metadata.version(name)
elif "pkg_resources" in sys.modules:
return sys.modules["pkg_resources"].get_distribution(name).version
17 changes: 3 additions & 14 deletions newrelic/core/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sysconfig

import newrelic
from newrelic.common.package_version_utils import get_package_version
from newrelic.common.system_info import (
logical_processor_count,
physical_processor_count,
Expand All @@ -37,18 +38,6 @@

def environment_settings():
"""Returns an array of arrays of environment settings"""

# Find version resolver.

get_version = None
# importlib was introduced into the standard library starting in Python3.8.
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
get_version = sys.modules["importlib"].metadata.version
elif "pkg_resources" in sys.modules:

def get_version(name): # pylint: disable=function-redefined
return sys.modules["pkg_resources"].get_distribution(name).version

env = []

# Agent information.
Expand Down Expand Up @@ -186,7 +175,7 @@ def get_version(name): # pylint: disable=function-redefined
dispatcher.append(("Dispatcher Version", hypercorn.__version__))
else:
try:
dispatcher.append(("Dispatcher Version", get_version("hypercorn")))
dispatcher.append(("Dispatcher Version", get_package_version("hypercorn")))
except Exception:
pass

Expand Down Expand Up @@ -237,7 +226,7 @@ def get_version(name): # pylint: disable=function-redefined
continue

try:
version = get_version(name)
version = get_package_version(name)
plugins.append("%s (%s)" % (name, version))
except Exception:
plugins.append(name)
Expand Down
5 changes: 5 additions & 0 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from newrelic.api.time_trace import notice_error
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper
from newrelic.common.package_version_utils import get_package_version


_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,6 +58,8 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
else:
topic = kwargs.get("topic", None)

transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down Expand Up @@ -161,6 +165,7 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

return record

Expand Down
6 changes: 6 additions & 0 deletions newrelic/hooks/messagebroker_kafkapython.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import sys

import kafka
from kafka.serializer import Serializer

from newrelic.api.application import application_instance
Expand All @@ -26,6 +27,7 @@
function_wrapper,
wrap_function_wrapper,
)
from newrelic.common.package_version_utils import get_package_version

HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
Expand All @@ -48,6 +50,8 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
headers = list(headers) if headers else []

transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))

with MessageTrace(
library="Kafka",
operation="Produce",
Expand Down Expand Up @@ -112,6 +116,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
message_count = 1

transaction = current_transaction(active_only=False)

if not transaction:
transaction = MessageTransaction(
application=application_instance(),
Expand Down Expand Up @@ -143,6 +148,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
name = "Named/%s" % destination_name
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))

return record

Expand Down
2 changes: 2 additions & 0 deletions tests/messagebroker_confluentkafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _test():


def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
from confluent_kafka import __version__ as version
transaction_name = (
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
)
Expand All @@ -72,6 +73,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
custom_metrics=[
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
("Python/MessageBroker/Confluent-Kafka/%s" % version, 1),
],
background_task=True,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def producer_callback(err, msg):


def test_trace_metrics(topic, send_producer_message):
from confluent_kafka import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -73,6 +75,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down
2 changes: 2 additions & 0 deletions tests/messagebroker_kafkapython/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _test():


def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
from kafka.version import __version__ as version
transaction_name = (
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
)
Expand All @@ -69,6 +70,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
custom_metrics=[
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
("Python/MessageBroker/Kafka-Python/%s" % version, 1),
],
background_task=True,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/messagebroker_kafkapython/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@


def test_trace_metrics(topic, send_producer_message):
from kafka.version import __version__ as version

scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
unscoped_metrics = scoped_metrics
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
Expand All @@ -36,6 +38,7 @@ def test_trace_metrics(topic, send_producer_message):
txn_name,
scoped_metrics=scoped_metrics,
rollup_metrics=unscoped_metrics,
custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)],
background_task=True,
)
@background_task()
Expand Down