-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve BatchExportSpanProcessor #1062
Improve BatchExportSpanProcessor #1062
Conversation
* it was possible for force flush calls to miss the flush finished notifications by the worker thread. in case a flush token got added in the main thread and the worker thread processed and notified the flush condition before the main thread called wait on the flush condition, the wakup is missed and the main thread has to wait the full flush timeout * calls to force flush were not really thread safe since the state if a flush operation is in progress was indictated by a single boolean flag which gets reset when the first force flush call finishes. * instead of having a boolean flag to indicate a flush request use an Event. When a call to force flush is made it is looked up if a flush request event is currently pending or a new one is created. The worker thread will check if a flush request event exists, unset it and use a local reference for signaling once the export operation finished. Force flush calls will wait in the meantime on the flush request event until they are signaled by the worker thread. This also makes calls to force flush thread safe since multiple threads can/might wait on one event.
I believe this PR also solves the issue of the previous implementation of |
Does this PR implement enough tests to cover what PR #947 was trying to cover? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good, just a non-blocking question
if num_spans <= 0: | ||
break | ||
|
||
def export_batch(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to make this method internal to the processor?
def export_batch(self) -> int: | |
def _export_batch(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good since it should only be called by the worker thread. same goes for the export method.
Description
Improve flushing in the
BatchExportSpanProcessor
by using athreading.Event
for requesting a flush operation and signaling that a flush operation finished.The idea is to have the thread which calls
force_flush
to create a flush request (threading.Event
+ num spans to flush) and wait on this request until it gets signaled by the worker thread or the flush timeout expires.The worker thread checks in each export interval if a flush request exists, unsets it and uses the local reference to signal the end of a flush/export operation.
By using a
threading.Event
for signaling, wakeup notifications are not missed since the wait call will return immediately once being signaled.This also addresses the issue that calls to
force_flush
were not thread safe since the indiciation if a flush operation was in progress relied on a boolean flag on the span processors instance which gets reset once the firstforce_flush
call finishes.Addresses #949
Type of change
Checklist: