diff --git a/examples/metrics-advanced/src/main.rs b/examples/metrics-advanced/src/main.rs index 268bd8f747..11467f6236 100644 --- a/examples/metrics-advanced/src/main.rs +++ b/examples/metrics-advanced/src/main.rs @@ -4,7 +4,7 @@ use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::{ Aggregation, Instrument, PeriodicReader, SdkMeterProvider, Stream, }; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::Resource; use std::error::Error; fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { @@ -45,7 +45,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { }; let exporter = opentelemetry_stdout::MetricsExporterBuilder::default().build(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new([KeyValue::new( diff --git a/examples/metrics-basic/Cargo.toml b/examples/metrics-basic/Cargo.toml index 37b79da140..f75d51818d 100644 --- a/examples/metrics-basic/Cargo.toml +++ b/examples/metrics-basic/Cargo.toml @@ -11,6 +11,7 @@ opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", " opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } serde_json = { workspace = true } +tracing-subscriber = { version = "0.3.18", features = ["env-filter","registry", "std"]} [features] default = ["otel_unstable"] diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 0bd5d4191f..c8d9f82bf7 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -1,13 +1,18 @@ use opentelemetry::global; use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::Resource; use std::error::Error; use std::vec; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Layer; fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { let exporter = opentelemetry_stdout::MetricsExporterBuilder::default().build(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new([KeyValue::new( @@ -21,6 +26,14 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::registry() + .with( + fmt::layer() + .with_thread_names(true) + .with_filter(LevelFilter::DEBUG), + ) + .init(); + // Initialize the MeterProvider with the stdout Exporter. let meter_provider = init_meter_provider(); diff --git a/examples/self-diagnostics/src/main.rs b/examples/self-diagnostics/src/main.rs index 68716794aa..2393362def 100644 --- a/examples/self-diagnostics/src/main.rs +++ b/examples/self-diagnostics/src/main.rs @@ -108,7 +108,7 @@ fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider { fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { let provider = opentelemetry_otlp::new_pipeline() - .metrics(opentelemetry_sdk::runtime::Tokio) + .metrics() .with_period(std::time::Duration::from_secs(1)) .with_exporter( opentelemetry_otlp::new_exporter() diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 006d8e4e2e..678129935e 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -61,7 +61,7 @@ fn init_tracer_provider() -> Result { fn init_metrics() -> Result { opentelemetry_otlp::new_pipeline() - .metrics(opentelemetry_sdk::runtime::Tokio) + .metrics() .with_exporter( http_exporter() .with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index f931e592e2..15d5eea767 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -41,7 +41,7 @@ fn init_metrics() -> Result(self, rt: RT) -> OtlpMetricPipeline - where - RT: Runtime, - { + pub fn metrics(self) -> OtlpMetricPipeline { OtlpMetricPipeline { - rt, temporality_selector: None, exporter_pipeline: NoExporterConfig(()), resource: None, @@ -117,8 +115,7 @@ impl From for MetricsExporterBuilder { /// /// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as /// runtime. -pub struct OtlpMetricPipeline { - rt: RT, +pub struct OtlpMetricPipeline { temporality_selector: Option>, exporter_pipeline: EB, resource: Option, @@ -126,10 +123,7 @@ pub struct OtlpMetricPipeline { timeout: Option, } -impl OtlpMetricPipeline -where - RT: Runtime, -{ +impl OtlpMetricPipeline { /// Build with resource key value pairs. pub fn with_resource(self, resource: Resource) -> Self { OtlpMetricPipeline { @@ -173,18 +167,14 @@ where } } -impl OtlpMetricPipeline -where - RT: Runtime, -{ +impl OtlpMetricPipeline { /// Build with the exporter pub fn with_exporter>( self, pipeline: B, - ) -> OtlpMetricPipeline { + ) -> OtlpMetricPipeline { OtlpMetricPipeline { exporter_pipeline: pipeline.into(), - rt: self.rt, temporality_selector: self.temporality_selector, resource: self.resource, period: self.period, @@ -193,10 +183,7 @@ where } } -impl OtlpMetricPipeline -where - RT: Runtime, -{ +impl OtlpMetricPipeline { /// Build MeterProvider pub fn build(self) -> Result { let exporter = self.exporter_pipeline.build_metrics_exporter( @@ -204,7 +191,7 @@ where .unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())), )?; - let mut builder = PeriodicReader::builder(exporter, self.rt); + let mut builder = PeriodicReader::builder(exporter); if let Some(period) = self.period { builder = builder.with_interval(period); @@ -226,7 +213,7 @@ where } } -impl Debug for OtlpMetricPipeline { +impl Debug for OtlpMetricPipeline { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("OtlpMetricPipeline") .field("exporter_pipeline", &self.exporter_pipeline) @@ -293,7 +280,8 @@ impl TemporalitySelector for MetricsExporter { #[async_trait] impl PushMetricsExporter for MetricsExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { + async fn export(&self, metrics: &mut ResourceMetrics, _timeout: Duration) -> Result<()> { + //TODO: Pass timeout to client self.client.export(metrics).await } diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index c49aaa75dd..3b55a3ce73 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,4 +1,6 @@ //! Interfaces for exporting metrics +use std::time::Duration; + use async_trait::async_trait; use opentelemetry::metrics::Result; @@ -16,7 +18,7 @@ pub trait PushMetricsExporter: TemporalitySelector + Send + Sync + 'static { /// implement any retry logic. All errors returned by this function are /// considered unrecoverable and will be reported to a configured error /// Handler. - async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>; + async fn export(&self, metrics: &mut ResourceMetrics, timeout: Duration) -> Result<()>; /// Flushes any metric data held by an exporter. async fn force_flush(&self) -> Result<()>; diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 8b1f33370a..4828b2d940 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -131,8 +131,8 @@ mod tests { use super::*; use crate::metrics::data::{ResourceMetrics, Temporality}; use crate::metrics::reader::TemporalitySelector; + use crate::testing::metrics::InMemoryMetricsExporter; use crate::testing::metrics::InMemoryMetricsExporterBuilder; - use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; use opentelemetry::metrics::{Counter, Meter, UpDownCounter}; use opentelemetry::{metrics::MeterProvider as _, KeyValue}; use rand::{rngs, Rng, SeedableRng}; @@ -491,7 +491,7 @@ mod tests { async fn counter_duplicate_instrument_merge() { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -542,7 +542,7 @@ mod tests { async fn counter_duplicate_instrument_different_meter_no_merge() { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -631,7 +631,7 @@ mod tests { async fn instrumentation_scope_identity_test() { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); // Act @@ -714,7 +714,7 @@ mod tests { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("test_histogram"); let stream_invalid_aggregation = Stream::new() .aggregation(Aggregation::ExplicitBucketHistogram { @@ -764,7 +764,7 @@ mod tests { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("my_observable_counter"); // View drops all attributes. let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]); @@ -839,7 +839,7 @@ mod tests { // Arrange let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let criteria = Instrument::new().name("my_counter"); // View drops all attributes. let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]); @@ -1250,6 +1250,58 @@ mod tests { asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge"); } + async fn some_async_function() -> u64 { + std::thread::sleep(std::time::Duration::from_millis(10)); + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn async_inside_observable_callback_from_tokio_current_thread() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + async_inside_observable_callback_helper(); + } + + fn async_inside_observable_callback_helper() { + // Run this test with stdout enabled to see output. + // cargo test async_inside_observable_callbacks --features=testing -- --nocapture + // Arrange + let mut test_context = TestContext::new(Temporality::Delta); + let _obs_gauge = test_context + .meter() + .u64_observable_gauge("my_observable_gauge") + .with_callback(|observer| { + // call async function from here + let value = futures_executor::block_on(some_async_function()); + observer.observe(value, &[]); + }) + .init(); + + test_context.flush_metrics(); + let last_value = + test_context.get_aggregation::>("my_observable_gauge", None); + assert_eq!(last_value.data_points.len(), 1); + let data_point = &last_value.data_points[0]; + assert_eq!(data_point.value, 1); + } + fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper( instrument_name: &'static str, ) { @@ -2373,7 +2425,7 @@ mod tests { exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality)); let exporter = exporter.build(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter.clone()).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); TestContext { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 30604acc48..033433d7fb 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -1,23 +1,17 @@ use std::{ - env, fmt, mem, - sync::{Arc, Mutex, Weak}, - time::Duration, + env, fmt, + sync::{ + atomic::AtomicBool, + mpsc::{self, Receiver, Sender}, + Arc, Mutex, Weak, + }, + thread, + time::{Duration, Instant}, }; -use futures_channel::{mpsc, oneshot}; -use futures_util::{ - future::{self, Either}, - pin_mut, - stream::{self, FusedStream}, - StreamExt, -}; -use opentelemetry::{ - global, - metrics::{MetricsError, Result}, - otel_error, -}; +use opentelemetry::metrics::{MetricsError, Result}; +use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn}; -use crate::runtime::Runtime; use crate::{ metrics::{exporter::PushMetricsExporter, reader::SdkProducer}, Resource, @@ -42,8 +36,9 @@ const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; /// to the exporter at a defined interval. /// /// By default, the returned [MetricReader] will collect and export data every -/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The -/// export time is not counted towards the interval between attempts. +/// 60 seconds. The export time is not counted towards the interval between +/// attempts. PeriodicReader itself does not enforce timeout. Instead timeout +/// is passed on to the exporter for each export attempt. /// /// The [collect] method of the returned [MetricReader] continues to gather and /// return metric data to the user. It will not automatically send that data to @@ -51,19 +46,17 @@ const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; /// /// [collect]: MetricReader::collect #[derive(Debug)] -pub struct PeriodicReaderBuilder { +pub struct PeriodicReaderBuilder { interval: Duration, timeout: Duration, exporter: E, - runtime: RT, } -impl PeriodicReaderBuilder +impl PeriodicReaderBuilder where E: PushMetricsExporter, - RT: Runtime, { - fn new(exporter: E, runtime: RT) -> Self { + fn new(exporter: E) -> Self { let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) .ok() .and_then(|v| v.parse().map(Duration::from_millis).ok()) @@ -77,7 +70,6 @@ where interval, timeout, exporter, - runtime, } } @@ -95,8 +87,9 @@ where self } - /// Configures the time a [PeriodicReader] waits for an export to complete - /// before canceling it. + /// Configures the timeout for an export to complete. PeriodicReader itself + /// does not enforce timeout. Instead timeout is passed on to the exporter + /// for each export attempt. /// /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT` /// environment variable. @@ -112,93 +105,194 @@ where /// Create a [PeriodicReader] with the given config. pub fn build(self) -> PeriodicReader { - let (message_sender, message_receiver) = mpsc::channel(256); - - let worker = move |reader: &PeriodicReader| { - let runtime = self.runtime.clone(); - let reader = reader.clone(); - self.runtime.spawn(Box::pin(async move { - let ticker = runtime - .interval(self.interval) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| Message::Export); - let messages = Box::pin(stream::select(message_receiver, ticker)); - PeriodicReaderWorker { - reader, - timeout: self.timeout, - runtime, - rm: ResourceMetrics { - resource: Resource::empty(), - scope_metrics: Vec::new(), - }, - } - .run(messages) - .await - })); - }; - - PeriodicReader { - exporter: Arc::new(self.exporter), - inner: Arc::new(Mutex::new(PeriodicReaderInner { - message_sender, - is_shutdown: false, - sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)), - })), - } + PeriodicReader::new(self.exporter, self.interval, self.timeout) } } /// A [MetricReader] that continuously collects and exports metric data at a set /// interval. /// -/// By default it will collect and export data every 60 seconds, and will cancel -/// export attempts that exceed 30 seconds. The export time is not counted -/// towards the interval between attempts. +/// By default, PeriodicReader will collect and export data every +/// 60 seconds. The export time is not counted towards the interval between +/// attempts. PeriodicReader itself does not enforce timeout. Instead timeout +/// is passed on to the exporter for each export attempt. /// /// The [collect] method of the returned continues to gather and /// return metric data to the user. It will not automatically send that data to /// the exporter outside of the predefined interval. /// -/// The [runtime] can be selected based on feature flags set for this crate. -/// /// The exporter can be any exporter that implements [PushMetricsExporter] such /// as [opentelemetry-otlp]. /// /// [collect]: MetricReader::collect -/// [runtime]: crate::runtime /// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/ /// /// # Example /// /// ```no_run /// use opentelemetry_sdk::metrics::PeriodicReader; -/// # fn example(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R) +/// # fn example(get_exporter: impl Fn() -> E) /// # where /// # E: opentelemetry_sdk::metrics::exporter::PushMetricsExporter, -/// # R: opentelemetry_sdk::runtime::Runtime, /// # { /// /// let exporter = get_exporter(); // set up a push exporter like OTLP -/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio /// -/// let reader = PeriodicReader::builder(exporter, runtime).build(); +/// let reader = PeriodicReader::builder(exporter).build(); /// # drop(reader); /// # } /// ``` #[derive(Clone)] pub struct PeriodicReader { - exporter: Arc, - inner: Arc>, + inner: Arc, } impl PeriodicReader { /// Configuration options for a periodic reader - pub fn builder(exporter: E, runtime: RT) -> PeriodicReaderBuilder + pub fn builder(exporter: E) -> PeriodicReaderBuilder where E: PushMetricsExporter, - RT: Runtime, { - PeriodicReaderBuilder::new(exporter, runtime) + PeriodicReaderBuilder::new(exporter) + } + + fn new(exporter: E, interval: Duration, timeout: Duration) -> Self + where + E: PushMetricsExporter, + { + let (message_sender, message_receiver): (Sender, Receiver) = + mpsc::channel(); + let reader = PeriodicReader { + inner: Arc::new(PeriodicReaderInner { + message_sender: Arc::new(Mutex::new(message_sender)), + is_shutdown: AtomicBool::new(false), + producer: Mutex::new(None), + exporter: Arc::new(exporter), + }), + }; + let cloned_reader = reader.clone(); + + let result_thread_creation = thread::Builder::new() + .name("OpenTelemetry.Metrics.PeriodicReader".to_string()) + .spawn(move || { + let mut interval_start = Instant::now(); + let mut remaining_interval = interval; + otel_info!( + name: "PeriodReaderThreadStarted", + event_name = "PeriodReaderThreadStarted", + interval = interval.as_secs(), + timeout = timeout.as_secs() + ); + loop { + otel_debug!( + name: "PeriodReaderThreadLoopAlive", event_name = "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval = remaining_interval.as_millis() + ); + match message_receiver.recv_timeout(remaining_interval) { + Ok(Message::Flush(response_sender)) => { + otel_debug!( + name: "PeriodReaderThreadExportingDueToFlush" + ); + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + response_sender.send(false).unwrap(); + } else { + response_sender.send(true).unwrap(); + } + + // Adjust the remaining interval after the flush + let elapsed = interval_start.elapsed(); + if elapsed < interval { + remaining_interval = interval - elapsed; + otel_debug!( + name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush", + event_name = "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush", + remaining_interval = remaining_interval.as_secs() + ); + } else { + otel_debug!( + name: "PeriodReaderThreadAdjustingExportAfterFlush", + event_name = "PeriodReaderThreadAdjustingExportAfterFlush", + ); + // Reset the interval if the flush finishes after the expected export time + // effectively missing the normal export. + // Should we attempt to do the missed export immediately? + // Or do the next export at the next interval? + // Currently this attempts the next export immediately. + // i.e calling Flush can affect the regularity. + interval_start = Instant::now(); + remaining_interval = Duration::ZERO; + } + } + Ok(Message::Shutdown(response_sender)) => { + // Perform final export and break out of loop and exit the thread + otel_debug!( + name: "PeriodReaderThreadExportingDueToShutdown", event_name = "PeriodReaderThreadExportingDueToShutdown" + ); + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + response_sender.send(false).unwrap(); + } else { + response_sender.send(true).unwrap(); + } + break; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + let export_start = Instant::now(); + otel_debug!( + name: "PeriodReaderThreadExportingDueToTimer", + event_name = "PeriodReaderThreadExportingDueToTimer" + ); + + if let Err(_e) = cloned_reader.collect_and_export(timeout) { + otel_debug!( + name: "PeriodReaderThreadExportingDueToTimerFailed", + event_name = "PeriodReaderThreadExportingDueToTimerFailed" + ); + } + + let time_taken_for_export = export_start.elapsed(); + if time_taken_for_export > interval { + otel_debug!( + name: "PeriodReaderThreadExportTookLongerThanInterval", + event_name = "PeriodReaderThreadExportTookLongerThanInterval" + ); + // if export took longer than interval, do the + // next export immediately. + // Alternatively, we could skip the next export + // and wait for the next interval. + // Or enforce that export timeout is less than interval. + // What is the desired behavior? + interval_start = Instant::now(); + remaining_interval = Duration::ZERO; + } else { + remaining_interval = interval - time_taken_for_export; + interval_start = Instant::now(); + } + } + Err(_) => { + // Some other error. Break out and exit the thread. + break; + } + } + } + otel_info!( + name: "PeriodReaderThreadStopped", + event_name = "PeriodReaderThreadStopped" + ); + }); + + // TODO: Should we fail-fast here and bubble up the error to user? + #[allow(unused_variables)] + if let Err(e) = result_thread_creation { + otel_error!( + name: "PeriodReaderThreadStartError", + event_name = "PeriodReaderThreadStartError", + error = format!("{:?}", e) + ); + } + reader + } + + fn collect_and_export(&self, timeout: Duration) -> Result<()> { + self.inner.collect_and_export(timeout) } } @@ -209,177 +303,183 @@ impl fmt::Debug for PeriodicReader { } struct PeriodicReaderInner { - message_sender: mpsc::Sender, - is_shutdown: bool, - sdk_producer_or_worker: ProducerOrWorker, + exporter: Arc, + message_sender: Arc>>, + producer: Mutex>>, + is_shutdown: AtomicBool, } -#[derive(Debug)] -enum Message { - Export, - Flush(oneshot::Sender>), - Shutdown(oneshot::Sender>), -} +impl PeriodicReaderInner { + fn register_pipeline(&self, producer: Weak) { + let mut inner = self.producer.lock().expect("lock poisoned"); + *inner = Some(producer); + } -enum ProducerOrWorker { - Producer(Weak), - Worker(Box), -} + fn temporality(&self, kind: InstrumentKind) -> Temporality { + self.exporter.temporality(kind) + } -struct PeriodicReaderWorker { - reader: PeriodicReader, - timeout: Duration, - runtime: RT, - rm: ResourceMetrics, -} + fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricsError::Other("reader is shut down".into())); + } + + let producer = self.producer.lock().expect("lock poisoned"); + if let Some(p) = producer.as_ref() { + p.upgrade() + .ok_or_else(|| MetricsError::Other("pipeline is dropped".into()))? + .produce(rm)?; + Ok(()) + } else { + Err(MetricsError::Other("pipeline is not registered".into())) + } + } + + fn collect_and_export(&self, timeout: Duration) -> Result<()> { + // TODO: Reuse the internal vectors. Or refactor to avoid needing any + // owned data structures to be passed to exporters. + let mut rm = ResourceMetrics { + resource: Resource::empty(), + scope_metrics: Vec::new(), + }; + + let collect_result = self.collect(&mut rm); + #[allow(clippy::question_mark)] + if let Err(e) = collect_result { + otel_warn!( + name: "PeriodReaderCollectError", + event_name = "PeriodReaderCollectError", + error = format!("{:?}", e) + ); + return Err(e); + } -impl PeriodicReaderWorker { - async fn collect_and_export(&mut self) -> Result<()> { - self.reader.collect(&mut self.rm)?; - if self.rm.scope_metrics.is_empty() { - // No metrics to export. + if rm.scope_metrics.is_empty() { + otel_debug!(name: "NoMetricsCollected"); return Ok(()); } - let export = self.reader.exporter.export(&mut self.rm); - let timeout = self.runtime.delay(self.timeout); - pin_mut!(export); - pin_mut!(timeout); + // TODO: substract the time taken for collect from the timeout. collect + // involves observable callbacks too, which are user defined and can + // take arbitrary time. + // + // Relying on futures executor to execute asyc call. No timeout is + // enforced here. The exporter is responsible for enforcing the timeout. + let exporter_result = futures_executor::block_on(self.exporter.export(&mut rm, timeout)); + #[allow(clippy::question_mark)] + if let Err(e) = exporter_result { + otel_warn!( + name: "PeriodReaderExportError", + event_name = "PeriodReaderExportError", + error = format!("{:?}", e) + ); + return Err(e); + } + + Ok(()) + } - match future::select(export, timeout).await { - Either::Left((res, _)) => { - res // return the status of export. + fn force_flush(&self) -> Result<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricsError::Other("reader is shut down".into())); + } + let (response_tx, response_rx) = mpsc::channel(); + match self.message_sender.lock() { + Ok(sender) => { + sender + .send(Message::Flush(response_tx)) + .map_err(|e| MetricsError::Other(e.to_string()))?; } - Either::Right(_) => { + Err(e) => { otel_error!( - name: "collect_and_export", - status = "timed_out" + name: "PeriodReaderForceFlushError", + event_name = "PeriodReaderForceFlushError", + error = format!("{:?}", e) ); - Err(MetricsError::Other("export timed out".into())) + return Err(MetricsError::Other(e.to_string())); } } - } - async fn process_message(&mut self, message: Message) -> bool { - match message { - Message::Export => { - if let Err(err) = self.collect_and_export().await { - global::handle_error(err) - } + if let Ok(response) = response_rx.recv() { + if response { + Ok(()) + } else { + Err(MetricsError::Other("Failed to flush".into())) } - Message::Flush(ch) => { - let res = self.collect_and_export().await; - if ch.send(res).is_err() { - global::handle_error(MetricsError::Other("flush channel closed".into())) - } + } else { + Err(MetricsError::Other("Failed to flush".into())) + } + } + + fn shutdown(&self) -> Result<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Err(MetricsError::Other("reader is already shut down".into())); + } + + let (response_tx, response_rx) = mpsc::channel(); + match self.message_sender.lock() { + Ok(sender) => { + sender + .send(Message::Shutdown(response_tx)) + .map_err(|e| MetricsError::Other(e.to_string()))?; } - Message::Shutdown(ch) => { - let res = self.collect_and_export().await; - let _ = self.reader.exporter.shutdown(); - if ch.send(res).is_err() { - global::handle_error(MetricsError::Other("shutdown channel closed".into())) - } - return false; + Err(e) => { + otel_error!( + name: "PeriodReaderShutdownError", + event_name = "PeriodReaderShutdownError", + error = format!("{:?}", e) + ); + return Err(MetricsError::Other(e.to_string())); } } - true - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - while let Some(message) = messages.next().await { - if !self.process_message(message).await { - break; + if let Ok(response) = response_rx.recv() { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + if response { + Ok(()) + } else { + Err(MetricsError::Other("Failed to shutdown".into())) } + } else { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + Err(MetricsError::Other("Failed to shutdown".into())) } } } +#[derive(Debug)] +enum Message { + Flush(Sender), + Shutdown(Sender), +} + impl TemporalitySelector for PeriodicReader { fn temporality(&self, kind: InstrumentKind) -> Temporality { - self.exporter.temporality(kind) + self.inner.temporality(kind) } } impl MetricReader for PeriodicReader { fn register_pipeline(&self, pipeline: Weak) { - let mut inner = match self.inner.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - let worker = match &mut inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(_) => { - // Only register once. If producer is already set, do nothing. - global::handle_error(MetricsError::Other( - "duplicate meter registration, did not register manual reader".into(), - )); - return; - } - ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})), - }; - - inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline); - worker(self); + self.inner.register_pipeline(pipeline); } fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> { - let inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricsError::Other("reader is shut down".into())); - } - - if let Some(producer) = match &inner.sdk_producer_or_worker { - ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(), - ProducerOrWorker::Worker(_) => None, - } { - producer.produce(rm)?; - } else { - return Err(MetricsError::Other("reader is not registered".into())); - } - - Ok(()) + self.inner.collect(rm) } fn force_flush(&self) -> Result<()> { - let mut inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricsError::Other("reader is shut down".into())); - } - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Flush(sender)) - .map_err(|e| MetricsError::Other(e.to_string()))?; - - drop(inner); // don't hold lock when blocking on future - - futures_executor::block_on(receiver) - .map_err(|err| MetricsError::Other(err.to_string())) - .and_then(|res| res) + self.inner.force_flush() } + // TODO: Offer an async version of shutdown so users can await the shutdown + // completion, and avoid blocking the thread. The default shutdown on drop + // can still use blocking call. If user already explicitly called shutdown, + // drop won't call shutdown again. fn shutdown(&self) -> Result<()> { - let mut inner = self.inner.lock()?; - if inner.is_shutdown { - return Err(MetricsError::Other("reader is already shut down".into())); - } - - let (sender, receiver) = oneshot::channel(); - inner - .message_sender - .try_send(Message::Shutdown(sender)) - .map_err(|e| MetricsError::Other(e.to_string()))?; - drop(inner); // don't hold lock when blocking on future - - let shutdown_result = futures_executor::block_on(receiver) - .map_err(|err| MetricsError::Other(err.to_string()))?; - - // Acquire the lock again to set the shutdown flag - let mut inner = self.inner.lock()?; - inner.is_shutdown = true; - - shutdown_result + self.inner.shutdown() } } @@ -387,93 +487,318 @@ impl MetricReader for PeriodicReader { mod tests { use super::PeriodicReader; use crate::{ - metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider, - runtime, testing::metrics::InMemoryMetricsExporter, Resource, + metrics::{ + data::{ResourceMetrics, Temporality}, + exporter::PushMetricsExporter, + reader::{MetricReader, TemporalitySelector}, + InstrumentKind, SdkMeterProvider, + }, + testing::metrics::InMemoryMetricsExporter, + Resource, }; + use async_trait::async_trait; + use opentelemetry::metrics::Result; use opentelemetry::metrics::{MeterProvider, MetricsError}; - use std::sync::mpsc; + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Arc, + }, + time::Duration, + }; - #[test] - fn collection_triggered_by_interval_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + // use below command to run all tests + // cargo test metrics::periodic_reader::tests --features=testing -- --nocapture + + #[derive(Debug, Clone)] + struct MetricExporterThatFailsOnlyOnFirst { + count: Arc, } - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + impl Default for MetricExporterThatFailsOnlyOnFirst { + fn default() -> Self { + MetricExporterThatFailsOnlyOnFirst { + count: Arc::new(AtomicUsize::new(0)), + } + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + impl MetricExporterThatFailsOnlyOnFirst { + fn get_count(&self) -> usize { + self.count.load(Ordering::Relaxed) + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + impl TemporalitySelector for MetricExporterThatFailsOnlyOnFirst { + fn temporality(&self, _kind: InstrumentKind) -> Temporality { + Temporality::Cumulative + } } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() - { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + #[async_trait] + impl PushMetricsExporter for MetricExporterThatFailsOnlyOnFirst { + async fn export(&self, _metrics: &mut ResourceMetrics, _timeout: Duration) -> Result<()> { + if self.count.fetch_add(1, Ordering::Relaxed) == 0 { + Err(MetricsError::Other("export failed".into())) + } else { + Ok(()) + } + } + + async fn force_flush(&self) -> Result<()> { + Ok(()) + } + + fn shutdown(&self) -> Result<()> { + Ok(()) + } } - #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { - collection_triggered_by_interval_helper(runtime::Tokio); + #[test] + fn collection_triggered_by_interval_multiple() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + let i = Arc::new(AtomicUsize::new(0)); + let i_clone = i.clone(); + + // Act + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { + i_clone.fetch_add(1, Ordering::Relaxed); + }) + .init(); + + // Sleep for a duration 5X (plus liberal buffer to account for potential + // CI slowness) the interval to ensure multiple collection. + // Not a fan of such tests, but this seems to be the only way to test + // if periodic reader is doing its job. + // TODO: Decide if this should be ignored in CI + std::thread::sleep(interval * 5 * 20); + + // Assert + assert!(i.load(Ordering::Relaxed) >= 5); } - #[tokio::test(flavor = "current_thread")] - async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { - collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + #[test] + fn shutdown_repeat() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.shutdown(); + assert!(result.is_ok()); + + // calling shutdown again should return Err + let result = meter_provider.shutdown(); + assert!(result.is_err()); + + // calling shutdown again should return Err + let result = meter_provider.shutdown(); + assert!(result.is_err()); } #[test] - fn unregistered_collect() { + fn flush_after_shutdown() { // Arrange + let interval = std::time::Duration::from_millis(1); let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); - let mut rm = ResourceMetrics { + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + + let result = meter_provider.shutdown(); + assert!(result.is_ok()); + + // calling force_flush after shutdown should return Err + let result = meter_provider.force_flush(); + assert!(result.is_err()); + } + + #[test] + fn flush_repeat() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + + // calling force_flush again should return Ok + let result = meter_provider.force_flush(); + assert!(result.is_ok()); + } + + #[test] + fn periodic_reader_without_pipeline() { + // Arrange + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let rm = &mut ResourceMetrics { resource: Resource::empty(), scope_metrics: Vec::new(), }; + // Pipeline is not registered, so collect should return an error + let result = reader.collect(rm); + assert!(result.is_err()); + + // Pipeline is not registered, so flush should return an error + let result = reader.force_flush(); + assert!(result.is_err()); + + // Adding reader to meter provider should register the pipeline + // TODO: This part might benefit from a different design. + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader.clone()) + .build(); - // Act - let result = reader.collect(&mut rm); + // Now collect and flush should succeed + let result = reader.collect(rm); + assert!(result.is_ok()); - // Assert - assert!( - matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered") - ); + let result = meter_provider.force_flush(); + assert!(result.is_ok()); } - fn collection_triggered_by_interval_helper(runtime: RT) - where - RT: crate::runtime::Runtime, - { - let interval = std::time::Duration::from_millis(1); + #[test] + fn exporter_failures_are_handled() { + // create a mock exporter that fails 1st time and succeeds 2nd time + // Validate using this exporter that periodic reader can handle exporter failure + // and continue to export metrics. + // Arrange + let interval = std::time::Duration::from_millis(10); + let exporter = MetricExporterThatFailsOnlyOnFirst::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let counter = meter.u64_counter("sync_counter").init(); + counter.add(1, &[]); + let _obs_counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |observer| { + observer.observe(1, &[]); + }) + .init(); + + // Sleep for a duration much longer than the interval to trigger + // multiple exports, including failures. + // Not a fan of such tests, but this seems to be the + // only way to test if periodic reader is doing its job. TODO: Decide if + // this should be ignored in CI + std::thread::sleep(Duration::from_millis(500)); + + // Assert that atleast 2 exports are attempted given the 1st one fails. + assert!(exporter.get_count() >= 2); + } + + #[test] + fn collection() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_from_tokio_multi_with_one_worker() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_from_tokio_with_two_worker() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn collection_from_tokio_current() { + collection_triggered_by_interval_helper(); + collection_triggered_by_flush_helper(); + collection_triggered_by_shutdown_helper(); + } + + fn collection_triggered_by_interval_helper() { + collection_helper(|_| { + // Sleep for a duration longer than the interval to ensure at least one collection + // Not a fan of such tests, but this seems to be the only way to test + // if periodic reader is doing its job. + // TODO: Decide if this should be ignored in CI + std::thread::sleep(Duration::from_millis(500)); + }); + } + + fn collection_triggered_by_flush_helper() { + collection_helper(|meter_provider| { + meter_provider.force_flush().expect("flush should succeed"); + }); + } + + fn collection_triggered_by_shutdown_helper() { + collection_helper(|meter_provider| { + meter_provider.shutdown().expect("shutdown should succeed"); + }); + } + + fn collection_helper(trigger: fn(&SdkMeterProvider)) { + // Arrange + let interval = std::time::Duration::from_millis(10); let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime) + let reader = PeriodicReader::builder(exporter.clone()) .with_interval(interval) .build(); let (sender, receiver) = mpsc::channel(); - // Act let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let meter = meter_provider.meter("test"); let _counter = meter .u64_observable_counter("testcounter") - .with_callback(move |_| { + .with_callback(move |observer| { + observer.observe(1, &[]); sender.send(()).expect("channel should still be open"); }) .init(); + // Act + trigger(&meter_provider); + // Assert receiver - .recv() - .expect("message should be available in channel, indicating a collection occurred"); + .recv_timeout(Duration::ZERO) + .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback"); + + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); } } diff --git a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs index 89eec6e6b7..70cb7b6d18 100644 --- a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs @@ -8,6 +8,7 @@ use opentelemetry::metrics::Result; use std::collections::VecDeque; use std::fmt; use std::sync::{Arc, Mutex}; +use std::time::Duration; /// An in-memory metrics exporter that stores metrics data in memory. /// @@ -24,7 +25,7 @@ use std::sync::{Arc, Mutex}; /// # Example /// /// ``` -///# use opentelemetry_sdk::{metrics, runtime}; +///# use opentelemetry_sdk::metrics; ///# use opentelemetry::{KeyValue}; ///# use opentelemetry::metrics::MeterProvider; ///# use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; @@ -37,7 +38,7 @@ use std::sync::{Arc, Mutex}; /// /// // Create a MeterProvider and register the exporter /// let meter_provider = metrics::SdkMeterProvider::builder() -/// .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_reader(PeriodicReader::builder(exporter.clone()).build()) /// .build(); /// /// // Create and record metrics using the MeterProvider @@ -259,7 +260,7 @@ impl TemporalitySelector for InMemoryMetricsExporter { #[async_trait] impl PushMetricsExporter for InMemoryMetricsExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { + async fn export(&self, metrics: &mut ResourceMetrics, _timeout: Duration) -> Result<()> { self.metrics .lock() .map(|mut metrics_guard| { diff --git a/opentelemetry-stdout/examples/basic.rs b/opentelemetry-stdout/examples/basic.rs index 4289c74ec4..6191b10b32 100644 --- a/opentelemetry-stdout/examples/basic.rs +++ b/opentelemetry-stdout/examples/basic.rs @@ -6,9 +6,6 @@ use opentelemetry::{global, KeyValue}; #[cfg(feature = "trace")] use opentelemetry::trace::{Span, Tracer}; -#[cfg(feature = "metrics")] -use opentelemetry_sdk::runtime; - #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; @@ -37,7 +34,7 @@ fn init_trace() { #[cfg(feature = "metrics")] fn init_metrics() -> opentelemetry_sdk::metrics::SdkMeterProvider { let exporter = opentelemetry_stdout::MetricsExporter::default(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let reader = PeriodicReader::builder(exporter).build(); let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(RESOURCE.clone()) diff --git a/opentelemetry-stdout/src/lib.rs b/opentelemetry-stdout/src/lib.rs index deab6ac355..93f1de9e75 100644 --- a/opentelemetry-stdout/src/lib.rs +++ b/opentelemetry-stdout/src/lib.rs @@ -25,7 +25,6 @@ //! use opentelemetry::{Context, KeyValue}; //! //! use opentelemetry_sdk::metrics::{SdkMeterProvider, PeriodicReader}; -//! use opentelemetry_sdk::runtime; //! use opentelemetry_sdk::trace::TracerProvider; //! //! use opentelemetry_sdk::logs::LoggerProvider; @@ -39,7 +38,7 @@ //! //! fn init_metrics() -> SdkMeterProvider { //! let exporter = opentelemetry_stdout::MetricsExporter::default(); -//! let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); +//! let reader = PeriodicReader::builder(exporter).build(); //! SdkMeterProvider::builder().with_reader(reader).build() //! } //! diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index fd39f8919e..bd7f1a9be5 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -8,8 +8,8 @@ use opentelemetry_sdk::metrics::{ reader::{DefaultTemporalitySelector, TemporalitySelector}, InstrumentKind, }; -use std::fmt::Debug; use std::sync::atomic; +use std::{fmt::Debug, time::Duration}; /// An OpenTelemetry exporter that writes to stdout on export. pub struct MetricsExporter { @@ -44,7 +44,7 @@ impl TemporalitySelector for MetricsExporter { #[async_trait] impl PushMetricsExporter for MetricsExporter { /// Write Metrics to stdout - async fn export(&self, metrics: &mut data::ResourceMetrics) -> Result<()> { + async fn export(&self, metrics: &mut data::ResourceMetrics, _timeout: Duration) -> Result<()> { if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err(MetricsError::Other("exporter is shut down".into())) } else {