Skip to content

Commit

Permalink
Add tests for periodic reader from various RT combinations (open-tele…
Browse files Browse the repository at this point in the history
…metry#2147)

These tests should help with testing open-telemetry#2142
  • Loading branch information
cijothomas committed Oct 4, 2024
1 parent 966c9a6 commit 8f71c7b
Showing 1 changed file with 62 additions and 38 deletions.
100 changes: 62 additions & 38 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,32 +421,41 @@ mod tests {
// cargo test metrics::periodic_reader::tests --features=testing -- --nocapture

#[test]
fn collection_triggered_by_interval() {
// Arrange
let interval = std::time::Duration::from_millis(1000);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();
fn collection_triggered_by_interval_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.init();
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

// Sleep for a duration longer than the interval to ensure at least one collection
std::thread::sleep(interval * 2);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

// Assert
receiver
.recv_timeout(Duration::ZERO)
.expect("message should be available in channel, indicating a collection occurred");
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "current_thread")]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[test]
Expand Down Expand Up @@ -551,25 +560,40 @@ mod tests {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
// Pipeline is not registered, so collect should return an error
let result = reader.collect(rm);
assert!(result.is_err());

// Pipeline is not registered, so flush should return an error
let result = reader.force_flush();
assert!(result.is_err());
// Act
let result = reader.collect(&mut rm);

// Assert
assert!(
matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered")
);
}

// Adding reader to meter provider should register the pipeline
// TODO: This part might benefit from a different design.
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
fn collection_triggered_by_interval_helper<RT>(runtime: RT)
where
RT: crate::runtime::Runtime,
{
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();

// Now collect and flush should succeed
let result = reader.collect(rm);
assert!(result.is_ok());
// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.init();

let result = meter_provider.force_flush();
assert!(result.is_ok());
// Assert
receiver
.recv()
.expect("message should be available in channel, indicating a collection occurred");
}
}

0 comments on commit 8f71c7b

Please sign in to comment.