1414from __future__ import annotations
1515
1616import abc
17- import collections
1817import enum
1918import logging
20- import os
2119import sys
22- import threading
23- import weakref
2420from os import environ , linesep
25- from typing import IO , Callable , Deque , Optional , Sequence
21+ from typing import IO , Callable , Optional , Sequence
2622
2723from opentelemetry .context import (
2824 _SUPPRESS_INSTRUMENTATION_KEY ,
3127 set_value ,
3228)
3329from opentelemetry .sdk ._logs import LogData , LogRecord , LogRecordProcessor
30+ from opentelemetry .sdk ._shared_internal import BatchProcessor
3431from opentelemetry .sdk .environment_variables import (
3532 OTEL_BLRP_EXPORT_TIMEOUT ,
3633 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE ,
3734 OTEL_BLRP_MAX_QUEUE_SIZE ,
3835 OTEL_BLRP_SCHEDULE_DELAY ,
3936)
40- from opentelemetry .util ._once import Once
4137
4238_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4339_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
4642_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
4743 "Unable to parse value for %s as integer. Defaulting to %s."
4844)
49-
5045_logger = logging .getLogger (__name__ )
5146
5247
@@ -55,29 +50,19 @@ class LogExportResult(enum.Enum):
5550 FAILURE = 1
5651
5752
58- class BatchLogExportStrategy (enum .Enum ):
59- EXPORT_ALL = 0
60- EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61- EXPORT_AT_LEAST_ONE_BATCH = 2
62-
63-
6453class LogExporter (abc .ABC ):
6554 """Interface for exporting logs.
66-
6755 Interface to be implemented by services that want to export logs received
6856 in their own format.
69-
7057 To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
7158 log processor.
7259 """
7360
7461 @abc .abstractmethod
7562 def export (self , batch : Sequence [LogData ]):
7663 """Exports a batch of logs.
77-
7864 Args:
7965 batch: The list of `LogData` objects to be exported
80-
8166 Returns:
8267 The result of the export
8368 """
@@ -146,9 +131,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
146131 return True
147132
148133
149- _BSP_RESET_ONCE = Once ()
150-
151-
152134class BatchLogRecordProcessor (LogRecordProcessor ):
153135 """This is an implementation of LogRecordProcessor which creates batches of
154136 received logs in the export-friendly LogData representation and
@@ -161,9 +143,9 @@ class BatchLogRecordProcessor(LogRecordProcessor):
161143 - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
162144 - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
163145 - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
164- """
165146
166- _queue : Deque [LogData ]
147+ All the logic for emitting logs, shutting down etc. resides in the BatchProcessor class.
148+ """
167149
168150 def __init__ (
169151 self ,
@@ -194,127 +176,24 @@ def __init__(
194176 BatchLogRecordProcessor ._validate_arguments (
195177 max_queue_size , schedule_delay_millis , max_export_batch_size
196178 )
197-
198- self ._exporter = exporter
199- self ._max_queue_size = max_queue_size
200- self ._schedule_delay = schedule_delay_millis / 1e3
201- self ._max_export_batch_size = max_export_batch_size
202- # Not used. No way currently to pass timeout to export.
203- # TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
204- self ._export_timeout_millis = export_timeout_millis
205- # Deque is thread safe.
206- self ._queue = collections .deque ([], max_queue_size )
207- self ._worker_thread = threading .Thread (
208- name = "OtelBatchLogRecordProcessor" ,
209- target = self .worker ,
210- daemon = True ,
179+ # Initializes BatchProcessor
180+ self ._batch_processor = BatchProcessor (
181+ exporter ,
182+ schedule_delay_millis ,
183+ max_export_batch_size ,
184+ export_timeout_millis ,
185+ max_queue_size ,
186+ "Log" ,
211187 )
212188
213- self ._shutdown = False
214- self ._export_lock = threading .Lock ()
215- self ._worker_awaken = threading .Event ()
216- self ._worker_thread .start ()
217- if hasattr (os , "register_at_fork" ):
218- weak_reinit = weakref .WeakMethod (self ._at_fork_reinit )
219- os .register_at_fork (after_in_child = lambda : weak_reinit ()()) # pylint: disable=unnecessary-lambda
220- self ._pid = os .getpid ()
221-
222- def _should_export_batch (
223- self , batch_strategy : BatchLogExportStrategy , num_iterations : int
224- ) -> bool :
225- if not self ._queue :
226- return False
227- # Always continue to export while queue length exceeds max batch size.
228- if len (self ._queue ) >= self ._max_export_batch_size :
229- return True
230- if batch_strategy is BatchLogExportStrategy .EXPORT_ALL :
231- return True
232- if batch_strategy is BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH :
233- return num_iterations == 0
234- return False
235-
236- def _at_fork_reinit (self ):
237- self ._export_lock = threading .Lock ()
238- self ._worker_awaken = threading .Event ()
239- self ._queue .clear ()
240- self ._worker_thread = threading .Thread (
241- name = "OtelBatchLogRecordProcessor" ,
242- target = self .worker ,
243- daemon = True ,
244- )
245- self ._worker_thread .start ()
246- self ._pid = os .getpid ()
247-
248- def worker (self ):
249- while not self ._shutdown :
250- # Lots of strategies in the spec for setting next timeout.
251- # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252- # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253- sleep_interrupted = self ._worker_awaken .wait (self ._schedule_delay )
254- if self ._shutdown :
255- break
256- self ._export (
257- BatchLogExportStrategy .EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258- if sleep_interrupted
259- else BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH
260- )
261- self ._worker_awaken .clear ()
262- self ._export (BatchLogExportStrategy .EXPORT_ALL )
263-
264- def _export (self , batch_strategy : BatchLogExportStrategy ) -> None :
265- with self ._export_lock :
266- iteration = 0
267- # We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268- # once the lock is obtained to see if we still need to make the requested export.
269- while self ._should_export_batch (batch_strategy , iteration ):
270- iteration += 1
271- token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
272- try :
273- self ._exporter .export (
274- [
275- # Oldest records are at the back, so pop from there.
276- self ._queue .pop ()
277- for _ in range (
278- min (
279- self ._max_export_batch_size ,
280- len (self ._queue ),
281- )
282- )
283- ]
284- )
285- except Exception : # pylint: disable=broad-exception-caught
286- _logger .exception ("Exception while exporting logs." )
287- detach (token )
288-
289189 def emit (self , log_data : LogData ) -> None :
290- if self ._shutdown :
291- _logger .info ("Shutdown called, ignoring log." )
292- return
293- if self ._pid != os .getpid ():
294- _BSP_RESET_ONCE .do_once (self ._at_fork_reinit )
295-
296- if len (self ._queue ) == self ._max_queue_size :
297- _logger .warning ("Queue full, dropping log." )
298- self ._queue .appendleft (log_data )
299- if len (self ._queue ) >= self ._max_export_batch_size :
300- self ._worker_awaken .set ()
190+ return self ._batch_processor .emit (log_data )
301191
302192 def shutdown (self ):
303- if self ._shutdown :
304- return
305- # Prevents emit and force_flush from further calling export.
306- self ._shutdown = True
307- # Interrupts sleep in the worker, if it's sleeping.
308- self ._worker_awaken .set ()
309- # Main worker loop should exit after one final export call with flush all strategy.
310- self ._worker_thread .join ()
311- self ._exporter .shutdown ()
193+ return self ._batch_processor .shutdown ()
312194
313- def force_flush (self , timeout_millis : Optional [int ] = None ) -> bool :
314- if self ._shutdown :
315- return
316- # Blocking call to export.
317- self ._export (BatchLogExportStrategy .EXPORT_ALL )
195+ def force_flush (self , timeout_millis : Optional [int ] = None ):
196+ return self ._batch_processor .force_flush (timeout_millis )
318197
319198 @staticmethod
320199 def _default_max_queue_size ():
0 commit comments