From 3193320fa6dc17e89a7bed6090000aef781ac29c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 21 Aug 2024 20:13:26 +0530 Subject: [PATCH] Simplify LogExporter::Export interface (#2041) Co-authored-by: Cijo Thomas --- .../benches/logs.rs | 8 +- opentelemetry-otlp/src/exporter/http/logs.rs | 14 ++-- opentelemetry-otlp/src/exporter/http/mod.rs | 8 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 14 ++-- opentelemetry-otlp/src/logs.rs | 10 +-- opentelemetry-proto/src/transform/logs.rs | 82 +++++++++++-------- opentelemetry-sdk/CHANGELOG.md | 12 +++ opentelemetry-sdk/benches/log.rs | 2 +- opentelemetry-sdk/benches/log_exporter.rs | 2 +- opentelemetry-sdk/benches/log_processor.rs | 2 +- opentelemetry-sdk/src/export/logs/mod.rs | 14 +--- opentelemetry-sdk/src/logs/log_emitter.rs | 6 +- opentelemetry-sdk/src/logs/log_processor.rs | 27 ++++-- opentelemetry-sdk/src/logs/mod.rs | 10 +++ .../src/testing/logs/in_memory_exporter.rs | 23 ++++-- opentelemetry-stdout/src/logs/exporter.rs | 13 +-- opentelemetry-stdout/src/logs/transform.rs | 56 ++++++------- stress/src/logs.rs | 2 +- 18 files changed, 167 insertions(+), 138 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index ba229419d4..aef5e7bdf2 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -16,10 +16,10 @@ use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::LogResult; -use opentelemetry::KeyValue; +use opentelemetry::{InstrumentationLibrary, KeyValue}; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; -use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; use tracing::error; @@ -34,7 +34,7 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export<'a>(&mut self, _: Vec>) -> LogResult<()> { + async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 396dec680d..83f25c1f9f 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,13 +3,15 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::LogRecord; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let client = self .client .lock() @@ -19,13 +21,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let (body, content_type) = { self.build_logs_export_body(owned_batch)? }; + let (body, content_type) = { self.build_logs_export_body(batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 2fa3ff851b..1b60971d76 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,16 +7,18 @@ use crate::{ OTEL_EXPORTER_OTLP_TIMEOUT, }; use http::{HeaderName, HeaderValue, Uri}; +#[cfg(feature = "logs")] +use opentelemetry::InstrumentationLibrary; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; -#[cfg(feature = "logs")] -use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; +#[cfg(feature = "logs")] +use opentelemetry_sdk::logs::LogRecord; #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::data::ResourceMetrics; use prost::Message; @@ -328,7 +330,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: Vec, + logs: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index b529eda511..e6d9661b8d 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,12 +4,14 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::LogExporter; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogRecord; pub(crate) struct TonicLogsClient { inner: Option, @@ -54,7 +56,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner @@ -67,13 +69,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); client .export(Request::from_parts( diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..65b913d11b 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -12,9 +12,10 @@ use crate::{NoExporterConfig, OtlpPipeline}; use async_trait::async_trait; use std::fmt::Debug; -use opentelemetry::logs::LogError; +use opentelemetry::logs::{LogError, LogResult}; +use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource}; +use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -98,10 +99,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export<'a>( - &mut self, - batch: Vec>, - ) -> opentelemetry::logs::LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index dfd845c5d8..b3b4b895fd 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -54,8 +54,8 @@ pub mod tonic { } } - impl From for LogRecord { - fn from(log_record: opentelemetry_sdk::logs::LogRecord) -> Self { + impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(log_record: &opentelemetry_sdk::logs::LogRecord) -> Self { let trace_context = log_record.trace_context.as_ref(); let severity_number = match log_record.severity_number { Some(Severity::Trace) => SeverityNumber::Trace, @@ -118,7 +118,7 @@ pub mod tonic { }, severity_number: severity_number.into(), severity_text: log_record.severity_text.map(Into::into).unwrap_or_default(), - body: log_record.body.map(Into::into), + body: log_record.body.clone().map(Into::into), dropped_attributes_count: 0, flags: trace_context .map(|ctx| { @@ -139,17 +139,23 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, )> for ResourceLogs { fn from( data: ( - opentelemetry_sdk::export::logs::LogData, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, ), ) -> Self { - let (log_data, resource) = data; + let ((log_record, instrumentation), resource) = data; ResourceLogs { resource: Some(Resource { @@ -158,21 +164,23 @@ pub mod tonic { }), schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { - schema_url: log_data - .instrumentation + schema_url: instrumentation .schema_url .clone() .map(Into::into) .unwrap_or_default(), - scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()), - log_records: vec![log_data.record.into()], + scope: Some((instrumentation, log_record.target.clone()).into()), + log_records: vec![log_record.into()], }], } } } pub fn group_logs_by_resource_and_scope( - logs: Vec, + logs: Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -180,15 +188,20 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData>, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, >, - log| { - let key = log - .record + (log_record, instrumentation)| { + let key = log_record .target .clone() - .unwrap_or_else(|| log.instrumentation.name.clone()); - scope_map.entry(key).or_default().push(log); + .unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned())); + scope_map + .entry(key) + .or_default() + .push((log_record, instrumentation)); scope_map }, ); @@ -197,13 +210,13 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - &log_data.first().unwrap().instrumentation, - Some(key), + log_data.first().unwrap().1, + Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into()) + .map(|(log_record, _)| log_record.into()) .collect(), }) .collect(); @@ -223,30 +236,29 @@ pub mod tonic { mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; - use opentelemetry_sdk::export::logs::LogData; + use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::{logs::LogRecord, Resource}; use std::time::SystemTime; - fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + fn create_test_log_data( + instrumentation_name: &str, + _message: &str, + ) -> (LogRecord, InstrumentationLibrary) { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); - LogData { - instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - record: logrecord, - } + let instrumentation = + InstrumentationLibrary::builder(instrumentation_name.to_string()).build(); + (logrecord, instrumentation) } #[test] fn test_group_logs_by_resource_and_scope_single_scope() { let resource = Resource::default(); - let log1 = create_test_log_data("test-lib", "Log 1"); - let log2 = create_test_log_data("test-lib", "Log 2"); + let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1"); + let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = @@ -263,10 +275,10 @@ mod tests { #[test] fn test_group_logs_by_resource_and_scope_multiple_scopes() { let resource = Resource::default(); - let log1 = create_test_log_data("lib1", "Log 1"); - let log2 = create_test_log_data("lib2", "Log 2"); + let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1"); + let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 496cc3543f..283db572c3 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -25,6 +25,18 @@ [#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021) - Provide default implementation for `event_enabled` method in `LogProcessor` trait that returns `true` always. +- **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041) + - The Exporter::export() interface is modified as below: + Previous Signature: + ```rust + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + ``` + + Updated Signature: + ```rust + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + ``` + This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. ## v0.24.1 diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 71d5fc699f..840560a1f4 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -26,7 +26,7 @@ use opentelemetry::logs::{ use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::trace; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 97069db21c..295f554940 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LoggerProvider; use pprof::criterion::{Output, PProfProfiler}; diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index c75dee65c1..7bf279c219 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -20,7 +20,7 @@ use std::{ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; use opentelemetry_sdk::{ - export::logs::LogData, + logs::LogData, logs::{LogProcessor, LogRecord, Logger, LoggerProvider}, }; diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index e1426553a1..afa6df0ee2 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -8,14 +8,13 @@ use opentelemetry::{ logs::{LogError, LogResult}, InstrumentationLibrary, }; -use std::borrow::Cow; use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { - /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -28,14 +27,5 @@ pub trait LogExporter: Send + Sync + Debug { fn set_resource(&mut self, _resource: &Resource) {} } -/// `LogData` represents a single log event without resource context. -#[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, - /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, -} - /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c9d3e5a828..3af118cc98 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,9 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{ - export::logs::{LogData, LogExporter}, - runtime::RuntimeChannel, - Resource, -}; +use crate::{export::logs::LogExporter, logs::LogData, runtime::RuntimeChannel, Resource}; use opentelemetry::{ global, logs::{LogError, LogResult}, diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7366f19791..16e15c5f1c 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,6 @@ use crate::{ - export::logs::{ExportResult, LogData, LogExporter}, + export::logs::{ExportResult, LogExporter}, + logs::LogData, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -104,7 +105,9 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)])) + futures_executor::block_on( + exporter.export(vec![(&data.record, &data.instrumentation)]), + ) }); if let Err(err) = result { global::handle_error(err); @@ -309,8 +312,13 @@ where if batch.is_empty() { return Ok(()); } + // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + let export_batch = batch + .iter() + .map(|log_data| (&log_data.record, &log_data.instrumentation)) + .collect(); - let export = exporter.export(batch); + let export = exporter.export(export_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -506,15 +514,17 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::export::logs::LogExporter; + use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ - export::logs::{LogData, LogExporter}, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, + BatchConfig, BatchConfigBuilder, LogData, LogProcessor, LoggerProvider, + SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, @@ -523,9 +533,9 @@ mod tests { use async_trait::async_trait; use opentelemetry::logs::AnyValue; use opentelemetry::logs::{Logger, LoggerProvider as _}; + use opentelemetry::InstrumentationLibrary; use opentelemetry::Key; use opentelemetry::{logs::LogResult, KeyValue}; - use std::borrow::Cow; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -536,7 +546,10 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { + async fn export( + &mut self, + _batch: Vec<(&LogRecord, &InstrumentationLibrary)>, + ) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 5d2e72719b..64e441d026 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -11,6 +11,16 @@ pub use log_processor::{ }; pub use record::{LogRecord, TraceContext}; +use opentelemetry::InstrumentationLibrary; +/// `LogData` represents a single log event without resource context. +#[derive(Clone, Debug)] +pub struct LogData { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + #[cfg(all(test, feature = "testing"))] mod tests { use super::*; diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 8068fafaec..170139bf9d 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogData, LogExporter}; +use crate::export::logs::LogExporter; use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; @@ -39,7 +39,7 @@ use std::sync::{Arc, Mutex}; /// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, should_reset_on_shutdown: bool, } @@ -50,6 +50,15 @@ impl Default for InMemoryLogsExporter { } } +/// `OwnedLogData` represents a single log event without resource context. +#[derive(Debug, Clone)] +pub struct OwnedLogData { + /// Log record, which can be borrowed or owned. + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + /// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and /// [`InstrumentationLibrary`]. #[derive(Clone, Debug)] @@ -175,10 +184,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for log in batch.into_iter() { - logs_guard.push(log.into_owned()); + for (log_record, instrumentation) in batch.into_iter() { + let owned_log = OwnedLogData { + record: log_record.clone(), + instrumentation: instrumentation.clone(), + }; + logs_guard.push(owned_log); } Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index dacefa3d8b..fd59701c0b 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; use core::fmt; +use opentelemetry::InstrumentationLibrary; use opentelemetry::{ logs::{LogError, LogResult}, ExportError, }; -use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; -use std::borrow::Cow; use std::io::{stdout, Write}; type Encoder = @@ -45,14 +45,9 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { if let Some(writer) = &mut self.writer { - // TODO - Avoid cloning logdata if it is borrowed. - let log_data = crate::logs::transform::LogData::from(( - batch.into_iter().map(Cow::into_owned).collect(), - &self.resource, - )); - let result = (self.encoder)(writer, log_data) as LogResult<()>; + let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 0560e0c064..84e864f469 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,13 +16,19 @@ pub struct LogData { impl From<( - Vec, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, )> for LogData { fn from( (sdk_logs, sdk_resource): ( - Vec, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, ), ) -> Self { @@ -30,8 +36,8 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); - let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into(); + let schema_url = sdk_log.1.schema_url.clone(); + let scope: Scope = sdk_log.1.clone().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -43,10 +49,10 @@ impl }); match rl.scope_logs.iter_mut().find(|sl| sl.scope == scope) { - Some(sl) => sl.log_records.push(sdk_log.into()), + Some(sl) => sl.log_records.push(sdk_log.0.into()), None => rl.scope_logs.push(ScopeLogs { scope, - log_records: vec![sdk_log.into()], + log_records: vec![sdk_log.0.into()], schema_url, }), } @@ -104,18 +110,17 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { - fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { +impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(record: &opentelemetry_sdk::logs::LogRecord) -> Self { LogRecord { attributes: { - let attributes = value - .record + let attributes = record .attributes_iter() .map(|(k, v)| KeyValue::from((k.clone(), v.clone()))) // Map each pair to a KeyValue .collect::>(); // Collect into a Vecs #[cfg(feature = "populate-logs-event-name")] - if let Some(event_name) = value.record.event_name { + if let Some(event_name) = record.event_name { let mut attributes_with_name = attributes; attributes_with_name.push(KeyValue::from(( "name".into(), @@ -129,33 +134,24 @@ impl From for LogRecord { #[cfg(not(feature = "populate-logs-event-name"))] attributes }, - trace_id: value - .record + trace_id: record .trace_context .as_ref() .map(|c| c.trace_id.to_string()), - span_id: value - .record + span_id: record.trace_context.as_ref().map(|c| c.span_id.to_string()), + flags: record .trace_context .as_ref() - .map(|c| c.span_id.to_string()), - flags: value - .record - .trace_context .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), - time_unix_nano: value.record.timestamp, - time: value.record.timestamp, - observed_time_unix_nano: value.record.observed_timestamp.unwrap(), - observed_time: value.record.observed_timestamp.unwrap(), - severity_number: value - .record - .severity_number - .map(|u| u as u32) - .unwrap_or_default(), + time_unix_nano: record.timestamp, + time: record.timestamp, + observed_time_unix_nano: record.observed_timestamp.unwrap(), + observed_time: record.observed_timestamp.unwrap(), + severity_number: record.severity_number.map(|u| u as u32).unwrap_or_default(), dropped_attributes_count: 0, - severity_text: value.record.severity_text, - body: value.record.body.map(|a| a.into()), + severity_text: record.severity_text, + body: record.body.clone().map(|a| a.into()), } } } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 6cec97463c..1798401e32 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -20,7 +20,7 @@ mod throughput; pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: &mut opentelemetry_sdk::export::logs::LogData) {} + fn emit(&self, _data: &mut opentelemetry_sdk::logs::LogData) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(())