From 8f4b0ebbe9cf4012c9cd46c4c258cc8cd60cec6f Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 22 Oct 2023 22:47:53 -0700 Subject: [PATCH 1/5] draft --- opentelemetry-otlp/src/exporter/http/logs.rs | 2 ++ opentelemetry-sdk/src/lib.rs | 1 + opentelemetry-sdk/src/logs/log_processor.rs | 5 +-- opentelemetry-sdk/src/suppression.rs | 33 ++++++++++++++++++++ 4 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 opentelemetry-sdk/src/suppression.rs diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 8f358c977d..15091438b8 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -4,12 +4,14 @@ use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::suppression::Supression; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { async fn export(&mut self, batch: Vec) -> LogResult<()> { + let _guard = Suppression::new(); let client = self .client .lock() diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index b021ea5bcd..9ad70ba0fe 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -125,6 +125,7 @@ pub mod metrics; pub mod propagation; pub mod resource; pub mod runtime; +pub mod suppression; #[cfg(any(feature = "testing", test))] #[doc(hidden)] pub mod testing; diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6bac467a5b..6e3ffd97d4 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,7 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, + suppression::Suppression, }; use futures_channel::oneshot; use futures_util::{ @@ -103,7 +104,7 @@ impl LogProcessor for SimpleLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - true + !Suppression::is_logging_suppressed() } } @@ -132,7 +133,7 @@ impl> LogProcessor for BatchLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - true + !Suppression::is_logging_suppressed() } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/suppression.rs b/opentelemetry-sdk/src/suppression.rs new file mode 100644 index 0000000000..f88134110f --- /dev/null +++ b/opentelemetry-sdk/src/suppression.rs @@ -0,0 +1,33 @@ + +use opentelemetry::Context; +use std::any::{Any, TypeId}; +use std::sync::Arc; + +#[derive(Debug, PartialEq, Clone, Copy)] +struct SuppressionKey(bool); // true means logging is suppressed + +pub struct Suppression; + +impl Suppression { + pub fn new() -> Self { + // Suppress logging when a new Suppression instance is created + let mut new_context = Context::current(); + new_context.entries.insert(TypeId::of::(), Arc::new(SuppressionKey(true))); + new_context.attach(); + + Suppression + } + + pub fn is_logging_suppressed() -> bool { + Context::current().get::().cloned().unwrap_or(SuppressionKey(false)).0 + } +} + +impl Drop for Suppression { + fn drop(&mut self) { + // Resume logging when the Suppression instance is dropped + let mut new_context = Context::current(); + new_context.entries.insert(TypeId::of::(), Arc::new(SuppressionKey(false))); + new_context.attach(); + } +} \ No newline at end of file From bba8c03385cd347830a9acd01f2415298a7d0694 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Oct 2023 22:57:21 -0700 Subject: [PATCH 2/5] use task local --- opentelemetry-otlp/src/exporter/http/logs.rs | 9 ++- opentelemetry-sdk/src/lib.rs | 1 + opentelemetry-sdk/src/logs/log_processor.rs | 7 ++- opentelemetry-sdk/src/suppression.rs | 64 ++++++++++---------- 4 files changed, 42 insertions(+), 39 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 15091438b8..720335f581 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,15 +3,18 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; -use opentelemetry_sdk::suppression::Supression; +use opentelemetry_sdk::{ + export::logs::{LogData, LogExporter}, + suppression::SuppressionGuard, +}; +//use opentelemetry_sdk::suppression::Supression; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { async fn export(&mut self, batch: Vec) -> LogResult<()> { - let _guard = Suppression::new(); + let _guard = SuppressionGuard::new(); let client = self .client .lock() diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 9ad70ba0fe..e57421a763 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -125,6 +125,7 @@ pub mod metrics; pub mod propagation; pub mod resource; pub mod runtime; +/// doc pub mod suppression; #[cfg(any(feature = "testing", test))] #[doc(hidden)] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6e3ffd97d4..ff130f34cc 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,7 +1,7 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, - suppression::Suppression, + suppression::SuppressionGuard, }; use futures_channel::oneshot; use futures_util::{ @@ -104,7 +104,7 @@ impl LogProcessor for SimpleLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - !Suppression::is_logging_suppressed() + !SuppressionGuard::is_logging_suppressed() } } @@ -133,7 +133,8 @@ impl> LogProcessor for BatchLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - !Suppression::is_logging_suppressed() + let suppress = SuppressionGuard::is_logging_suppressed(); + !suppress } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/suppression.rs b/opentelemetry-sdk/src/suppression.rs index f88134110f..ff54678b35 100644 --- a/opentelemetry-sdk/src/suppression.rs +++ b/opentelemetry-sdk/src/suppression.rs @@ -1,33 +1,31 @@ - -use opentelemetry::Context; -use std::any::{Any, TypeId}; -use std::sync::Arc; - -#[derive(Debug, PartialEq, Clone, Copy)] -struct SuppressionKey(bool); // true means logging is suppressed - -pub struct Suppression; - -impl Suppression { - pub fn new() -> Self { - // Suppress logging when a new Suppression instance is created - let mut new_context = Context::current(); - new_context.entries.insert(TypeId::of::(), Arc::new(SuppressionKey(true))); - new_context.attach(); - - Suppression - } - - pub fn is_logging_suppressed() -> bool { - Context::current().get::().cloned().unwrap_or(SuppressionKey(false)).0 - } -} - -impl Drop for Suppression { - fn drop(&mut self) { - // Resume logging when the Suppression instance is dropped - let mut new_context = Context::current(); - new_context.entries.insert(TypeId::of::(), Arc::new(SuppressionKey(false))); - new_context.attach(); - } -} \ No newline at end of file +use tokio::task_local; + +// Define the async local storage for suppression. +task_local! { + static SUPPRESSION_FLAG: bool; +} + +/// Represents a scope within which logging is suppressed. +/// Logging is suppressed for the duration of the guard's lifetime. +#[derive(Debug)] +pub struct SuppressionGuard(bool); // Capture the original state + +impl SuppressionGuard { + /// doc1 + pub fn new() -> Self { + let original_state = SUPPRESSION_FLAG.try_with(|&flag| flag).unwrap_or(false); + SUPPRESSION_FLAG.scope(true, async {}); + SuppressionGuard(original_state) + } + + /// doc2 + pub fn is_logging_suppressed() -> bool { + SUPPRESSION_FLAG.try_with(|&flag| flag).unwrap_or(false) + } +} + +impl Drop for SuppressionGuard { + fn drop(&mut self) { + SUPPRESSION_FLAG.scope(self.0, async {}); // Restore the original state + } +} From f98bf85a9d7c920cdb93175366c7e8abdc581488 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Oct 2023 22:58:42 -0700 Subject: [PATCH 3/5] fix --- opentelemetry-sdk/src/logs/log_processor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ff130f34cc..6288b9684b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -133,8 +133,7 @@ impl> LogProcessor for BatchLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - let suppress = SuppressionGuard::is_logging_suppressed(); - !suppress + !SuppressionGuard::is_logging_suppressed() } fn force_flush(&self) -> LogResult<()> { From a65650cdb3815e9f579e474ceb4a17bbc22a0f5d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 30 Oct 2023 14:42:56 -0700 Subject: [PATCH 4/5] review comment --- .../examples/basic-otlp-http/Cargo.toml | 4 +- opentelemetry-otlp/src/exporter/http/logs.rs | 46 ++++++----- opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/logs/log_processor.rs | 6 +- opentelemetry-sdk/src/suppression.rs | 81 ++++++++++++++----- 5 files changed, 92 insertions(+), 46 deletions(-) diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 50f84549b3..785656ace2 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,9 +8,9 @@ publish = false [dependencies] once_cell = "1.17" opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs", "logs_level_enabled"] } opentelemetry-otlp = { path = "../..", features = ["http-proto", "reqwest-client", "logs"] } -opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} +opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing"} opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } tokio = { version = "1.0", features = ["full"] } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 720335f581..2bea9668e6 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -5,7 +5,7 @@ use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_sdk::{ export::logs::{LogData, LogExporter}, - suppression::SuppressionGuard, + suppression::with_suppression, }; //use opentelemetry_sdk::suppression::Supression; @@ -14,31 +14,33 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { async fn export(&mut self, batch: Vec) -> LogResult<()> { - let _guard = SuppressionGuard::new(); - let client = self - .client - .lock() - .map_err(|e| LogError::Other(e.to_string().into())) - .and_then(|g| match &*g { - Some(client) => Ok(Arc::clone(client)), - _ => Err(LogError::Other("exporter is already shut down".into())), - })?; + with_suppression(async { + let client = self + .client + .lock() + .map_err(|e| LogError::Other(e.to_string().into())) + .and_then(|g| match &*g { + Some(client) => Ok(Arc::clone(client)), + _ => Err(LogError::Other("exporter is already shut down".into())), + })?; - let (body, content_type) = build_body(batch)?; - let mut request = http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; + let (body, content_type) = build_body(batch)?; + let mut request = http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body) + .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); - } + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } - client.send(request).await?; + client.send(request).await?; - Ok(()) + Ok(()) + }) + .await } fn shutdown(&mut self) { diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index e8bf9375ee..d82d53bdee 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -30,6 +30,7 @@ url = { version = "2.2", optional = true } tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true } tokio-stream = { version = "0.1.1", optional = true } http = { version = "0.2", optional = true } +pin-project-lite = { version = "0.2" } [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6288b9684b..7f5d62b6bb 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,7 +1,7 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, - suppression::SuppressionGuard, + suppression::is_suppressed, }; use futures_channel::oneshot; use futures_util::{ @@ -104,7 +104,7 @@ impl LogProcessor for SimpleLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - !SuppressionGuard::is_logging_suppressed() + !is_suppressed() } } @@ -133,7 +133,7 @@ impl> LogProcessor for BatchLogProcessor { #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - !SuppressionGuard::is_logging_suppressed() + !is_suppressed() } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/suppression.rs b/opentelemetry-sdk/src/suppression.rs index ff54678b35..90e796b6a5 100644 --- a/opentelemetry-sdk/src/suppression.rs +++ b/opentelemetry-sdk/src/suppression.rs @@ -1,31 +1,74 @@ -use tokio::task_local; +use pin_project_lite::pin_project; +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context as TaskContext, Poll}; +use std::thread; +use tokio::task; -// Define the async local storage for suppression. -task_local! { - static SUPPRESSION_FLAG: bool; +thread_local! { + static SUPPRESSION_ENABLED: RefCell = RefCell::new(false) } -/// Represents a scope within which logging is suppressed. -/// Logging is suppressed for the duration of the guard's lifetime. -#[derive(Debug)] -pub struct SuppressionGuard(bool); // Capture the original state +#[derive(Clone, Copy, Debug)] +pub struct SuppressionContext; -impl SuppressionGuard { - /// doc1 - pub fn new() -> Self { - let original_state = SUPPRESSION_FLAG.try_with(|&flag| flag).unwrap_or(false); - SUPPRESSION_FLAG.scope(true, async {}); - SuppressionGuard(original_state) +impl SuppressionContext { + pub fn current() -> Self { + SuppressionContext } - /// doc2 - pub fn is_logging_suppressed() -> bool { - SUPPRESSION_FLAG.try_with(|&flag| flag).unwrap_or(false) + pub fn attach(self) -> Guard { + let was_suppressed = SUPPRESSION_ENABLED.with(|suppressed| { + let was_suppressed = *suppressed.borrow(); + *suppressed.borrow_mut() = true; + was_suppressed + }); + Guard { was_suppressed } } + + pub fn is_suppressed() -> bool { + let is_suppressed = SUPPRESSION_ENABLED.with(|suppressed| *suppressed.borrow()); + is_suppressed + } +} + +pub struct Guard { + was_suppressed: bool, } -impl Drop for SuppressionGuard { +impl Drop for Guard { fn drop(&mut self) { - SUPPRESSION_FLAG.scope(self.0, async {}); // Restore the original state + SUPPRESSION_ENABLED.with(|suppressed| *suppressed.borrow_mut() = self.was_suppressed); + } +} + +pin_project! { + #[derive(Clone, Debug)] + pub struct WithSuppContext { + #[pin] + inner: T, + supp_cx: SuppressionContext, + } +} + +impl Future for WithSuppContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.supp_cx.attach(); + this.inner.poll(task_cx) + } +} + +pub fn with_suppression(inner: T) -> WithSuppContext { + WithSuppContext { + inner, + supp_cx: SuppressionContext::current(), } } + +pub fn is_suppressed() -> bool { + SuppressionContext::is_suppressed() +} From bc9da9b8d30fabb1fcb7a4514a884fd9e0a57b0e Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 30 Oct 2023 23:26:09 -0700 Subject: [PATCH 5/5] use task-local storage --- opentelemetry-sdk/src/suppression.rs | 56 +++++++++++++++++++++------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/opentelemetry-sdk/src/suppression.rs b/opentelemetry-sdk/src/suppression.rs index 90e796b6a5..f783d8ebcc 100644 --- a/opentelemetry-sdk/src/suppression.rs +++ b/opentelemetry-sdk/src/suppression.rs @@ -3,47 +3,62 @@ use std::cell::RefCell; use std::future::Future; use std::pin::Pin; use std::task::{Context as TaskContext, Poll}; -use std::thread; -use tokio::task; +use tokio::task_local; -thread_local! { - static SUPPRESSION_ENABLED: RefCell = RefCell::new(false) +task_local! { + static SUPPRESSION_ENABLED: RefCell } #[derive(Clone, Copy, Debug)] +///suppression context pub struct SuppressionContext; impl SuppressionContext { + ///current pub fn current() -> Self { SuppressionContext } + ///attach pub fn attach(self) -> Guard { - let was_suppressed = SUPPRESSION_ENABLED.with(|suppressed| { - let was_suppressed = *suppressed.borrow(); - *suppressed.borrow_mut() = true; - was_suppressed - }); + let was_suppressed = SUPPRESSION_ENABLED + .try_with(|suppressed| { + let was_suppressed = *suppressed.borrow(); + *suppressed.borrow_mut() = true; + was_suppressed + }) + .unwrap_or_else(|_| { + SUPPRESSION_ENABLED.with(|suppressed| { + *suppressed.borrow_mut() = true; + false // was_suppressed is false since it was not previously initialized + }) + }); Guard { was_suppressed } } - + ///is_suppressed pub fn is_suppressed() -> bool { - let is_suppressed = SUPPRESSION_ENABLED.with(|suppressed| *suppressed.borrow()); - is_suppressed + SUPPRESSION_ENABLED + .try_with(|suppressed| *suppressed.borrow()) + .unwrap_or(false) } } +/// Guard +#[derive(Debug)] pub struct Guard { was_suppressed: bool, } impl Drop for Guard { fn drop(&mut self) { - SUPPRESSION_ENABLED.with(|suppressed| *suppressed.borrow_mut() = self.was_suppressed); + let _ = SUPPRESSION_ENABLED.try_with(|suppressed| { + *suppressed.borrow_mut() = self.was_suppressed; + }); } } pin_project! { + /// WithSuppContext #[derive(Clone, Debug)] pub struct WithSuppContext { #[pin] @@ -62,6 +77,20 @@ impl Future for WithSuppContext { } } +/// with_init_suppression +pub async fn with_init_suppression(func: F) +where + F: FnOnce() -> Fut, + Fut: Future, +{ + SUPPRESSION_ENABLED + .scope(RefCell::new(true), async { + func().await; + }) + .await; +} + +/// with_suppression pub fn with_suppression(inner: T) -> WithSuppContext { WithSuppContext { inner, @@ -69,6 +98,7 @@ pub fn with_suppression(inner: T) -> WithSuppContext { } } +/// is_suppressed pub fn is_suppressed() -> bool { SuppressionContext::is_suppressed() }