From 5a68ca68659902bf49420db90d968a7592e3250c Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 11 Feb 2025 18:04:55 +0500 Subject: [PATCH 01/14] feat: support batch (getmany) in aiokafka instrumentation --- .../instrumentation/aiokafka/__init__.py | 7 + .../instrumentation/aiokafka/utils.py | 249 ++++++++++++++++-- .../tests/test_instrumentation.py | 158 ++++++++++- .../tests/test_utils.py | 2 +- 4 files changed, 390 insertions(+), 26 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 4b14ace4fb..c7ac2f2633 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -78,6 +78,7 @@ async def async_consume_hook(span, record, args, kwargs): from opentelemetry import trace from opentelemetry.instrumentation.aiokafka.package import _instruments from opentelemetry.instrumentation.aiokafka.utils import ( + _wrap_getmany, _wrap_getone, _wrap_send, ) @@ -131,7 +132,13 @@ def _instrument(self, **kwargs): "getone", _wrap_getone(tracer, async_consume_hook), ) + wrap_function_wrapper( + aiokafka.AIOKafkaConsumer, + "getmany", + _wrap_getmany(tracer, async_consume_hook), + ) def _uninstrument(self, **kwargs): unwrap(aiokafka.AIOKafkaProducer, "send") unwrap(aiokafka.AIOKafkaConsumer, "getone") + unwrap(aiokafka.AIOKafkaConsumer, "getmany") diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index cae0d97717..362b74240f 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -1,9 +1,22 @@ +from __future__ import annotations + +import asyncio import json from logging import getLogger -from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Protocol, + Tuple, + Union, +) import aiokafka -from aiokafka import ConsumerRecord from opentelemetry import context, propagate, trace from opentelemetry.context import Context @@ -13,6 +26,54 @@ from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span +if TYPE_CHECKING: + from aiokafka.structs import RecordMetadata + + class AIOKafkaGetOneProto(Protocol): + async def __call__( + self, *partitions: aiokafka.TopicPartition + ) -> aiokafka.ConsumerRecord[object, object]: ... + + class AIOKafkaGetManyProto(Protocol): + async def __call__( + self, + *partitions: aiokafka.TopicPartition, + timeout_ms: int = 0, + max_records: int | None = None, + ) -> dict[ + aiokafka.TopicPartition, + list[aiokafka.ConsumerRecord[object, object]], + ]: ... + + class AIOKafkaSendProto(Protocol): + async def __call__( + self, + topic: str, + value: Any | None = None, + key: Any | None = None, + partition: int | None = None, + timestamp_ms: int | None = None, + headers: HeadersT | None = None, + ) -> asyncio.Future[RecordMetadata]: ... + + +ProduceHookT = Optional[ + Callable[[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]] +] +ConsumeHookT = Optional[ + Callable[ + [ + Span, + aiokafka.ConsumerRecord[object, object], + Tuple[aiokafka.TopicPartition, ...], + Dict[str, Any], + ], + Awaitable[None], + ] +] + +HeadersT = List[Tuple[str, Optional[bytes]]] + _LOG = getLogger(__name__) @@ -97,14 +158,6 @@ async def _extract_send_partition( return None -ProduceHookT = Optional[Callable[[Span, Tuple, Dict], Awaitable[None]]] -ConsumeHookT = Optional[ - Callable[[Span, ConsumerRecord, Tuple, Dict], Awaitable[None]] -] - -HeadersT = List[Tuple[str, Optional[bytes]]] - - class AIOKafkaContextGetter(textmap.Getter[HeadersT]): def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: if carrier is None: @@ -198,7 +251,7 @@ def _enrich_send_span( ) -def _enrich_anext_span( +def _enrich_getone_span( span: Span, *, bootstrap_servers: Union[str, List[str]], @@ -247,19 +300,93 @@ def _enrich_anext_span( ) +def _enrich_getmany_poll_span( + span: Span, + *, + bootstrap_servers: Union[str, List[str]], + client_id: str, + consumer_group: Optional[str], + message_count: int, +) -> None: + if not span.is_recording(): + return + + span.set_attribute( + messaging_attributes.MESSAGING_SYSTEM, + messaging_attributes.MessagingSystemValues.KAFKA.value, + ) + span.set_attribute( + server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) + ) + span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) + + if consumer_group is not None: + span.set_attribute( + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group + ) + + span.set_attribute( + messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count + ) + + span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll") + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_TYPE, + messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + ) + + +def _enrich_getmany_topic_span( + span: Span, + *, + bootstrap_servers: Union[str, List[str]], + client_id: str, + consumer_group: Optional[str], + topic: str, + partition: int, + message_count: int, +) -> None: + if not span.is_recording(): + return + + _enrich_base_span( + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + topic=topic, + partition=partition, + key=None, + ) + + if consumer_group is not None: + span.set_attribute( + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group + ) + + span.set_attribute( + messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count + ) + + span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll") + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_TYPE, + messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + ) + + def _get_span_name(operation: str, topic: str): return f"{topic} {operation}" def _wrap_send( tracer: Tracer, async_produce_hook: ProduceHookT -) -> Callable[..., Awaitable[None]]: +) -> Callable[..., Awaitable[asyncio.Future[RecordMetadata]]]: async def _traced_send( - func: Callable[..., Awaitable[None]], + func: AIOKafkaSendProto, instance: aiokafka.AIOKafkaProducer, args: Tuple[Any], kwargs: Dict[str, Any], - ) -> None: + ) -> asyncio.Future[RecordMetadata]: headers = _extract_send_headers(args, kwargs) if headers is None: headers = [] @@ -301,14 +428,14 @@ async def _traced_send( async def _create_consumer_span( tracer: Tracer, async_consume_hook: ConsumeHookT, - record: ConsumerRecord, + record: aiokafka.ConsumerRecord[object, object], extracted_context: Context, bootstrap_servers: Union[str, List[str]], client_id: str, consumer_group: Optional[str], args: Tuple[Any], kwargs: Dict[str, Any], -): +) -> trace.Span: span_name = _get_span_name("receive", record.topic) with tracer.start_as_current_span( span_name, @@ -317,7 +444,7 @@ async def _create_consumer_span( ) as span: new_context = trace.set_span_in_context(span, extracted_context) token = context.attach(new_context) - _enrich_anext_span( + _enrich_getone_span( span, bootstrap_servers=bootstrap_servers, client_id=client_id, @@ -334,16 +461,18 @@ async def _create_consumer_span( _LOG.exception(hook_exception) context.detach(token) + return span + def _wrap_getone( tracer: Tracer, async_consume_hook: ConsumeHookT -) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]: - async def _traced_next( - func: Callable[..., Awaitable[aiokafka.ConsumerRecord]], +) -> Callable[..., Awaitable[aiokafka.ConsumerRecord[object, object]]]: + async def _traced_getone( + func: AIOKafkaGetOneProto, instance: aiokafka.AIOKafkaConsumer, args: Tuple[Any], kwargs: Dict[str, Any], - ) -> aiokafka.ConsumerRecord: + ) -> aiokafka.ConsumerRecord[object, object]: record = await func(*args, **kwargs) if record: @@ -367,4 +496,80 @@ async def _traced_next( ) return record - return _traced_next + return _traced_getone + + +def _wrap_getmany( + tracer: Tracer, async_consume_hook: ConsumeHookT +) -> Callable[ + ..., + Awaitable[ + dict[ + aiokafka.TopicPartition, + list[aiokafka.ConsumerRecord[object, object]], + ] + ], +]: + async def _traced_getmany( + func: AIOKafkaGetManyProto, + instance: aiokafka.AIOKafkaConsumer, + args: Tuple[Any], + kwargs: Dict[str, Any], + ) -> dict[ + aiokafka.TopicPartition, list[aiokafka.ConsumerRecord[object, object]] + ]: + records = await func(*args, **kwargs) + + if records: + bootstrap_servers = _extract_bootstrap_servers(instance._client) + client_id = _extract_client_id(instance._client) + consumer_group = _extract_consumer_group(instance) + + span_name = _get_span_name( + "poll", ", ".join([topic.topic for topic in records.keys()]) + ) + with tracer.start_as_current_span( + span_name, kind=trace.SpanKind.CLIENT + ) as poll_span: + _enrich_getmany_poll_span( + poll_span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + consumer_group=consumer_group, + message_count=sum(len(r) for r in records.values()), + ) + + for topic, topic_records in records.items(): + span_name = _get_span_name("poll", topic.topic) + with tracer.start_as_current_span( + span_name, kind=trace.SpanKind.CLIENT + ) as topic_span: + _enrich_getmany_topic_span( + topic_span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + consumer_group=consumer_group, + topic=topic.topic, + partition=topic.partition, + message_count=len(topic_records), + ) + + for record in topic_records: + extracted_context = propagate.extract( + record.headers, getter=_aiokafka_getter + ) + record_span = await _create_consumer_span( + tracer, + async_consume_hook, + record, + extracted_context, + bootstrap_servers, + client_id, + consumer_group, + args, + kwargs, + ) + topic_span.add_link(record_span.get_span_context()) + return records + + return _traced_getmany diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 8211566239..0aa2f30198 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -12,10 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import uuid from typing import Any, List, Sequence, Tuple from unittest import IsolatedAsyncioTestCase, TestCase, mock +import aiokafka from aiokafka import ( AIOKafkaConsumer, AIOKafkaProducer, @@ -44,6 +47,9 @@ def test_instrument_api(self) -> None: self.assertTrue( isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) ) + self.assertTrue( + isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper) + ) instrumentation.uninstrument() self.assertFalse( @@ -52,6 +58,9 @@ def test_instrument_api(self) -> None: self.assertFalse( isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper) ) + self.assertFalse( + isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper) + ) class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase): @@ -73,6 +82,34 @@ def consumer_record_factory( headers=headers, ) + @staticmethod + def consumer_batch_factory( + *headers: Tuple[Tuple[str, bytes], ...], + ) -> dict[aiokafka.TopicPartition, list[aiokafka.ConsumerRecord]]: + records = {} + for number, record_headers in enumerate(headers, start=1): + records[ + aiokafka.TopicPartition( + topic=f"topic_{number}", partition=number + ) + ] = [ + ConsumerRecord( + f"topic_{number}", + number, + number, + number, + number, + f"key_{number}".encode(), + f"value_{number}".encode(), + None, + number, + number, + headers=record_headers, + ) + ] + + return records + @staticmethod async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer: consumer = AIOKafkaConsumer(**consumer_kwargs) @@ -83,6 +120,7 @@ async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer: await consumer.start() consumer._fetcher.next_record = mock.AsyncMock() + consumer._fetcher.fetched_records = mock.AsyncMock() return consumer @@ -233,6 +271,118 @@ async def test_getone_consume_hook(self) -> None: async_consume_hook_mock.assert_awaited_once() + async def test_getmany(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + client_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + consumer = await self.consumer_factory( + client_id=client_id, group_id=group_id + ) + fetched_records_mock: mock.AsyncMock = ( + consumer._fetcher.fetched_records + ) + + expected_spans = [ + { + "name": "topic_1 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1", + }, + }, + { + "name": "topic_1 poll", + "kind": SpanKind.CLIENT, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1, + }, + }, + { + "name": "topic_2 receive", + "kind": SpanKind.CONSUMER, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2, + messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2", + }, + }, + { + "name": "topic_2 poll", + "kind": SpanKind.CLIENT, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1, + }, + }, + { + "name": "topic_1, topic_2 poll", + "kind": SpanKind.CLIENT, + "attributes": { + messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, + server_attributes.SERVER_ADDRESS: '"localhost"', + messaging_attributes.MESSAGING_CLIENT_ID: client_id, + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, + messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 2, + }, + }, + ] + self.memory_exporter.clear() + + fetched_records_mock.side_effect = [ + self.consumer_batch_factory( + ( + ( + "traceparent", + b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01", + ), + ), + (), + ), + ] + + await consumer.getmany() + fetched_records_mock.assert_awaited_with((), 0.0, max_records=None) + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + async def test_send(self) -> None: AIOKafkaInstrumentor().uninstrument() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) @@ -313,8 +463,10 @@ def _compare_spans( ) -> None: self.assertEqual(len(spans), len(expected_spans)) for span, expected_span in zip(spans, expected_spans): - self.assertEqual(expected_span["name"], span.name) - self.assertEqual(expected_span["kind"], span.kind) + self.assertEqual(expected_span["name"], span.name, msg=span.name) + self.assertEqual(expected_span["kind"], span.kind, msg=span.name) self.assertEqual( - expected_span["attributes"], dict(span.attributes) + expected_span["attributes"], + dict(span.attributes), + msg=span.name, ) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 09a8655309..0b20a57f2c 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -228,7 +228,7 @@ async def test_wrap_next( @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.context.attach") @mock.patch( - "opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span" + "opentelemetry.instrumentation.aiokafka.utils._enrich_getone_span" ) @mock.patch("opentelemetry.context.detach") async def test_create_consumer_span( From 29eb8d345863cdb6e3ad03cd797113fd1b4d3c60 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 11:14:42 +0500 Subject: [PATCH 02/14] test: fix unclosed resources and typing --- .../tests/test_instrumentation.py | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 0aa2f30198..cafa683ae0 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -15,7 +15,7 @@ from __future__ import annotations import uuid -from typing import Any, List, Sequence, Tuple +from typing import Any, List, Sequence, Tuple, cast from unittest import IsolatedAsyncioTestCase, TestCase, mock import aiokafka @@ -138,16 +138,22 @@ async def producer_factory() -> AIOKafkaProducer: return producer - async def test_getone(self) -> None: - AIOKafkaInstrumentor().uninstrument() + def setUp(self): + super().setUp() AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + def tearDown(self): + super().tearDown() + AIOKafkaInstrumentor().uninstrument() + + async def test_getone(self) -> None: client_id = str(uuid.uuid4()) group_id = str(uuid.uuid4()) consumer = await self.consumer_factory( client_id=client_id, group_id=group_id ) - next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + self.addAsyncCleanup(consumer.stop) + next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record) expected_spans = [ { @@ -229,7 +235,8 @@ async def async_consume_hook(span, *_) -> None: ) consumer = await self.consumer_factory() - next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + self.addAsyncCleanup(consumer.stop) + next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record) self.memory_exporter.clear() @@ -261,7 +268,8 @@ async def test_getone_consume_hook(self) -> None: ) consumer = await self.consumer_factory() - next_record_mock: mock.AsyncMock = consumer._fetcher.next_record + self.addAsyncCleanup(consumer.stop) + next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record) next_record_mock.side_effect = [ self.consumer_record_factory(1, headers=()) @@ -272,16 +280,14 @@ async def test_getone_consume_hook(self) -> None: async_consume_hook_mock.assert_awaited_once() async def test_getmany(self) -> None: - AIOKafkaInstrumentor().uninstrument() - AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - client_id = str(uuid.uuid4()) group_id = str(uuid.uuid4()) consumer = await self.consumer_factory( client_id=client_id, group_id=group_id ) - fetched_records_mock: mock.AsyncMock = ( - consumer._fetcher.fetched_records + self.addAsyncCleanup(consumer.stop) + fetched_records_mock = cast( + mock.AsyncMock, consumer._fetcher.fetched_records ) expected_spans = [ @@ -384,12 +390,10 @@ async def test_getmany(self) -> None: self._compare_spans(span_list, expected_spans) async def test_send(self) -> None: - AIOKafkaInstrumentor().uninstrument() - AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - producer = await self.producer_factory() - add_message_mock: mock.AsyncMock = ( - producer._message_accumulator.add_message + self.addAsyncCleanup(producer.stop) + add_message_mock = cast( + mock.AsyncMock, producer._message_accumulator.add_message ) tracer = self.tracer_provider.get_tracer(__name__) @@ -419,12 +423,10 @@ async def test_send(self) -> None: ) async def test_send_baggage(self) -> None: - AIOKafkaInstrumentor().uninstrument() - AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) - producer = await self.producer_factory() - add_message_mock: mock.AsyncMock = ( - producer._message_accumulator.add_message + self.addAsyncCleanup(producer.stop) + add_message_mock = cast( + mock.AsyncMock, producer._message_accumulator.add_message ) tracer = self.tracer_provider.get_tracer(__name__) @@ -453,6 +455,7 @@ async def test_send_produce_hook(self) -> None: ) producer = await self.producer_factory() + self.addAsyncCleanup(producer.stop) await producer.send("topic_1", b"value_1") From ae227a18e7cd8cd51035d799754507337f417066 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 11:58:36 +0500 Subject: [PATCH 03/14] test: add test_wrap_getmany --- .../tests/test_utils.py | 96 +++++++++++++++++-- 1 file changed, 89 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 0b20a57f2c..b6d86951e4 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -15,6 +15,8 @@ from unittest import IsolatedAsyncioTestCase, mock +import aiokafka + from opentelemetry.instrumentation.aiokafka.utils import ( AIOKafkaContextGetter, AIOKafkaContextSetter, @@ -23,6 +25,7 @@ _create_consumer_span, _extract_send_partition, _get_span_name, + _wrap_getmany, _wrap_getone, _wrap_send, ) @@ -174,7 +177,7 @@ async def wrap_send_helper( @mock.patch( "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group" ) - async def test_wrap_next( + async def test_wrap_getone( self, extract_consumer_group: mock.MagicMock, extract_client_id: mock.MagicMock, @@ -184,12 +187,12 @@ async def test_wrap_next( ) -> None: tracer = mock.MagicMock() consume_hook = mock.AsyncMock() - original_next_callback = mock.AsyncMock() + original_getone_callback = mock.AsyncMock() kafka_consumer = mock.MagicMock() - wrapped_next = _wrap_getone(tracer, consume_hook) - record = await wrapped_next( - original_next_callback, kafka_consumer, self.args, self.kwargs + wrapped_getone = _wrap_getone(tracer, consume_hook) + record = await wrapped_getone( + original_getone_callback, kafka_consumer, self.args, self.kwargs ) extract_bootstrap_servers.assert_called_once_with( @@ -203,10 +206,10 @@ async def test_wrap_next( extract_consumer_group.assert_called_once_with(kafka_consumer) consumer_group = extract_consumer_group.return_value - original_next_callback.assert_awaited_once_with( + original_getone_callback.assert_awaited_once_with( *self.args, **self.kwargs ) - self.assertEqual(record, original_next_callback.return_value) + self.assertEqual(record, original_getone_callback.return_value) extract.assert_called_once_with( record.headers, getter=_aiokafka_getter @@ -225,6 +228,85 @@ async def test_wrap_next( self.kwargs, ) + @mock.patch("opentelemetry.propagate.extract") + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._create_consumer_span" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_topic_span" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_poll_span" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._extract_client_id" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group" + ) + async def test_wrap_getmany( + self, + extract_consumer_group: mock.MagicMock, + extract_client_id: mock.MagicMock, + extract_bootstrap_servers: mock.MagicMock, + enrich_getmany_poll_span: mock.MagicMock, + enrich_getmany_topic_span: mock.MagicMock, + _create_consumer_span: mock.MagicMock, + extract: mock.MagicMock, + ) -> None: + tracer = mock.MagicMock() + consume_hook = mock.AsyncMock() + record_mock = mock.MagicMock() + original_getmany_callback = mock.AsyncMock( + return_value={ + aiokafka.TopicPartition(topic="topic_1", partition=0): [ + record_mock + ] + } + ) + kafka_consumer = mock.MagicMock() + + wrapped_getmany = _wrap_getmany(tracer, consume_hook) + records = await wrapped_getmany( + original_getmany_callback, kafka_consumer, self.args, self.kwargs + ) + + extract_bootstrap_servers.assert_called_once_with( + kafka_consumer._client + ) + bootstrap_servers = extract_bootstrap_servers.return_value + + extract_client_id.assert_called_once_with(kafka_consumer._client) + client_id = extract_client_id.return_value + + extract_consumer_group.assert_called_once_with(kafka_consumer) + consumer_group = extract_consumer_group.return_value + + original_getmany_callback.assert_awaited_once_with( + *self.args, **self.kwargs + ) + self.assertEqual(records, original_getmany_callback.return_value) + + extract.assert_called_once_with( + record_mock.headers, getter=_aiokafka_getter + ) + context = extract.return_value + + _create_consumer_span.assert_called_once_with( + tracer, + consume_hook, + record_mock, + context, + bootstrap_servers, + client_id, + consumer_group, + self.args, + self.kwargs, + ) + @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.context.attach") @mock.patch( From 745d7ad396d529582c205949182d630c02725e5f Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 11:59:01 +0500 Subject: [PATCH 04/14] fix: get unique topic list in batch --- .../src/opentelemetry/instrumentation/aiokafka/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 362b74240f..8bac23eb82 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -526,7 +526,10 @@ async def _traced_getmany( consumer_group = _extract_consumer_group(instance) span_name = _get_span_name( - "poll", ", ".join([topic.topic for topic in records.keys()]) + "poll", + ", ".join( + sorted(set(topic.topic for topic in records.keys())) + ), ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT From 774d741a806ca289db1d3771c562d516e7c62cca Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 12:15:25 +0500 Subject: [PATCH 05/14] fix: update typing, run pyupgrade --- .../instrumentation/aiokafka/utils.py | 107 ++++++++++-------- .../tests/test_instrumentation.py | 8 +- .../tests/test_utils.py | 3 +- 3 files changed, 63 insertions(+), 55 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 8bac23eb82..e8e7db35d1 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -9,11 +9,11 @@ Awaitable, Callable, Dict, - List, + MutableSequence, Optional, Protocol, + Sequence, Tuple, - Union, ) import aiokafka @@ -49,8 +49,8 @@ class AIOKafkaSendProto(Protocol): async def __call__( self, topic: str, - value: Any | None = None, - key: Any | None = None, + value: object | None = None, + key: object | None = None, partition: int | None = None, timestamp_ms: int | None = None, headers: HeadersT | None = None, @@ -72,14 +72,14 @@ async def __call__( ] ] -HeadersT = List[Tuple[str, Optional[bytes]]] +HeadersT = Sequence[Tuple[str, Optional[bytes]]] _LOG = getLogger(__name__) def _extract_bootstrap_servers( client: aiokafka.AIOKafkaClient, -) -> Union[str, List[str]]: +) -> str | list[str]: return client._bootstrap_servers @@ -89,7 +89,7 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str: def _extract_consumer_group( consumer: aiokafka.AIOKafkaConsumer, -) -> Optional[str]: +) -> str | None: return consumer._group_id @@ -97,43 +97,45 @@ def _extract_argument( key: str, position: int, default_value: Any, - args: Tuple[Any], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], ) -> Any: if len(args) > position: return args[position] return kwargs.get(key, default_value) -def _extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str: +def _extract_send_topic(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str: """extract topic from `send` method arguments in AIOKafkaProducer class""" return _extract_argument("topic", 0, "unknown", args, kwargs) def _extract_send_value( - args: Tuple[Any], kwargs: Dict[str, Any] -) -> Optional[Any]: + args: tuple[Any, ...], kwargs: dict[str, Any] +) -> object | None: """extract value from `send` method arguments in AIOKafkaProducer class""" return _extract_argument("value", 1, None, args, kwargs) def _extract_send_key( - args: Tuple[Any], kwargs: Dict[str, Any] -) -> Optional[Any]: + args: tuple[Any, ...], kwargs: dict[str, Any] +) -> object | None: """extract key from `send` method arguments in AIOKafkaProducer class""" return _extract_argument("key", 2, None, args, kwargs) -def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]): +def _extract_send_headers( + args: tuple[Any, ...], kwargs: dict[str, Any] +) -> HeadersT | None: """extract headers from `send` method arguments in AIOKafkaProducer class""" return _extract_argument("headers", 5, None, args, kwargs) async def _extract_send_partition( instance: aiokafka.AIOKafkaProducer, - args: Tuple[Any], - kwargs: Dict[str, Any], -) -> Optional[int]: + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> int | None: """extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class""" try: topic = _extract_send_topic(args, kwargs) @@ -159,7 +161,7 @@ async def _extract_send_partition( class AIOKafkaContextGetter(textmap.Getter[HeadersT]): - def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: + def get(self, carrier: HeadersT, key: str) -> list[str] | None: if carrier is None: return None @@ -169,7 +171,7 @@ def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: return [value.decode()] return None - def keys(self, carrier: HeadersT) -> List[str]: + def keys(self, carrier: HeadersT) -> list[str]: if carrier is None: return [] return [key for (key, value) in carrier] @@ -177,11 +179,17 @@ def keys(self, carrier: HeadersT) -> List[str]: class AIOKafkaContextSetter(textmap.Setter[HeadersT]): def set( - self, carrier: HeadersT, key: Optional[str], value: Optional[str] + self, carrier: HeadersT, key: str | None, value: str | None ) -> None: if carrier is None or key is None: return + if not isinstance(carrier, MutableSequence): + _LOG.warning( + "Unable to set context in headers. Headers is immutable" + ) + return + if value is not None: carrier.append((key, value.encode())) else: @@ -195,11 +203,11 @@ def set( def _enrich_base_span( span: Span, *, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, topic: str, - partition: Optional[int], - key: Optional[Any], + partition: int | None, + key: object | None, ) -> None: span.set_attribute( messaging_attributes.MESSAGING_SYSTEM, @@ -219,18 +227,19 @@ def _enrich_base_span( if key is not None: span.set_attribute( - messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, + key, # FIXME: serialize key to str? ) def _enrich_send_span( span: Span, *, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, topic: str, - partition: Optional[int], - key: Optional[str], + partition: int | None, + key: object | None, ) -> None: if not span.is_recording(): return @@ -254,12 +263,12 @@ def _enrich_send_span( def _enrich_getone_span( span: Span, *, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, - consumer_group: Optional[str], + consumer_group: str | None, topic: str, - partition: Optional[int], - key: Optional[str], + partition: int | None, + key: object | None, offset: int, ) -> None: if not span.is_recording(): @@ -303,9 +312,9 @@ def _enrich_getone_span( def _enrich_getmany_poll_span( span: Span, *, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, - consumer_group: Optional[str], + consumer_group: str | None, message_count: int, ) -> None: if not span.is_recording(): @@ -339,9 +348,9 @@ def _enrich_getmany_poll_span( def _enrich_getmany_topic_span( span: Span, *, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, - consumer_group: Optional[str], + consumer_group: str | None, topic: str, partition: int, message_count: int, @@ -384,10 +393,10 @@ def _wrap_send( async def _traced_send( func: AIOKafkaSendProto, instance: aiokafka.AIOKafkaProducer, - args: Tuple[Any], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], ) -> asyncio.Future[RecordMetadata]: - headers = _extract_send_headers(args, kwargs) + headers: HeadersT | None = _extract_send_headers(args, kwargs) if headers is None: headers = [] kwargs["headers"] = headers @@ -430,11 +439,11 @@ async def _create_consumer_span( async_consume_hook: ConsumeHookT, record: aiokafka.ConsumerRecord[object, object], extracted_context: Context, - bootstrap_servers: Union[str, List[str]], + bootstrap_servers: str | list[str], client_id: str, - consumer_group: Optional[str], - args: Tuple[Any], - kwargs: Dict[str, Any], + consumer_group: str | None, + args: tuple[aiokafka.TopicPartition, ...], + kwargs: dict[str, Any], ) -> trace.Span: span_name = _get_span_name("receive", record.topic) with tracer.start_as_current_span( @@ -470,8 +479,8 @@ def _wrap_getone( async def _traced_getone( func: AIOKafkaGetOneProto, instance: aiokafka.AIOKafkaConsumer, - args: Tuple[Any], - kwargs: Dict[str, Any], + args: tuple[aiokafka.TopicPartition, ...], + kwargs: dict[str, Any], ) -> aiokafka.ConsumerRecord[object, object]: record = await func(*args, **kwargs) @@ -513,8 +522,8 @@ def _wrap_getmany( async def _traced_getmany( func: AIOKafkaGetManyProto, instance: aiokafka.AIOKafkaConsumer, - args: Tuple[Any], - kwargs: Dict[str, Any], + args: tuple[aiokafka.TopicPartition, ...], + kwargs: dict[str, Any], ) -> dict[ aiokafka.TopicPartition, list[aiokafka.ConsumerRecord[object, object]] ]: @@ -527,9 +536,7 @@ async def _traced_getmany( span_name = _get_span_name( "poll", - ", ".join( - sorted(set(topic.topic for topic in records.keys())) - ), + ", ".join(sorted({topic.topic for topic in records.keys()})), ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index cafa683ae0..2d16afbdbc 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -15,7 +15,7 @@ from __future__ import annotations import uuid -from typing import Any, List, Sequence, Tuple, cast +from typing import Any, Sequence, cast from unittest import IsolatedAsyncioTestCase, TestCase, mock import aiokafka @@ -66,7 +66,7 @@ def test_instrument_api(self) -> None: class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase): @staticmethod def consumer_record_factory( - number: int, headers: Tuple[Tuple[str, bytes], ...] + number: int, headers: tuple[tuple[str, bytes], ...] ) -> ConsumerRecord: return ConsumerRecord( f"topic_{number}", @@ -84,7 +84,7 @@ def consumer_record_factory( @staticmethod def consumer_batch_factory( - *headers: Tuple[Tuple[str, bytes], ...], + *headers: tuple[tuple[str, bytes], ...], ) -> dict[aiokafka.TopicPartition, list[aiokafka.ConsumerRecord]]: records = {} for number, record_headers in enumerate(headers, start=1): @@ -462,7 +462,7 @@ async def test_send_produce_hook(self) -> None: async_produce_hook_mock.assert_awaited_once() def _compare_spans( - self, spans: Sequence[ReadableSpan], expected_spans: List[dict] + self, spans: Sequence[ReadableSpan], expected_spans: list[dict] ) -> None: self.assertEqual(len(spans), len(expected_spans)) for span, expected_span in zip(spans, expected_spans): diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index b6d86951e4..4b43a5e8f3 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=unnecessary-dunder-call +from __future__ import annotations from unittest import IsolatedAsyncioTestCase, mock @@ -45,7 +46,7 @@ def test_context_setter(self) -> None: carrier_list = [("key1", b"val1")] context_setter.set(carrier_list, "key2", "val2") - self.assertTrue(("key2", "val2".encode()) in carrier_list) + self.assertTrue(("key2", b"val2") in carrier_list) def test_context_getter(self) -> None: context_setter = AIOKafkaContextSetter() From 5f75b2705780c20c259228311a24a54dbd383d2e Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 12:35:53 +0500 Subject: [PATCH 06/14] fix: remove json.dumps from SERVER_ADDRESS attribute --- .../instrumentation/aiokafka/utils.py | 9 ++------- .../tests/test_instrumentation.py | 14 +++++++------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index e8e7db35d1..0fddffed5d 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json from logging import getLogger from typing import ( TYPE_CHECKING, @@ -213,9 +212,7 @@ def _enrich_base_span( messaging_attributes.MESSAGING_SYSTEM, messaging_attributes.MessagingSystemValues.KAFKA.value, ) - span.set_attribute( - server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) - ) + span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers) span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic) @@ -324,9 +321,7 @@ def _enrich_getmany_poll_span( messaging_attributes.MESSAGING_SYSTEM, messaging_attributes.MessagingSystemValues.KAFKA.value, ) - span.set_attribute( - server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) - ) + span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers) span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) if consumer_group is not None: diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 2d16afbdbc..d839490334 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -161,7 +161,7 @@ async def test_getone(self) -> None: "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", @@ -178,7 +178,7 @@ async def test_getone(self) -> None: "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", @@ -296,7 +296,7 @@ async def test_getmany(self) -> None: "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", @@ -313,7 +313,7 @@ async def test_getmany(self) -> None: "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", @@ -328,7 +328,7 @@ async def test_getmany(self) -> None: "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", @@ -345,7 +345,7 @@ async def test_getmany(self) -> None: "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", @@ -360,7 +360,7 @@ async def test_getmany(self) -> None: "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: '"localhost"', + server_attributes.SERVER_ADDRESS: "localhost", messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, messaging_attributes.MESSAGING_OPERATION_NAME: "poll", From 8f684120dd8b11a017a7d862eb2d718fd43bf649 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 12:36:15 +0500 Subject: [PATCH 07/14] fix pylint --- .../tests/test_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 4b43a5e8f3..91f53d967c 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -248,13 +248,14 @@ async def test_wrap_getone( @mock.patch( "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group" ) + # pylint: disable=too-many-locals async def test_wrap_getmany( self, extract_consumer_group: mock.MagicMock, extract_client_id: mock.MagicMock, extract_bootstrap_servers: mock.MagicMock, - enrich_getmany_poll_span: mock.MagicMock, - enrich_getmany_topic_span: mock.MagicMock, + _enrich_getmany_poll_span: mock.MagicMock, + _enrich_getmany_topic_span: mock.MagicMock, _create_consumer_span: mock.MagicMock, extract: mock.MagicMock, ) -> None: From bf40659e9c59acd1a51e90f375ba5caf1a5124ac Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 12:48:32 +0500 Subject: [PATCH 08/14] fix: sync span_kind with spec --- .../src/opentelemetry/instrumentation/aiokafka/utils.py | 4 ++-- .../tests/test_instrumentation.py | 8 ++++---- .../tests/test_utils.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 0fddffed5d..27fa978df8 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -253,7 +253,7 @@ def _enrich_send_span( span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send") span.set_attribute( messaging_attributes.MESSAGING_OPERATION_TYPE, - messaging_attributes.MessagingOperationTypeValues.PUBLISH.value, + messaging_attributes.MessagingOperationTypeValues.SEND.value, ) @@ -444,7 +444,7 @@ async def _create_consumer_span( with tracer.start_as_current_span( span_name, context=extracted_context, - kind=trace.SpanKind.CONSUMER, + kind=trace.SpanKind.CLIENT, ) as span: new_context = trace.set_span_in_context(span, extracted_context) token = context.attach(new_context) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index d839490334..7b46b53326 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -158,7 +158,7 @@ async def test_getone(self) -> None: expected_spans = [ { "name": "topic_1 receive", - "kind": SpanKind.CONSUMER, + "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, server_attributes.SERVER_ADDRESS: "localhost", @@ -175,7 +175,7 @@ async def test_getone(self) -> None: }, { "name": "topic_2 receive", - "kind": SpanKind.CONSUMER, + "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, server_attributes.SERVER_ADDRESS: "localhost", @@ -293,7 +293,7 @@ async def test_getmany(self) -> None: expected_spans = [ { "name": "topic_1 receive", - "kind": SpanKind.CONSUMER, + "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, server_attributes.SERVER_ADDRESS: "localhost", @@ -325,7 +325,7 @@ async def test_getmany(self) -> None: }, { "name": "topic_2 receive", - "kind": SpanKind.CONSUMER, + "kind": SpanKind.CLIENT, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, server_attributes.SERVER_ADDRESS: "localhost", diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 91f53d967c..637edc5f11 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -347,7 +347,7 @@ async def test_create_consumer_span( tracer.start_as_current_span.assert_called_once_with( expected_span_name, context=extracted_context, - kind=SpanKind.CONSUMER, + kind=SpanKind.CLIENT, ) span = tracer.start_as_current_span.return_value.__enter__() set_span_in_context.assert_called_once_with(span, extracted_context) From 54fdd95416ea93ddc38cfe77616678fca38eb6fe Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 12 Feb 2025 12:55:16 +0500 Subject: [PATCH 09/14] add CHANGELOG entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d0ce4370a..dbe76605da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added +- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch) + ([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257)) + ### Fixed - `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument` ([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247)) From 76c8faa6be7c1ba18ab2f2c51906f8bc0ee62008 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sun, 30 Mar 2025 14:06:57 +0500 Subject: [PATCH 10/14] remove changes not from this issue --- .../instrumentation/aiokafka/utils.py | 29 +++++++++----- .../tests/test_instrumentation.py | 40 +++++++++---------- .../tests/test_utils.py | 2 +- 3 files changed, 40 insertions(+), 31 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 27fa978df8..40366d7bf2 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json from logging import getLogger from typing import ( TYPE_CHECKING, @@ -212,7 +213,9 @@ def _enrich_base_span( messaging_attributes.MESSAGING_SYSTEM, messaging_attributes.MessagingSystemValues.KAFKA.value, ) - span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers) + span.set_attribute( + server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) + ) span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic) @@ -253,7 +256,7 @@ def _enrich_send_span( span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send") span.set_attribute( messaging_attributes.MESSAGING_OPERATION_TYPE, - messaging_attributes.MessagingOperationTypeValues.SEND.value, + messaging_attributes.MessagingOperationTypeValues.PUBLISH.value, ) @@ -321,7 +324,9 @@ def _enrich_getmany_poll_span( messaging_attributes.MESSAGING_SYSTEM, messaging_attributes.MessagingSystemValues.KAFKA.value, ) - span.set_attribute(server_attributes.SERVER_ADDRESS, bootstrap_servers) + span.set_attribute( + server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) + ) span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) if consumer_group is not None: @@ -333,7 +338,9 @@ def _enrich_getmany_poll_span( messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count ) - span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll") + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_NAME, "receive" + ) span.set_attribute( messaging_attributes.MESSAGING_OPERATION_TYPE, messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, @@ -371,7 +378,9 @@ def _enrich_getmany_topic_span( messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count ) - span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "poll") + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_NAME, "receive" + ) span.set_attribute( messaging_attributes.MESSAGING_OPERATION_TYPE, messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, @@ -444,7 +453,7 @@ async def _create_consumer_span( with tracer.start_as_current_span( span_name, context=extracted_context, - kind=trace.SpanKind.CLIENT, + kind=trace.SpanKind.CONSUMER, ) as span: new_context = trace.set_span_in_context(span, extracted_context) token = context.attach(new_context) @@ -530,11 +539,11 @@ async def _traced_getmany( consumer_group = _extract_consumer_group(instance) span_name = _get_span_name( - "poll", + "receive", ", ".join(sorted({topic.topic for topic in records.keys()})), ) with tracer.start_as_current_span( - span_name, kind=trace.SpanKind.CLIENT + span_name, kind=trace.SpanKind.CONSUMER ) as poll_span: _enrich_getmany_poll_span( poll_span, @@ -545,9 +554,9 @@ async def _traced_getmany( ) for topic, topic_records in records.items(): - span_name = _get_span_name("poll", topic.topic) + span_name = _get_span_name("receive", topic.topic) with tracer.start_as_current_span( - span_name, kind=trace.SpanKind.CLIENT + span_name, kind=trace.SpanKind.CONSUMER ) as topic_span: _enrich_getmany_topic_span( topic_span, diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 7b46b53326..8be31aa26f 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -158,10 +158,10 @@ async def test_getone(self) -> None: expected_spans = [ { "name": "topic_1 receive", - "kind": SpanKind.CLIENT, + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", @@ -175,10 +175,10 @@ async def test_getone(self) -> None: }, { "name": "topic_2 receive", - "kind": SpanKind.CLIENT, + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", @@ -293,10 +293,10 @@ async def test_getmany(self) -> None: expected_spans = [ { "name": "topic_1 receive", - "kind": SpanKind.CLIENT, + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", @@ -309,26 +309,26 @@ async def test_getmany(self) -> None: }, }, { - "name": "topic_1 poll", - "kind": SpanKind.CLIENT, + "name": "topic_1 receive", + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1", messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, - messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1, }, }, { "name": "topic_2 receive", - "kind": SpanKind.CLIENT, + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", @@ -341,29 +341,29 @@ async def test_getmany(self) -> None: }, }, { - "name": "topic_2 poll", - "kind": SpanKind.CLIENT, + "name": "topic_2 receive", + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2", messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2", messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, - messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1, }, }, { - "name": "topic_1, topic_2 poll", - "kind": SpanKind.CLIENT, + "name": "topic_1, topic_2 receive", + "kind": SpanKind.CONSUMER, "attributes": { messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value, - server_attributes.SERVER_ADDRESS: "localhost", + server_attributes.SERVER_ADDRESS: '"localhost"', messaging_attributes.MESSAGING_CLIENT_ID: client_id, messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id, - messaging_attributes.MESSAGING_OPERATION_NAME: "poll", + messaging_attributes.MESSAGING_OPERATION_NAME: "receive", messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 2, }, diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 637edc5f11..91f53d967c 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -347,7 +347,7 @@ async def test_create_consumer_span( tracer.start_as_current_span.assert_called_once_with( expected_span_name, context=extracted_context, - kind=SpanKind.CLIENT, + kind=SpanKind.CONSUMER, ) span = tracer.start_as_current_span.return_value.__enter__() set_span_in_context.assert_called_once_with(span, extracted_context) From 4d6107f2f221a9e5e67bc82e87ac87bbcf3a8cf2 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Thu, 10 Apr 2025 10:56:55 +0500 Subject: [PATCH 11/14] move types under TYPE_CHECKING --- .../instrumentation/aiokafka/utils.py | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 40366d7bf2..a6f8be5f76 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -56,23 +56,22 @@ async def __call__( headers: HeadersT | None = None, ) -> asyncio.Future[RecordMetadata]: ... - -ProduceHookT = Optional[ - Callable[[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]] -] -ConsumeHookT = Optional[ - Callable[ - [ - Span, - aiokafka.ConsumerRecord[object, object], - Tuple[aiokafka.TopicPartition, ...], - Dict[str, Any], - ], - Awaitable[None], + ProduceHookT = Optional[ + Callable[[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]] + ] + ConsumeHookT = Optional[ + Callable[ + [ + Span, + aiokafka.ConsumerRecord[object, object], + Tuple[aiokafka.TopicPartition, ...], + Dict[str, Any], + ], + Awaitable[None], + ] ] -] -HeadersT = Sequence[Tuple[str, Optional[bytes]]] + HeadersT = Sequence[Tuple[str, Optional[bytes]]] _LOG = getLogger(__name__) @@ -160,7 +159,7 @@ async def _extract_send_partition( return None -class AIOKafkaContextGetter(textmap.Getter[HeadersT]): +class AIOKafkaContextGetter(textmap.Getter["HeadersT"]): def get(self, carrier: HeadersT, key: str) -> list[str] | None: if carrier is None: return None @@ -177,7 +176,7 @@ def keys(self, carrier: HeadersT) -> list[str]: return [key for (key, value) in carrier] -class AIOKafkaContextSetter(textmap.Setter[HeadersT]): +class AIOKafkaContextSetter(textmap.Setter["HeadersT"]): def set( self, carrier: HeadersT, key: str | None, value: str | None ) -> None: From c7704ef303ce39fda4e517794e939d8dbe17fbac Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 11 Apr 2025 10:25:35 +0500 Subject: [PATCH 12/14] move CHANGELOG entry to unreleased --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f2fcd4135..3bdb4918c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3385](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3385)) - `opentelemetry-instrumentation` Make auto instrumentation use the same dependency resolver as manual instrumentation does ([#3202](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3202)) +- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch) + ([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257)) ### Fixed @@ -35,8 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Version 1.31.0/0.52b0 (2025-03-12) ### Added -- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch) - ([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257)) + - `opentelemetry-instrumentation-openai-v2` Update doc for OpenAI Instrumentation to support OpenAI Compatible Platforms ([#3279](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3279)) - `opentelemetry-instrumentation-system-metrics` Add `process` metrics and deprecated `process.runtime` prefixed ones From 04674118230090aeb21ab6c1bcbdc0be3d1f6927 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 15 Apr 2025 12:07:37 +0500 Subject: [PATCH 13/14] enable pyright for aiokafka, fix key type --- .../pyproject.toml | 1 + .../instrumentation/aiokafka/__init__.py | 27 +++++- .../instrumentation/aiokafka/py.typed | 0 .../instrumentation/aiokafka/utils.py | 83 ++++++++++--------- .../tests/test_utils.py | 2 +- pyproject.toml | 2 + tox.ini | 1 + uv.lock | 2 + 8 files changed, 76 insertions(+), 42 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/py.typed diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/pyproject.toml b/instrumentation/opentelemetry-instrumentation-aiokafka/pyproject.toml index b26af549cd..2e4a3408ca 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "opentelemetry-api ~= 1.27", "opentelemetry-instrumentation == 0.54b0.dev", "opentelemetry-semantic-conventions == 0.54b0.dev", + "typing_extensions ~= 4.1", ] [project.optional-dependencies] diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index c7ac2f2633..770eef62b3 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -69,11 +69,15 @@ async def async_consume_hook(span, record, args, kwargs): ___ """ +from __future__ import annotations + from asyncio import iscoroutinefunction -from typing import Collection +from typing import TYPE_CHECKING, Collection import aiokafka -from wrapt import wrap_function_wrapper +from wrapt import ( + wrap_function_wrapper, # type: ignore[reportUnknownVariableType] +) from opentelemetry import trace from opentelemetry.instrumentation.aiokafka.package import _instruments @@ -87,6 +91,21 @@ async def async_consume_hook(span, record, args, kwargs): from opentelemetry.instrumentation.utils import unwrap from opentelemetry.semconv.schemas import Schemas +if TYPE_CHECKING: + from typing import TypedDict + + from typing_extensions import Unpack + + from .utils import ConsumeHookT, ProduceHookT + + class InstrumentKwargs(TypedDict, total=False): + tracer_provider: trace.TracerProvider + async_produce_hook: ProduceHookT + async_consume_hook: ConsumeHookT + + class UninstrumentKwargs(TypedDict, total=False): + pass + class AIOKafkaInstrumentor(BaseInstrumentor): """An instrumentor for kafka module @@ -96,7 +115,7 @@ class AIOKafkaInstrumentor(BaseInstrumentor): def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _instrument(self, **kwargs): + def _instrument(self, **kwargs: Unpack[InstrumentKwargs]): """Instruments the kafka module Args: @@ -138,7 +157,7 @@ def _instrument(self, **kwargs): _wrap_getmany(tracer, async_consume_hook), ) - def _uninstrument(self, **kwargs): + def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]): unwrap(aiokafka.AIOKafkaProducer, "send") unwrap(aiokafka.AIOKafkaConsumer, "getone") unwrap(aiokafka.AIOKafkaConsumer, "getmany") diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/py.typed b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 3195597447..6d8ef6d222 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib import json from logging import getLogger from typing import ( @@ -14,6 +15,7 @@ Protocol, Sequence, Tuple, + cast, ) import aiokafka @@ -56,19 +58,18 @@ async def __call__( headers: HeadersT | None = None, ) -> asyncio.Future[RecordMetadata]: ... - ProduceHookT = Optional[ - Callable[[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]] + ProduceHookT = Callable[ + [Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None] ] - ConsumeHookT = Optional[ - Callable[ - [ - Span, - aiokafka.ConsumerRecord[object, object], - Tuple[aiokafka.TopicPartition, ...], - Dict[str, Any], - ], - Awaitable[None], - ] + + ConsumeHookT = Callable[ + [ + Span, + aiokafka.ConsumerRecord[object, object], + Tuple[aiokafka.TopicPartition, ...], + Dict[str, Any], + ], + Awaitable[None], ] HeadersT = Sequence[Tuple[str, Optional[bytes]]] @@ -89,7 +90,7 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str: def _extract_consumer_group( consumer: aiokafka.AIOKafkaConsumer, ) -> str | None: - return consumer._group_id + return consumer._group_id # type: ignore[reportUnknownVariableType] def _extract_argument( @@ -139,6 +140,17 @@ def _move_headers_to_kwargs( return args[:5], kwargs +def _deserialize_key(key: object | None) -> str | None: + if key is None: + return None + + if isinstance(key, bytes): + with contextlib.suppress(UnicodeDecodeError): + return key.decode() + + return str(key) + + async def _extract_send_partition( instance: aiokafka.AIOKafkaProducer, args: tuple[Any, ...], @@ -150,7 +162,10 @@ async def _extract_send_partition( key = _extract_send_key(args, kwargs) value = _extract_send_value(args, kwargs) partition = _extract_argument("partition", 3, None, args, kwargs) - key_bytes, value_bytes = instance._serialize(topic, key, value) + key_bytes, value_bytes = cast( + "tuple[bytes | None, bytes | None]", + instance._serialize(topic, key, value), # type: ignore[reportUnknownMemberType] + ) valid_types = (bytes, bytearray, memoryview, type(None)) if ( type(key_bytes) not in valid_types @@ -158,9 +173,9 @@ async def _extract_send_partition( ): return None - await instance.client._wait_on_metadata(topic) + await instance.client._wait_on_metadata(topic) # type: ignore[reportUnknownMemberType] - return instance._partition( + return instance._partition( # type: ignore[reportUnknownMemberType] topic, partition, key, value, key_bytes, value_bytes ) except Exception as exception: # pylint: disable=W0703 @@ -170,9 +185,6 @@ async def _extract_send_partition( class AIOKafkaContextGetter(textmap.Getter["HeadersT"]): def get(self, carrier: HeadersT, key: str) -> list[str] | None: - if carrier is None: - return None - for item_key, value in carrier: if item_key == key: if value is not None: @@ -180,16 +192,14 @@ def get(self, carrier: HeadersT, key: str) -> list[str] | None: return None def keys(self, carrier: HeadersT) -> list[str]: - if carrier is None: - return [] - return [key for (key, value) in carrier] + return [key for (key, _) in carrier] class AIOKafkaContextSetter(textmap.Setter["HeadersT"]): def set( self, carrier: HeadersT, key: str | None, value: str | None ) -> None: - if carrier is None or key is None: + if key is None: return if not isinstance(carrier, MutableSequence): @@ -215,7 +225,7 @@ def _enrich_base_span( client_id: str, topic: str, partition: int | None, - key: object | None, + key: str | None, ) -> None: span.set_attribute( messaging_attributes.MESSAGING_SYSTEM, @@ -235,8 +245,7 @@ def _enrich_base_span( if key is not None: span.set_attribute( - messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, - key, # FIXME: serialize key to str? + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key ) @@ -247,7 +256,7 @@ def _enrich_send_span( client_id: str, topic: str, partition: int | None, - key: object | None, + key: str | None, ) -> None: if not span.is_recording(): return @@ -276,7 +285,7 @@ def _enrich_getone_span( consumer_group: str | None, topic: str, partition: int | None, - key: object | None, + key: str | None, offset: int, ) -> None: if not span.is_recording(): @@ -399,8 +408,8 @@ def _get_span_name(operation: str, topic: str): return f"{topic} {operation}" -def _wrap_send( - tracer: Tracer, async_produce_hook: ProduceHookT +def _wrap_send( # type: ignore[reportUnusedFunction] + tracer: Tracer, async_produce_hook: ProduceHookT | None ) -> Callable[..., Awaitable[asyncio.Future[RecordMetadata]]]: async def _traced_send( func: AIOKafkaSendProto, @@ -417,7 +426,7 @@ async def _traced_send( topic = _extract_send_topic(args, kwargs) bootstrap_servers = _extract_bootstrap_servers(instance.client) client_id = _extract_client_id(instance.client) - key = _extract_send_key(args, kwargs) + key = _deserialize_key(_extract_send_key(args, kwargs)) partition = await _extract_send_partition(instance, args, kwargs) span_name = _get_span_name("send", topic) with tracer.start_as_current_span( @@ -449,7 +458,7 @@ async def _traced_send( async def _create_consumer_span( tracer: Tracer, - async_consume_hook: ConsumeHookT, + async_consume_hook: ConsumeHookT | None, record: aiokafka.ConsumerRecord[object, object], extracted_context: Context, bootstrap_servers: str | list[str], @@ -473,7 +482,7 @@ async def _create_consumer_span( consumer_group=consumer_group, topic=record.topic, partition=record.partition, - key=record.key, + key=_deserialize_key(record.key), offset=record.offset, ) try: @@ -486,8 +495,8 @@ async def _create_consumer_span( return span -def _wrap_getone( - tracer: Tracer, async_consume_hook: ConsumeHookT +def _wrap_getone( # type: ignore[reportUnusedFunction] + tracer: Tracer, async_consume_hook: ConsumeHookT | None ) -> Callable[..., Awaitable[aiokafka.ConsumerRecord[object, object]]]: async def _traced_getone( func: AIOKafkaGetOneProto, @@ -521,8 +530,8 @@ async def _traced_getone( return _traced_getone -def _wrap_getmany( - tracer: Tracer, async_consume_hook: ConsumeHookT +def _wrap_getmany( # type: ignore[reportUnusedFunction] + tracer: Tracer, async_consume_hook: ConsumeHookT | None ) -> Callable[ ..., Awaitable[ diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index 91f53d967c..a7dcbe6dcf 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -360,7 +360,7 @@ async def test_create_consumer_span( consumer_group=consumer_group, topic=record.topic, partition=record.partition, - key=record.key, + key=str(record.key), offset=record.offset, ) consume_hook.assert_awaited_once_with( diff --git a/pyproject.toml b/pyproject.toml index f2e9cdf98d..05820c40a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -196,6 +196,7 @@ pythonVersion = "3.8" reportPrivateUsage = false # Ignore private attributes added by instrumentation packages. # Add progressively instrumentation packages here. include = [ + "instrumentation/opentelemetry-instrumentation-aiokafka", "instrumentation/opentelemetry-instrumentation-asyncclick", "instrumentation/opentelemetry-instrumentation-threading", "instrumentation-genai/opentelemetry-instrumentation-vertexai", @@ -203,6 +204,7 @@ include = [ # We should also add type hints to the test suite - It helps on finding bugs. # We are excluding for now because it's easier, and more important to add to the instrumentation packages. exclude = [ + "instrumentation/opentelemetry-instrumentation-aiokafka/tests/**/*.py", "instrumentation/opentelemetry-instrumentation-asyncclick/tests/**/*.py", "instrumentation/opentelemetry-instrumentation-threading/tests/**", "instrumentation-genai/opentelemetry-instrumentation-vertexai/tests/**/*.py", diff --git a/tox.ini b/tox.ini index 68c59b4b12..333c8e9c08 100644 --- a/tox.ini +++ b/tox.ini @@ -1047,6 +1047,7 @@ deps = {toxinidir}/util/opentelemetry-util-http {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments] {toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments] + {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments] {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncclick[instruments] commands = diff --git a/uv.lock b/uv.lock index e5ba8062b7..a30c6f7221 100644 --- a/uv.lock +++ b/uv.lock @@ -2829,6 +2829,7 @@ dependencies = [ { name = "opentelemetry-api" }, { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, ] [package.optional-dependencies] @@ -2843,6 +2844,7 @@ requires-dist = [ { name = "opentelemetry-api", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-api&branch=main" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-semantic-conventions&branch=main" }, + { name = "typing-extensions", specifier = "~=4.1" }, ] provides-extras = ["instruments"] From 0bc894bd80228b2c568198dfa42c95db3b8fb37e Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 6 May 2025 20:06:25 +0500 Subject: [PATCH 14/14] Update CHANGELOG.md --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df6d1f13a2..a918f21458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch) + ([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257)) + ### Fixed - `opentelemetry-instrumentation` Catch `ModuleNotFoundError` when the library is not installed @@ -41,8 +44,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3385](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3385)) - `opentelemetry-instrumentation` Make auto instrumentation use the same dependency resolver as manual instrumentation does ([#3202](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3202)) -- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch) - ([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257)) ### Fixed