Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/trace/exporters: add batch span processor exporter #153

Merged
125 changes: 125 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import sys
import threading
import typing
from enum import Enum

from opentelemetry.sdk import util

from .. import Span, SpanProcessor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,6 +83,126 @@ def shutdown(self) -> None:
self.span_exporter.shutdown()


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.

BatchExportSpanProcessor is an implementation of `SpanProcessor` that
batches ended spans and pushes them to the configured `SpanExporter`.
"""

def __init__(
self,
span_exporter: SpanExporter,
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
c24t marked this conversation as resolved.
Show resolved Hide resolved
max_export_batch_size: int = 512,
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved

if schedule_delay_millis <= 0:
raise ValueError("schedule_delay_millis must be positive.")

if max_export_batch_size <= 0:
raise ValueError(
"max_export_batch_size must be a positive integer."
)

if max_export_batch_size > max_queue_size:
raise ValueError(
"max_export_batch_size must be less and equal to max_export_batch_size."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"max_export_batch_size must be less and equal to max_export_batch_size."
"max_export_batch_size must be less than or equal to max_export_batch_size."

Also FWIW the type annotations don't do anything at runtime, if you want to enforce int/float types here we need a type check too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That check is not strictly needed, I just want a number, if the user pass something else it'll fail at some point.

)

self.span_exporter = span_exporter
self.queue = collections.deque([], max_queue_size)
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.condition = threading.Condition(threading.Lock())
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.done = False
# flag that indicates that spans are being dropped
self._spans_dropped = False
# used to avoid sending too much notifications to worker
self.notified = False
self.worker_thread.start()

def on_start(self, span: Span) -> None:
pass

def on_end(self, span: Span) -> None:
if self.done:
logging.warning("Already shutdown, dropping span.")
return
if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logging.warning("Queue is full, likely spans will be dropped.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a consequence of adding items outside the condition, because the worker might wake up and consume some spans between here and appendleft below?

I bet we'll want exact stats on dropped spans in the future, will probably need to change this behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we want exact stats we can either:

  • Go back to using queue.Queue
  • Use a lock and a plain list
  • Estimate the number with n_ended_spans - (n_exported_or_exporting_spans + len(self.queue)) (becomes exact after shutdown, or whenever no spans are currently added or processed)
  • Maybe the queue implementation that @reyang suggested, I haven't looked at it though so I don't know if that would solve the problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c24t yes you right. There is not a way to actually now if the queue will be full when appendleft will be called. I agree with both of you, if we need to support that use case we'll need another approach.

self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_queue_size // 2:
with self.condition:
if not self.notified:
self.notified = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, it's to prevent calling notify_all when the worker isn't actually waiting, which could happen if we're adding spans to the queue fast enough that we never drain it more than halfway.

My intuition is that notify_all is effectively a no-op if nothing is waiting on the cv, so I'm surprised you think it's worth the check to prevent the extra calls. In any case this is fine, just curious whether I'm missing something here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my original suggestion that triggered this was to use == instead of >= but @mauriciovasquezbernal rightly rejected that because with multiple threads adding spans, the condition might never be hit. But you are right that notify_all is semantically a no-op if no one is waiting, so I'd say the self.notified variable is overkill.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused. The notified variable I introduced is a horrible and wrong hack.
notify() is not-op when nobody is waiting:

This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting.

self.condition.notify_all()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, out of curiosity, why this instead of notify when there's only one worker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to notify()


def worker(self):
timeout = self.schedule_delay_millis / 1e3
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
while not self.done:
if len(self.queue) < self.max_export_batch_size:
with self.condition:
self.notified = False
self.condition.wait(timeout)
if not self.queue:
# spurious notification, let's wait again
continue
if self.done:
# missing spans will be sent when calling flush
break
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved

# substract the duration of this export call to the next timeout
start = util.time_ns()
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
self.export()
end = util.time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self._flush()

def export(self) -> bool:
"""Exports at most max_export_batch_size spans."""
idx = 0
spans = [None] * self.max_export_batch_size
Copy link
Member

@Oberon00 Oberon00 Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing from #153 (comment)

(Just an optimization, maybe for another PR:) I suggest we either use some more clever preallocation size than the max, or we create the list once at thread startup in the worker function and reuse it every time in export.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 one clever preallocation strategy is letting python handle this under the hood and just appending to an empty list. :P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we reuse the same list and emtpy it every time, that would actually be good. 😉

The thing with the batch exporter is that it is all about performance (otherwise we could use the simple exporter). I understand that moving the bulk of the work to another thread is already the most important part though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the @Oberon00 could be a good approach, I implemented it. However if we really want to optimize it we should do some benchmark first.

# currently only a single thread acts as consumer, so queue.pop() will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will raise or will not raise?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the not got lost while wrapping the line.

# raise an exception
while idx < self.max_export_batch_size and self.queue:
spans[idx] = self.queue.pop()
idx += 1
try:
self.span_exporter.export(spans[:idx])
# pylint: disable=broad-except
except Exception:
logger.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't think logger.exception is justified here?

"Exception while exporting data.", exc_info=sys.exc_info()
)

return self.queue
Copy link
Member

@c24t c24t Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return the queue?

I see @Oberon00's comment now, maybe do this instead:

Suggested change
return self.queue
return bool(self.queue)

But I think this is a surprising return type for export.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just return to my original approach. export() returns nothing and flush() checks the size of the queue. It's clear after all.


def _flush(self):
while self.export():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a problem for another PR, but we probably want a timeout on flush too.

Copy link
Member Author

@mauriciovasquezbernal mauriciovasquezbernal Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's keep an eye on it.

pass

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.span_exporter.shutdown()


class ConsoleSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that prints spans to the
console.
Expand Down
147 changes: 140 additions & 7 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,166 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest

from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import export


class MySpanExporter(export.SpanExporter):
"""Very simple span exporter used for testing."""

def __init__(self, destination, max_export_batch_size=None):
self.destination = destination
self.max_export_batch_size = max_export_batch_size

def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
self.max_export_batch_size is not None
and len(spans) > self.max_export_batch_size
):
raise ValueError("Batch is too big")
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS


class TestSimpleExportSpanProcessor(unittest.TestCase):
def test_simple_span_processor(self):
class MySpanExporter(export.SpanExporter):
def __init__(self, destination):
self.destination = destination
tracer = trace.Tracer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote to leave the tracer out of these tests and call on_end directly with some mock spans instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, make them look more like test units than integration tests.
I updated it.


spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.SimpleExportSpanProcessor(my_exporter)
tracer.add_span_processor(span_processor)

def export(self, spans: trace.Span) -> export.SpanExportResult:
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS
with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("xxx"):
pass

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)


class TestBatchExportSpanProcessor(unittest.TestCase):
def test_batch_span_processor(self):
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.SimpleExportSpanProcessor(my_exporter)
span_processor = export.BatchExportSpanProcessor(my_exporter)
tracer.add_span_processor(span_processor)

with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("xxx"):
pass

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()
self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, max_export_batch_size=128
)
span_processor = export.BatchExportSpanProcessor(
my_exporter, max_queue_size=512, max_export_batch_size=128
)
tracer.add_span_processor(span_processor)

for idx in range(512):
with tracer.start_span("foo{}".format(idx)):
pass

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this test and the one above just seem to check _flush, it's probably worth adding a separate check that we only export max_export_batch_size many spans at a time during normal operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it.

self.assertEqual(len(spans_names_list), 512)

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(
my_exporter, schedule_delay_millis=50
)
tracer.add_span_processor(span_processor)

# start single span
with tracer.start_span("foo1"):
pass

time.sleep(0.05 + 0.02)
# span should be already exported
self.assertEqual(len(spans_names_list), 1)

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()

def test_batch_span_processor_parameters(self):
# zero max_queue_size
self.assertRaises(
ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0
)

# negative max_queue_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=-500,
)

# zero schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=0,
)

# negative schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=-500,
)

# zero max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=0,
)

# negative max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=-500,
)

# max_export_batch_size > max_queue_size:
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=256,
max_export_batch_size=512,
)