diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 7f56a301721..ea26ea2638f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import logging import os import sys @@ -37,6 +36,7 @@ OTEL_BSP_SCHEDULE_DELAY, ) from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.sdk.util import BatchAccumulator from opentelemetry.util._once import Once _DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 @@ -124,16 +124,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True -class _FlushRequest: - """Represents a request for the BatchSpanProcessor to flush spans.""" - - __slots__ = ["event", "num_spans"] - - def __init__(self): - self.event = threading.Event() - self.num_spans = 0 - - _BSP_RESET_ONCE = Once() @@ -182,27 +172,24 @@ def __init__( max_queue_size, schedule_delay_millis, max_export_batch_size ) + self.accumulator = BatchAccumulator(max_export_batch_size) + self.flush_lock = threading.Lock() + self.span_exporter = span_exporter - self.queue = collections.deque( - [], max_queue_size - ) # type: typing.Deque[Span] - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True - ) self.condition = threading.Condition(threading.Lock()) - self._flush_request = None # type: typing.Optional[_FlushRequest] self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size self.export_timeout_millis = export_timeout_millis - self.done = False + + self.worker_stopped: AtomicBool = AtomicBool(False) + self.processor_shutdown: AtomicBool = AtomicBool(False) + # flag that indicates that spans are being dropped self._spans_dropped = False - # precallocated list to send spans to exporter - self.spans_list = [ - None - ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] - self.worker_thread.start() + + self._start_worker() + # Only available in *nix since py37. if hasattr(os, "register_at_fork"): os.register_at_fork( @@ -210,13 +197,26 @@ def __init__( ) # pylint: disable=protected-access self._pid = os.getpid() + def _start_worker(self): + self.worker_stopped.set(False) + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() + + def _stop_worker(self): + self.worker_stopped.set(True) + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + def on_start( self, span: Span, parent_context: typing.Optional[Context] = None ) -> None: pass def on_end(self, span: ReadableSpan) -> None: - if self.done: + if self.processor_shutdown.get(): logger.warning("Already shutdown, dropping span.") return if not span.context.trace_flags.sampled: @@ -224,20 +224,13 @@ def on_end(self, span: ReadableSpan) -> None: if self._pid != os.getpid(): _BSP_RESET_ONCE.do_once(self._at_fork_reinit) - if len(self.queue) == self.max_queue_size: - if not self._spans_dropped: - logger.warning("Queue is full, likely spans will be dropped.") - self._spans_dropped = True - - self.queue.appendleft(span) - - if len(self.queue) >= self.max_export_batch_size: + full = self.accumulator.push(span) + if full: with self.condition: self.condition.notify() def _at_fork_reinit(self): self.condition = threading.Condition(threading.Lock()) - self.queue.clear() # worker_thread is local to a process, only the thread that issued fork continues # to exist. A new worker thread must be started in child process. @@ -249,169 +242,53 @@ def _at_fork_reinit(self): def worker(self): timeout = self.schedule_delay_millis / 1e3 - flush_request = None # type: typing.Optional[_FlushRequest] - while not self.done: + while not self.worker_stopped.get(): with self.condition: - if self.done: - # done flag may have changed, avoid waiting + if self.worker_stopped.get(): break - flush_request = self._get_and_unset_flush_request() - if ( - len(self.queue) < self.max_export_batch_size - and flush_request is None - ): - - self.condition.wait(timeout) - flush_request = self._get_and_unset_flush_request() - if not self.queue: - # spurious notification, let's wait again, reset timeout - timeout = self.schedule_delay_millis / 1e3 - self._notify_flush_request_finished(flush_request) - flush_request = None - continue - if self.done: - # missing spans will be sent when calling flush - break - - # subtract the duration of this export call to the next timeout - start = time_ns() - self._export(flush_request) - end = time_ns() - duration = (end - start) / 1e9 - timeout = self.schedule_delay_millis / 1e3 - duration - - self._notify_flush_request_finished(flush_request) - flush_request = None - - # there might have been a new flush request while export was running - # and before the done flag switched to true - with self.condition: - shutdown_flush_request = self._get_and_unset_flush_request() - - # be sure that all spans are sent - self._drain_queue() - self._notify_flush_request_finished(flush_request) - self._notify_flush_request_finished(shutdown_flush_request) - - def _get_and_unset_flush_request( - self, - ) -> typing.Optional[_FlushRequest]: - """Returns the current flush request and makes it invisible to the - worker thread for subsequent calls. - """ - flush_request = self._flush_request - self._flush_request = None - if flush_request is not None: - flush_request.num_spans = len(self.queue) - return flush_request - - @staticmethod - def _notify_flush_request_finished( - flush_request: typing.Optional[_FlushRequest], - ): - """Notifies the flush initiator(s) waiting on the given request/event - that the flush operation was finished. - """ - if flush_request is not None: - flush_request.event.set() - - def _get_or_create_flush_request(self) -> _FlushRequest: - """Either returns the current active flush event or creates a new one. - - The flush event will be visible and read by the worker thread before an - export operation starts. Callers of a flush operation may wait on the - returned event to be notified when the flush/export operation was - finished. - - This method is not thread-safe, i.e. callers need to take care about - synchronization/locking. - """ - if self._flush_request is None: - self._flush_request = _FlushRequest() - return self._flush_request - - def _export(self, flush_request: typing.Optional[_FlushRequest]): - """Exports spans considering the given flush_request. - - In case of a given flush_requests spans are exported in batches until - the number of exported spans reached or exceeded the number of spans in - the flush request. - In no flush_request was given at most max_export_batch_size spans are - exported. - """ - if not flush_request: - self._export_batch() - return - - num_spans = flush_request.num_spans - while self.queue: - num_exported = self._export_batch() - num_spans -= num_exported - - if num_spans <= 0: - break + self.condition.wait(timeout) + while True: + batch = self.accumulator.batch() + if len(batch) == 0: + break + self._export(batch) - def _export_batch(self) -> int: - """Exports at most max_export_batch_size spans and returns the number of - exported spans. - """ - idx = 0 - # currently only a single thread acts as consumer, so queue.pop() will - # not raise an exception - while idx < self.max_export_batch_size and self.queue: - self.spans_list[idx] = self.queue.pop() - idx += 1 + def _export(self, batch): token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + # noinspection PyBroadException try: - # Ignore type b/c the Optional[None]+slicing is too "clever" - # for mypy - self.span_exporter.export(self.spans_list[:idx]) # type: ignore + self.span_exporter.export(batch) except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span batch.") detach(token) - # clean up list - for index in range(idx): - self.spans_list[index] = None - return idx - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self.queue: - self._export_batch() - def force_flush(self, timeout_millis: int = None) -> bool: - - if timeout_millis is None: - timeout_millis = self.export_timeout_millis - - if self.done: - logger.warning("Already shutdown, ignoring call to force_flush().") - return True - - with self.condition: - flush_request = self._get_or_create_flush_request() - # signal the worker thread to flush and wait for it to finish - self.condition.notify_all() - - # wait for token to be processed - ret = flush_request.event.wait(timeout_millis / 1e3) - if not ret: - logger.warning("Timeout was exceeded in force_flush().") - return ret + with self.flush_lock: + start = time_ns() + self._stop_worker() + out = True + while not self.accumulator.empty(): + batch = self.accumulator.batch() + self._export(batch) + if self._has_timed_out(start, timeout_millis): + logger.warning("Timeout was exceeded in force_flush().") + out = False + break + self._start_worker() + return out def shutdown(self) -> None: - # signal the worker thread to finish and then wait for it - self.done = True - with self.condition: - self.condition.notify_all() - self.worker_thread.join() + self.processor_shutdown.set(True) + self._stop_worker() self.span_exporter.shutdown() + @staticmethod + def _has_timed_out(start_time_ns, timeout_millis): + if timeout_millis is None: + return False + elapsed_millis = (time_ns() - start_time_ns) / 1e6 + return elapsed_millis > timeout_millis + @staticmethod def _default_max_queue_size(): try: @@ -496,6 +373,20 @@ def _validate_arguments( ) +class AtomicBool: + def __init__(self, v: bool): + self.lock = threading.Lock() + self.v = v + + def set(self, v: bool): + with self.lock: + self.v = v + + def get(self) -> bool: + with self.lock: + return self.v + + class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the console. @@ -511,7 +402,7 @@ def __init__( formatter: typing.Callable[ [ReadableSpan], str ] = lambda span: span.to_json() - + linesep, + + linesep, ): self.out = out self.formatter = formatter diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py index e1857d8e62d..b9d0ec219bd 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py @@ -11,12 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import datetime import threading from collections import OrderedDict, deque from collections.abc import MutableMapping, Sequence -from typing import Optional +from typing import Generic, Optional, TypeVar from deprecated import deprecated @@ -148,3 +148,48 @@ def from_map(cls, maxlen, mapping): for key, value in mapping.items(): bounded_dict[key] = value return bounded_dict + + +_T = TypeVar("_T") + + +class BatchAccumulator(Generic[_T]): + """ + A thread-safe data structure to accumulate and manage items in batches. Designed to collect individual items up to a + specified batch size and then enqueue these batches in a FIFO queue. + """ + def __init__(self, batch_size): + self.batch_size = batch_size + self.items = [] + self.batches = collections.deque() + self.lock = threading.Lock() + + def empty(self): + """ + Returns True if the item list and the batch queue are empty, and False otherwise. + """ + with self.lock: + return len(self.items) == 0 and len(self.batches) == 0 + + def push(self, item): + with self.lock: + self.items.append(item) + if len(self.items) < self.batch_size: + return False + self.batches.appendleft(self.items) + self.items = [] + return True + + def batch(self): + """ + Returns the earliest (first in line) batch of items from the FIFO queue. If the queue is empty, returns a batch + consisting of any remaining items that haven't been batched. + """ + try: + return self.batches.pop() + except IndexError: + # if there are no batches left, return the items that haven't been batched + with self.lock: + out = self.items + self.items = [] + return out diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.pyi b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.pyi index d42e0f018fa..c289c9d5341 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.pyi +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.pyi @@ -11,10 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import threading from typing import ( + Deque, + Generic, Iterable, Iterator, + List, Mapping, MutableMapping, Sequence, @@ -71,3 +74,14 @@ class BoundedDict(MutableMapping[_KT, _VT]): def from_map( cls, maxlen: int, mapping: Mapping[_KT, _VT] ) -> BoundedDict[_KT, _VT]: ... + +class BatchAccumulator(Generic[_T]): + batch_size: int + lock: threading.Lock + items: List[_T] + batches: Deque[_T] + + def __init__(self, batch_size: int): ... + def empty(self): ... + def push(self, span: _T): ... + def batch(self): ... diff --git a/opentelemetry-sdk/tests/test_util.py b/opentelemetry-sdk/tests/test_util.py index 00099090cdc..08a65ce488b 100644 --- a/opentelemetry-sdk/tests/test_util.py +++ b/opentelemetry-sdk/tests/test_util.py @@ -14,7 +14,7 @@ import unittest -from opentelemetry.sdk.util import BoundedList +from opentelemetry.sdk.util import BatchAccumulator, BoundedList class TestBoundedList(unittest.TestCase): @@ -141,3 +141,26 @@ def test_no_limit(self): for num in range(100): self.assertEqual(blist[num], num) + + +class TestBatchAccumulator(unittest.TestCase): + def test_push(self): + acc = BatchAccumulator[int](4) + self.assertTrue(acc.empty()) + self.assertFalse(acc.push(1)) + self.assertFalse(acc.push(2)) + self.assertFalse(acc.push(3)) + self.assertTrue(acc.push(4)) + self.assertListEqual([1, 2, 3, 4], acc.batch()) + self.assertEqual(0, len(acc.batch())) + self.assertTrue(acc.empty()) + + def test_batch(self): + acc = BatchAccumulator[int](4) + for i in range(10): + acc.push(i) + self.assertListEqual([0, 1, 2, 3], acc.batch()) + self.assertListEqual([4, 5, 6, 7], acc.batch()) + self.assertListEqual([8, 9], acc.batch()) + self.assertListEqual([], acc.batch()) + self.assertTrue(acc.empty()) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 7784ef4c9d4..7dfb25f6720 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -322,10 +322,11 @@ def test_flush_timeout(self): destination=spans_names_list, export_timeout_millis=500 ) span_processor = export.BatchSpanProcessor(my_exporter) + span_processor._stop_worker() _create_start_and_end_span("foo", span_processor) - # check that the timeout is not meet + # check that the timeout is not met with self.assertLogs(level=WARNING): self.assertFalse(span_processor.force_flush(100)) span_processor.shutdown() diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py new file mode 100644 index 00000000000..96c4c1d2fca --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -0,0 +1,178 @@ +import threading +import time +import unittest +from concurrent import futures +from os import environ + +import grpc + +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2, metrics_service_pb2_grpc +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc +from opentelemetry.sdk.trace import ReadableSpan, _Span +from opentelemetry.sdk.trace import SpanProcessor +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import SpanContext, TraceFlags + + +@unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping, RUN_LONG_TESTS not set') +class TestBSPIntegration(unittest.TestCase): + """ + These are longer-running tests (total ~1 minute) that start up a local python grpc server and send spans to + it, comparing the number of received spans against how many were sent. + """ + + def test_full_speed(self): + self.run_bsp_test( + num_threads=128, + max_interval_sec=4, + num_spans_per_firehose=1000, + firehose_sleep_sec=0, + ) + + def test_slower(self): + self.run_bsp_test( + num_threads=128, + max_interval_sec=4, + num_spans_per_firehose=1000, + firehose_sleep_sec=0.01, + ) + + def test_slow_enough_to_engage_timer(self): + self.run_bsp_test( + num_threads=1, + max_interval_sec=4, + num_spans_per_firehose=10, + firehose_sleep_sec=1, + ) + + def run_bsp_test(self, num_threads, max_interval_sec, num_spans_per_firehose, firehose_sleep_sec): + server = OTLPServer() + server.start() + + bsp = BatchSpanProcessor(OTLPSpanExporter(), schedule_delay_millis=max_interval_sec * 1e3) + + firehose = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=firehose_sleep_sec) + + threads = [] + for _ in range(num_threads): + thread = threading.Thread(target=firehose.run) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + time.sleep(max_interval_sec * 2) + + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans_per_firehose * num_threads, num_span_received) + server.stop() + + +class SpanFirehose: + + def __init__(self, sp: SpanProcessor, num_spans: int, sleep_sec: float): + self._sp = sp + self._num_spans = num_spans + self._sleep_sec = sleep_sec + + def run(self) -> float: + start = time.time() + span = mk_span('test-span') + for i in range(self._num_spans): + time.sleep(self._sleep_sec) + self._sp.on_end(span) + return time.time() - start + + +class OTLPServer: + + def __init__(self): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + self.trace_servicer = TraceServiceServicer() + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(self.trace_servicer, self.server) + + metrics_servicer = MetricsServiceServicer() + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server(metrics_servicer, self.server) + + logs_servicer = LogsServiceServicer() + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(logs_servicer, self.server) + + self.server.add_insecure_port('0.0.0.0:4317') + + def start(self): + self.server.start() + + def stop(self): + self.server.stop(0) + + def get_num_spans_received(self): + return self.trace_servicer.get_num_spans() + + +class LogsServiceServicer(logs_service_pb2_grpc.LogsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return logs_service_pb2.ExportLogsServiceResponse() + + +class TraceServiceServicer(trace_service_pb2_grpc.TraceServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return trace_service_pb2.ExportTraceServiceResponse() + + def get_num_spans(self): + out = 0 + for req in self.requests_received: + for rs in req.resource_spans: + for ss in rs.scope_spans: + out += len(ss.spans) + return out + + +class MetricsServiceServicer(metrics_service_pb2_grpc.MetricsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return metrics_service_pb2.ExportMetricsServiceResponse() + + +def mk_readable_span(): + ctx = SpanContext(0, 0, False) + return ReadableSpan(context=ctx, attributes={}) + + +def mk_spans(n): + span = mk_span('foo') + out = [] + for _ in range(n): + out.append(span) + return out + + +def create_start_and_end_span(name, span_processor): + span = _Span(name, mk_ctx(), span_processor=span_processor) + span.start() + span.end() + + +def mk_span(name='foo'): + return _Span(name=name, context=mk_ctx()) + + +def mk_ctx(): + return SpanContext(1, 2, False, trace_flags=TraceFlags(TraceFlags.SAMPLED))