diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 92507c9b1998a..8b5cfefcf57ae 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -2,23 +2,23 @@ use std::{collections::HashMap, net::SocketAddr}; use bytes::{Bytes, BytesMut}; use chrono::Utc; +use http::{StatusCode, Uri}; +use http_serde; +use tokio_util::codec::Decoder as _; +use vrl::value::{kind::Collection, Kind}; +use warp::http::{HeaderMap, HeaderValue}; + use codecs::{ decoding::{DeserializerConfig, FramingConfig}, BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig, NewlineDelimitedDecoderConfig, }; - -use http::{StatusCode, Uri}; -use http_serde; use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; -use tokio_util::codec::Decoder as _; use vector_config::configurable_component; use vector_core::{ config::{DataType, LegacyKey, LogNamespace}, schema::Definition, }; -use vrl::value::{kind::Collection, Kind}; -use warp::http::{HeaderMap, HeaderValue}; use crate::{ codecs::{Decoder, DecodingConfig}, @@ -385,7 +385,8 @@ struct SimpleHttpSource { } impl HttpSource for SimpleHttpSource { - /// Enriches the passed in events with metadata for the `request_path` and for each of the headers. + /// Enriches the log events with metadata for the `request_path` and for each of the headers. + /// Non-log events are skipped. fn enrich_events( &self, events: &mut [Event], @@ -393,29 +394,41 @@ impl HttpSource for SimpleHttpSource { headers_config: &HeaderMap, query_parameters: &HashMap, ) { + let now = Utc::now(); for event in events.iter_mut() { - let log = event.as_mut_log(); - - // add request_path to each event - self.log_namespace.insert_source_metadata( - SimpleHttpConfig::NAME, - log, - self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty), - path!("path"), - request_path.to_owned(), - ); - - // add each header to each event - for header_name in &self.headers { - let value = headers_config.get(header_name).map(HeaderValue::as_bytes); + match event { + Event::Log(log) => { + // add request_path to each event + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty), + path!("path"), + request_path.to_owned(), + ); - self.log_namespace.insert_source_metadata( - SimpleHttpConfig::NAME, - log, - Some(LegacyKey::InsertIfEmpty(path!(header_name))), - path!("headers", header_name), - Value::from(value.map(Bytes::copy_from_slice)), - ); + // add each header to each event + for header_name in &self.headers { + let value = headers_config.get(header_name).map(HeaderValue::as_bytes); + + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + Some(LegacyKey::InsertIfEmpty(path!(header_name))), + path!("headers", header_name), + Value::from(value.map(Bytes::copy_from_slice)), + ); + } + + self.log_namespace.insert_standard_vector_source_metadata( + log, + SimpleHttpConfig::NAME, + now, + ); + } + _ => { + continue; + } } } @@ -426,17 +439,6 @@ impl HttpSource for SimpleHttpSource { self.log_namespace, SimpleHttpConfig::NAME, ); - - let now = Utc::now(); - for event in events { - let log = event.as_mut_log(); - - self.log_namespace.insert_standard_vector_source_metadata( - log, - SimpleHttpConfig::NAME, - now, - ); - } } fn build_events( @@ -474,29 +476,29 @@ impl HttpSource for SimpleHttpSource { #[cfg(test)] mod tests { - use lookup::{event_path, owned_value_path, OwnedTargetPath}; use std::str::FromStr; use std::{collections::BTreeMap, io::Write, net::SocketAddr}; - use vector_core::config::LogNamespace; - use vector_core::event::LogEvent; - use vector_core::schema::Definition; - use vrl::value::kind::Collection; - use vrl::value::Kind; - use codecs::{ - decoding::{DeserializerConfig, FramingConfig}, - BytesDecoderConfig, JsonDeserializerConfig, - }; use flate2::{ write::{GzEncoder, ZlibEncoder}, Compression, }; use futures::Stream; use http::{HeaderMap, Method, StatusCode}; - use lookup::lookup_v2::OptionalValuePath; use similar_asserts::assert_eq; + use vrl::value::kind::Collection; + use vrl::value::Kind; + + use codecs::{ + decoding::{DeserializerConfig, FramingConfig}, + BytesDecoderConfig, JsonDeserializerConfig, + }; + use lookup::lookup_v2::OptionalValuePath; + use lookup::{event_path, owned_value_path, OwnedTargetPath}; + use vector_core::config::LogNamespace; + use vector_core::event::LogEvent; + use vector_core::schema::Definition; - use super::{remove_duplicates, SimpleHttpConfig}; use crate::sources::http_server::HttpMethod; use crate::{ config::{log_schema, SourceConfig, SourceContext}, @@ -508,6 +510,8 @@ mod tests { SourceSender, }; + use super::{remove_duplicates, SimpleHttpConfig}; + #[test] fn generate_config() { crate::test_util::test_generate_config::(); diff --git a/src/sources/util/http/query.rs b/src/sources/util/http/query.rs index 5e064df8fb51d..59a0bcb02c117 100644 --- a/src/sources/util/http/query.rs +++ b/src/sources/util/http/query.rs @@ -16,13 +16,15 @@ pub fn add_query_parameters( for query_parameter_name in query_parameters_config { let value = query_parameters.get(query_parameter_name); for event in events.iter_mut() { - log_namespace.insert_source_metadata( - source_name, - event.as_mut_log(), - Some(LegacyKey::Overwrite(path!(query_parameter_name))), - path!("query_parameters"), - crate::event::Value::from(value.map(String::to_owned)), - ); + if let Event::Log(log) = event { + log_namespace.insert_source_metadata( + source_name, + log, + Some(LegacyKey::Overwrite(path!(query_parameter_name))), + path!("query_parameters"), + crate::event::Value::from(value.map(String::to_owned)), + ); + } } } }