Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datadog_metrics sink): the integration tests weren't actually validating anything #18754

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions .github/workflows/changes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ on:
value: ${{ jobs.int_tests.outputs.clickhouse }}
databend:
value: ${{ jobs.int_tests.outputs.databend }}
datadog:
value: ${{ jobs.int_tests.outputs.datadog }}
datadog-agent:
value: ${{ jobs.int_tests.outputs.datadog-agent }}
datadog-logs:
value: ${{ jobs.int_tests.outputs.datadog-logs }}
datadog-metrics:
value: ${{ jobs.int_tests.outputs.datadog-metrics }}
datadog-traces:
value: ${{ jobs.int_tests.outputs.datadog-traces }}
dnstap:
value: ${{ jobs.int_tests.outputs.dnstap }}
docker-logs:
Expand Down Expand Up @@ -189,7 +195,10 @@ jobs:
azure: ${{ steps.filter.outputs.azure }}
clickhouse: ${{ steps.filter.outputs.clickhouse }}
databend: ${{ steps.filter.outputs.databend }}
datadog: ${{ steps.filter.outputs.datadog }}
datadog-agent: ${{ steps.filter.outputs.datadog-agent }}
datadog-logs: ${{ steps.filter.outputs.datadog-logs }}
datadog-metrics: ${{ steps.filter.outputs.datadog-metrics }}
datadog-traces: ${{ steps.filter.outputs.datadog-traces }}
dnstap: ${{ steps.filter.outputs.dnstap }}
docker-logs: ${{ steps.filter.outputs.docker-logs }}
elasticsearch: ${{ steps.filter.outputs.elasticsearch }}
Expand Down
13 changes: 8 additions & 5 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ jobs:
|| needs.changes.outputs.azure == 'true'
|| needs.changes.outputs.clickhouse == 'true'
|| needs.changes.outputs.databend == 'true'
|| needs.changes.outputs.datadog == 'true'
|| needs.changes.outputs.datadog-agent == 'true'
|| needs.changes.outputs.datadog-logs == 'true'
|| needs.changes.outputs.datadog-metrics == 'true'
|| needs.changes.outputs.datadog-traces == 'true'
|| needs.changes.outputs.dnstap == 'true'
|| needs.changes.outputs.docker-logs == 'true'
|| needs.changes.outputs.elasticsearch == 'true'
Expand Down Expand Up @@ -166,7 +169,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh databend

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-agent == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-agent
uses: nick-fields/retry@v2
Expand All @@ -175,7 +178,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-agent

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-logs == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-logs
uses: nick-fields/retry@v2
Expand All @@ -184,7 +187,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-logs

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-metrics == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-metrics
uses: nick-fields/retry@v2
Expand All @@ -193,7 +196,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-metrics

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-traces == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-traces
uses: nick-fields/retry@v2
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/datadog-metrics/test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
features:
- datadog-metrics-integration-tests

test_filter: '::datadog::metrics::'
test_filter: '::datadog::metrics::integration_tests'

runner:
env:
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/datadog-traces/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ matrix:
paths:
- "src/common/datadog.rs"
- "src/internal_events/datadog_*"
- "src/sinks/datadog/**"
- "src/sinks/datadog/traces/**"
- "src/sinks/util/**"
- "scripts/integration/datadog-traces/**"
226 changes: 117 additions & 109 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use chrono::{SubsecRound, Utc};
use flate2::read::ZlibDecoder;
use futures::{channel::mpsc::Receiver, stream, StreamExt};
use hyper::StatusCode;
Expand All @@ -22,28 +23,25 @@ use crate::{
},
};

enum ApiStatus {
OK,
// Forbidden,
}
fn generate_metric_events() -> Vec<Event> {
let timestamp = Utc::now().trunc_subsecs(3);
let events: Vec<_> = (0..10)
.map(|index| {
let ts = timestamp + (std::time::Duration::from_secs(2) * index);
Event::Metric(
Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
MetricKind::Incremental,
MetricValue::Counter {
value: index as f64,
},
)
.with_timestamp(Some(ts)),
)
})
.collect();

fn test_server(
addr: std::net::SocketAddr,
api_status: ApiStatus,
) -> (
futures::channel::mpsc::Receiver<(http::request::Parts, Bytes)>,
stream_cancel::Trigger,
impl std::future::Future<Output = Result<(), ()>>,
) {
let status = match api_status {
ApiStatus::OK => StatusCode::OK,
// ApiStatus::Forbidden => StatusCode::FORBIDDEN,
};

// NOTE: we pass `Trigger` out to the caller even though this suite never
// uses it as it's being dropped cancels the stream machinery here,
// indicating failures that might not be valid.
build_test_server_status(addr, status)
events
}

/// Starts a test sink with random metrics running into it
Expand All @@ -55,10 +53,7 @@ fn test_server(
/// Testers may set `http_status` and `batch_status`. The first controls what
/// status code faked HTTP responses will have, the second acts as a check on
/// the `Receiver`'s status before being returned to the caller.
async fn start_test(
api_status: ApiStatus,
batch_status: BatchStatus,
) -> (Vec<Event>, Receiver<(http::request::Parts, Bytes)>) {
async fn start_test() -> (Vec<Event>, Receiver<(http::request::Parts, Bytes)>) {
let config = indoc! {r#"
default_api_key = "atoken"
default_namespace = "foo"
Expand All @@ -73,25 +68,18 @@ async fn start_test(

let (sink, _) = config.build(cx).await.unwrap();

let (rx, _trigger, server) = test_server(addr, api_status);
let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK);
tokio::spawn(server);

let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let (batch, mut receiver) = BatchNotifier::new_with_receiver();

let events = generate_metric_events();

let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, batch_status);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

(events, rx)
}
Expand All @@ -110,68 +98,96 @@ fn decompress_payload(payload: Vec<u8>) -> std::io::Result<Vec<u8>> {
/// were delivered and then asserts that every message is able to be
/// deserialized.
async fn smoke() {
let (expected, rx) = start_test(ApiStatus::OK, BatchStatus::Delivered).await;
let (expected, rx) = start_test().await;

let output = rx.take(expected.len()).collect::<Vec<_>>().await;

for val in output.iter() {
assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken");
assert!(val.0.headers.contains_key("DD-Agent-Payload"));

let compressed_payload = val.1.to_vec();
let payload = decompress_payload(compressed_payload).unwrap();
let payload = std::str::from_utf8(&payload).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();

let series = payload
.as_object()
.unwrap()
.get("series")
.unwrap()
.as_array()
.unwrap();
assert!(!series.is_empty());

// check metrics are sorted by name, which helps HTTP compression
let metric_names: Vec<String> = series
.iter()
.map(|value| {
value
.as_object()
.unwrap()
.get("metric")
.unwrap()
.as_str()
.unwrap()
.to_string()
})
.collect();
let mut sorted_names = metric_names.clone();
sorted_names.sort();
assert_eq!(metric_names, sorted_names);

let entry = series.first().unwrap().as_object().unwrap();
assert_eq!(
entry.get("metric").unwrap().as_str().unwrap(),
"foo.counter"
);
assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count");
let points = entry
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap();
assert_eq!(points.len(), 2);
assert_eq!(points.get(1).unwrap().as_f64().unwrap(), 1.0);
}
assert!(output.len() == 1, "Should have received a response");

let val = output.first().unwrap();

assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken");
assert!(val.0.headers.contains_key("DD-Agent-Payload"));

let compressed_payload = val.1.to_vec();
let payload = decompress_payload(compressed_payload).unwrap();
let payload = std::str::from_utf8(&payload).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();

let series = payload
.as_object()
.unwrap()
.get("series")
.unwrap()
.as_array()
.unwrap();
assert!(!series.is_empty());

// check metrics are sorted by name, which helps HTTP compression
let metric_names: Vec<String> = series
.iter()
.map(|value| {
value
.as_object()
.unwrap()
.get("metric")
.unwrap()
.as_str()
.unwrap()
.to_string()
})
.collect();
let mut sorted_names = metric_names.clone();
sorted_names.sort();
assert_eq!(metric_names, sorted_names);

let entry = series.first().unwrap().as_object().unwrap();
assert!(entry
.get("metric")
.unwrap()
.as_str()
.unwrap()
.starts_with("foo.counter_"),);
assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count");
let points = entry
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap();
assert_eq!(points.len(), 2);

// validate that all values were received
let all_values: f64 = series
.iter()
.map(|entry| {
entry
.as_object()
.unwrap()
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap()
.get(1)
.unwrap()
.as_f64()
.unwrap()
})
.sum();

// the input values are [0..10)
assert_eq!(all_values, 45.0);
}

async fn run_sink() {
Expand All @@ -186,17 +202,9 @@ async fn run_sink() {

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();

let events = generate_metric_events();

let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
Expand Down