From fd52bf2ea907f99298947b5a1705ec99602774ca Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 30 Oct 2023 14:04:01 -0400 Subject: [PATCH 1/2] OPW-94 remove peer_addr tag --- src/internal_events/tcp.rs | 3 +- src/sources/socket/mod.rs | 54 +++++++++---------- src/sources/statsd/mod.rs | 8 +-- src/sources/syslog.rs | 4 +- src/test_util/components.rs | 7 +-- .../2023-11-07-0-34-0-upgrade-guide.md | 4 ++ 6 files changed, 39 insertions(+), 41 deletions(-) diff --git a/src/internal_events/tcp.rs b/src/internal_events/tcp.rs index 6b7607cbb6a4e..1f6a3f74db06b 100644 --- a/src/internal_events/tcp.rs +++ b/src/internal_events/tcp.rs @@ -144,8 +144,7 @@ impl InternalEvent for TcpBytesReceived { ); counter!( "component_received_bytes_total", self.byte_size as u64, - "protocol" => "tcp", - "peer_addr" => self.peer_addr.to_string() + "protocol" => "tcp" ); } } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 64fdcd99b225b..62f044714e2c8 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -376,7 +376,7 @@ mod test { sources::util::net::SocketListenAddr, test_util::{ collect_n, collect_n_limited, - components::{assert_source_compliance, SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS}, + components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS}, next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp, }, tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig}, @@ -391,7 +391,7 @@ mod test { //////// TCP TESTS //////// #[tokio::test] async fn tcp_it_includes_host() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -416,7 +416,7 @@ mod test { #[tokio::test] async fn tcp_it_includes_vector_namespaced_fields() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); let mut conf = TcpConfig::from_address(addr.into()); @@ -456,7 +456,7 @@ mod test { #[tokio::test] async fn tcp_splits_on_newline() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let addr = next_addr(); @@ -488,7 +488,7 @@ mod test { #[tokio::test] async fn tcp_it_includes_source_type() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -514,7 +514,7 @@ mod test { #[tokio::test] async fn tcp_continue_after_long_line() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -555,7 +555,7 @@ mod test { #[tokio::test] async fn tcp_with_tls() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -619,7 +619,7 @@ mod test { #[tokio::test] async fn tcp_with_tls_vector_namespace() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -694,7 +694,7 @@ mod test { #[tokio::test] async fn tcp_shutdown_simple() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let source_id = ComponentKey::from("tcp_shutdown_simple"); let (tx, mut rx) = SourceSender::new_test(); let addr = next_addr(); @@ -962,7 +962,7 @@ mod test { #[tokio::test] async fn udp_message() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; @@ -979,7 +979,7 @@ mod test { #[tokio::test] async fn udp_message_preserves_newline() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; @@ -996,7 +996,7 @@ mod test { #[tokio::test] async fn udp_multiple_packets() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; @@ -1017,7 +1017,7 @@ mod test { #[tokio::test] async fn udp_max_length() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = next_addr(); let mut config = UdpConfig::from_address(address.into()); @@ -1053,7 +1053,7 @@ mod test { /// Windows will drop the entire packet if we exceed the max_length so we are unable to /// extract anything. async fn udp_max_length_delimited() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = next_addr(); let mut config = UdpConfig::from_address(address.into()); @@ -1084,7 +1084,7 @@ mod test { #[tokio::test] async fn udp_it_includes_host() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; @@ -1099,7 +1099,7 @@ mod test { #[tokio::test] async fn udp_it_includes_vector_namespaced_fields() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, true).await; @@ -1127,7 +1127,7 @@ mod test { #[tokio::test] async fn udp_it_includes_source_type() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let address = init_udp(tx, false).await; @@ -1144,7 +1144,7 @@ mod test { #[tokio::test] async fn udp_shutdown_simple() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let source_id = ComponentKey::from("udp_shutdown_simple"); @@ -1174,7 +1174,7 @@ mod test { #[tokio::test] async fn udp_shutdown_infinite_stream() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (tx, rx) = SourceSender::new_test(); let source_id = ComponentKey::from("udp_shutdown_infinite_stream"); @@ -1334,7 +1334,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_datagram_message() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("test", false, false).await; let events = collect_n(rx, 1).await; @@ -1401,7 +1401,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_datagram_message_with_vector_namespace() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("test", false, true).await; let events = collect_n(rx, 1).await; let log = events[0].as_log(); @@ -1426,7 +1426,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_datagram_message_preserves_newline() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("foo\nbar", false, false).await; let events = collect_n(rx, 1).await; @@ -1446,7 +1446,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_datagram_multiple_packets() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { unix_multiple_packets(false).await }) .await; @@ -1513,7 +1513,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_stream_message() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("test", true, false).await; let events = collect_n(rx, 1).await; @@ -1533,7 +1533,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_stream_message_with_vector_namespace() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("test", true, true).await; let events = collect_n(rx, 1).await; let log = events[0].as_log(); @@ -1556,7 +1556,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_stream_message_splits_on_newline() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let (_, rx) = unix_message("foo\nbar", true, false).await; let events = collect_n(rx, 2).await; @@ -1584,7 +1584,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn unix_stream_multiple_packets() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { unix_multiple_packets(true).await }) .await; diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 6e7139495c103..70b72eb941f79 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -351,7 +351,7 @@ mod test { collect_limited, components::{ assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS, - SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, + SOCKET_PUSH_SOURCE_TAGS, }, metrics::{assert_counter, assert_distribution, assert_gauge, assert_set}, next_addr, @@ -365,7 +365,7 @@ mod test { #[tokio::test] async fn test_statsd_udp() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move { let in_addr = next_addr(); let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into())); let (sender, mut receiver) = mpsc::channel(200); @@ -384,7 +384,7 @@ mod test { #[tokio::test] async fn test_statsd_tcp() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move { let in_addr = next_addr(); let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into())); let (sender, mut receiver) = mpsc::channel(200); @@ -427,7 +427,7 @@ mod test { #[cfg(unix)] #[tokio::test] async fn test_statsd_unix() { - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move { let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test"); let config = StatsdConfig::Unix(UnixConfig { path: in_path.clone(), diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index cb1dd777775b9..1b051de08c80b 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -1190,14 +1190,14 @@ mod test { #[cfg(unix)] #[tokio::test] async fn test_unix_stream_syslog() { - use crate::test_util::components::SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS; + use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS; use futures_util::{stream, SinkExt}; use std::os::unix::net::UnixStream as StdUnixStream; use tokio::io::AsyncWriteExt; use tokio::net::UnixStream; use tokio_util::codec::{FramedWrite, LinesCodec}; - assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { let num_messages: usize = 1; let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test"); diff --git a/src/test_util/components.rs b/src/test_util/components.rs index 8e08275a9bcb5..8b4cdf46158a6 100644 --- a/src/test_util/components.rs +++ b/src/test_util/components.rs @@ -42,12 +42,7 @@ pub const HTTP_PULL_SOURCE_TAGS: [&str; 2] = ["endpoint", "protocol"]; pub const HTTP_PUSH_SOURCE_TAGS: [&str; 2] = ["http_path", "protocol"]; /// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`. -pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"]; - -/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`, but -/// specifically sources that experience high cardinality i.e. many many clients, where emitting metrics with the peer -/// address as a tag would represent too high of a cost to pay. -pub const SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"]; +pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"]; /// The standard set of tags for all generic socket-based sources that poll connections i.e. Redis. pub const SOCKET_PULL_SOURCE_TAGS: [&str; 2] = ["remote_addr", "protocol"]; diff --git a/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md b/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md index 242ecabeb6ec1..e4fd0686723a9 100644 --- a/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md @@ -13,6 +13,7 @@ Vector's 0.34.0 release includes **breaking changes**: 1. [Removal of Deprecated Datadog Component Config Options](#datadog-deprecated-config-options) 1. [Removal of Deprecated `component_name` Metric Tag](#deprecated-component-name) +1. [Removal of `peer_addr` Metric Tag](#remove-peer-addr) 1. [Blackhole sink no longer reports by default](#blackhole-sink-reporting) We cover them below to help you upgrade quickly: @@ -33,6 +34,9 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used. +#### Removal of `peer_addr` Metric Tag {#remove-peer-addr} +The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality. + #### Blackhole sink no longer reports by default {#blackhole-sink-reporting} The `blackhole` sink no longer reports events processed every second by default. Instead this From 0d32982c157a532d48641da30b6e15aa065d8ffb Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 30 Oct 2023 14:39:51 -0400 Subject: [PATCH 2/2] fmt --- website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md b/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md index e4fd0686723a9..ba61b2bbd0ffc 100644 --- a/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-11-07-0-34-0-upgrade-guide.md @@ -35,6 +35,7 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used. #### Removal of `peer_addr` Metric Tag {#remove-peer-addr} + The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality. #### Blackhole sink no longer reports by default {#blackhole-sink-reporting}