Skip to content

Commit

Permalink
Fix Confluent Kafka Producer Arguments (#699)
Browse files Browse the repository at this point in the history
* Add confluentkafka test for posargs/kwargs

* Fix confluent kafka topic argument bug

* More sensible producer arguments
  • Loading branch information
TimPansino authored Nov 24, 2022
1 parent eb28b52 commit eead7a7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
topic = args[0]
args = args[1:]
else:
topic = kwargs.get("topic", None)
topic = kwargs.pop("topic", None)

transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

Expand Down
54 changes: 42 additions & 12 deletions tests/messagebroker_confluentkafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import threading

import pytest
Expand All @@ -36,34 +37,63 @@
)
@background_task()
def test_produce_arguments(topic, producer, client_type, serialize, headers):
callback_called = threading.Event()
callback1_called = threading.Event()
callback2_called = threading.Event()
ts = int(time.time())

def producer_callback(err, msg):
callback_called.set()
def producer_callback1(err, msg):
callback1_called.set()

def producer_callback2(err, msg):
callback2_called.set()

if client_type == "cimpl":
# Keyword Args
producer.produce(
topic,
topic=topic,
value=serialize({"foo": 1}),
key=serialize("my-key"),
callback=producer_callback,
partition=1,
timestamp=1,
partition=0,
callback=producer_callback2,
timestamp=ts,
headers=headers,
)
else:
# Positional Args
producer.produce(
topic,
serialize({"foo": 1}),
serialize("my-key"),
0,
producer_callback1,
None,
ts,
headers,
)
else:
# Keyword Args
producer.produce(
topic=topic,
value=serialize({"foo": 1}),
key=serialize("my-key"),
partition=1,
on_delivery=producer_callback,
timestamp=1,
partition=0,
on_delivery=producer_callback2,
timestamp=ts,
headers=headers,
)
# Positional Args
producer.produce(
topic,
serialize("my-key"),
serialize({"foo": 1}),
0,
producer_callback1,
ts,
headers,
)
producer.flush()

assert callback_called.wait(5), "Callback never called."
assert callback1_called.wait(5), "Callback never called."
assert callback2_called.wait(5), "Callback never called."


def test_trace_metrics(topic, send_producer_message):
Expand Down

0 comments on commit eead7a7

Please sign in to comment.