diff --git a/examples/metrics-advanced/src/main.rs b/examples/metrics-advanced/src/main.rs index ed1572ade6..d6b2b11e57 100644 --- a/examples/metrics-advanced/src/main.rs +++ b/examples/metrics-advanced/src/main.rs @@ -4,7 +4,7 @@ use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::{ Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream, Temporality, }; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::Resource; use std::error::Error; fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { @@ -49,7 +49,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { .with_temporality(Temporality::Delta) .build(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new([KeyValue::new( diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 084472dc28..d661bd601d 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, KeyValue}; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::Resource; use std::error::Error; use std::vec; @@ -9,7 +9,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { // Build exporter using Delta Temporality (Defaults to Temporality::Cumulative) // .with_temporality(opentelemetry_sdk::metrics::Temporality::Delta) .build(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new([KeyValue::new( diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 8e640ee1c9..7e855f3047 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -9,12 +9,13 @@ publish = false default = ["reqwest"] reqwest = ["opentelemetry-otlp/reqwest-client"] hyper = ["opentelemetry-otlp/hyper-client"] +experimental_metrics_periodicreader_with_async_runtime = ["opentelemetry_sdk/experimental_metrics_periodicreader_with_async_runtime"] [dependencies] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"]} +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs", "experimental_metrics_periodicreader_with_async_runtime"]} opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false} opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 3514c643f1..c01bb05df4 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -10,7 +10,7 @@ use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter}; use opentelemetry_sdk::{ logs::LoggerProvider, - metrics::{MetricError, PeriodicReader, SdkMeterProvider}, + metrics::{MetricError, SdkMeterProvider}, runtime, trace::{self as sdktrace, TracerProvider}, }; @@ -63,8 +63,19 @@ fn init_metrics() -> Result Result { fn init_metrics() -> Result { let exporter = MetricExporter::builder().with_tonic().build()?; - - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); Ok(SdkMeterProvider::builder() .with_reader(reader) diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 42201f2672..1222f9e2ce 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -179,7 +179,7 @@ //! .build() //! .unwrap(); //! -//! let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio) +//! let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter) //! .with_interval(std::time::Duration::from_secs(3)) //! .with_timeout(Duration::from_secs(10)) //! .build(); diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 7ceb9c0188..95714301aa 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -28,6 +28,52 @@ Migration Guidance: - These methods were intended for log appenders. Keep the clone of the provider handle, instead of depending on above methods. +- *Breaking* - `PeriodicReader` Updates + + `PeriodicReader` no longer requires an async runtime by default. Instead, it + now creates its own background thread for execution. This change allows + metrics to be used in environments without async runtimes. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_metrics_periodicreader_with_async_runtime`**. + + Migration Guide: + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio).build(); + ``` + - *After:* + ```rust + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter).build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_metrics_periodicreader_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio).build(); + ``` + + - *After:* + ```rust + let reader = opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder(exporter, runtime::Tokio).build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_metrics_periodicreader_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index eb5a4862c2..a2a2cf180c 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -54,6 +54,7 @@ rt-tokio-current-thread = ["tokio", "tokio-stream"] rt-async-std = ["async-std"] internal-logs = ["tracing"] experimental_metrics_periodic_reader_no_runtime = ["metrics"] +experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] [[bench]] diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index a83cd92fcb..f52f54edd2 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -50,8 +50,9 @@ pub(crate) mod meter; mod meter_provider; pub(crate) mod noop; pub(crate) mod periodic_reader; -#[cfg(feature = "experimental_metrics_periodic_reader_no_runtime")] -pub(crate) mod periodic_reader_with_own_thread; +#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")] +/// Module for periodic reader with async runtime. +pub mod periodic_reader_with_async_runtime; pub(crate) mod pipeline; pub mod reader; pub(crate) mod view; @@ -61,8 +62,6 @@ pub use error::{MetricError, MetricResult}; pub use manual_reader::*; pub use meter_provider::*; pub use periodic_reader::*; -#[cfg(feature = "experimental_metrics_periodic_reader_no_runtime")] -pub use periodic_reader_with_own_thread::*; pub use pipeline::Pipeline; pub use instrument::InstrumentKind; @@ -107,8 +106,8 @@ mod tests { use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint}; use super::*; use crate::metrics::data::ResourceMetrics; + use crate::testing::metrics::InMemoryMetricExporter; use crate::testing::metrics::InMemoryMetricExporterBuilder; - use crate::{runtime, testing::metrics::InMemoryMetricExporter}; use data::GaugeDataPoint; use opentelemetry::metrics::{Counter, Meter, UpDownCounter}; use opentelemetry::InstrumentationScope; @@ -513,7 +512,7 @@ mod tests { } let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Test Meter creation in 2 ways, both with empty string as meter name @@ -529,7 +528,7 @@ mod tests { async fn counter_duplicate_instrument_merge() { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -580,7 +579,7 @@ mod tests { async fn counter_duplicate_instrument_different_meter_no_merge() { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -669,7 +668,7 @@ mod tests { async fn instrumentation_scope_identity_test() { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -753,7 +752,7 @@ mod tests { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("test_histogram"); let stream_invalid_aggregation = Stream::new() .aggregation(Aggregation::ExplicitBucketHistogram { @@ -803,7 +802,7 @@ mod tests { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("my_observable_counter"); // View drops all attributes. let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]); @@ -878,7 +877,7 @@ mod tests { // Arrange let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("my_counter"); // View drops all attributes. let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]); @@ -2433,7 +2432,7 @@ mod tests { let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality); let exporter = exporter.build(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); TestContext { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index f8dffdcbd0..9e46d55fad 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -1,25 +1,24 @@ use std::{ - env, fmt, mem, - sync::{Arc, Mutex, Weak}, - time::Duration, + env, fmt, + sync::{ + atomic::AtomicBool, + mpsc::{self, Receiver, Sender}, + Arc, Mutex, Weak, + }, + thread, + time::{Duration, Instant}, }; -use futures_channel::{mpsc, oneshot}; -use futures_util::{ - future::{self, Either}, - pin_mut, - stream::{self, FusedStream}, - StreamExt, -}; -use opentelemetry::{otel_debug, otel_error}; +use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn}; -use crate::runtime::Runtime; use crate::{ metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; -use super::{data::ResourceMetrics, reader::MetricReader, InstrumentKind, Pipeline}; +use super::{ + data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality, +}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); @@ -33,8 +32,9 @@ const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; /// to the exporter at a defined interval. /// /// By default, the returned [MetricReader] will collect and export data every -/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The -/// export time is not counted towards the interval between attempts. +/// 60 seconds. The export time is not counted towards the interval between +/// attempts. PeriodicReader itself does not enforce timeout. Instead timeout +/// is passed on to the exporter for each export attempt. /// /// The [collect] method of the returned [MetricReader] continues to gather and /// return metric data to the user. It will not automatically send that data to @@ -42,19 +42,17 @@ const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; /// /// [collect]: MetricReader::collect #[derive(Debug)] -pub struct PeriodicReaderBuilder { +pub struct PeriodicReaderBuilder { interval: Duration, timeout: Duration, exporter: E, - runtime: RT, } -impl PeriodicReaderBuilder +impl PeriodicReaderBuilder where E: PushMetricExporter, - RT: Runtime, { - fn new(exporter: E, runtime: RT) -> Self { + fn new(exporter: E) -> Self { let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) .ok() .and_then(|v| v.parse().map(Duration::from_millis).ok()) @@ -68,7 +66,6 @@ where interval, timeout, exporter, - runtime, } } @@ -86,8 +83,9 @@ where self } - /// Configures the time a [PeriodicReader] waits for an export to complete - /// before canceling it. + /// Configures the timeout for an export to complete. PeriodicReader itself + /// does not enforce timeout. Instead timeout is passed on to the exporter + /// for each export attempt. /// /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT` /// environment variable. @@ -103,100 +101,189 @@ where /// Create a [PeriodicReader] with the given config. pub fn build(self) -> PeriodicReader { - let (message_sender, message_receiver) = mpsc::channel(256); - - let worker = move |reader: &PeriodicReader| { - let runtime = self.runtime.clone(); - let reader = reader.clone(); - self.runtime.spawn(Box::pin(async move { - let ticker = runtime - .interval(self.interval) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| Message::Export); - let messages = Box::pin(stream::select(message_receiver, ticker)); - PeriodicReaderWorker { - reader, - timeout: self.timeout, - runtime, - rm: ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }, - } - .run(messages) - .await - })); - }; - - otel_debug!( - name: "PeriodicReader.BuildCompleted", - message = "Periodic reader built.", - interval_in_secs = self.interval.as_secs(), - temporality = format!("{:?}", self.exporter.temporality()), - ); - - PeriodicReader { - exporter: Arc::new(self.exporter), - inner: Arc::new(Mutex::new(PeriodicReaderInner { - message_sender, - is_shutdown: false, - sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)), - })), - } + PeriodicReader::new(self.exporter, self.interval, self.timeout) } } /// A [MetricReader] that continuously collects and exports metric data at a set /// interval. /// -/// By default it will collect and export data every 60 seconds, and will cancel -/// export attempts that exceed 30 seconds. The export time is not counted -/// towards the interval between attempts. +/// By default, PeriodicReader will collect and export data every +/// 60 seconds. The export time is not counted towards the interval between +/// attempts. PeriodicReader itself does not enforce timeout. +/// Instead timeout is passed on to the exporter for each export attempt. /// /// The [collect] method of the returned continues to gather and /// return metric data to the user. It will not automatically send that data to /// the exporter outside of the predefined interval. /// -/// The [runtime] can be selected based on feature flags set for this crate. -/// -/// The exporter can be any exporter that implements [PushMetricExporter] such -/// as [opentelemetry-otlp]. +/// As this spuns up own background thread, this is recommended to be used with push exporters +/// that do not require any particular async runtime. As of now, this cannot be used with +/// OTLP exporters as they requires async runtime /// /// [collect]: MetricReader::collect -/// [runtime]: crate::runtime -/// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/ /// /// # Example /// /// ```no_run /// use opentelemetry_sdk::metrics::PeriodicReader; -/// # fn example(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R) +/// # fn example(get_exporter: impl Fn() -> E) /// # where /// # E: opentelemetry_sdk::metrics::exporter::PushMetricExporter, -/// # R: opentelemetry_sdk::runtime::Runtime, /// # { /// -/// let exporter = get_exporter(); // set up a push exporter like OTLP -/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio +/// let exporter = get_exporter(); // set up a push exporter /// -/// let reader = PeriodicReader::builder(exporter, runtime).build(); +/// let reader = PeriodicReader::builder(exporter).build(); /// # drop(reader); /// # } /// ``` #[derive(Clone)] pub struct PeriodicReader { - exporter: Arc, - inner: Arc>, + inner: Arc, } impl PeriodicReader { - /// Configuration options for a periodic reader - pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder + /// Configuration options for a periodic reader with own thread + pub fn builder(exporter: E) -> PeriodicReaderBuilder where E: PushMetricExporter, - RT: Runtime, { - PeriodicReaderBuilder::new(exporter, runtime) + PeriodicReaderBuilder::new(exporter) + } + + fn new(exporter: E, interval: Duration, timeout: Duration) -> Self + where + E: PushMetricExporter, + { + let (message_sender, message_receiver): (Sender, Receiver) = + mpsc::channel(); + let reader = PeriodicReader { + inner: Arc::new(PeriodicReaderInner { + message_sender: Arc::new(Mutex::new(message_sender)), + is_shutdown: AtomicBool::new(false), + producer: Mutex::new(None), + exporter: Arc::new(exporter), + }), + }; + let cloned_reader = reader.clone(); + + let result_thread_creation = thread::Builder::new() + .name("OpenTelemetry.Metrics.PeriodicReader".to_string()) + .spawn(move || { + let mut interval_start = Instant::now(); + let mut remaining_interval = interval; + otel_info!( + name: "PeriodReaderThreadStarted", + interval = interval.as_secs(), + timeout = timeout.as_secs() + ); + loop { + otel_debug!( + name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval = remaining_interval.as_millis() + ); + match message_receiver.recv_timeout(remaining_interval) { + Ok(Message::Flush(response_sender)) => { + otel_debug!( + name: "PeriodReaderThreadExportingDueToFlush" + ); + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + response_sender.send(false).unwrap(); + } else { + response_sender.send(true).unwrap(); + } + + // Adjust the remaining interval after the flush + let elapsed = interval_start.elapsed(); + if elapsed < interval { + remaining_interval = interval - elapsed; + otel_debug!( + name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush", + remaining_interval = remaining_interval.as_secs() + ); + } else { + otel_debug!( + name: "PeriodReaderThreadAdjustingExportAfterFlush", + ); + // Reset the interval if the flush finishes after the expected export time + // effectively missing the normal export. + // Should we attempt to do the missed export immediately? + // Or do the next export at the next interval? + // Currently this attempts the next export immediately. + // i.e calling Flush can affect the regularity. + interval_start = Instant::now(); + remaining_interval = Duration::ZERO; + } + } + Ok(Message::Shutdown(response_sender)) => { + // Perform final export and break out of loop and exit the thread + otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown"); + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + response_sender.send(false).unwrap(); + } else { + response_sender.send(true).unwrap(); + } + break; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + let export_start = Instant::now(); + otel_debug!( + name: "PeriodReaderThreadExportingDueToTimer", + event_name = "PeriodReaderThreadExportingDueToTimer" + ); + + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + otel_debug!( + name: "PeriodReaderThreadExportingDueToTimerFailed", + event_name = "PeriodReaderThreadExportingDueToTimerFailed" + ); + } + + let time_taken_for_export = export_start.elapsed(); + if time_taken_for_export > interval { + otel_debug!( + name: "PeriodReaderThreadExportTookLongerThanInterval", + event_name = "PeriodReaderThreadExportTookLongerThanInterval" + ); + // if export took longer than interval, do the + // next export immediately. + // Alternatively, we could skip the next export + // and wait for the next interval. + // Or enforce that export timeout is less than interval. + // What is the desired behavior? + interval_start = Instant::now(); + remaining_interval = Duration::ZERO; + } else { + remaining_interval = interval - time_taken_for_export; + interval_start = Instant::now(); + } + } + Err(_) => { + // Some other error. Break out and exit the thread. + break; + } + } + } + otel_info!( + name: "PeriodReaderThreadStopped", + event_name = "PeriodReaderThreadStopped" + ); + }); + + // TODO: Should we fail-fast here and bubble up the error to user? + #[allow(unused_variables)] + if let Err(e) = result_thread_creation { + otel_error!( + name: "PeriodReaderThreadStartError", + event_name = "PeriodReaderThreadStartError", + error = format!("{:?}", e) + ); + } + reader + } + + fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { + self.inner.collect_and_export(timeout) } } @@ -207,195 +294,180 @@ impl fmt::Debug for PeriodicReader { } struct PeriodicReaderInner { - message_sender: mpsc::Sender, - is_shutdown: bool, - sdk_producer_or_worker: ProducerOrWorker, + exporter: Arc, + message_sender: Arc>>, + producer: Mutex>>, + is_shutdown: AtomicBool, } -#[derive(Debug)] -enum Message { - Export, - Flush(oneshot::Sender>), - Shutdown(oneshot::Sender>), -} +impl PeriodicReaderInner { + fn register_pipeline(&self, producer: Weak) { + let mut inner = self.producer.lock().expect("lock poisoned"); + *inner = Some(producer); + } -enum ProducerOrWorker { - Producer(Weak), - Worker(Box), -} + fn temporality(&self, _kind: InstrumentKind) -> Temporality { + self.exporter.temporality() + } -struct PeriodicReaderWorker { - reader: PeriodicReader, - timeout: Duration, - runtime: RT, - rm: ResourceMetrics, -} + fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricError::Other("reader is shut down".into())); + } + + let producer = self.producer.lock().expect("lock poisoned"); + if let Some(p) = producer.as_ref() { + p.upgrade() + .ok_or_else(|| MetricError::Other("pipeline is dropped".into()))? + .produce(rm)?; + Ok(()) + } else { + Err(MetricError::Other("pipeline is not registered".into())) + } + } + + fn collect_and_export(&self, _timeout: Duration) -> MetricResult<()> { + // TODO: Reuse the internal vectors. Or refactor to avoid needing any + // owned data structures to be passed to exporters. + let mut rm = ResourceMetrics { + resource: Resource::empty(), + scope_metrics: Vec::new(), + }; -impl PeriodicReaderWorker { - async fn collect_and_export(&mut self) -> MetricResult<()> { - self.reader.collect(&mut self.rm)?; - if self.rm.scope_metrics.is_empty() { - otel_debug!( - name: "PeriodicReaderWorker.NoMetricsToExport", + let collect_result = self.collect(&mut rm); + #[allow(clippy::question_mark)] + if let Err(e) = collect_result { + otel_warn!( + name: "PeriodReaderCollectError", + event_name = "PeriodReaderCollectError", + error = format!("{:?}", e) ); - // No metrics to export. + return Err(e); + } + + if rm.scope_metrics.is_empty() { + otel_debug!(name: "NoMetricsCollected"); return Ok(()); } - otel_debug!( - name: "PeriodicReaderWorker.InvokeExporter", - message = "Calling exporter's export method with collected metrics.", - count = self.rm.scope_metrics.len(), - ); - let export = self.reader.exporter.export(&mut self.rm); - let timeout = self.runtime.delay(self.timeout); - pin_mut!(export); - pin_mut!(timeout); - - match future::select(export, timeout).await { - Either::Left((res, _)) => { - res // return the status of export. - } - Either::Right(_) => Err(MetricError::Other("export timed out".into())), + // TODO: substract the time taken for collect from the timeout. collect + // involves observable callbacks too, which are user defined and can + // take arbitrary time. + // + // Relying on futures executor to execute async call. + // TODO: Add timeout and pass it to exporter or consider alternative + // design to enforce timeout here. + let exporter_result = futures_executor::block_on(self.exporter.export(&mut rm)); + #[allow(clippy::question_mark)] + if let Err(e) = exporter_result { + otel_warn!( + name: "PeriodReaderExportError", + event_name = "PeriodReaderExportError", + error = format!("{:?}", e) + ); + return Err(e); } + + Ok(()) } - async fn process_message(&mut self, message: Message) -> bool { - match message { - Message::Export => { - otel_debug!( - name: "PeriodicReader.ExportTriggered", - message = "Export message received.", - ); - if let Err(err) = self.collect_and_export().await { - otel_error!( - name: "PeriodicReader.ExportFailed", - message = "Failed to export metrics", - reason = format!("{}", err)); - } + fn force_flush(&self) -> MetricResult<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricError::Other("reader is shut down".into())); + } + let (response_tx, response_rx) = mpsc::channel(); + match self.message_sender.lock() { + Ok(sender) => { + sender + .send(Message::Flush(response_tx)) + .map_err(|e| MetricError::Other(e.to_string()))?; } - Message::Flush(ch) => { + Err(e) => { otel_debug!( - name: "PeriodicReader.ForceFlushCalled", - message = "Flush message received.", + name: "PeriodReaderForceFlushError", + event_name = "PeriodReaderForceFlushError", + error = format!("{:?}", e) ); - let res = self.collect_and_export().await; - if let Err(send_error) = ch.send(res) { - otel_debug!( - name: "PeriodicReader.Flush.SendResultError", - message = "Failed to send flush result.", - reason = format!("{:?}", send_error), - ); - } + return Err(MetricError::Other(e.to_string())); + } + } + + if let Ok(response) = response_rx.recv() { + // TODO: call exporter's force_flush method. + if response { + Ok(()) + } else { + Err(MetricError::Other("Failed to flush".into())) } - Message::Shutdown(ch) => { + } else { + Err(MetricError::Other("Failed to flush".into())) + } + } + + fn shutdown(&self) -> MetricResult<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricError::Other("Reader is already shut down".into())); + } + + // TODO: See if this is better to be created upfront. + let (response_tx, response_rx) = mpsc::channel(); + match self.message_sender.lock() { + Ok(sender) => { + sender + .send(Message::Shutdown(response_tx)) + .map_err(|e| MetricError::Other(e.to_string()))?; + } + Err(e) => { otel_debug!( - name: "PeriodicReader.ShutdownCalled", - message = "Shutdown message received", + name: "PeriodReaderShutdownError", + event_name = "PeriodReaderShutdownError", + error = format!("{:?}", e) ); - let res = self.collect_and_export().await; - let _ = self.reader.exporter.shutdown(); - if let Err(send_error) = ch.send(res) { - otel_debug!( - name: "PeriodicReader.Shutdown.SendResultError", - message = "Failed to send shutdown result", - reason = format!("{:?}", send_error), - ); - } - return false; + return Err(MetricError::Other(e.to_string())); } } - true - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - while let Some(message) = messages.next().await { - if !self.process_message(message).await { - break; + if let Ok(response) = response_rx.recv() { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + if response { + Ok(()) + } else { + Err(MetricError::Other("Failed to shutdown".into())) } + } else { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + Err(MetricError::Other("Failed to shutdown".into())) } } } +#[derive(Debug)] +enum Message { + Flush(Sender), + Shutdown(Sender), +} + impl MetricReader for PeriodicReader { fn register_pipeline(&self, pipeline: Weak) { - let mut inner = match self.inner.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - let worker = match &mut inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(_) => { - // Only register once. If producer is already set, do nothing. - otel_debug!(name: "PeriodicReader.DuplicateRegistration", - message = "duplicate registration found, did not register periodic reader."); - return; - } - ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})), - }; - - inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline); - worker(self); + self.inner.register_pipeline(pipeline); } fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { - let inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricError::Other("reader is shut down".into())); - } - - if let Some(producer) = match &inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(), - ProducerOrWorker::Worker(_) => None, - } { - producer.produce(rm)?; - } else { - return Err(MetricError::Other("reader is not registered".into())); - } - - Ok(()) + self.inner.collect(rm) } fn force_flush(&self) -> MetricResult<()> { - let mut inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricError::Other("reader is shut down".into())); - } - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Flush(sender)) - .map_err(|e| MetricError::Other(e.to_string()))?; - - drop(inner); // don't hold lock when blocking on future - - futures_executor::block_on(receiver) - .map_err(|err| MetricError::Other(err.to_string())) - .and_then(|res| res) + self.inner.force_flush() } + // TODO: Offer an async version of shutdown so users can await the shutdown + // completion, and avoid blocking the thread. The default shutdown on drop + // can still use blocking call. If user already explicitly called shutdown, + // drop won't call shutdown again. fn shutdown(&self) -> MetricResult<()> { - let mut inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricError::Other("reader is already shut down".into())); - } - - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Shutdown(sender)) - .map_err(|e| MetricError::Other(e.to_string()))?; - drop(inner); // don't hold lock when blocking on future - - let shutdown_result = futures_executor::block_on(receiver) - .map_err(|err| MetricError::Other(err.to_string()))?; - - // Acquire the lock again to set the shutdown flag - let mut inner = self.inner.lock()?; - inner.is_shutdown = true; - - shutdown_result + self.inner.shutdown() } /// To construct a [MetricReader][metric-reader] when setting up an SDK, @@ -405,104 +477,322 @@ impl MetricReader for PeriodicReader { /// If not configured, the Cumulative temporality SHOULD be used. /// /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader - fn temporality(&self, kind: InstrumentKind) -> super::Temporality { - kind.temporality_preference(self.exporter.temporality()) + fn temporality(&self, kind: InstrumentKind) -> Temporality { + kind.temporality_preference(self.inner.temporality(kind)) } } #[cfg(all(test, feature = "testing"))] mod tests { use super::PeriodicReader; - use crate::metrics::reader::MetricReader; - use crate::metrics::MetricError; use crate::{ - metrics::data::ResourceMetrics, metrics::SdkMeterProvider, runtime, - testing::metrics::InMemoryMetricExporter, Resource, + metrics::{ + data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError, + MetricResult, SdkMeterProvider, Temporality, + }, + testing::metrics::InMemoryMetricExporter, + Resource, }; + use async_trait::async_trait; use opentelemetry::metrics::MeterProvider; - use std::sync::mpsc; + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Arc, + }, + time::Duration, + }; - #[test] - fn collection_triggered_by_interval_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + // use below command to run all tests + // cargo test metrics::periodic_reader_with_own_thread::tests --features=testing,experimental_metrics_periodic_reader_no_runtime -- --nocapture + + #[derive(Debug, Clone)] + struct MetricExporterThatFailsOnlyOnFirst { + count: Arc, } - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + impl Default for MetricExporterThatFailsOnlyOnFirst { + fn default() -> Self { + MetricExporterThatFailsOnlyOnFirst { + count: Arc::new(AtomicUsize::new(0)), + } + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + impl MetricExporterThatFailsOnlyOnFirst { + fn get_count(&self) -> usize { + self.count.load(Ordering::Relaxed) + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + #[async_trait] + impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { + async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + if self.count.fetch_add(1, Ordering::Relaxed) == 0 { + Err(MetricError::Other("export failed".into())) + } else { + Ok(()) + } + } + + async fn force_flush(&self) -> MetricResult<()> { + Ok(()) + } + + fn shutdown(&self) -> MetricResult<()> { + Ok(()) + } + + fn temporality(&self) -> Temporality { + Temporality::Cumulative + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + #[test] + fn collection_triggered_by_interval_multiple() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + let i = Arc::new(AtomicUsize::new(0)); + let i_clone = i.clone(); + + // Act + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { + i_clone.fetch_add(1, Ordering::Relaxed); + }) + .build(); + + // Sleep for a duration 5X (plus liberal buffer to account for potential + // CI slowness) the interval to ensure multiple collection. + // Not a fan of such tests, but this seems to be the only way to test + // if periodic reader is doing its job. + // TODO: Decide if this should be ignored in CI + std::thread::sleep(interval * 5 * 20); + + // Assert + assert!(i.load(Ordering::Relaxed) >= 5); } - #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + #[test] + fn shutdown_repeat() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.shutdown(); + assert!(result.is_ok()); + + // calling shutdown again should return Err + let result = meter_provider.shutdown(); + assert!(result.is_err()); + + // calling shutdown again should return Err + let result = meter_provider.shutdown(); + assert!(result.is_err()); } - #[tokio::test(flavor = "current_thread")] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + #[test] + fn flush_after_shutdown() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + + let result = meter_provider.shutdown(); + assert!(result.is_ok()); + + // calling force_flush after shutdown should return Err + let result = meter_provider.force_flush(); + assert!(result.is_err()); } #[test] - fn unregistered_collect() { + fn flush_repeat() { // Arrange + let interval = std::time::Duration::from_millis(1); let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); - let mut rm = ResourceMetrics { + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + + // calling force_flush again should return Ok + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + } + + #[test] + fn periodic_reader_without_pipeline() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let rm = &mut ResourceMetrics { resource: Resource::empty(), scope_metrics: Vec::new(), }; + // Pipeline is not registered, so collect should return an error + let result = reader.collect(rm); + assert!(result.is_err()); + + // Pipeline is not registered, so flush should return an error + let result = reader.force_flush(); + assert!(result.is_err()); + + // Adding reader to meter provider should register the pipeline + // TODO: This part might benefit from a different design. + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader.clone()) + .build(); - // Act - let result = reader.collect(&mut rm); + // Now collect and flush should succeed + let result = reader.collect(rm); + assert!(result.is_ok()); - // Assert - assert!( - matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered") - ); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); } - fn collection_triggered_by_interval_helper(runtime: RT) - where - RT: crate::runtime::Runtime, - { - let interval = std::time::Duration::from_millis(1); + #[test] + fn exporter_failures_are_handled() { + // create a mock exporter that fails 1st time and succeeds 2nd time + // Validate using this exporter that periodic reader can handle exporter failure + // and continue to export metrics. + // Arrange + let interval = std::time::Duration::from_millis(10); + let exporter = MetricExporterThatFailsOnlyOnFirst::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let counter = meter.u64_counter("sync_counter").build(); + counter.add(1, &[]); + let _obs_counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |observer| { + observer.observe(1, &[]); + }) + .build(); + + // Sleep for a duration much longer than the interval to trigger + // multiple exports, including failures. + // Not a fan of such tests, but this seems to be the + // only way to test if periodic reader is doing its job. TODO: Decide if + // this should be ignored in CI + std::thread::sleep(Duration::from_millis(500)); + + // Assert that atleast 2 exports are attempted given the 1st one fails. + assert!(exporter.get_count() >= 2); + } + + #[test] + fn collection() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_from_tokio_multi_with_one_worker() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_from_tokio_with_two_worker() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn collection_from_tokio_current() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + fn collection_triggered_by_interval_helper() { + collection_helper(|_| { + // Sleep for a duration longer than the interval to ensure at least one collection + // Not a fan of such tests, but this seems to be the only way to test + // if periodic reader is doing its job. + // TODO: Decide if this should be ignored in CI + std::thread::sleep(Duration::from_millis(500)); + }); + } + + fn collection_triggered_by_flush_helper() { + collection_helper(|meter_provider| { + meter_provider.force_flush().expect("flush should succeed"); + }); + } + + fn collection_triggered_by_shutdown_helper() { + collection_helper(|meter_provider| { + meter_provider.shutdown().expect("shutdown should succeed"); + }); + } + + fn collection_helper(trigger: fn(&SdkMeterProvider)) { + // Arrange + let interval = std::time::Duration::from_millis(10); let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime) + let reader = PeriodicReader::builder(exporter.clone()) .with_interval(interval) .build(); let (sender, receiver) = mpsc::channel(); - // Act let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let meter = meter_provider.meter("test"); let _counter = meter .u64_observable_counter("testcounter") - .with_callback(move |_| { + .with_callback(move |observer| { + observer.observe(1, &[]); sender.send(()).expect("channel should still be open"); }) .build(); + // Act + trigger(&meter_provider); + // Assert receiver - .recv() - .expect("message should be available in channel, indicating a collection occurred"); + .recv_timeout(Duration::ZERO) + .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback"); + + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); } } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs new file mode 100644 index 0000000000..33558b579b --- /dev/null +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -0,0 +1,508 @@ +use std::{ + env, fmt, mem, + sync::{Arc, Mutex, Weak}, + time::Duration, +}; + +use futures_channel::{mpsc, oneshot}; +use futures_util::{ + future::{self, Either}, + pin_mut, + stream::{self, FusedStream}, + StreamExt, +}; +use opentelemetry::{otel_debug, otel_error}; + +use crate::runtime::Runtime; +use crate::{ + metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, + Resource, +}; + +use super::{data::ResourceMetrics, reader::MetricReader, InstrumentKind, Pipeline}; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); + +const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; +const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; + +/// Configuration options for [PeriodicReader]. +/// +/// A periodic reader is a [MetricReader] that collects and exports metric data +/// to the exporter at a defined interval. +/// +/// By default, the returned [MetricReader] will collect and export data every +/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The +/// export time is not counted towards the interval between attempts. +/// +/// The [collect] method of the returned [MetricReader] continues to gather and +/// return metric data to the user. It will not automatically send that data to +/// the exporter outside of the predefined interval. +/// +/// [collect]: MetricReader::collect +#[derive(Debug)] +pub struct PeriodicReaderBuilder { + interval: Duration, + timeout: Duration, + exporter: E, + runtime: RT, +} + +impl PeriodicReaderBuilder +where + E: PushMetricExporter, + RT: Runtime, +{ + fn new(exporter: E, runtime: RT) -> Self { + let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(DEFAULT_INTERVAL); + let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(DEFAULT_TIMEOUT); + + PeriodicReaderBuilder { + interval, + timeout, + exporter, + runtime, + } + } + + /// Configures the intervening time between exports for a [PeriodicReader]. + /// + /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL` + /// environment variable. + /// + /// If this option is not used or `interval` is equal to zero, 60 seconds is + /// used as the default. + pub fn with_interval(mut self, interval: Duration) -> Self { + if !interval.is_zero() { + self.interval = interval; + } + self + } + + /// Configures the time a [PeriodicReader] waits for an export to complete + /// before canceling it. + /// + /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT` + /// environment variable. + /// + /// If this option is not used or `timeout` is equal to zero, 30 seconds is used + /// as the default. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + if !timeout.is_zero() { + self.timeout = timeout; + } + self + } + + /// Create a [PeriodicReader] with the given config. + pub fn build(self) -> PeriodicReader { + let (message_sender, message_receiver) = mpsc::channel(256); + + let worker = move |reader: &PeriodicReader| { + let runtime = self.runtime.clone(); + let reader = reader.clone(); + self.runtime.spawn(Box::pin(async move { + let ticker = runtime + .interval(self.interval) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| Message::Export); + let messages = Box::pin(stream::select(message_receiver, ticker)); + PeriodicReaderWorker { + reader, + timeout: self.timeout, + runtime, + rm: ResourceMetrics { + resource: Resource::empty(), + scope_metrics: Vec::new(), + }, + } + .run(messages) + .await + })); + }; + + otel_debug!( + name: "PeriodicReader.BuildCompleted", + message = "Periodic reader built.", + interval_in_secs = self.interval.as_secs(), + temporality = format!("{:?}", self.exporter.temporality()), + ); + + PeriodicReader { + exporter: Arc::new(self.exporter), + inner: Arc::new(Mutex::new(PeriodicReaderInner { + message_sender, + is_shutdown: false, + sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)), + })), + } + } +} + +/// A [MetricReader] that continuously collects and exports metric data at a set +/// interval. +/// +/// By default it will collect and export data every 60 seconds, and will cancel +/// export attempts that exceed 30 seconds. The export time is not counted +/// towards the interval between attempts. +/// +/// The [collect] method of the returned continues to gather and +/// return metric data to the user. It will not automatically send that data to +/// the exporter outside of the predefined interval. +/// +/// The [runtime] can be selected based on feature flags set for this crate. +/// +/// The exporter can be any exporter that implements [PushMetricExporter] such +/// as [opentelemetry-otlp]. +/// +/// [collect]: MetricReader::collect +/// [runtime]: crate::runtime +/// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/ +/// +/// # Example +/// +/// ```no_run +/// use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader; +/// # fn example(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R) +/// # where +/// # E: opentelemetry_sdk::metrics::exporter::PushMetricExporter, +/// # R: opentelemetry_sdk::runtime::Runtime, +/// # { +/// +/// let exporter = get_exporter(); // set up a push exporter like OTLP +/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio +/// +/// let reader = PeriodicReader::builder(exporter, runtime).build(); +/// # drop(reader); +/// # } +/// ``` +#[derive(Clone)] +pub struct PeriodicReader { + exporter: Arc, + inner: Arc>, +} + +impl PeriodicReader { + /// Configuration options for a periodic reader + pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder + where + E: PushMetricExporter, + RT: Runtime, + { + PeriodicReaderBuilder::new(exporter, runtime) + } +} + +impl fmt::Debug for PeriodicReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeriodicReader").finish() + } +} + +struct PeriodicReaderInner { + message_sender: mpsc::Sender, + is_shutdown: bool, + sdk_producer_or_worker: ProducerOrWorker, +} + +#[derive(Debug)] +enum Message { + Export, + Flush(oneshot::Sender>), + Shutdown(oneshot::Sender>), +} + +enum ProducerOrWorker { + Producer(Weak), + Worker(Box), +} + +struct PeriodicReaderWorker { + reader: PeriodicReader, + timeout: Duration, + runtime: RT, + rm: ResourceMetrics, +} + +impl PeriodicReaderWorker { + async fn collect_and_export(&mut self) -> MetricResult<()> { + self.reader.collect(&mut self.rm)?; + if self.rm.scope_metrics.is_empty() { + otel_debug!( + name: "PeriodicReaderWorker.NoMetricsToExport", + ); + // No metrics to export. + return Ok(()); + } + + otel_debug!( + name: "PeriodicReaderWorker.InvokeExporter", + message = "Calling exporter's export method with collected metrics.", + count = self.rm.scope_metrics.len(), + ); + let export = self.reader.exporter.export(&mut self.rm); + let timeout = self.runtime.delay(self.timeout); + pin_mut!(export); + pin_mut!(timeout); + + match future::select(export, timeout).await { + Either::Left((res, _)) => { + res // return the status of export. + } + Either::Right(_) => Err(MetricError::Other("export timed out".into())), + } + } + + async fn process_message(&mut self, message: Message) -> bool { + match message { + Message::Export => { + otel_debug!( + name: "PeriodicReader.ExportTriggered", + message = "Export message received.", + ); + if let Err(err) = self.collect_and_export().await { + otel_error!( + name: "PeriodicReader.ExportFailed", + message = "Failed to export metrics", + reason = format!("{}", err)); + } + } + Message::Flush(ch) => { + otel_debug!( + name: "PeriodicReader.ForceFlushCalled", + message = "Flush message received.", + ); + let res = self.collect_and_export().await; + if let Err(send_error) = ch.send(res) { + otel_debug!( + name: "PeriodicReader.Flush.SendResultError", + message = "Failed to send flush result.", + reason = format!("{:?}", send_error), + ); + } + } + Message::Shutdown(ch) => { + otel_debug!( + name: "PeriodicReader.ShutdownCalled", + message = "Shutdown message received", + ); + let res = self.collect_and_export().await; + let _ = self.reader.exporter.shutdown(); + if let Err(send_error) = ch.send(res) { + otel_debug!( + name: "PeriodicReader.Shutdown.SendResultError", + message = "Failed to send shutdown result", + reason = format!("{:?}", send_error), + ); + } + return false; + } + } + + true + } + + async fn run(mut self, mut messages: impl FusedStream + Unpin) { + while let Some(message) = messages.next().await { + if !self.process_message(message).await { + break; + } + } + } +} + +impl MetricReader for PeriodicReader { + fn register_pipeline(&self, pipeline: Weak) { + let mut inner = match self.inner.lock() { + Ok(guard) => guard, + Err(_) => return, + }; + + let worker = match &mut inner.sdk_producer_or_worker { + ProducerOrWorker::Producer(_) => { + // Only register once. If producer is already set, do nothing. + otel_debug!(name: "PeriodicReader.DuplicateRegistration", + message = "duplicate registration found, did not register periodic reader."); + return; + } + ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})), + }; + + inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline); + worker(self); + } + + fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { + let inner = self.inner.lock()?; + if inner.is_shutdown { + return Err(MetricError::Other("reader is shut down".into())); + } + + if let Some(producer) = match &inner.sdk_producer_or_worker { + ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(), + ProducerOrWorker::Worker(_) => None, + } { + producer.produce(rm)?; + } else { + return Err(MetricError::Other("reader is not registered".into())); + } + + Ok(()) + } + + fn force_flush(&self) -> MetricResult<()> { + let mut inner = self.inner.lock()?; + if inner.is_shutdown { + return Err(MetricError::Other("reader is shut down".into())); + } + let (sender, receiver) = oneshot::channel(); + inner + .message_sender + .try_send(Message::Flush(sender)) + .map_err(|e| MetricError::Other(e.to_string()))?; + + drop(inner); // don't hold lock when blocking on future + + futures_executor::block_on(receiver) + .map_err(|err| MetricError::Other(err.to_string())) + .and_then(|res| res) + } + + fn shutdown(&self) -> MetricResult<()> { + let mut inner = self.inner.lock()?; + if inner.is_shutdown { + return Err(MetricError::Other("reader is already shut down".into())); + } + + let (sender, receiver) = oneshot::channel(); + inner + .message_sender + .try_send(Message::Shutdown(sender)) + .map_err(|e| MetricError::Other(e.to_string()))?; + drop(inner); // don't hold lock when blocking on future + + let shutdown_result = futures_executor::block_on(receiver) + .map_err(|err| MetricError::Other(err.to_string()))?; + + // Acquire the lock again to set the shutdown flag + let mut inner = self.inner.lock()?; + inner.is_shutdown = true; + + shutdown_result + } + + /// To construct a [MetricReader][metric-reader] when setting up an SDK, + /// The output temporality (optional), a function of instrument kind. + /// This function SHOULD be obtained from the exporter. + /// + /// If not configured, the Cumulative temporality SHOULD be used. + /// + /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader + fn temporality(&self, kind: InstrumentKind) -> super::Temporality { + kind.temporality_preference(self.exporter.temporality()) + } +} + +#[cfg(all(test, feature = "testing"))] +mod tests { + use super::PeriodicReader; + use crate::metrics::reader::MetricReader; + use crate::metrics::MetricError; + use crate::{ + metrics::data::ResourceMetrics, metrics::SdkMeterProvider, runtime, + testing::metrics::InMemoryMetricExporter, Resource, + }; + use opentelemetry::metrics::MeterProvider; + use std::sync::mpsc; + + #[test] + fn collection_triggered_by_interval_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "current_thread")] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[test] + fn unregistered_collect() { + // Arrange + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let mut rm = ResourceMetrics { + resource: Resource::empty(), + scope_metrics: Vec::new(), + }; + + // Act + let result = reader.collect(&mut rm); + + // Assert + assert!( + matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered") + ); + } + + fn collection_triggered_by_interval_helper(runtime: RT) + where + RT: crate::runtime::Runtime, + { + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime) + .with_interval(interval) + .build(); + let (sender, receiver) = mpsc::channel(); + + // Act + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { + sender.send(()).expect("channel should still be open"); + }) + .build(); + + // Assert + receiver + .recv() + .expect("message should be available in channel, indicating a collection occurred"); + } +} diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_own_thread.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_own_thread.rs deleted file mode 100644 index 28cf32f414..0000000000 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_own_thread.rs +++ /dev/null @@ -1,798 +0,0 @@ -use std::{ - env, fmt, - sync::{ - atomic::AtomicBool, - mpsc::{self, Receiver, Sender}, - Arc, Mutex, Weak, - }, - thread, - time::{Duration, Instant}, -}; - -use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn}; - -use crate::{ - metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, - Resource, -}; - -use super::{ - data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality, -}; - -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); -const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); - -const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; -const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; - -/// Configuration options for [PeriodicReaderWithOwnThread]. -/// -/// A periodic reader is a [MetricReader] that collects and exports metric data -/// to the exporter at a defined interval. -/// -/// By default, the returned [MetricReader] will collect and export data every -/// 60 seconds. The export time is not counted towards the interval between -/// attempts. PeriodicReaderWithOwnThread itself does not enforce timeout. Instead timeout -/// is passed on to the exporter for each export attempt. -/// -/// The [collect] method of the returned [MetricReader] continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. -/// -/// [collect]: MetricReader::collect -#[derive(Debug)] -pub struct PeriodicReaderWithOwnThreadBuilder { - interval: Duration, - timeout: Duration, - exporter: E, -} - -impl PeriodicReaderWithOwnThreadBuilder -where - E: PushMetricExporter, -{ - fn new(exporter: E) -> Self { - let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) - .ok() - .and_then(|v| v.parse().map(Duration::from_millis).ok()) - .unwrap_or(DEFAULT_INTERVAL); - let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME) - .ok() - .and_then(|v| v.parse().map(Duration::from_millis).ok()) - .unwrap_or(DEFAULT_TIMEOUT); - - PeriodicReaderWithOwnThreadBuilder { - interval, - timeout, - exporter, - } - } - - /// Configures the intervening time between exports for a [PeriodicReaderWithOwnThread]. - /// - /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL` - /// environment variable. - /// - /// If this option is not used or `interval` is equal to zero, 60 seconds is - /// used as the default. - pub fn with_interval(mut self, interval: Duration) -> Self { - if !interval.is_zero() { - self.interval = interval; - } - self - } - - /// Configures the timeout for an export to complete. PeriodicReaderWithOwnThread itself - /// does not enforce timeout. Instead timeout is passed on to the exporter - /// for each export attempt. - /// - /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT` - /// environment variable. - /// - /// If this option is not used or `timeout` is equal to zero, 30 seconds is used - /// as the default. - pub fn with_timeout(mut self, timeout: Duration) -> Self { - if !timeout.is_zero() { - self.timeout = timeout; - } - self - } - - /// Create a [PeriodicReaderWithOwnThread] with the given config. - pub fn build(self) -> PeriodicReaderWithOwnThread { - PeriodicReaderWithOwnThread::new(self.exporter, self.interval, self.timeout) - } -} - -/// A [MetricReader] that continuously collects and exports metric data at a set -/// interval. -/// -/// By default, PeriodicReaderWithOwnThread will collect and export data every -/// 60 seconds. The export time is not counted towards the interval between -/// attempts. PeriodicReaderWithOwnThread itself does not enforce timeout. -/// Instead timeout is passed on to the exporter for each export attempt. -/// -/// The [collect] method of the returned continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. -/// -/// As this spuns up own background thread, this is recommended to be used with push exporters -/// that do not require any particular async runtime. As of now, this cannot be used with -/// OTLP exporters as they requires async runtime -/// -/// [collect]: MetricReader::collect -/// -/// # Example -/// -/// ```no_run -/// use opentelemetry_sdk::metrics::PeriodicReaderWithOwnThread; -/// # fn example(get_exporter: impl Fn() -> E) -/// # where -/// # E: opentelemetry_sdk::metrics::exporter::PushMetricExporter, -/// # { -/// -/// let exporter = get_exporter(); // set up a push exporter -/// -/// let reader = PeriodicReaderWithOwnThread::builder(exporter).build(); -/// # drop(reader); -/// # } -/// ``` -#[derive(Clone)] -pub struct PeriodicReaderWithOwnThread { - inner: Arc, -} - -impl PeriodicReaderWithOwnThread { - /// Configuration options for a periodic reader with own thread - pub fn builder(exporter: E) -> PeriodicReaderWithOwnThreadBuilder - where - E: PushMetricExporter, - { - PeriodicReaderWithOwnThreadBuilder::new(exporter) - } - - fn new(exporter: E, interval: Duration, timeout: Duration) -> Self - where - E: PushMetricExporter, - { - let (message_sender, message_receiver): (Sender, Receiver) = - mpsc::channel(); - let reader = PeriodicReaderWithOwnThread { - inner: Arc::new(PeriodicReaderInner { - message_sender: Arc::new(Mutex::new(message_sender)), - is_shutdown: AtomicBool::new(false), - producer: Mutex::new(None), - exporter: Arc::new(exporter), - }), - }; - let cloned_reader = reader.clone(); - - let result_thread_creation = thread::Builder::new() - .name("OpenTelemetry.Metrics.PeriodicReaderWithOwnThread".to_string()) - .spawn(move || { - let mut interval_start = Instant::now(); - let mut remaining_interval = interval; - otel_info!( - name: "PeriodReaderThreadStarted", - interval = interval.as_secs(), - timeout = timeout.as_secs() - ); - loop { - otel_debug!( - name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval = remaining_interval.as_millis() - ); - match message_receiver.recv_timeout(remaining_interval) { - Ok(Message::Flush(response_sender)) => { - otel_debug!( - name: "PeriodReaderThreadExportingDueToFlush" - ); - if let Err(_e) = cloned_reader.collect_and_export(timeout) { - response_sender.send(false).unwrap(); - } else { - response_sender.send(true).unwrap(); - } - - // Adjust the remaining interval after the flush - let elapsed = interval_start.elapsed(); - if elapsed < interval { - remaining_interval = interval - elapsed; - otel_debug!( - name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush", - remaining_interval = remaining_interval.as_secs() - ); - } else { - otel_debug!( - name: "PeriodReaderThreadAdjustingExportAfterFlush", - ); - // Reset the interval if the flush finishes after the expected export time - // effectively missing the normal export. - // Should we attempt to do the missed export immediately? - // Or do the next export at the next interval? - // Currently this attempts the next export immediately. - // i.e calling Flush can affect the regularity. - interval_start = Instant::now(); - remaining_interval = Duration::ZERO; - } - } - Ok(Message::Shutdown(response_sender)) => { - // Perform final export and break out of loop and exit the thread - otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown"); - if let Err(_e) = cloned_reader.collect_and_export(timeout) { - response_sender.send(false).unwrap(); - } else { - response_sender.send(true).unwrap(); - } - break; - } - Err(mpsc::RecvTimeoutError::Timeout) => { - let export_start = Instant::now(); - otel_debug!( - name: "PeriodReaderThreadExportingDueToTimer", - event_name = "PeriodReaderThreadExportingDueToTimer" - ); - - if let Err(_e) = cloned_reader.collect_and_export(timeout) { - otel_debug!( - name: "PeriodReaderThreadExportingDueToTimerFailed", - event_name = "PeriodReaderThreadExportingDueToTimerFailed" - ); - } - - let time_taken_for_export = export_start.elapsed(); - if time_taken_for_export > interval { - otel_debug!( - name: "PeriodReaderThreadExportTookLongerThanInterval", - event_name = "PeriodReaderThreadExportTookLongerThanInterval" - ); - // if export took longer than interval, do the - // next export immediately. - // Alternatively, we could skip the next export - // and wait for the next interval. - // Or enforce that export timeout is less than interval. - // What is the desired behavior? - interval_start = Instant::now(); - remaining_interval = Duration::ZERO; - } else { - remaining_interval = interval - time_taken_for_export; - interval_start = Instant::now(); - } - } - Err(_) => { - // Some other error. Break out and exit the thread. - break; - } - } - } - otel_info!( - name: "PeriodReaderThreadStopped", - event_name = "PeriodReaderThreadStopped" - ); - }); - - // TODO: Should we fail-fast here and bubble up the error to user? - #[allow(unused_variables)] - if let Err(e) = result_thread_creation { - otel_error!( - name: "PeriodReaderThreadStartError", - event_name = "PeriodReaderThreadStartError", - error = format!("{:?}", e) - ); - } - reader - } - - fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { - self.inner.collect_and_export(timeout) - } -} - -impl fmt::Debug for PeriodicReaderWithOwnThread { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PeriodicReaderWithOwnThread").finish() - } -} - -struct PeriodicReaderInner { - exporter: Arc, - message_sender: Arc>>, - producer: Mutex>>, - is_shutdown: AtomicBool, -} - -impl PeriodicReaderInner { - fn register_pipeline(&self, producer: Weak) { - let mut inner = self.producer.lock().expect("lock poisoned"); - *inner = Some(producer); - } - - fn temporality(&self, _kind: InstrumentKind) -> Temporality { - self.exporter.temporality() - } - - fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("reader is shut down".into())); - } - - let producer = self.producer.lock().expect("lock poisoned"); - if let Some(p) = producer.as_ref() { - p.upgrade() - .ok_or_else(|| MetricError::Other("pipeline is dropped".into()))? - .produce(rm)?; - Ok(()) - } else { - Err(MetricError::Other("pipeline is not registered".into())) - } - } - - fn collect_and_export(&self, _timeout: Duration) -> MetricResult<()> { - // TODO: Reuse the internal vectors. Or refactor to avoid needing any - // owned data structures to be passed to exporters. - let mut rm = ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }; - - let collect_result = self.collect(&mut rm); - #[allow(clippy::question_mark)] - if let Err(e) = collect_result { - otel_warn!( - name: "PeriodReaderCollectError", - event_name = "PeriodReaderCollectError", - error = format!("{:?}", e) - ); - return Err(e); - } - - if rm.scope_metrics.is_empty() { - otel_debug!(name: "NoMetricsCollected"); - return Ok(()); - } - - // TODO: substract the time taken for collect from the timeout. collect - // involves observable callbacks too, which are user defined and can - // take arbitrary time. - // - // Relying on futures executor to execute async call. - // TODO: Add timeout and pass it to exporter or consider alternative - // design to enforce timeout here. - let exporter_result = futures_executor::block_on(self.exporter.export(&mut rm)); - #[allow(clippy::question_mark)] - if let Err(e) = exporter_result { - otel_warn!( - name: "PeriodReaderExportError", - event_name = "PeriodReaderExportError", - error = format!("{:?}", e) - ); - return Err(e); - } - - Ok(()) - } - - fn force_flush(&self) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("reader is shut down".into())); - } - let (response_tx, response_rx) = mpsc::channel(); - match self.message_sender.lock() { - Ok(sender) => { - sender - .send(Message::Flush(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; - } - Err(e) => { - otel_debug!( - name: "PeriodReaderForceFlushError", - event_name = "PeriodReaderForceFlushError", - error = format!("{:?}", e) - ); - return Err(MetricError::Other(e.to_string())); - } - } - - if let Ok(response) = response_rx.recv() { - // TODO: call exporter's force_flush method. - if response { - Ok(()) - } else { - Err(MetricError::Other("Failed to flush".into())) - } - } else { - Err(MetricError::Other("Failed to flush".into())) - } - } - - fn shutdown(&self) -> MetricResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return Err(MetricError::Other("Reader is already shut down".into())); - } - - // TODO: See if this is better to be created upfront. - let (response_tx, response_rx) = mpsc::channel(); - match self.message_sender.lock() { - Ok(sender) => { - sender - .send(Message::Shutdown(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; - } - Err(e) => { - otel_debug!( - name: "PeriodReaderShutdownError", - event_name = "PeriodReaderShutdownError", - error = format!("{:?}", e) - ); - return Err(MetricError::Other(e.to_string())); - } - } - - if let Ok(response) = response_rx.recv() { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - if response { - Ok(()) - } else { - Err(MetricError::Other("Failed to shutdown".into())) - } - } else { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - Err(MetricError::Other("Failed to shutdown".into())) - } - } -} - -#[derive(Debug)] -enum Message { - Flush(Sender), - Shutdown(Sender), -} - -impl MetricReader for PeriodicReaderWithOwnThread { - fn register_pipeline(&self, pipeline: Weak) { - self.inner.register_pipeline(pipeline); - } - - fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { - self.inner.collect(rm) - } - - fn force_flush(&self) -> MetricResult<()> { - self.inner.force_flush() - } - - // TODO: Offer an async version of shutdown so users can await the shutdown - // completion, and avoid blocking the thread. The default shutdown on drop - // can still use blocking call. If user already explicitly called shutdown, - // drop won't call shutdown again. - fn shutdown(&self) -> MetricResult<()> { - self.inner.shutdown() - } - - /// To construct a [MetricReader][metric-reader] when setting up an SDK, - /// The output temporality (optional), a function of instrument kind. - /// This function SHOULD be obtained from the exporter. - /// - /// If not configured, the Cumulative temporality SHOULD be used. - /// - /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader - fn temporality(&self, kind: InstrumentKind) -> Temporality { - kind.temporality_preference(self.inner.temporality(kind)) - } -} - -#[cfg(all(test, feature = "testing"))] -mod tests { - use super::PeriodicReaderWithOwnThread; - use crate::{ - metrics::{ - data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError, - MetricResult, SdkMeterProvider, Temporality, - }, - testing::metrics::InMemoryMetricExporter, - Resource, - }; - use async_trait::async_trait; - use opentelemetry::metrics::MeterProvider; - use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, - }, - time::Duration, - }; - - // use below command to run all tests - // cargo test metrics::periodic_reader_with_own_thread::tests --features=testing,experimental_metrics_periodic_reader_no_runtime -- --nocapture - - #[derive(Debug, Clone)] - struct MetricExporterThatFailsOnlyOnFirst { - count: Arc, - } - - impl Default for MetricExporterThatFailsOnlyOnFirst { - fn default() -> Self { - MetricExporterThatFailsOnlyOnFirst { - count: Arc::new(AtomicUsize::new(0)), - } - } - } - - impl MetricExporterThatFailsOnlyOnFirst { - fn get_count(&self) -> usize { - self.count.load(Ordering::Relaxed) - } - } - - #[async_trait] - impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { - async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { - if self.count.fetch_add(1, Ordering::Relaxed) == 0 { - Err(MetricError::Other("export failed".into())) - } else { - Ok(()) - } - } - - async fn force_flush(&self) -> MetricResult<()> { - Ok(()) - } - - fn shutdown(&self) -> MetricResult<()> { - Ok(()) - } - - fn temporality(&self) -> Temporality { - Temporality::Cumulative - } - } - - #[test] - fn collection_triggered_by_interval_multiple() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - let i = Arc::new(AtomicUsize::new(0)); - let i_clone = i.clone(); - - // Act - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let _counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |_| { - i_clone.fetch_add(1, Ordering::Relaxed); - }) - .build(); - - // Sleep for a duration 5X (plus liberal buffer to account for potential - // CI slowness) the interval to ensure multiple collection. - // Not a fan of such tests, but this seems to be the only way to test - // if periodic reader is doing its job. - // TODO: Decide if this should be ignored in CI - std::thread::sleep(interval * 5 * 20); - - // Assert - assert!(i.load(Ordering::Relaxed) >= 5); - } - - #[test] - fn shutdown_repeat() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let result = meter_provider.shutdown(); - assert!(result.is_ok()); - - // calling shutdown again should return Err - let result = meter_provider.shutdown(); - assert!(result.is_err()); - - // calling shutdown again should return Err - let result = meter_provider.shutdown(); - assert!(result.is_err()); - } - - #[test] - fn flush_after_shutdown() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let result = meter_provider.force_flush(); - assert!(result.is_ok()); - - let result = meter_provider.shutdown(); - assert!(result.is_ok()); - - // calling force_flush after shutdown should return Err - let result = meter_provider.force_flush(); - assert!(result.is_err()); - } - - #[test] - fn flush_repeat() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let result = meter_provider.force_flush(); - assert!(result.is_ok()); - - // calling force_flush again should return Ok - let result = meter_provider.force_flush(); - assert!(result.is_ok()); - } - - #[test] - fn periodic_reader_without_pipeline() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - - let rm = &mut ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }; - // Pipeline is not registered, so collect should return an error - let result = reader.collect(rm); - assert!(result.is_err()); - - // Pipeline is not registered, so flush should return an error - let result = reader.force_flush(); - assert!(result.is_err()); - - // Adding reader to meter provider should register the pipeline - // TODO: This part might benefit from a different design. - let meter_provider = SdkMeterProvider::builder() - .with_reader(reader.clone()) - .build(); - - // Now collect and flush should succeed - let result = reader.collect(rm); - assert!(result.is_ok()); - - let result = meter_provider.force_flush(); - assert!(result.is_ok()); - } - - #[test] - fn exporter_failures_are_handled() { - // create a mock exporter that fails 1st time and succeeds 2nd time - // Validate using this exporter that periodic reader can handle exporter failure - // and continue to export metrics. - // Arrange - let interval = std::time::Duration::from_millis(10); - let exporter = MetricExporterThatFailsOnlyOnFirst::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let counter = meter.u64_counter("sync_counter").build(); - counter.add(1, &[]); - let _obs_counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |observer| { - observer.observe(1, &[]); - }) - .build(); - - // Sleep for a duration much longer than the interval to trigger - // multiple exports, including failures. - // Not a fan of such tests, but this seems to be the - // only way to test if periodic reader is doing its job. TODO: Decide if - // this should be ignored in CI - std::thread::sleep(Duration::from_millis(500)); - - // Assert that atleast 2 exports are attempted given the 1st one fails. - assert!(exporter.get_count() >= 2); - } - - #[test] - fn collection() { - collection_triggered_by_interval_helper(); - collection_triggered_by_flush_helper(); - collection_triggered_by_shutdown_helper(); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_from_tokio_multi_with_one_worker() { - collection_triggered_by_interval_helper(); - collection_triggered_by_flush_helper(); - collection_triggered_by_shutdown_helper(); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_from_tokio_with_two_worker() { - collection_triggered_by_interval_helper(); - collection_triggered_by_flush_helper(); - collection_triggered_by_shutdown_helper(); - } - - #[tokio::test(flavor = "current_thread")] - async fn collection_from_tokio_current() { - collection_triggered_by_interval_helper(); - collection_triggered_by_flush_helper(); - collection_triggered_by_shutdown_helper(); - } - - fn collection_triggered_by_interval_helper() { - collection_helper(|_| { - // Sleep for a duration longer than the interval to ensure at least one collection - // Not a fan of such tests, but this seems to be the only way to test - // if periodic reader is doing its job. - // TODO: Decide if this should be ignored in CI - std::thread::sleep(Duration::from_millis(500)); - }); - } - - fn collection_triggered_by_flush_helper() { - collection_helper(|meter_provider| { - meter_provider.force_flush().expect("flush should succeed"); - }); - } - - fn collection_triggered_by_shutdown_helper() { - collection_helper(|meter_provider| { - meter_provider.shutdown().expect("shutdown should succeed"); - }); - } - - fn collection_helper(trigger: fn(&SdkMeterProvider)) { - // Arrange - let interval = std::time::Duration::from_millis(10); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReaderWithOwnThread::builder(exporter.clone()) - .with_interval(interval) - .build(); - let (sender, receiver) = mpsc::channel(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let _counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |observer| { - observer.observe(1, &[]); - sender.send(()).expect("channel should still be open"); - }) - .build(); - - // Act - trigger(&meter_provider); - - // Assert - receiver - .recv_timeout(Duration::ZERO) - .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback"); - - let exported_metrics = exporter - .get_finished_metrics() - .expect("this should not fail"); - assert!( - !exported_metrics.is_empty(), - "Metrics should be available in exporter." - ); - } -} diff --git a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs index 631e444181..c8972ef9be 100644 --- a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs @@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex}; /// # Example /// /// ``` -///# use opentelemetry_sdk::{metrics, runtime}; +///# use opentelemetry_sdk::metrics; ///# use opentelemetry::{KeyValue}; ///# use opentelemetry::metrics::MeterProvider; ///# use opentelemetry_sdk::testing::metrics::InMemoryMetricExporter; @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex}; /// /// // Create a MeterProvider and register the exporter /// let meter_provider = metrics::SdkMeterProvider::builder() -/// .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_reader(PeriodicReader::builder(exporter.clone()).build()) /// .build(); /// /// // Create and record metrics using the MeterProvider diff --git a/opentelemetry-stdout/examples/basic.rs b/opentelemetry-stdout/examples/basic.rs index fd4e78e0d5..8fa2ae8132 100644 --- a/opentelemetry-stdout/examples/basic.rs +++ b/opentelemetry-stdout/examples/basic.rs @@ -6,9 +6,6 @@ use opentelemetry::{global, KeyValue}; #[cfg(feature = "trace")] use opentelemetry::trace::{Span, Tracer}; -#[cfg(feature = "metrics")] -use opentelemetry_sdk::runtime; - #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; @@ -37,7 +34,7 @@ fn init_trace() -> TracerProvider { #[cfg(feature = "metrics")] fn init_metrics() -> opentelemetry_sdk::metrics::SdkMeterProvider { let exporter = opentelemetry_stdout::MetricExporter::default(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(RESOURCE.clone()) diff --git a/opentelemetry-stdout/src/lib.rs b/opentelemetry-stdout/src/lib.rs index 546dfb08bc..207eb1460c 100644 --- a/opentelemetry-stdout/src/lib.rs +++ b/opentelemetry-stdout/src/lib.rs @@ -25,7 +25,6 @@ //! use opentelemetry::{Context, KeyValue}; //! //! use opentelemetry_sdk::metrics::{SdkMeterProvider, PeriodicReader}; -//! use opentelemetry_sdk::runtime; //! use opentelemetry_sdk::trace::TracerProvider; //! //! use opentelemetry_sdk::logs::LoggerProvider; @@ -39,7 +38,7 @@ //! //! fn init_metrics() -> SdkMeterProvider { //! let exporter = opentelemetry_stdout::MetricExporter::default(); -//! let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); +//! let reader = PeriodicReader::builder(exporter).build(); //! SdkMeterProvider::builder().with_reader(reader).build() //! } //!