-
Notifications
You must be signed in to change notification settings - Fork 451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Logs] Initial draft to handle batch export into background thread #2096
base: main
Are you sure you want to change the base?
[Logs] Initial draft to handle batch export into background thread #2096
Conversation
See https://github.com/open-telemetry/opentelemetry-rust/compare/main...lalitb:thread-runtime?expand=1 also to get some ideas about solving same/similar problems. |
// Either::Left((export_res, _)) => export_res, | ||
// Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), | ||
// } | ||
ExportResult::Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we are expecting export method to complete in a definite time-period, and we are not handling timeout outside of export method ? any reason why it is so ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. Just chatted with @ThomsonTan about this. If a rogue exporter takes forever, then BatchProcessor's will get stuck behind that. For now, lets us document this limitation and move on.
(Metrics have the same problem. Its slightly worse in metrics, as observable callbacks can also go rogue and entire Metrics SDK will halt to function)
@ThomsonTan - can we move this as separate processor (say) |
Also, can you please fix the merge conflicts - would be good to review on that :), |
Also @ThomsonTan - Can you test it with all OTLP scenarios - OTLP HTTP (hyper, reqwest, reqwest-blocing) and OTLP gRPC, and share the result. Thanks. |
Also, adding this to v0.28 milestone. |
@@ -538,11 +585,13 @@ where | |||
enum BatchMessage { | |||
/// Export logs, usually called when the log is emitted. | |||
ExportLog((LogRecord, InstrumentationScope)), | |||
/// ForceFlush flush the current buffer to the backend | |||
ForceFlush(mpsc::SyncSender<ExportResult>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is another Flush in the enum below, is that meant to be removed?
record.clone(), | ||
instrumentation.clone(), | ||
))); | ||
|
||
if let Err(err) = result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this logging as we are tracking drop via counts
futures_executor::block_on(res_receiver) | ||
.map_err(|err| LogError::Other(err.into())) | ||
.and_then(std::convert::identity) | ||
let (sender, receiver) = mpsc::sync_channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a one shot::channel, and another try_send just above this. I guess they were leftovers and should be removed.
})??; | ||
|
||
if let Some(handle) = self.handle.lock().unwrap().take() { | ||
handle.join().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emit internal log when thread is exiting?
})); | ||
}); | ||
|
||
let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need timeout for flush.
.ok() | ||
.and_then(|v| v.parse().map(Duration::from_millis).ok()) | ||
.unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT); | ||
let shutdown_timeout = env::var(OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with using timeout for shutdown, but this env variable is not required. Lets accept a timeout in the shutdown() method itself, and if none provided, use 5 sec as default. (5 sec is not from any spec, just my guess of a good default)
self.sender.try_send(BatchMessage::Shutdown(sender)) | ||
.map_err(|err| LogError::Other(err.into()))?; | ||
|
||
receiver.recv_timeout(self.shutdown_timeout).map_err(|err| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice way of enforcing shutdown timeout! Love it
error = format!("{}", err) | ||
); | ||
} | ||
logs.reserve(config.max_export_batch_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Vec: has a new method that takes capacity, so you can use that instead of reserve.
}; | ||
|
||
match receiver.recv_timeout(remaining_time) { | ||
Ok(BatchMessage::ExportLog(data)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to understand the amount of allocation/copying overall...
when emit() is called, we close the record and store it in heap, and a pointer to the heap is passed to the channel. In our thread, we receive the pointer as a message and add that to the vec, so its just the cost of copying a pointer, and not the entire LogRecord again.. is this right understanding?
|
||
match receiver.recv_timeout(remaining_time) { | ||
Ok(BatchMessage::ExportLog(data)) => { | ||
logs.push(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I mentioned this in the GitHub issue too, but something we need to further discuss. The channel has capacity of 500 (example). The vec also can hold 500. So it is possible for around 1000 item to be in memory (channel + vector) at a time?
Fixes #2066
Changes
This is an early draft which implements the proposal mentioned in #2066, and looking for feedback.
Merge requirement checklist
CHANGELOG.md
files updated for non-trivial, user-facing changes