From 35205ed962f7361320b45a4488e3309a032ec25c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Thu, 19 Sep 2019 12:56:35 +0200 Subject: [PATCH 01/10] sdk/trace/exporters: add batch span processor exporter The exporters specification states that two built-in span processors should be implemented, the simple processor span and the batch processor span. This commit implements the later, it is mainly based on the opentelemetry/java one. The algorithm implements the following logic: - a condition variable is used to notify the worker thread in case the queue is half full, so that exporting can start before the queue gets full and spans are dropped. - export is called each schedule_delay_millis if there is a least one new span to export. - when the processor is shutdown all remaining spans are exported. --- .../sdk/trace/export/__init__.py | 99 +++++++++++++++++++ .../tests/trace/export/test_export.py | 84 ++++++++++++++++ 2 files changed, 183 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 0c011f9976..cdc38ca247 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -13,6 +13,8 @@ # limitations under the License. import logging +import queue +import threading import typing from enum import Enum @@ -76,3 +78,100 @@ def on_end(self, span: Span) -> None: def shutdown(self) -> None: self.span_exporter.shutdown() + + +class BatchExportSpanProcessor(SpanProcessor): + """Batch span processor implementation. + + BatchExportSpanProcessor is an implementation of `SpanProcessor` that + batches ended spans and pushes them to the configured `SpanExporter`. + """ + + def __init__( + self, + span_exporter: SpanExporter, + max_queue_size: int = 2048, + schedule_delay_millis: int = 5000, + max_export_batch_size: int = 512, + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError("max_export_batch_size must be positive.") + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less and equal to max_export_batch_size." + ) + + self.span_exporter = span_exporter + self.queue = queue.Queue(max_queue_size) + self.worker_thread = threading.Thread(target=self.worker) + self.condition = threading.Condition() + self.schedule_delay_millis = schedule_delay_millis + self.max_export_batch_size = max_export_batch_size + self.half_max_queue_size = max_queue_size / 2 + self.done = False + + self.worker_thread.start() + + def on_start(self, span: Span) -> None: + pass + + def on_end(self, span: Span) -> None: + try: + self.queue.put(span, block=False) + except queue.Full: + # TODO: dropped spans counter? + pass + if self.queue.qsize() >= self.half_max_queue_size: + with self.condition: + self.condition.notify_all() + + def worker(self): + while not self.done: + if self.queue.qsize() < self.max_export_batch_size: + with self.condition: + self.condition.wait(self.schedule_delay_millis / 1000) + if self.queue.empty(): + # spurious notification, let's wait again + continue + if self.done: + # missing spans will be sent when calling flush + break + + self.export() + + # be sure that all spans are sent + self.flush() + + def export(self): + """Exports at most max_export_batch_size spans.""" + idx = 0 + spans = [] + # currently only a single thread acts as consumer, so queue.get() will + # never block + while idx < self.max_export_batch_size and not self.queue.empty(): + spans.append(self.queue.get()) + idx += 1 + try: + self.span_exporter.export(spans) + # pylint: disable=broad-except + except Exception as exc: + logger.warning("Exception while exporting data: %s", exc) + + def flush(self): + while not self.queue.empty(): + self.export() + + def shutdown(self) -> None: + # signal the worker thread to finish and then wait for it + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + self.span_exporter.shutdown() diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index ef9786ca63..8be126842b 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -42,3 +42,87 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: pass self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) + + +class TestBatchExportSpanProcessor(unittest.TestCase): + def test_batch_span_processor(self): + class MySpanExporter(export.SpanExporter): + def __init__(self, destination): + self.destination = destination + + def export(self, spans: trace.Span) -> export.SpanExportResult: + self.destination.extend(span.name for span in spans) + return export.SpanExportResult.SUCCESS + + tracer = trace.Tracer() + + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + tracer.add_span_processor(span_processor) + + with tracer.start_span("foo"): + with tracer.start_span("bar"): + with tracer.start_span("xxx"): + pass + + # call shutdown on specific span processor + # TODO: this call is missing in the tracer + span_processor.shutdown() + self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) + + def test_batch_span_processor_parameters(self): + # zero max_queue_size + self.assertRaises( + ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0 + ) + + # negative max_queue_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_queue_size=-500, + ) + + # zero schedule_delay_millis + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + schedule_delay_millis=0, + ) + + # negative schedule_delay_millis + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + schedule_delay_millis=-500, + ) + + # zero max_export_batch_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_export_batch_size=0, + ) + + # negative max_export_batch_size + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_export_batch_size=-500, + ) + + # max_export_batch_size > max_queue_size: + self.assertRaises( + ValueError, + export.BatchExportSpanProcessor, + None, + max_queue_size=256, + max_export_batch_size=512, + ) From ffe33fc056280f6f3ee6bc8fc598e8d861f0bc08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 23 Sep 2019 10:45:21 +0200 Subject: [PATCH 02/10] address feedback 1 - use // instead of / - set daemon=true - fix error message for max_batch_size - change schedule_delay_millis' type to float - make flush internal method --- .../src/opentelemetry/sdk/trace/export/__init__.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index cdc38ca247..7149b898b8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -91,7 +91,7 @@ def __init__( self, span_exporter: SpanExporter, max_queue_size: int = 2048, - schedule_delay_millis: int = 5000, + schedule_delay_millis: float = 5000, max_export_batch_size: int = 512, ): if max_queue_size <= 0: @@ -101,7 +101,9 @@ def __init__( raise ValueError("schedule_delay_millis must be positive.") if max_export_batch_size <= 0: - raise ValueError("max_export_batch_size must be positive.") + raise ValueError( + "max_export_batch_size must be a positive integer." + ) if max_export_batch_size > max_queue_size: raise ValueError( @@ -110,11 +112,11 @@ def __init__( self.span_exporter = span_exporter self.queue = queue.Queue(max_queue_size) - self.worker_thread = threading.Thread(target=self.worker) + self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition() self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size - self.half_max_queue_size = max_queue_size / 2 + self.half_max_queue_size = max_queue_size // 2 self.done = False self.worker_thread.start() @@ -147,7 +149,7 @@ def worker(self): self.export() # be sure that all spans are sent - self.flush() + self._flush() def export(self): """Exports at most max_export_batch_size spans.""" @@ -164,7 +166,7 @@ def export(self): except Exception as exc: logger.warning("Exception while exporting data: %s", exc) - def flush(self): + def _flush(self): while not self.queue.empty(): self.export() From 3e69336962301999b283daeeb97ac61f279129e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 23 Sep 2019 11:47:43 +0200 Subject: [PATCH 03/10] substract time the exporter takes to the timeout --- .../src/opentelemetry/sdk/trace/export/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 6216ea876e..0433270bd5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -18,6 +18,8 @@ import typing from enum import Enum +from opentelemetry.sdk import util + from .. import Span, SpanProcessor logger = logging.getLogger(__name__) @@ -135,10 +137,11 @@ def on_end(self, span: Span) -> None: self.condition.notify_all() def worker(self): + timeout = self.schedule_delay_millis / 1e3 while not self.done: if self.queue.qsize() < self.max_export_batch_size: with self.condition: - self.condition.wait(self.schedule_delay_millis / 1000) + self.condition.wait(timeout) if self.queue.empty(): # spurious notification, let's wait again continue @@ -146,7 +149,12 @@ def worker(self): # missing spans will be sent when calling flush break + # substract the duration of this export call to the next timeout + start = util.time_ns() self.export() + end = util.time_ns() + duration = (end - start) / 1e9 + timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent self._flush() From f4256b8f6c68cc37dafbaebb17fc5405073bcd08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 23 Sep 2019 11:54:59 +0200 Subject: [PATCH 04/10] add some more tests --- .../tests/trace/export/test_export.py | 81 +++++++++++++++---- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 8be126842b..6fafbe366d 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,22 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import unittest from opentelemetry.sdk import trace from opentelemetry.sdk.trace import export -class TestSimpleExportSpanProcessor(unittest.TestCase): - def test_simple_span_processor(self): - class MySpanExporter(export.SpanExporter): - def __init__(self, destination): - self.destination = destination +class MySpanExporter(export.SpanExporter): + """Very simple span exporter used for testing.""" + + def __init__(self, destination, max_export_batch_size=None): + self.destination = destination + self.max_export_batch_size = max_export_batch_size - def export(self, spans: trace.Span) -> export.SpanExportResult: - self.destination.extend(span.name for span in spans) - return export.SpanExportResult.SUCCESS + def export(self, spans: trace.Span) -> export.SpanExportResult: + if ( + self.max_export_batch_size is not None + and len(spans) > self.max_export_batch_size + ): + raise ValueError("Batch is too big") + self.destination.extend(span.name for span in spans) + return export.SpanExportResult.SUCCESS + +class TestSimpleExportSpanProcessor(unittest.TestCase): + def test_simple_span_processor(self): tracer = trace.Tracer() spans_names_list = [] @@ -46,14 +56,6 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: class TestBatchExportSpanProcessor(unittest.TestCase): def test_batch_span_processor(self): - class MySpanExporter(export.SpanExporter): - def __init__(self, destination): - self.destination = destination - - def export(self, spans: trace.Span) -> export.SpanExportResult: - self.destination.extend(span.name for span in spans) - return export.SpanExportResult.SUCCESS - tracer = trace.Tracer() spans_names_list = [] @@ -72,6 +74,53 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: span_processor.shutdown() self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) + def test_batch_span_processor_lossless(self): + """Test that no spans are lost when sending max_queue_size spans""" + tracer = trace.Tracer() + + spans_names_list = [] + + my_exporter = MySpanExporter( + destination=spans_names_list, max_export_batch_size=128 + ) + span_processor = export.BatchExportSpanProcessor( + my_exporter, max_queue_size=512, max_export_batch_size=128 + ) + tracer.add_span_processor(span_processor) + + for idx in range(512): + with tracer.start_span("foo{}".format(idx)): + pass + + # call shutdown on specific span processor + # TODO: this call is missing in the tracer + span_processor.shutdown() + self.assertEqual(len(spans_names_list), 512) + + def test_batch_span_processor_scheduled_delay(self): + """Test that spans are exported each schedule_delay_millis""" + tracer = trace.Tracer() + + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor( + my_exporter, schedule_delay_millis=50 + ) + tracer.add_span_processor(span_processor) + + # start single span + with tracer.start_span("foo1"): + pass + + time.sleep(0.05) + # span should be already exported + self.assertEqual(len(spans_names_list), 1) + + # call shutdown on specific span processor + # TODO: this call is missing in the tracer + span_processor.shutdown() + def test_batch_span_processor_parameters(self): # zero max_queue_size self.assertRaises( From 7559ee41fe423ac8bd4a28a4366703fbe523a677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 24 Sep 2019 08:15:59 +0200 Subject: [PATCH 05/10] add a little bit of room to timeout test it doesn't work sometimes when using the exact same value of schedule_delay_millis, add some room --- opentelemetry-sdk/tests/trace/export/test_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 6fafbe366d..5f7ad7239d 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -113,7 +113,7 @@ def test_batch_span_processor_scheduled_delay(self): with tracer.start_span("foo1"): pass - time.sleep(0.05) + time.sleep(0.05 + 0.02) # span should be already exported self.assertEqual(len(spans_names_list), 1) From 75a32dd8d1cf3d9364b4a972227322f783cb4369 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 25 Sep 2019 16:46:22 +0200 Subject: [PATCH 06/10] handle feedback: - log when dropping spans - check if processor is already shutdown - avoid using RLock - prereserve span array - introduce variable to avoid sending notification storm --- .../sdk/trace/export/__init__.py | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 0433270bd5..c1836f6e01 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -14,6 +14,7 @@ import logging import queue +import sys import threading import typing from enum import Enum @@ -115,32 +116,43 @@ def __init__( self.span_exporter = span_exporter self.queue = queue.Queue(max_queue_size) self.worker_thread = threading.Thread(target=self.worker, daemon=True) - self.condition = threading.Condition() + self.condition = threading.Condition(threading.Lock()) self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.half_max_queue_size = max_queue_size // 2 self.done = False - + # flag that indicates that spans are being dropped + self._spans_dropped = False + # used to avoid sending too much notifications to worker + self.notified = False self.worker_thread.start() def on_start(self, span: Span) -> None: pass def on_end(self, span: Span) -> None: + if self.done: + logging.warning("Already shutdown, dropping span.") + return try: - self.queue.put(span, block=False) + self.queue.put_nowait(span) except queue.Full: # TODO: dropped spans counter? - pass + if not self._spans_dropped: + logging.warning("Dropping spans.") + self._spans_dropped = True if self.queue.qsize() >= self.half_max_queue_size: with self.condition: - self.condition.notify_all() + if not self.notified: + self.notified = True + self.condition.notify_all() def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: if self.queue.qsize() < self.max_export_batch_size: with self.condition: + self.notified = False self.condition.wait(timeout) if self.queue.empty(): # spurious notification, let's wait again @@ -159,24 +171,28 @@ def worker(self): # be sure that all spans are sent self._flush() - def export(self): + def export(self) -> bool: """Exports at most max_export_batch_size spans.""" idx = 0 - spans = [] + spans = [None] * self.max_export_batch_size # currently only a single thread acts as consumer, so queue.get() will # never block while idx < self.max_export_batch_size and not self.queue.empty(): - spans.append(self.queue.get()) + spans[idx] = self.queue.get() idx += 1 try: - self.span_exporter.export(spans) + self.span_exporter.export(spans[:idx]) # pylint: disable=broad-except - except Exception as exc: - logger.warning("Exception while exporting data: %s", exc) + except Exception: + logger.warning( + "Exception while exporting data.", exc_info=sys.exc_info() + ) + + return not self.queue.empty() def _flush(self): - while not self.queue.empty(): - self.export() + while self.export(): + pass def shutdown(self) -> None: # signal the worker thread to finish and then wait for it From cf023edfdb1b7d106227a85b826563d48018b05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 25 Sep 2019 16:46:22 +0200 Subject: [PATCH 07/10] use collections.deque --- .../sdk/trace/export/__init__.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index c1836f6e01..9d340c2d26 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import logging -import queue import sys import threading import typing @@ -114,12 +114,12 @@ def __init__( ) self.span_exporter = span_exporter - self.queue = queue.Queue(max_queue_size) + self.queue = collections.deque([], max_queue_size) self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition(threading.Lock()) self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size - self.half_max_queue_size = max_queue_size // 2 + self.max_queue_size = max_queue_size self.done = False # flag that indicates that spans are being dropped self._spans_dropped = False @@ -134,14 +134,14 @@ def on_end(self, span: Span) -> None: if self.done: logging.warning("Already shutdown, dropping span.") return - try: - self.queue.put_nowait(span) - except queue.Full: - # TODO: dropped spans counter? + if len(self.queue) == self.max_queue_size: if not self._spans_dropped: - logging.warning("Dropping spans.") + logging.warning("Queue is full, likely spans will be dropped.") self._spans_dropped = True - if self.queue.qsize() >= self.half_max_queue_size: + + self.queue.appendleft(span) + + if len(self.queue) >= self.max_queue_size // 2: with self.condition: if not self.notified: self.notified = True @@ -150,11 +150,11 @@ def on_end(self, span: Span) -> None: def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: - if self.queue.qsize() < self.max_export_batch_size: + if len(self.queue) < self.max_export_batch_size: with self.condition: self.notified = False self.condition.wait(timeout) - if self.queue.empty(): + if not self.queue: # spurious notification, let's wait again continue if self.done: @@ -175,10 +175,10 @@ def export(self) -> bool: """Exports at most max_export_batch_size spans.""" idx = 0 spans = [None] * self.max_export_batch_size - # currently only a single thread acts as consumer, so queue.get() will - # never block - while idx < self.max_export_batch_size and not self.queue.empty(): - spans[idx] = self.queue.get() + # currently only a single thread acts as consumer, so queue.pop() will + # raise an exception + while idx < self.max_export_batch_size and self.queue: + spans[idx] = self.queue.pop() idx += 1 try: self.span_exporter.export(spans[:idx]) @@ -188,7 +188,7 @@ def export(self) -> bool: "Exception while exporting data.", exc_info=sys.exc_info() ) - return not self.queue.empty() + return self.queue def _flush(self): while self.export(): From 5366af1f64ad6032e5d9933c7b590a2c76c8e138 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 27 Sep 2019 10:59:32 +0200 Subject: [PATCH 08/10] further changes: - export returns nothing - added another test that sends more than max_queue_size elements - removed notified extra variable - move preallocated list to class --- .../sdk/trace/export/__init__.py | 33 +++++++++---------- .../tests/trace/export/test_export.py | 25 ++++++++++++++ 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9d340c2d26..768294a95a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -14,7 +14,6 @@ import collections import logging -import sys import threading import typing from enum import Enum @@ -110,7 +109,7 @@ def __init__( if max_export_batch_size > max_queue_size: raise ValueError( - "max_export_batch_size must be less and equal to max_export_batch_size." + "max_export_batch_size must be less than and equal to max_export_batch_size." ) self.span_exporter = span_exporter @@ -123,8 +122,8 @@ def __init__( self.done = False # flag that indicates that spans are being dropped self._spans_dropped = False - # used to avoid sending too much notifications to worker - self.notified = False + # precallocated list to send spans to exporter + self.spans_list = [None] * self.max_export_batch_size self.worker_thread.start() def on_start(self, span: Span) -> None: @@ -143,16 +142,13 @@ def on_end(self, span: Span) -> None: if len(self.queue) >= self.max_queue_size // 2: with self.condition: - if not self.notified: - self.notified = True - self.condition.notify_all() + self.condition.notify() def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: if len(self.queue) < self.max_export_batch_size: with self.condition: - self.notified = False self.condition.wait(timeout) if not self.queue: # spurious notification, let's wait again @@ -174,25 +170,26 @@ def worker(self): def export(self) -> bool: """Exports at most max_export_batch_size spans.""" idx = 0 - spans = [None] * self.max_export_batch_size + # currently only a single thread acts as consumer, so queue.pop() will - # raise an exception + # not raise an exception while idx < self.max_export_batch_size and self.queue: - spans[idx] = self.queue.pop() + self.spans_list[idx] = self.queue.pop() idx += 1 try: - self.span_exporter.export(spans[:idx]) + self.span_exporter.export(self.spans_list[:idx]) # pylint: disable=broad-except except Exception: - logger.warning( - "Exception while exporting data.", exc_info=sys.exc_info() - ) + logger.exception("Exception while exporting data.") - return self.queue + # clean up list + for index in range(idx): + self.spans_list[index] = None def _flush(self): - while self.export(): - pass + # export all elements until queue is empty + while self.queue: + self.export() def shutdown(self) -> None: # signal the worker thread to finish and then wait for it diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 5f7ad7239d..0ef75e04bd 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -97,6 +97,31 @@ def test_batch_span_processor_lossless(self): span_processor.shutdown() self.assertEqual(len(spans_names_list), 512) + def test_batch_span_processor_many_spans(self): + """Test that no spans are lost when sending max_queue_size spans""" + tracer = trace.Tracer() + + spans_names_list = [] + + my_exporter = MySpanExporter( + destination=spans_names_list, max_export_batch_size=128 + ) + span_processor = export.BatchExportSpanProcessor( + my_exporter, max_queue_size=256, max_export_batch_size=64, schedule_delay_millis=100 + ) + tracer.add_span_processor(span_processor) + + for iteration in range(4): + for idx in range(256): + with tracer.start_span("foo{}".format(idx)): + pass + time.sleep(0.05) # give some time for the exporter to upload spans + + # call shutdown on specific span processor + # TODO: this call is missing in the tracer + span_processor.shutdown() + self.assertEqual(len(spans_names_list), 1024) + def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" tracer = trace.Tracer() From 0346b197088e66725f8fe3cf8e44701db66b7044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 27 Sep 2019 11:52:05 +0200 Subject: [PATCH 09/10] improve tests by removing tracer and using mock spans --- .../tests/trace/export/test_export.py | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 0ef75e04bd..94f4e92a1e 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -15,6 +15,8 @@ import time import unittest +from unittest import mock +from opentelemetry import trace as trace_api from opentelemetry.sdk import trace from opentelemetry.sdk.trace import export @@ -55,29 +57,31 @@ def test_simple_span_processor(self): class TestBatchExportSpanProcessor(unittest.TestCase): - def test_batch_span_processor(self): - tracer = trace.Tracer() + def _create_start_and_end_span(self, name, span_processor): + span = trace.Span( + name, + mock.Mock(spec=trace_api.SpanContext), + span_processor=span_processor + ) + span.start() + span.end() + def test_batch_span_processor(self): spans_names_list = [] my_exporter = MySpanExporter(destination=spans_names_list) span_processor = export.BatchExportSpanProcessor(my_exporter) - tracer.add_span_processor(span_processor) - with tracer.start_span("foo"): - with tracer.start_span("bar"): - with tracer.start_span("xxx"): - pass + span_names = ["xxx", "bar", "foo"] + + for name in span_names: + self._create_start_and_end_span(name, span_processor) - # call shutdown on specific span processor - # TODO: this call is missing in the tracer span_processor.shutdown() - self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) + self.assertListEqual(span_names, spans_names_list) def test_batch_span_processor_lossless(self): """Test that no spans are lost when sending max_queue_size spans""" - tracer = trace.Tracer() - spans_names_list = [] my_exporter = MySpanExporter( @@ -86,21 +90,15 @@ def test_batch_span_processor_lossless(self): span_processor = export.BatchExportSpanProcessor( my_exporter, max_queue_size=512, max_export_batch_size=128 ) - tracer.add_span_processor(span_processor) for idx in range(512): - with tracer.start_span("foo{}".format(idx)): - pass + self._create_start_and_end_span("foo", span_processor) - # call shutdown on specific span processor - # TODO: this call is missing in the tracer span_processor.shutdown() self.assertEqual(len(spans_names_list), 512) def test_batch_span_processor_many_spans(self): - """Test that no spans are lost when sending max_queue_size spans""" - tracer = trace.Tracer() - + """Test that no spans are lost when sending many spans""" spans_names_list = [] my_exporter = MySpanExporter( @@ -109,41 +107,32 @@ def test_batch_span_processor_many_spans(self): span_processor = export.BatchExportSpanProcessor( my_exporter, max_queue_size=256, max_export_batch_size=64, schedule_delay_millis=100 ) - tracer.add_span_processor(span_processor) for iteration in range(4): for idx in range(256): - with tracer.start_span("foo{}".format(idx)): - pass + self._create_start_and_end_span("foo", span_processor) + time.sleep(0.05) # give some time for the exporter to upload spans - # call shutdown on specific span processor - # TODO: this call is missing in the tracer span_processor.shutdown() self.assertEqual(len(spans_names_list), 1024) def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" - tracer = trace.Tracer() - spans_names_list = [] my_exporter = MySpanExporter(destination=spans_names_list) span_processor = export.BatchExportSpanProcessor( my_exporter, schedule_delay_millis=50 ) - tracer.add_span_processor(span_processor) - # start single span - with tracer.start_span("foo1"): - pass + # create single span + self._create_start_and_end_span("foo", span_processor) time.sleep(0.05 + 0.02) # span should be already exported self.assertEqual(len(spans_names_list), 1) - # call shutdown on specific span processor - # TODO: this call is missing in the tracer span_processor.shutdown() def test_batch_span_processor_parameters(self): From 66c5191583fa93c803427bfb29e79279ed6d1e85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 27 Sep 2019 14:07:55 +0200 Subject: [PATCH 10/10] make lint happy --- .../sdk/trace/export/__init__.py | 2 +- .../tests/trace/export/test_export.py | 40 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 768294a95a..66122985c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -142,7 +142,7 @@ def on_end(self, span: Span) -> None: if len(self.queue) >= self.max_queue_size // 2: with self.condition: - self.condition.notify() + self.condition.notify() def worker(self): timeout = self.schedule_delay_millis / 1e3 diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 94f4e92a1e..de7a5cd9d7 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -14,8 +14,8 @@ import time import unittest - from unittest import mock + from opentelemetry import trace as trace_api from opentelemetry.sdk import trace from opentelemetry.sdk.trace import export @@ -56,16 +56,17 @@ def test_simple_span_processor(self): self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) -class TestBatchExportSpanProcessor(unittest.TestCase): - def _create_start_and_end_span(self, name, span_processor): - span = trace.Span( - name, - mock.Mock(spec=trace_api.SpanContext), - span_processor=span_processor - ) - span.start() - span.end() +def _create_start_and_end_span(name, span_processor): + span = trace.Span( + name, + mock.Mock(spec=trace_api.SpanContext), + span_processor=span_processor, + ) + span.start() + span.end() + +class TestBatchExportSpanProcessor(unittest.TestCase): def test_batch_span_processor(self): spans_names_list = [] @@ -75,7 +76,7 @@ def test_batch_span_processor(self): span_names = ["xxx", "bar", "foo"] for name in span_names: - self._create_start_and_end_span(name, span_processor) + _create_start_and_end_span(name, span_processor) span_processor.shutdown() self.assertListEqual(span_names, spans_names_list) @@ -91,8 +92,8 @@ def test_batch_span_processor_lossless(self): my_exporter, max_queue_size=512, max_export_batch_size=128 ) - for idx in range(512): - self._create_start_and_end_span("foo", span_processor) + for _ in range(512): + _create_start_and_end_span("foo", span_processor) span_processor.shutdown() self.assertEqual(len(spans_names_list), 512) @@ -105,12 +106,15 @@ def test_batch_span_processor_many_spans(self): destination=spans_names_list, max_export_batch_size=128 ) span_processor = export.BatchExportSpanProcessor( - my_exporter, max_queue_size=256, max_export_batch_size=64, schedule_delay_millis=100 + my_exporter, + max_queue_size=256, + max_export_batch_size=64, + schedule_delay_millis=100, ) - for iteration in range(4): - for idx in range(256): - self._create_start_and_end_span("foo", span_processor) + for _ in range(4): + for _ in range(256): + _create_start_and_end_span("foo", span_processor) time.sleep(0.05) # give some time for the exporter to upload spans @@ -127,7 +131,7 @@ def test_batch_span_processor_scheduled_delay(self): ) # create single span - self._create_start_and_end_span("foo", span_processor) + _create_start_and_end_span("foo", span_processor) time.sleep(0.05 + 0.02) # span should be already exported