diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 9f9eab7cbc5191..8b23b1be7b1c69 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -55,7 +55,7 @@ pub struct LogSchema { /// This field will generally represent a real host, or container, that generated the message, /// but is somewhat source-dependent. #[serde(default = "LogSchema::default_host_key")] - host_key: String, + host_key: OptionalValuePath, /// The name of the event field to set the source identifier in. /// @@ -92,8 +92,8 @@ impl LogSchema { OptionalValuePath::new("timestamp") } - fn default_host_key() -> String { - String::from("host") + fn default_host_key() -> OptionalValuePath { + OptionalValuePath::new("host") } fn default_source_type_key() -> OptionalValuePath { @@ -121,8 +121,8 @@ impl LogSchema { self.timestamp_key.path.as_ref() } - pub fn host_key(&self) -> &str { - &self.host_key + pub fn host_key(&self) -> Option<&OwnedValuePath> { + self.host_key.path.as_ref() } pub fn source_type_key(&self) -> Option<&OwnedValuePath> { @@ -141,8 +141,8 @@ impl LogSchema { self.timestamp_key = OptionalValuePath { path: v }; } - pub fn set_host_key(&mut self, v: String) { - self.host_key = v; + pub fn set_host_key(&mut self, path: Option) { + self.host_key = OptionalValuePath { path }; } pub fn set_source_type_key(&mut self, path: Option) { @@ -169,7 +169,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.host_key' found".to_owned()); } else { - self.set_host_key(other.host_key().to_string()); + self.set_host_key(other.host_key().cloned()); } if self.message_key() != LOG_SCHEMA_DEFAULT.message_key() && self.message_key() != other.message_key() diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index e11423e122d13c..41633b32d93d53 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -458,7 +458,7 @@ impl LogEvent { pub fn host_path(&self) -> Option { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("host"), - LogNamespace::Legacy => Some(log_schema().host_key().to_owned()), + LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string), } } @@ -505,7 +505,9 @@ impl LogEvent { pub fn get_host(&self) -> Option<&Value> { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("host"), - LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())), + LogNamespace::Legacy => log_schema() + .host_key() + .and_then(|key| self.get((PathPrefix::Event, key))), } } diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index ee15ed3509cf6c..5bbfe4ad082d43 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -91,3 +91,11 @@ impl From for OptionalValuePath { Self { path: Some(path) } } } + +impl From> for OptionalValuePath { + fn from(value: Option) -> Self { + value.map_or(OptionalValuePath::none(), |path| { + OptionalValuePath::from(path) + }) + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index 40d0733f15cb08..799e8f6a3c7c3b 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -843,7 +843,10 @@ mod tests { ) .unwrap(); - assert_eq!("host", config.global.log_schema.host_key().to_string()); + assert_eq!( + "host", + config.global.log_schema.host_key().unwrap().to_string() + ); assert_eq!( "message", config.global.log_schema.message_key().to_string() @@ -879,7 +882,10 @@ mod tests { ) .unwrap(); - assert_eq!("this", config.global.log_schema.host_key().to_string()); + assert_eq!( + "this", + config.global.log_schema.host_key().unwrap().to_string() + ); assert_eq!("that", config.global.log_schema.message_key().to_string()); assert_eq!( "then", diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 0dd6c393e31b5b..49e0c6e61aacc3 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -385,7 +385,10 @@ fn sketch_to_proto_message( let name = get_namespaced_name(metric, default_namespace); let ts = encode_timestamp(metric.timestamp()); let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()).unwrap_or_default(); + let host = log_schema + .host_key() + .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default()) + .unwrap_or_default(); let tags = encode_tags(&tags); let cnt = ddsketch.count() as i64; @@ -497,7 +500,10 @@ fn generate_series_metrics( let name = get_namespaced_name(metric, default_namespace); let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()); + let host = log_schema + .host_key() + .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default()); + let source_type_name = tags.remove("source_type_name"); let device = tags.remove("device"); let ts = encode_timestamp(metric.timestamp()); diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 910e108dfebe05..fa1e2cc3f22805 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -7,6 +7,8 @@ use futures_util::{ }; use tokio::sync::oneshot::{channel, Sender}; use tower::Service; +use vrl::path::PathPrefix; + use vector_core::{ config::log_schema, event::Event, @@ -15,11 +17,13 @@ use vector_core::{ stream::{BatcherSettings, DriverResponse}, }; -use super::service::TraceApiRequest; use crate::{ internal_events::DatadogTracesEncodingError, sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt}, }; + +use super::service::TraceApiRequest; + #[derive(Default)] struct EventPartitioner; @@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner { Event::Trace(t) => PartitionKey { api_key: item.metadata().datadog_api_key(), env: t.get("env").map(|s| s.to_string_lossy().into_owned()), - hostname: t - .get(log_schema().host_key()) - .map(|s| s.to_string_lossy().into_owned()), + hostname: log_schema().host_key().and_then(|key| { + t.get((PathPrefix::Event, key)) + .map(|s| s.to_string_lossy().into_owned()) + }), agent_version: t .get("agent_version") .map(|s| s.to_string_lossy().into_owned()), diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index b87446ff015f74..fc76533a5830d7 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -3,7 +3,7 @@ use lookup::lookup_v2::OptionalValuePath; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; -use super::host_key; +use super::config_host_key; use crate::sinks::splunk_hec::common::config_timestamp_key; use crate::{ codecs::EncodingConfig, @@ -74,8 +74,8 @@ pub struct HumioLogsConfig { /// By default, the [global `log_schema.host_key` option][global_host_key] is used. /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key - #[serde(default = "host_key")] - pub(super) host_key: String, + #[serde(default = "config_host_key")] + pub(super) host_key: OptionalValuePath, /// Event fields to be added to Humio’s extra fields. /// @@ -154,7 +154,7 @@ impl GenerateConfig for HumioLogsConfig { event_type: None, indexed_fields: vec![], index: None, - host_key: host_key(), + host_key: config_host_key(), compression: Compression::default(), request: TowerRequestConfig::default(), batch: BatchConfig::default(), @@ -231,6 +231,7 @@ mod integration_tests { use serde::Deserialize; use serde_json::{json, Value as JsonValue}; use tokio::time::Duration; + use vrl::path::PathPrefix; use super::*; use crate::{ @@ -262,14 +263,14 @@ mod integration_tests { let message = random_string(100); let host = "192.168.1.1".to_string(); let mut event = LogEvent::from(message.clone()); - event.insert(log_schema().host_key(), host.clone()); + event.insert( + (PathPrefix::Event, log_schema().host_key().unwrap()), + host.clone(), + ); let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456); event.insert( - ( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap(), - ), + (PathPrefix::Event, log_schema().timestamp_key().unwrap()), ts, ); @@ -387,7 +388,9 @@ mod integration_tests { source: None, encoding: JsonSerializerConfig::default().into(), event_type: None, - host_key: log_schema().host_key().to_string(), + host_key: OptionalValuePath { + path: log_schema().host_key().cloned(), + }, indexed_fields: vec![], index: None, compression: Compression::None, diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index a336f60590aa9e..d3ca01cbc33d54 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -9,7 +9,7 @@ use vector_config::configurable_component; use vector_core::sink::StreamSink; use super::{ - host_key, + config_host_key, logs::{HumioLogsConfig, HOST}, }; use crate::{ @@ -86,8 +86,8 @@ pub struct HumioMetricsConfig { /// By default, the [global `log_schema.host_key` option][global_host_key] is used. /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key - #[serde(default = "host_key")] - host_key: String, + #[serde(default = "config_host_key")] + host_key: OptionalValuePath, /// Event fields to be added to Humio’s extra fields. /// diff --git a/src/sinks/humio/mod.rs b/src/sinks/humio/mod.rs index 4601bbaef5aaec..5a4f4f88582f2e 100644 --- a/src/sinks/humio/mod.rs +++ b/src/sinks/humio/mod.rs @@ -1,6 +1,10 @@ +use lookup::lookup_v2::OptionalValuePath; + pub mod logs; pub mod metrics; -fn host_key() -> String { - crate::config::log_schema().host_key().to_string() +pub fn config_host_key() -> OptionalValuePath { + OptionalValuePath { + path: crate::config::log_schema().host_key().cloned(), + } } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ca12946f00badc..78ed0155367ba1 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut}; use futures::SinkExt; use http::{Request, Uri}; use indoc::indoc; +use vrl::value::Kind; + use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; use lookup::{OwnedValuePath, PathPrefix}; use vector_config::configurable_component; use vector_core::config::log_schema; use vector_core::schema; -use vrl::value::Kind; use crate::{ codecs::Transformer, @@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig { .host_key .clone() .and_then(|k| k.path) - .unwrap_or_else(|| { - parse_value_path(log_schema().host_key()) - .expect("global log_schema.host_key to be valid path") - }); + .or(log_schema().host_key().cloned()) + .expect("global log_schema.host_key to be valid path"); let message_key = self .message_key @@ -409,10 +408,10 @@ mod tests { use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; use indoc::indoc; + use lookup::owned_value_path; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - use super::*; use crate::{ sinks::{ influxdb::test_util::{assert_fields, split_line_protocol, ts}, @@ -427,6 +426,8 @@ mod tests { }, }; + use super::*; + type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; #[test] @@ -880,16 +881,17 @@ mod tests { #[cfg(feature = "influxdb-integration-tests")] #[cfg(test)] mod integration_tests { + use std::sync::Arc; + use chrono::Utc; - use codecs::BytesDeserializerConfig; use futures::stream; + use vrl::value; + + use codecs::BytesDeserializerConfig; use lookup::{owned_value_path, path}; - use std::sync::Arc; use vector_core::config::{LegacyKey, LogNamespace}; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - use vrl::value; - use super::*; use crate::{ config::SinkContext, sinks::influxdb::{ @@ -900,6 +902,8 @@ mod integration_tests { test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, }; + use super::*; + #[tokio::test] async fn influxdb2_logs_put_data() { let endpoint = address_v2(); diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 64d6bb2dae06b3..afdb097b493043 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -132,8 +132,10 @@ pub fn build_uri( uri.parse::() } -pub fn host_key() -> String { - crate::config::log_schema().host_key().to_string() +pub fn config_host_key() -> OptionalValuePath { + OptionalValuePath { + path: crate::config::log_schema().host_key().cloned(), + } } pub fn config_timestamp_key() -> OptionalValuePath { diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 64299669e388b9..a205f1a0110de7 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -17,7 +17,7 @@ use crate::{ sinks::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, - build_healthcheck, build_http_batch_service, create_client, host_key, + build_healthcheck, build_http_batch_service, config_host_key, create_client, service::{HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, @@ -64,8 +64,8 @@ pub struct HecLogsSinkConfig { /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key #[configurable(metadata(docs::advanced))] - #[serde(default = "host_key")] - pub host_key: String, + #[serde(default = "config_host_key")] + pub host_key: OptionalValuePath, /// Fields to be [added to Splunk index][splunk_field_index_docs]. /// @@ -165,7 +165,7 @@ impl GenerateConfig for HecLogsSinkConfig { toml::Value::try_from(Self { default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(), endpoint: "endpoint".to_owned(), - host_key: host_key(), + host_key: config_host_key(), indexed_fields: vec![], index: None, sourcetype: None, @@ -273,7 +273,7 @@ impl HecLogsSinkConfig { source: self.source.clone(), index: self.index.clone(), indexed_fields: self.indexed_fields.clone(), - host: self.host_key.clone(), + host_key: self.host_key.path.clone(), timestamp_nanos_key: self.timestamp_nanos_key.clone(), timestamp_key: self.timestamp_key.path.clone(), endpoint_target: self.endpoint_target, diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index 0510d2dbe9721d..f686e9e74d4cc0 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -107,7 +107,7 @@ async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLog HecLogsSinkConfig { default_token: get_token().await.into(), endpoint: splunk_hec_address(), - host_key: "host".into(), + host_key: OptionalValuePath::new("host"), indexed_fields, index: None, sourcetype: None, @@ -327,7 +327,7 @@ async fn splunk_configure_hostname() { let cx = SinkContext::default(); let config = HecLogsSinkConfig { - host_key: "roast".into(), + host_key: OptionalValuePath::new("roast"), ..config( JsonSerializerConfig::default().into(), vec!["asdf".to_string()], diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 11d3f260ed0222..595f8f2119dce9 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -39,7 +39,7 @@ pub struct HecLogsSink { pub source: Option