Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
- Remove log messages from `BatchLogRecordProcessor.emit`, this caused the program
to crash at shutdown with a max recursion error ([#4586](https://github.com/open-telemetry/opentelemetry-python/pull/4586)).

## Version 1.33.0/0.54b0 (2025-05-09)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
)
detach(token)

# Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
# resulting in endless recursive calls that crash the program.
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
def emit(self, data: Telemetry) -> None:
if self._shutdown:
self._logger.info("Shutdown called, ignoring %s.", self._exporting)
return
if self._pid != os.getpid():
self._bsp_reset_once.do_once(self._at_fork_reinit)

if len(self._queue) == self._max_queue_size:
self._logger.warning("Queue full, dropping %s.", self._exporting)
# This will drop a log from the right side if the queue is at _max_queue_length.
self._queue.appendleft(data)
if len(self._queue) >= self._max_export_batch_size:
self._worker_awaken.set()
Expand Down
36 changes: 36 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import os
import time
import unittest
from sys import version_info
from unittest.mock import Mock, patch

from pytest import mark

from opentelemetry._logs import SeverityNumber
from opentelemetry.sdk import trace
from opentelemetry.sdk._logs import (
Expand Down Expand Up @@ -342,6 +345,39 @@ def test_emit_call_log_record(self):
logger.error("error")
self.assertEqual(log_record_processor.emit.call_count, 1)

@mark.skipif(
version_info < (3, 10),
reason="assertNoLogs only exists in python 3.10+.",
)
def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
exporter = Mock()
processor = BatchLogRecordProcessor(exporter)
logger_provider = LoggerProvider(
resource=SDKResource.create(
{
"service.name": "shoppingcart",
"service.instance.id": "instance-12",
}
),
)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(
level=logging.INFO, logger_provider=logger_provider
)
sdk_logger = logging.getLogger("opentelemetry.sdk")
# Attach OTLP handler to SDK logger
sdk_logger.addHandler(handler)
# If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail.
try:
with self.assertNoLogs(sdk_logger, logging.NOTSET):
processor.emit(EMPTY_LOG)
processor.shutdown()
with self.assertNoLogs(sdk_logger, logging.NOTSET):
processor.emit(EMPTY_LOG)
finally:
sdk_logger.removeHandler(handler)

def test_args(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
exporter.export.assert_called_once_with([telemetry])

def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
self, batch_processor_class, telemetry, caplog
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
Expand All @@ -107,8 +107,6 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(

# This should not be flushed.
batch_processor.emit(telemetry)
assert len(caplog.records) == 1
assert "Shutdown called, ignoring" in caplog.text
exporter.export.assert_called_once()

# pylint: disable=no-self-use
Expand Down