-
Notifications
You must be signed in to change notification settings - Fork 631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sdk/trace/exporters: add batch span processor exporter #153
Changes from 1 commit
35205ed
ffe33fc
4784c15
3e69336
f4256b8
7559ee4
75a32dd
cf023ed
5366af1
0346b19
66c5191
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -13,6 +13,8 @@ | |||||
# limitations under the License. | ||||||
|
||||||
import logging | ||||||
import queue | ||||||
import threading | ||||||
import typing | ||||||
from enum import Enum | ||||||
|
||||||
|
@@ -76,3 +78,100 @@ def on_end(self, span: Span) -> None: | |||||
|
||||||
def shutdown(self) -> None: | ||||||
self.span_exporter.shutdown() | ||||||
|
||||||
|
||||||
class BatchExportSpanProcessor(SpanProcessor): | ||||||
"""Batch span processor implementation. | ||||||
|
||||||
BatchExportSpanProcessor is an implementation of `SpanProcessor` that | ||||||
batches ended spans and pushes them to the configured `SpanExporter`. | ||||||
""" | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
span_exporter: SpanExporter, | ||||||
max_queue_size: int = 2048, | ||||||
schedule_delay_millis: int = 5000, | ||||||
max_export_batch_size: int = 512, | ||||||
): | ||||||
if max_queue_size <= 0: | ||||||
raise ValueError("max_queue_size must be a positive integer.") | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
if schedule_delay_millis <= 0: | ||||||
raise ValueError("schedule_delay_millis must be positive.") | ||||||
|
||||||
if max_export_batch_size <= 0: | ||||||
raise ValueError("max_export_batch_size must be positive.") | ||||||
|
||||||
if max_export_batch_size > max_queue_size: | ||||||
raise ValueError( | ||||||
"max_export_batch_size must be less and equal to max_export_batch_size." | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Also FWIW the type annotations don't do anything at runtime, if you want to enforce int/float types here we need a type check too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That check is not strictly needed, I just want a number, if the user pass something else it'll fail at some point. |
||||||
) | ||||||
|
||||||
self.span_exporter = span_exporter | ||||||
self.queue = queue.Queue(max_queue_size) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You never call https://docs.python.org/3/library/queue.html#queue.Queue.task_done on the queue. Maybe a https://docs.python.org/3/library/collections.html#collections.deque would be the better (more light-weight) choice?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure it'll work. It doesn't provide a way to access the number of elements in the queue, so an external counter for the number of elements would be needed (not sure if this will work because deque drops elements without warning). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does, just use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're totally right, I need more coffee. I used it, two changes:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It sounds like we need to clarify that in the spec, I actually expected that we'd drop the oldest spans first.
I think it is if we lock around adding spans to the deque, which we might need to do later anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we only consider CPython with its GIL, a plain list with a lock (condition) might actually be the best solution after all. But I expect the deque without locking at every added span to perform significantly better in GIL-less Python (pypy). By introducing a single lock that is called on every span.end(), we would effectively reintroduce a sort of GIL (even though we only hold this lock for a short time at once, it would be highly contended). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to avoid as much as possible having a lock when adding an element as it will content on all span endings. |
||||||
self.worker_thread = threading.Thread(target=self.worker) | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self.condition = threading.Condition() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Otherwise, a RLock is created by default, but we don't need one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Oberon00 what's wrong with the default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We just don't need the additional guarantees that a recursive lock offers. Lock is at least as efficient as RLock (after all, a Lock could be implemented as being the same as an RLock but not the other way round). |
||||||
self.schedule_delay_millis = schedule_delay_millis | ||||||
self.max_export_batch_size = max_export_batch_size | ||||||
self.half_max_queue_size = max_queue_size / 2 | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self.done = False | ||||||
|
||||||
self.worker_thread.start() | ||||||
|
||||||
def on_start(self, span: Span) -> None: | ||||||
pass | ||||||
|
||||||
def on_end(self, span: Span) -> None: | ||||||
try: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should also check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think specification is not clear if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thing we should maybe do is check https://docs.python.org/3/library/threading.html#threading.Thread.is_alive for our worker and restart it (logging a warning) if it crashed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about this one. Maybe if that thread could crash we should implement a health check somewhere else I think. I don't want to make this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although, about the slowness, we could maybe only check it in cases where we notify the condition. |
||||||
self.queue.put(span, block=False) | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
except queue.Full: | ||||||
# TODO: dropped spans counter? | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dropped span counter sounds like a plan. Or we could log a warning the first time a span is dropped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea, I'd just log the first time a span is dropped. A better approach could be a rate-limited logging system that actually prints the number of spans being dropped per second or so. |
||||||
pass | ||||||
if self.queue.qsize() >= self.half_max_queue_size: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think we send too many notifications otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I create a variable to avoid this "notification storm", the equal comparison could not work because it is possible that the check misses it (two spans end at the same time...). |
||||||
with self.condition: | ||||||
self.condition.notify_all() | ||||||
|
||||||
def worker(self): | ||||||
while not self.done: | ||||||
if self.queue.qsize() < self.max_export_batch_size: | ||||||
with self.condition: | ||||||
self.condition.wait(self.schedule_delay_millis / 1000) | ||||||
if self.queue.empty(): | ||||||
# spurious notification, let's wait again | ||||||
continue | ||||||
if self.done: | ||||||
# missing spans will be sent when calling flush | ||||||
break | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
self.export() | ||||||
|
||||||
# be sure that all spans are sent | ||||||
self.flush() | ||||||
|
||||||
def export(self): | ||||||
"""Exports at most max_export_batch_size spans.""" | ||||||
idx = 0 | ||||||
spans = [] | ||||||
# currently only a single thread acts as consumer, so queue.get() will | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how I could integrate it. It'd a big redesign. |
||||||
# never block | ||||||
while idx < self.max_export_batch_size and not self.queue.empty(): | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
spans.append(self.queue.get()) | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
idx += 1 | ||||||
try: | ||||||
self.span_exporter.export(spans) | ||||||
# pylint: disable=broad-except | ||||||
except Exception as exc: | ||||||
logger.warning("Exception while exporting data: %s", exc) | ||||||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
def flush(self): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
while not self.queue.empty(): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of checking queue.empty() here again, we could have export return a bool. |
||||||
self.export() | ||||||
|
||||||
def shutdown(self) -> None: | ||||||
# signal the worker thread to finish and then wait for it | ||||||
self.done = True | ||||||
with self.condition: | ||||||
self.condition.notify_all() | ||||||
self.worker_thread.join() | ||||||
self.span_exporter.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int
->float
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reyang milliseconds aren't precise enough here?