Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 75 additions & 184 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import os
import sys
Expand All @@ -37,6 +36,7 @@
OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.sdk.util import BatchAccumulator
from opentelemetry.util._once import Once

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
Expand Down Expand Up @@ -124,16 +124,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


class _FlushRequest:
"""Represents a request for the BatchSpanProcessor to flush spans."""

__slots__ = ["event", "num_spans"]

def __init__(self):
self.event = threading.Event()
self.num_spans = 0


_BSP_RESET_ONCE = Once()


Expand Down Expand Up @@ -182,62 +172,65 @@ def __init__(
max_queue_size, schedule_delay_millis, max_export_batch_size
)

self.accumulator = BatchAccumulator(max_export_batch_size)
self.flush_lock = threading.Lock()

self.span_exporter = span_exporter
self.queue = collections.deque(
[], max_queue_size
) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.condition = threading.Condition(threading.Lock())
self._flush_request = None # type: typing.Optional[_FlushRequest]
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.export_timeout_millis = export_timeout_millis
self.done = False

self.worker_stopped: AtomicBool = AtomicBool(False)
self.processor_shutdown: AtomicBool = AtomicBool(False)

# flag that indicates that spans are being dropped
self._spans_dropped = False
# precallocated list to send spans to exporter
self.spans_list = [
None
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()

self._start_worker()

# Only available in *nix since py37.
if hasattr(os, "register_at_fork"):
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access
self._pid = os.getpid()

def _start_worker(self):
self.worker_stopped.set(False)
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.worker_thread.start()

def _stop_worker(self):
self.worker_stopped.set(True)
with self.condition:
self.condition.notify_all()
self.worker_thread.join()

def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
if self.done:
if self.processor_shutdown.get():
logger.warning("Already shutdown, dropping span.")
return
if not span.context.trace_flags.sampled:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
self._spans_dropped = True

self.queue.appendleft(span)

if len(self.queue) >= self.max_export_batch_size:
full = self.accumulator.push(span)
if full:
with self.condition:
self.condition.notify()

def _at_fork_reinit(self):
self.condition = threading.Condition(threading.Lock())
self.queue.clear()

# worker_thread is local to a process, only the thread that issued fork continues
# to exist. A new worker thread must be started in child process.
Expand All @@ -249,169 +242,53 @@ def _at_fork_reinit(self):

def worker(self):
timeout = self.schedule_delay_millis / 1e3
flush_request = None # type: typing.Optional[_FlushRequest]
while not self.done:
while not self.worker_stopped.get():
with self.condition:
if self.done:
# done flag may have changed, avoid waiting
if self.worker_stopped.get():
break
flush_request = self._get_and_unset_flush_request()
if (
len(self.queue) < self.max_export_batch_size
and flush_request is None
):

self.condition.wait(timeout)
flush_request = self._get_and_unset_flush_request()
if not self.queue:
# spurious notification, let's wait again, reset timeout
timeout = self.schedule_delay_millis / 1e3
self._notify_flush_request_finished(flush_request)
flush_request = None
continue
if self.done:
# missing spans will be sent when calling flush
break

# subtract the duration of this export call to the next timeout
start = time_ns()
self._export(flush_request)
end = time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

self._notify_flush_request_finished(flush_request)
flush_request = None

# there might have been a new flush request while export was running
# and before the done flag switched to true
with self.condition:
shutdown_flush_request = self._get_and_unset_flush_request()

# be sure that all spans are sent
self._drain_queue()
self._notify_flush_request_finished(flush_request)
self._notify_flush_request_finished(shutdown_flush_request)

def _get_and_unset_flush_request(
self,
) -> typing.Optional[_FlushRequest]:
"""Returns the current flush request and makes it invisible to the
worker thread for subsequent calls.
"""
flush_request = self._flush_request
self._flush_request = None
if flush_request is not None:
flush_request.num_spans = len(self.queue)
return flush_request

@staticmethod
def _notify_flush_request_finished(
flush_request: typing.Optional[_FlushRequest],
):
"""Notifies the flush initiator(s) waiting on the given request/event
that the flush operation was finished.
"""
if flush_request is not None:
flush_request.event.set()

def _get_or_create_flush_request(self) -> _FlushRequest:
"""Either returns the current active flush event or creates a new one.

The flush event will be visible and read by the worker thread before an
export operation starts. Callers of a flush operation may wait on the
returned event to be notified when the flush/export operation was
finished.

This method is not thread-safe, i.e. callers need to take care about
synchronization/locking.
"""
if self._flush_request is None:
self._flush_request = _FlushRequest()
return self._flush_request

def _export(self, flush_request: typing.Optional[_FlushRequest]):
"""Exports spans considering the given flush_request.

In case of a given flush_requests spans are exported in batches until
the number of exported spans reached or exceeded the number of spans in
the flush request.
In no flush_request was given at most max_export_batch_size spans are
exported.
"""
if not flush_request:
self._export_batch()
return

num_spans = flush_request.num_spans
while self.queue:
num_exported = self._export_batch()
num_spans -= num_exported

if num_spans <= 0:
break
self.condition.wait(timeout)
while True:
batch = self.accumulator.batch()
if len(batch) == 0:
break
self._export(batch)

def _export_batch(self) -> int:
"""Exports at most max_export_batch_size spans and returns the number of
exported spans.
"""
idx = 0
# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
self.spans_list[idx] = self.queue.pop()
idx += 1
def _export(self, batch):
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
# noinspection PyBroadException
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
self.span_exporter.export(batch)
except Exception: # pylint: disable=broad-except
logger.exception("Exception while exporting Span batch.")
detach(token)

# clean up list
for index in range(idx):
self.spans_list[index] = None
return idx

def _drain_queue(self):
"""Export all elements until queue is empty.

Can only be called from the worker thread context because it invokes
`export` that is not thread safe.
"""
while self.queue:
self._export_batch()

def force_flush(self, timeout_millis: int = None) -> bool:

if timeout_millis is None:
timeout_millis = self.export_timeout_millis

if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

with self.condition:
flush_request = self._get_or_create_flush_request()
# signal the worker thread to flush and wait for it to finish
self.condition.notify_all()

# wait for token to be processed
ret = flush_request.event.wait(timeout_millis / 1e3)
if not ret:
logger.warning("Timeout was exceeded in force_flush().")
return ret
with self.flush_lock:
start = time_ns()
self._stop_worker()
out = True
while not self.accumulator.empty():
batch = self.accumulator.batch()
self._export(batch)
if self._has_timed_out(start, timeout_millis):
logger.warning("Timeout was exceeded in force_flush().")
out = False
break
self._start_worker()
return out

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.processor_shutdown.set(True)
self._stop_worker()
self.span_exporter.shutdown()

@staticmethod
def _has_timed_out(start_time_ns, timeout_millis):
if timeout_millis is None:
return False
elapsed_millis = (time_ns() - start_time_ns) / 1e6
return elapsed_millis > timeout_millis

@staticmethod
def _default_max_queue_size():
try:
Expand Down Expand Up @@ -496,6 +373,20 @@ def _validate_arguments(
)


class AtomicBool:
def __init__(self, v: bool):
self.lock = threading.Lock()
self.v = v

def set(self, v: bool):
with self.lock:
self.v = v

def get(self) -> bool:
with self.lock:
return self.v


class ConsoleSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that prints spans to the
console.
Expand All @@ -511,7 +402,7 @@ def __init__(
formatter: typing.Callable[
[ReadableSpan], str
] = lambda span: span.to_json()
+ linesep,
+ linesep,
):
self.out = out
self.formatter = formatter
Expand Down
Loading