Skip to content

Commit

Permalink
provide a new all_metadata_fields to iterate over the metadata paths
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 15, 2023
1 parent f947957 commit d05eeab
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 36 deletions.
6 changes: 3 additions & 3 deletions benches/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -35,7 +35,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -48,7 +48,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand Down
48 changes: 47 additions & 1 deletion lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,22 @@ impl LogEvent {
}
}

pub fn all_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
/// Iterate over the event root value and return an iterator to fields and value pairs.
/// Note that this
pub fn all_event_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
self.as_map().map(util::log::all_fields)
}

pub fn all_metadata_fields(
&self,
) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
match self.metadata.value() {
Value::Object(metadata_map) => Some(metadata_map)
.map(|tree| util::log::all_fields_with_prefix(PathPrefix::Metadata, tree)),
_ => None,
}
}

/// Returns an iterator of all fields if the value is an Object. Otherwise,
/// a single field is returned with a "message" key
pub fn convert_to_fields(&self) -> impl Iterator<Item = (String, &Value)> + Serialize {
Expand Down Expand Up @@ -1091,4 +1103,38 @@ mod test {

vector_common::assert_event_data_eq!(merged, expected);
}

#[test]
fn event_fields_iter() {
let mut log = LogEvent::default();
log.insert("a", 0);
log.insert("a.b", 1);
log.insert("c", 2);
let actual: Vec<(String, Value)> = log
.all_event_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())]
);
}

#[test]
fn metadata_fields_iter() {
let mut log = LogEvent::default();
log.insert("%a", 0);
log.insert("%a.b", 1);
log.insert("%c", 2);
let actual: Vec<(String, Value)> = log
.all_metadata_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())]
);
}
}
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn event_iteration() {
log.insert("Pitbull", "The bigger they are, the harder they fall");

let all = log
.all_fields()
.all_event_fields()
.unwrap()
.map(|(k, v)| (k, v.to_string_lossy()))
.collect::<HashSet<_>>();
Expand All @@ -39,7 +39,7 @@ fn event_iteration_order() {
log.insert("o9amkaRY", Value::from("pGsfG7Nr"));
log.insert("YRjhxXcg", Value::from("nw8iM5Jr"));

let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn serialization() {
"timestamp": event.get(log_schema().timestamp_key().unwrap().to_string().as_str()),
});

let actual_all = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let actual_all = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(expected_all, actual_all);

let rfc3339_re = Regex::new(r"\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z\z").unwrap();
Expand All @@ -90,7 +90,7 @@ fn type_serialization() {
event.insert("bool", true);
event.insert("string", "thisisastring");

let map = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let map = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(map["float"], json!(5.5));
assert_eq!(map["int"], json!(4));
assert_eq!(map["bool"], json!(true));
Expand Down
33 changes: 32 additions & 1 deletion lib/vector-core/src/event/util/log/all_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use serde::{Serialize, Serializer};
use vrl::path::PathPrefix;

use super::Value;

Expand All @@ -14,6 +15,15 @@ pub fn all_fields(fields: &BTreeMap<String, Value>) -> FieldsIter {
FieldsIter::new(fields)
}

/// Same functionality as `all_fields` but it prepends a character that denotes the
/// path type.
pub fn all_fields_with_prefix(
path_prefix: PathPrefix,
fields: &BTreeMap<String, Value>,
) -> FieldsIter {
FieldsIter::new_with_prefix(path_prefix, fields)
}

/// An iterator with a single "message" element
pub fn all_fields_non_object_root(value: &Value) -> FieldsIter {
FieldsIter::non_object(value)
Expand All @@ -37,6 +47,8 @@ enum PathComponent<'a> {
/// If a key maps to an empty collection, the key and the empty collection will be returned.
#[derive(Clone)]
pub struct FieldsIter<'a> {
/// If specified, this will be prepended to each path.
path_prefix: Option<PathPrefix>,
/// Stack of iterators used for the depth-first traversal.
stack: Vec<LeafIter<'a>>,
/// Path components from the root up to the top of the stack.
Expand All @@ -46,6 +58,18 @@ pub struct FieldsIter<'a> {
impl<'a> FieldsIter<'a> {
fn new(fields: &'a BTreeMap<String, Value>) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
}

fn new_with_prefix(
path_prefix: PathPrefix,
fields: &'a BTreeMap<String, Value>,
) -> FieldsIter<'a> {
FieldsIter {
path_prefix: Some(path_prefix),
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
Expand All @@ -55,6 +79,7 @@ impl<'a> FieldsIter<'a> {
/// will be treated as an object with a single "message" key
fn non_object(value: &'a Value) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Root((value, false))],
path: vec![],
}
Expand Down Expand Up @@ -82,7 +107,13 @@ impl<'a> FieldsIter<'a> {
}

fn make_path(&mut self, component: PathComponent<'a>) -> String {
let mut res = String::new();
let mut res = match self.path_prefix {
None => String::new(),
Some(prefix) => match prefix {
PathPrefix::Event => String::from("."),
PathPrefix::Metadata => String::from("%"),
},
};
let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable();
loop {
match path_iter.next() {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/util/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod all_fields;
mod keys;

pub use all_fields::{all_fields, all_fields_non_object_root};
pub use all_fields::{all_fields, all_fields_non_object_root, all_fields_with_prefix};
pub use keys::keys;

pub(self) use super::Value;
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Transformer {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_fields().expect("must be an object") {
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn azure_blob_insert_json_into_blob() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn azure_blob_insert_json_into_blob_gzip() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ mod integration_tests {
for i in 0..input.len() {
let data = messages[i].message.decode_data();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_log().all_fields().unwrap()).unwrap();
let expected =
serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
assert_eq!(data, expected);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod integration_tests {
}

for event in events {
let json = serde_json::to_value(event.as_log().all_fields().unwrap()).unwrap();
let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
match query_event {
"query" => {
if json["messageType"] == json!("ClientQuery") {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ mod tests {
assert!(log.get(PID_KEY).is_some());
assert!(log.get_timestamp().is_some());

assert_eq!(8, log.all_fields().unwrap().count());
assert_eq!(8, log.all_event_fields().unwrap().count());
} else {
panic!("Expected to receive a linux event");
}
Expand Down
41 changes: 27 additions & 14 deletions src/transforms/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use lookup::lookup_v2::ConfigTargetPath;
use lru::LruCache;
use vector_config::configurable_component;
use vector_core::config::{clone_input_definitions, LogNamespace};
use vrl::path::OwnedTargetPath;

use crate::{
config::{
Expand Down Expand Up @@ -256,11 +257,17 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
FieldMatchConfig::IgnoreFields(fields) => {
let mut entry = Vec::new();

if let Some(all_fields) = event.as_log().all_fields() {
for (field_name, value) in all_fields {
if let Ok(path) = ConfigTargetPath::try_from(field_name) {
if !fields.contains(&path) {
entry.push((path, type_id_for_value(value), value.coerce_to_bytes()));
if let Some(event_fields) = event.as_log().all_event_fields() {
if let Some(metadata_fields) = event.as_log().all_metadata_fields() {
for (field_name, value) in event_fields.chain(metadata_fields) {
if let Ok(path) = ConfigTargetPath::try_from(field_name) {
if !fields.contains(&path) {
entry.push((
path.0,
type_id_for_value(value),
value.coerce_to_bytes(),
));
}
}
}
}
Expand Down Expand Up @@ -340,34 +347,40 @@ mod tests {
#[tokio::test]
async fn dedupe_match_basic() {
let transform_config = make_match_transform_config(5, vec!["matched".into()]);
basic(transform_config).await;
basic(transform_config, "matched", "unmatched").await;
}

#[tokio::test]
async fn dedupe_ignore_basic() {
let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
basic(transform_config).await;
basic(transform_config, "matched", "unmatched").await;
}

async fn basic(transform_config: DedupeConfig) {
#[tokio::test]
async fn dedupe_ignore_with_metadata_field() {
let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
basic(transform_config, "matched", "%ignored").await;
}

async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
assert_transform_compliance(async {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;

let mut event1 = Event::Log(LogEvent::from("message"));
event1.as_mut_log().insert("matched", "some value");
event1.as_mut_log().insert("unmatched", "another value");
event1.as_mut_log().insert(first_path, "some value");
event1.as_mut_log().insert(second_path, "another value");

// Test that unmatched field isn't considered
let mut event2 = Event::Log(LogEvent::from("message"));
event2.as_mut_log().insert("matched", "some value2");
event2.as_mut_log().insert("unmatched", "another value");
event2.as_mut_log().insert(first_path, "some value2");
event2.as_mut_log().insert(second_path, "another value");

// Test that matched field is considered
let mut event3 = Event::Log(LogEvent::from("message"));
event3.as_mut_log().insert("matched", "some value");
event3.as_mut_log().insert("unmatched", "another value2");
event3.as_mut_log().insert(first_path, "some value");
event3.as_mut_log().insert(second_path, "another value2");

// First event should always be passed through as-is.
tx.send(event1.clone()).await.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions src/transforms/metric_to_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(counter).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down Expand Up @@ -453,7 +453,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(gauge).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down Expand Up @@ -483,7 +483,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(set).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down Expand Up @@ -515,7 +515,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(distro).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(histo).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down Expand Up @@ -615,7 +615,7 @@ mod tests {
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));

let log = do_transform(summary).await.unwrap();
let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();

assert_eq!(
collected,
Expand Down

0 comments on commit d05eeab

Please sign in to comment.