Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability)!: remove peer_addr internal metric tag #18982

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/internal_events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl InternalEvent for TcpBytesReceived {
);
counter!(
"component_received_bytes_total", self.byte_size as u64,
"protocol" => "tcp",
"peer_addr" => self.peer_addr.to_string()
"protocol" => "tcp"
);
}
}
54 changes: 27 additions & 27 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS},
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
Expand All @@ -391,7 +391,7 @@ mod test {
//////// TCP TESTS ////////
#[tokio::test]
async fn tcp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -416,7 +416,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
let mut conf = TcpConfig::from_address(addr.into());
Expand Down Expand Up @@ -456,7 +456,7 @@ mod test {

#[tokio::test]
async fn tcp_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -488,7 +488,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -514,7 +514,7 @@ mod test {

#[tokio::test]
async fn tcp_continue_after_long_line() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -555,7 +555,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -619,7 +619,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -694,7 +694,7 @@ mod test {

#[tokio::test]
async fn tcp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let source_id = ComponentKey::from("tcp_shutdown_simple");
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
Expand Down Expand Up @@ -962,7 +962,7 @@ mod test {

#[tokio::test]
async fn udp_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -979,7 +979,7 @@ mod test {

#[tokio::test]
async fn udp_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -996,7 +996,7 @@ mod test {

#[tokio::test]
async fn udp_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1017,7 +1017,7 @@ mod test {

#[tokio::test]
async fn udp_max_length() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1053,7 +1053,7 @@ mod test {
/// Windows will drop the entire packet if we exceed the max_length so we are unable to
/// extract anything.
async fn udp_max_length_delimited() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1084,7 +1084,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1099,7 +1099,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, true).await;

Expand Down Expand Up @@ -1127,7 +1127,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1144,7 +1144,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_simple");

Expand Down Expand Up @@ -1174,7 +1174,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_infinite_stream() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_infinite_stream");

Expand Down Expand Up @@ -1334,7 +1334,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, false).await;
let events = collect_n(rx, 1).await;

Expand Down Expand Up @@ -1401,7 +1401,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1426,7 +1426,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", false, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1446,7 +1446,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(false).await
})
.await;
Expand Down Expand Up @@ -1513,7 +1513,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1533,7 +1533,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1556,7 +1556,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", true, false).await;
let events = collect_n(rx, 2).await;

Expand Down Expand Up @@ -1584,7 +1584,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(true).await
})
.await;
Expand Down
8 changes: 4 additions & 4 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ mod test {
collect_limited,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
next_addr,
Expand All @@ -365,7 +365,7 @@ mod test {

#[tokio::test]
async fn test_statsd_udp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand All @@ -384,7 +384,7 @@ mod test {

#[tokio::test]
async fn test_statsd_tcp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand Down Expand Up @@ -427,7 +427,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_statsd_unix() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,14 +1190,14 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
use crate::test_util::components::SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS;
use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
use futures_util::{stream, SinkExt};
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LinesCodec};

assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1;
let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test");

Expand Down
7 changes: 1 addition & 6 deletions src/test_util/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ pub const HTTP_PULL_SOURCE_TAGS: [&str; 2] = ["endpoint", "protocol"];
pub const HTTP_PUSH_SOURCE_TAGS: [&str; 2] = ["http_path", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`.
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`, but
/// specifically sources that experience high cardinality i.e. many many clients, where emitting metrics with the peer
/// address as a tag would represent too high of a cost to pay.
pub const SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];

/// The standard set of tags for all generic socket-based sources that poll connections i.e. Redis.
pub const SOCKET_PULL_SOURCE_TAGS: [&str; 2] = ["remote_addr", "protocol"];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Vector's 0.34.0 release includes **breaking changes**:

1. [Removal of Deprecated Datadog Component Config Options](#datadog-deprecated-config-options)
1. [Removal of Deprecated `component_name` Metric Tag](#deprecated-component-name)
1. [Removal of `peer_addr` Metric Tag](#remove-peer-addr)
1. [Blackhole sink no longer reports by default](#blackhole-sink-reporting)

We cover them below to help you upgrade quickly:
Expand All @@ -33,6 +34,10 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou

The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used.

#### Removal of `peer_addr` Metric Tag {#remove-peer-addr}

The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality.

#### Blackhole sink no longer reports by default {#blackhole-sink-reporting}

The `blackhole` sink no longer reports events processed every second by default. Instead this
Expand Down
Loading