Skip to content

Commit

Permalink
instrumentation/kafka: fix handling consumer iteration if transaction…
Browse files Browse the repository at this point in the history
… not sampled (#2075)

Handle the case where if the transaction is not sampled capture_span
will return None instead of span.
While at it fix handling of checking for KAFKA_HOST in tests.

Fix #2073
  • Loading branch information
xrmx committed Aug 19, 2024
1 parent 91a53fc commit 787a77a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
3 changes: 2 additions & 1 deletion elasticapm/instrumentation/packages/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ def call(self, module, method, wrapped, instance, args, kwargs):
try:
result = wrapped(*args, **kwargs)
except StopIteration:
span.cancel()
if span:
span.cancel()
raise
if span and not isinstance(span, DroppedSpan):
topic = result[0]
Expand Down
24 changes: 21 additions & 3 deletions tests/instrumentation/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@

pytestmark = [pytest.mark.kafka]

if "KAFKA_HOST" not in os.environ:
KAFKA_HOST = os.environ.get("KAFKA_HOST")
if not KAFKA_HOST:
pytestmark.append(pytest.mark.skip("Skipping kafka tests, no KAFKA_HOST environment variable set"))

KAFKA_HOST = os.environ["KAFKA_HOST"]


@pytest.fixture(scope="function")
def topics():
Expand Down Expand Up @@ -233,3 +232,22 @@ def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consume
elasticapm_client.end_transaction("foo")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 0


def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
instrument, elasticapm_client, producer, consumer, topics
):
def delayed_send():
time.sleep(0.2)
producer.send("test", key=b"foo", value=b"bar")

thread = threading.Thread(target=delayed_send)
thread.start()
transaction = elasticapm_client.begin_transaction("foo")
transaction.is_sampled = False
for item in consumer:
pass
thread.join()
elasticapm_client.end_transaction("foo")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 0

0 comments on commit 787a77a

Please sign in to comment.