diff --git a/CHANGELOG.md b/CHANGELOG.md index 4331bb599c..e7cf1ee10f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435)) - mongo db - fix db statement capturing ([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512)) +- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list + ([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655)) ## Version 1.15.0/0.36b0 (2022-12-10) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index ea304c81d3..89fad19c1b 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: if value: value = value.encode() - carrier.append((key, value)) + + if isinstance(carrier, list): + carrier.append((key, value)) + + if isinstance(carrier, dict): + carrier[key] = value _kafka_getter = KafkaContextGetter() diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index e9462d7898..ce661a1839 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -23,6 +23,9 @@ ProxiedConsumer, ProxiedProducer, ) +from opentelemetry.instrumentation.confluent_kafka.utils import ( + KafkaContextSetter, +) class TestConfluentKafka(TestCase): @@ -58,3 +61,14 @@ def test_instrument_api(self) -> None: consumer = instrumentation.uninstrument_consumer(consumer) self.assertEqual(consumer.__class__, Consumer) + + def test_context_setter(self) -> None: + context_setter = KafkaContextSetter() + + carrier_dict = {"key1": "val1"} + context_setter.set(carrier_dict, "key2", "val2") + self.assertGreaterEqual(carrier_dict.items(), {"key2": "val2".encode()}.items()) + + carrier_list = [("key1", "val1")] + context_setter.set(carrier_list, "key2", "val2") + self.assertTrue(("key2", "val2".encode()) in carrier_list)