Skip to content

Commit

Permalink
sdk/trace/exporters: add batch span processor exporter (#153)
Browse files Browse the repository at this point in the history
The exporters specification states that two built-in span processors should be
implemented, the simple processor span and the batch processor span.

This commit implements the latter, it is mainly based on the opentelemetry/java
one.

The algorithm implements the following logic:
- a condition variable is used to notify the worker thread in case the queue
is half full, so that exporting can start before the queue gets full and spans
are dropped.
- export is called each schedule_delay_millis if there is a least one new span
to export.
- when the processor is shutdown all remaining spans are exported.
  • Loading branch information
mauriciovasquezbernal authored and Oberon00 committed Sep 27, 2019
1 parent 84f589b commit d0946cd
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 8 deletions.
122 changes: 122 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import threading
import typing
from enum import Enum

from opentelemetry.sdk import util

from .. import Span, SpanProcessor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,6 +82,124 @@ def shutdown(self) -> None:
self.span_exporter.shutdown()


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.
BatchExportSpanProcessor is an implementation of `SpanProcessor` that
batches ended spans and pushes them to the configured `SpanExporter`.
"""

def __init__(
self,
span_exporter: SpanExporter,
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
max_export_batch_size: int = 512,
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")

if schedule_delay_millis <= 0:
raise ValueError("schedule_delay_millis must be positive.")

if max_export_batch_size <= 0:
raise ValueError(
"max_export_batch_size must be a positive integer."
)

if max_export_batch_size > max_queue_size:
raise ValueError(
"max_export_batch_size must be less than and equal to max_export_batch_size."
)

self.span_exporter = span_exporter
self.queue = collections.deque([], max_queue_size)
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.condition = threading.Condition(threading.Lock())
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.done = False
# flag that indicates that spans are being dropped
self._spans_dropped = False
# precallocated list to send spans to exporter
self.spans_list = [None] * self.max_export_batch_size
self.worker_thread.start()

def on_start(self, span: Span) -> None:
pass

def on_end(self, span: Span) -> None:
if self.done:
logging.warning("Already shutdown, dropping span.")
return
if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logging.warning("Queue is full, likely spans will be dropped.")
self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_queue_size // 2:
with self.condition:
self.condition.notify()

def worker(self):
timeout = self.schedule_delay_millis / 1e3
while not self.done:
if len(self.queue) < self.max_export_batch_size:
with self.condition:
self.condition.wait(timeout)
if not self.queue:
# spurious notification, let's wait again
continue
if self.done:
# missing spans will be sent when calling flush
break

# substract the duration of this export call to the next timeout
start = util.time_ns()
self.export()
end = util.time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self._flush()

def export(self) -> bool:
"""Exports at most max_export_batch_size spans."""
idx = 0

# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
self.spans_list[idx] = self.queue.pop()
idx += 1
try:
self.span_exporter.export(self.spans_list[:idx])
# pylint: disable=broad-except
except Exception:
logger.exception("Exception while exporting data.")

# clean up list
for index in range(idx):
self.spans_list[index] = None

def _flush(self):
# export all elements until queue is empty
while self.queue:
self.export()

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.span_exporter.shutdown()


class ConsoleSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that prints spans to the
console.
Expand Down
167 changes: 159 additions & 8 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import export


class TestSimpleExportSpanProcessor(unittest.TestCase):
def test_simple_span_processor(self):
class MySpanExporter(export.SpanExporter):
def __init__(self, destination):
self.destination = destination
class MySpanExporter(export.SpanExporter):
"""Very simple span exporter used for testing."""

def __init__(self, destination, max_export_batch_size=None):
self.destination = destination
self.max_export_batch_size = max_export_batch_size

def export(self, spans: trace.Span) -> export.SpanExportResult:
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS
def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
self.max_export_batch_size is not None
and len(spans) > self.max_export_batch_size
):
raise ValueError("Batch is too big")
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS


class TestSimpleExportSpanProcessor(unittest.TestCase):
def test_simple_span_processor(self):
tracer = trace.Tracer()

spans_names_list = []
Expand All @@ -42,3 +54,142 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
pass

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)


def _create_start_and_end_span(name, span_processor):
span = trace.Span(
name,
mock.Mock(spec=trace_api.SpanContext),
span_processor=span_processor,
)
span.start()
span.end()


class TestBatchExportSpanProcessor(unittest.TestCase):
def test_batch_span_processor(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(my_exporter)

span_names = ["xxx", "bar", "foo"]

for name in span_names:
_create_start_and_end_span(name, span_processor)

span_processor.shutdown()
self.assertListEqual(span_names, spans_names_list)

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, max_export_batch_size=128
)
span_processor = export.BatchExportSpanProcessor(
my_exporter, max_queue_size=512, max_export_batch_size=128
)

for _ in range(512):
_create_start_and_end_span("foo", span_processor)

span_processor.shutdown()
self.assertEqual(len(spans_names_list), 512)

def test_batch_span_processor_many_spans(self):
"""Test that no spans are lost when sending many spans"""
spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, max_export_batch_size=128
)
span_processor = export.BatchExportSpanProcessor(
my_exporter,
max_queue_size=256,
max_export_batch_size=64,
schedule_delay_millis=100,
)

for _ in range(4):
for _ in range(256):
_create_start_and_end_span("foo", span_processor)

time.sleep(0.05) # give some time for the exporter to upload spans

span_processor.shutdown()
self.assertEqual(len(spans_names_list), 1024)

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(
my_exporter, schedule_delay_millis=50
)

# create single span
_create_start_and_end_span("foo", span_processor)

time.sleep(0.05 + 0.02)
# span should be already exported
self.assertEqual(len(spans_names_list), 1)

span_processor.shutdown()

def test_batch_span_processor_parameters(self):
# zero max_queue_size
self.assertRaises(
ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0
)

# negative max_queue_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=-500,
)

# zero schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=0,
)

# negative schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=-500,
)

# zero max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=0,
)

# negative max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=-500,
)

# max_export_batch_size > max_queue_size:
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=256,
max_export_batch_size=512,
)

0 comments on commit d0946cd

Please sign in to comment.