diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 52d6d3bd22..66122985c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import logging +import threading import typing from enum import Enum +from opentelemetry.sdk import util + from .. import Span, SpanProcessor logger = logging.getLogger(__name__) @@ -78,6 +82,124 @@ def shutdown(self) -> None: self.span_exporter.shutdown() +class BatchExportSpanProcessor(SpanProcessor): + """Batch span processor implementation. + + BatchExportSpanProcessor is an implementation of `SpanProcessor` that + batches ended spans and pushes them to the configured `SpanExporter`. + """ + + def __init__( + self, + span_exporter: SpanExporter, + max_queue_size: int = 2048, + schedule_delay_millis: float = 5000, + max_export_batch_size: int = 512, + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than and equal to max_export_batch_size." + ) + + self.span_exporter = span_exporter + self.queue = collections.deque([], max_queue_size) + self.worker_thread = threading.Thread(target=self.worker, daemon=True) + self.condition = threading.Condition(threading.Lock()) + self.schedule_delay_millis = schedule_delay_millis + self.max_export_batch_size = max_export_batch_size + self.max_queue_size = max_queue_size + self.done = False + # flag that indicates that spans are being dropped + self._spans_dropped = False + # precallocated list to send spans to exporter + self.spans_list = [None] * self.max_export_batch_size + self.worker_thread.start() + + def on_start(self, span: Span) -> None: + pass + + def on_end(self, span: Span) -> None: + if self.done: + logging.warning("Already shutdown, dropping span.") + return + if len(self.queue) == self.max_queue_size: + if not self._spans_dropped: + logging.warning("Queue is full, likely spans will be dropped.") + self._spans_dropped = True + + self.queue.appendleft(span) + + if len(self.queue) >= self.max_queue_size // 2: + with self.condition: + self.condition.notify() + + def worker(self): + timeout = self.schedule_delay_millis / 1e3 + while not self.done: + if len(self.queue) < self.max_export_batch_size: + with self.condition: + self.condition.wait(timeout) + if not self.queue: + # spurious notification, let's wait again + continue + if self.done: + # missing spans will be sent when calling flush + break + + # substract the duration of this export call to the next timeout + start = util.time_ns() + self.export() + end = util.time_ns() + duration = (end - start) / 1e9 + timeout = self.schedule_delay_millis / 1e3 - duration + + # be sure that all spans are sent + self._flush() + + def export(self) -> bool: + """Exports at most max_export_batch_size spans.""" + idx = 0 + + # currently only a single thread acts as consumer, so queue.pop() will + # not raise an exception + while idx < self.max_export_batch_size and self.queue: + self.spans_list[idx] = self.queue.pop() + idx += 1 + try: + self.span_exporter.export(self.spans_list[:idx]) + # pylint: disable=broad-except + except Exception: + logger.exception("Exception while exporting data.") + + # clean up list + for index in range(idx): + self.spans_list[index] = None + + def _flush(self): + # export all elements until queue is empty + while self.queue: + self.export() + + def shutdown(self) -> None: + # signal the worker thread to finish and then wait for it + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + self.span_exporter.shutdown() + + class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the console. diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index ef9786ca63..de7a5cd9d7 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,22 +12,34 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import unittest +from unittest import mock +from opentelemetry import trace as trace_api from opentelemetry.sdk import trace from opentelemetry.sdk.trace import export -class TestSimpleExportSpanProcessor(unittest.TestCase): - def test_simple_span_processor(self): - class MySpanExporter(export.SpanExporter): - def __init__(self, destination): - self.destination = destination +class MySpanExporter(export.SpanExporter): + """Very simple span exporter used for testing.""" + + def __init__(self, destination, max_export_batch_size=None): + self.destination = destination + self.max_export_batch_size = max_export_batch_size - def export(self, spans: trace.Span) -> export.SpanExportResult: - self.destination.extend(span.name for span in spans) - return export.SpanExportResult.SUCCESS + def export(self, spans: trace.Span) -> export.SpanExportResult: + if ( + self.max_export_batch_size is not None + and len(spans) > self.max_export_batch_size + ): + raise ValueError("Batch is too big") + self.destination.extend(span.name for span in spans) + return export.SpanExportResult.SUCCESS + +class TestSimpleExportSpanProcessor(unittest.TestCase): + def test_simple_span_processor(self): tracer = trace.Tracer() spans_names_list = [] @@ -42,3 +54,142 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: pass self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) + + +def _create_start_and_end_span(name, span_processor): + span = trace.Span( + name, + mock.Mock(spec=trace_api.SpanContext), + span_processor=span_processor, + ) + span.start() + span.end() + + +class TestBatchExportSpanProcessor(unittest.TestCase): + def test_batch_span_processor(self): + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + span_names = ["xxx", "bar", "foo"] + + for name in span_names: + _create_start_and_end_span(name, span_processor) + + span_processor.shutdown() + self.assertListEqual(span_names, spans_names_list) + + def test_batch_span_processor_lossless(self): + """Test that no spans are lost when sending max_queue_size spans""" + spans_names_list = [] + + my_exporter = MySpanExporter( + destination=spans_names_list, max_export_batch_size=128 + ) + span_processor = export.BatchExportSpanProcessor( + my_exporter, max_queue_size=512, max_export_batch_size=128 + ) + + for _ in range(512): + _create_start_and_end_span("foo", span_processor) + + span_processor.shutdown() + self.assertEqual(len(spans_names_list), 512) + + def test_batch_span_processor_many_spans(self): + """Test that no spans are lost when sending many spans""" + spans_names_list = [] + + my_exporter = MySpanExporter( + destination=spans_names_list, max_export_batch_size=128 + ) + span_processor = export.BatchExportSpanProcessor( + my_exporter, + max_queue_size=256, + max_export_batch_size=64, + schedule_delay_millis=100, + ) + + for _ in range(4): + for _ in range(256): + _create_start_and_end_span("foo", span_processor) + + time.sleep(0.05) # give some time for the exporter to upload spans + + span_processor.shutdown() + self.assertEqual(len(spans_names_list), 1024) + + def test_batch_span_processor_scheduled_delay(self): + """Test that spans are exported each schedule_delay_millis""" + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor( + my_exporter, schedule_delay_millis=50 + ) + + # create single span + _create_start_and_end_span("foo", span_processor) + + time.sleep(0.05 + 0.02) + # span should be already exported + self.assertEqual(len(spans_names_list), 1) + + span_processor.shutdown() + + def test_batch_span_processor_parameters(self): + # zero max_queue_size + self.assertRaises( + ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0 + ) + + # negative max_queue_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_queue_size=-500, + ) + + # zero schedule_delay_millis + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + schedule_delay_millis=0, + ) + + # negative schedule_delay_millis + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + schedule_delay_millis=-500, + ) + + # zero max_export_batch_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_export_batch_size=0, + ) + + # negative max_export_batch_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_export_batch_size=-500, + ) + + # max_export_batch_size > max_queue_size: + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_queue_size=256, + max_export_batch_size=512, + )