Skip to content

Commit

Permalink
feat: change dedupe config paths to ConfigTargetPath (vectordotdev#…
Browse files Browse the repository at this point in the history
…18241)

* feat: change dedupe config paths to `ConfigTargetPath`

* add test-utils feature and update test code

* Update src/transforms/dedupe.rs

Co-authored-by: Nathan Fox <fuchsnj@gmail.com>

* provide a new all_metadata_fields to iterate over the metadata paths

* fix comments

* fix metadata_keys_simple test

---------

Co-authored-by: Nathan Fox <fuchsnj@gmail.com>
  • Loading branch information
pront and fuchsnj authored Aug 16, 2023
1 parent c495939 commit 4ec6c11
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 54 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,12 @@ libc = "0.2.147"
similar-asserts = "1.4.2"
proptest = "1.2"
quickcheck = "1.0.3"
lookup = { package = "vector-lookup", path = "lib/vector-lookup", features = ["test"] }
reqwest = { version = "0.11", features = ["json"] }
tempfile = "3.6.0"
test-generator = "0.3.1"
tokio-test = "0.4.2"
tokio = { version = "1.30.0", features = ["test-util"] }
tokio-test = "0.4.2"
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
wiremock = "0.5.19"
Expand Down
6 changes: 3 additions & 3 deletions benches/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -35,7 +35,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -48,7 +48,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand Down
52 changes: 50 additions & 2 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{
};
use crate::config::LogNamespace;
use crate::config::{log_schema, telemetry};
use crate::event::util::log::{all_fields, all_metadata_fields};
use crate::{event::MaybeAsLogMut, ByteSizeOf};
use lookup::{metadata_path, path};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -420,8 +421,21 @@ impl LogEvent {
}
}

pub fn all_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
self.as_map().map(util::log::all_fields)
/// If the event root value is a map, build and return an iterator to event field and value pairs.
/// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
pub fn all_event_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
self.as_map().map(all_fields)
}

/// If the metadata root value is a map, build and return an iterator to metadata field and value pairs.
/// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
pub fn all_metadata_fields(
&self,
) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
match self.metadata.value() {
Value::Object(metadata_map) => Some(metadata_map).map(all_metadata_fields),
_ => None,
}
}

/// Returns an iterator of all fields if the value is an Object. Otherwise,
Expand Down Expand Up @@ -1091,4 +1105,38 @@ mod test {

vector_common::assert_event_data_eq!(merged, expected);
}

#[test]
fn event_fields_iter() {
let mut log = LogEvent::default();
log.insert("a", 0);
log.insert("a.b", 1);
log.insert("c", 2);
let actual: Vec<(String, Value)> = log
.all_event_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())]
);
}

#[test]
fn metadata_fields_iter() {
let mut log = LogEvent::default();
log.insert("%a", 0);
log.insert("%a.b", 1);
log.insert("%c", 2);
let actual: Vec<(String, Value)> = log
.all_metadata_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())]
);
}
}
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn event_iteration() {
log.insert("Pitbull", "The bigger they are, the harder they fall");

let all = log
.all_fields()
.all_event_fields()
.unwrap()
.map(|(k, v)| (k, v.to_string_lossy()))
.collect::<HashSet<_>>();
Expand All @@ -39,7 +39,7 @@ fn event_iteration_order() {
log.insert("o9amkaRY", Value::from("pGsfG7Nr"));
log.insert("YRjhxXcg", Value::from("nw8iM5Jr"));

let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn serialization() {
"timestamp": event.get(log_schema().timestamp_key().unwrap().to_string().as_str()),
});

let actual_all = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let actual_all = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(expected_all, actual_all);

let rfc3339_re = Regex::new(r"\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z\z").unwrap();
Expand All @@ -90,7 +90,7 @@ fn type_serialization() {
event.insert("bool", true);
event.insert("string", "thisisastring");

let map = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let map = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(map["float"], json!(5.5));
assert_eq!(map["int"], json!(4));
assert_eq!(map["bool"], json!(true));
Expand Down
51 changes: 50 additions & 1 deletion lib/vector-core/src/event/util/log/all_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use serde::{Serialize, Serializer};
use vrl::path::PathPrefix;

use super::Value;

Expand All @@ -14,6 +15,12 @@ pub fn all_fields(fields: &BTreeMap<String, Value>) -> FieldsIter {
FieldsIter::new(fields)
}

/// Same functionality as `all_fields` but it prepends a character that denotes the
/// path type.
pub fn all_metadata_fields(fields: &BTreeMap<String, Value>) -> FieldsIter {
FieldsIter::new_with_prefix(PathPrefix::Metadata, fields)
}

/// An iterator with a single "message" element
pub fn all_fields_non_object_root(value: &Value) -> FieldsIter {
FieldsIter::non_object(value)
Expand All @@ -37,15 +44,30 @@ enum PathComponent<'a> {
/// If a key maps to an empty collection, the key and the empty collection will be returned.
#[derive(Clone)]
pub struct FieldsIter<'a> {
/// If specified, this will be prepended to each path.
path_prefix: Option<PathPrefix>,
/// Stack of iterators used for the depth-first traversal.
stack: Vec<LeafIter<'a>>,
/// Path components from the root up to the top of the stack.
path: Vec<PathComponent<'a>>,
}

impl<'a> FieldsIter<'a> {
// TODO deprecate this in favor of `new_with_prefix`.
fn new(fields: &'a BTreeMap<String, Value>) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
}

fn new_with_prefix(
path_prefix: PathPrefix,
fields: &'a BTreeMap<String, Value>,
) -> FieldsIter<'a> {
FieldsIter {
path_prefix: Some(path_prefix),
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
Expand All @@ -55,6 +77,7 @@ impl<'a> FieldsIter<'a> {
/// will be treated as an object with a single "message" key
fn non_object(value: &'a Value) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Root((value, false))],
path: vec![],
}
Expand Down Expand Up @@ -82,7 +105,13 @@ impl<'a> FieldsIter<'a> {
}

fn make_path(&mut self, component: PathComponent<'a>) -> String {
let mut res = String::new();
let mut res = match self.path_prefix {
None => String::new(),
Some(prefix) => match prefix {
PathPrefix::Event => String::from("."),
PathPrefix::Metadata => String::from("%"),
},
};
let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable();
loop {
match path_iter.next() {
Expand Down Expand Up @@ -177,6 +206,26 @@ mod test {
assert_eq!(collected, expected);
}

#[test]
fn metadata_keys_simple() {
let fields = fields_from_json(json!({
"field_1": 1,
"field_0": 0,
"field_2": 2
}));
let expected: Vec<_> = vec![
("%field_0", &Value::Integer(0)),
("%field_1", &Value::Integer(1)),
("%field_2", &Value::Integer(2)),
]
.into_iter()
.map(|(k, v)| (k.into(), v))
.collect();

let collected: Vec<_> = all_metadata_fields(&fields).collect();
assert_eq!(collected, expected);
}

#[test]
fn keys_nested() {
let fields = fields_from_json(json!({
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/util/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod all_fields;
mod keys;

pub use all_fields::{all_fields, all_fields_non_object_root};
pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields};
pub use keys::keys;

pub(self) use super::Value;
Expand Down
3 changes: 3 additions & 0 deletions lib/vector-lookup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ serde = { version = "1.0.183", default-features = false, features = ["derive", "
vector-config = { path = "../vector-config" }
vector-config-macros = { path = "../vector-config-macros" }
vrl.workspace = true

[features]
test = []
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ impl<'a> TargetPath<'a> for &'a ConfigTargetPath {
&self.0.path
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigTargetPath {
fn from(path: &str) -> Self {
ConfigTargetPath::try_from(path.to_string()).unwrap()
}
}
2 changes: 1 addition & 1 deletion src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Transformer {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_fields().expect("must be an object") {
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn azure_blob_insert_json_into_blob() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn azure_blob_insert_json_into_blob_gzip() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ mod integration_tests {
for i in 0..input.len() {
let data = messages[i].message.decode_data();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_log().all_fields().unwrap()).unwrap();
let expected =
serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
assert_eq!(data, expected);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod integration_tests {
}

for event in events {
let json = serde_json::to_value(event.as_log().all_fields().unwrap()).unwrap();
let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
match query_event {
"query" => {
if json["messageType"] == json!("ClientQuery") {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ mod tests {
assert!(log.get(PID_KEY).is_some());
assert!(log.get_timestamp().is_some());

assert_eq!(8, log.all_fields().unwrap().count());
assert_eq!(8, log.all_event_fields().unwrap().count());
} else {
panic!("Expected to receive a linux event");
}
Expand Down
Loading

0 comments on commit 4ec6c11

Please sign in to comment.