Skip to content

Commit

Permalink
Simplify LogExporter::Export interface (#2041)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
lalitb and cijothomas authored Aug 21, 2024
1 parent 056d2ae commit 3193320
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 138 deletions.
8 changes: 4 additions & 4 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::LogResult;
use opentelemetry::KeyValue;
use opentelemetry::{InstrumentationLibrary, KeyValue};
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider};
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::Resource;
use pprof::criterion::{Output, PProfProfiler};
use tracing::error;
Expand All @@ -34,7 +34,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::sync::Arc;
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::LogRecord;

use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
let client = self
.client
.lock()
Expand All @@ -19,13 +21,7 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch)? };
let (body, content_type) = { self.build_logs_export_body(batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use crate::{
OTEL_EXPORTER_OTLP_TIMEOUT,
};
use http::{HeaderName, HeaderValue, Uri};
#[cfg(feature = "logs")]
use opentelemetry::InstrumentationLibrary;
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
use opentelemetry_sdk::export::trace::SpanData;
#[cfg(feature = "logs")]
use opentelemetry_sdk::logs::LogRecord;
#[cfg(feature = "metrics")]
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use prost::Message;
Expand Down Expand Up @@ -328,7 +330,7 @@ impl OtlpHttpClient {
#[cfg(feature = "logs")]
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
logs: Vec<(&LogRecord, &InstrumentationLibrary)>,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry_sdk::export::logs::LogExporter;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::LogRecord;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
Expand Down Expand Up @@ -54,7 +56,7 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = inner
Expand All @@ -67,13 +69,7 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

client
.export(Request::from_parts(
Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use crate::{NoExporterConfig, OtlpPipeline};
use async_trait::async_trait;
use std::fmt::Debug;

use opentelemetry::logs::LogError;
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry::InstrumentationLibrary;

use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource};
use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";
Expand Down Expand Up @@ -98,10 +99,7 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
) -> opentelemetry::logs::LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
self.client.export(batch).await
}

Expand Down
82 changes: 47 additions & 35 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ pub mod tonic {
}
}

impl From<opentelemetry_sdk::logs::LogRecord> for LogRecord {
fn from(log_record: opentelemetry_sdk::logs::LogRecord) -> Self {
impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord {
fn from(log_record: &opentelemetry_sdk::logs::LogRecord) -> Self {
let trace_context = log_record.trace_context.as_ref();
let severity_number = match log_record.severity_number {
Some(Severity::Trace) => SeverityNumber::Trace,
Expand Down Expand Up @@ -118,7 +118,7 @@ pub mod tonic {
},
severity_number: severity_number.into(),
severity_text: log_record.severity_text.map(Into::into).unwrap_or_default(),
body: log_record.body.map(Into::into),
body: log_record.body.clone().map(Into::into),
dropped_attributes_count: 0,
flags: trace_context
.map(|ctx| {
Expand All @@ -139,17 +139,23 @@ pub mod tonic {

impl
From<(
opentelemetry_sdk::export::logs::LogData,
(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
),
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
fn from(
data: (
opentelemetry_sdk::export::logs::LogData,
(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
),
&ResourceAttributesWithSchema,
),
) -> Self {
let (log_data, resource) = data;
let ((log_record, instrumentation), resource) = data;

ResourceLogs {
resource: Some(Resource {
Expand All @@ -158,37 +164,44 @@ pub mod tonic {
}),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_logs: vec![ScopeLogs {
schema_url: log_data
.instrumentation
schema_url: instrumentation
.schema_url
.clone()
.map(Into::into)
.unwrap_or_default(),
scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()),
log_records: vec![log_data.record.into()],
scope: Some((instrumentation, log_record.target.clone()).into()),
log_records: vec![log_record.into()],
}],
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
logs: Vec<(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
)>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
Cow<'static, str>,
Vec<&opentelemetry_sdk::export::logs::LogData>,
Vec<(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
)>,
>,
log| {
let key = log
.record
(log_record, instrumentation)| {
let key = log_record
.target
.clone()
.unwrap_or_else(|| log.instrumentation.name.clone());
scope_map.entry(key).or_default().push(log);
.unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned()));
scope_map
.entry(key)
.or_default()
.push((log_record, instrumentation));
scope_map
},
);
Expand All @@ -197,13 +210,13 @@ pub mod tonic {
.into_iter()
.map(|(key, log_data)| ScopeLogs {
scope: Some(InstrumentationScope::from((
&log_data.first().unwrap().instrumentation,
Some(key),
log_data.first().unwrap().1,
Some(key.into_owned().into()),
))),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_data
.into_iter()
.map(|log_data| log_data.record.clone().into())
.map(|(log_record, _)| log_record.into())
.collect(),
})
.collect();
Expand All @@ -223,30 +236,29 @@ pub mod tonic {
mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
fn create_test_log_data(
instrumentation_name: &str,
_message: &str,
) -> (LogRecord, InstrumentationLibrary) {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
}
let instrumentation =
InstrumentationLibrary::builder(instrumentation_name.to_string()).build();
(logrecord, instrumentation)
}

#[test]
fn test_group_logs_by_resource_and_scope_single_scope() {
let resource = Resource::default();
let log1 = create_test_log_data("test-lib", "Log 1");
let log2 = create_test_log_data("test-lib", "Log 2");
let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");

let logs = vec![log1, log2];
let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
Expand All @@ -263,10 +275,10 @@ mod tests {
#[test]
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
let resource = Resource::default();
let log1 = create_test_log_data("lib1", "Log 1");
let log2 = create_test_log_data("lib2", "Log 2");
let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");

let logs = vec![log1, log2];
let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
[#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021)
- Provide default implementation for `event_enabled` method in `LogProcessor`
trait that returns `true` always.
- **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041)
- The Exporter::export() interface is modified as below:
Previous Signature:
```rust
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
```

Updated Signature:
```rust
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>;
```
This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures.

## v0.24.1

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use opentelemetry::logs::{
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::Key;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::logs::LogData;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::trace;
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion};

use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};

use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::logs::LogData;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LoggerProvider;
use pprof::criterion::{Output, PProfProfiler};
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};
use opentelemetry_sdk::{
export::logs::LogData,
logs::LogData,
logs::{LogProcessor, LogRecord, Logger, LoggerProvider},
};

Expand Down
Loading

0 comments on commit 3193320

Please sign in to comment.