From 4ec6c11a72a1312c5a79135aa649c0e26cda5da9 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 16 Aug 2023 12:25:44 -0400 Subject: [PATCH] feat: change dedupe config paths to `ConfigTargetPath` (#18241) * feat: change dedupe config paths to `ConfigTargetPath` * add test-utils feature and update test code * Update src/transforms/dedupe.rs Co-authored-by: Nathan Fox * provide a new all_metadata_fields to iterate over the metadata paths * fix comments * fix metadata_keys_simple test --------- Co-authored-by: Nathan Fox --- Cargo.toml | 3 +- benches/event.rs | 6 +- lib/vector-core/src/event/log_event.rs | 52 +++++++++++- lib/vector-core/src/event/test/mod.rs | 4 +- .../src/event/test/serialization.rs | 4 +- .../src/event/util/log/all_fields.rs | 51 +++++++++++- lib/vector-core/src/event/util/log/mod.rs | 2 +- lib/vector-lookup/Cargo.toml | 3 + lib/vector-lookup/src/lookup_v2/mod.rs | 7 ++ src/codecs/encoding/transformer.rs | 2 +- src/sinks/azure_blob/integration_tests.rs | 4 +- src/sinks/gcp/pubsub.rs | 3 +- src/sources/dnstap/mod.rs | 2 +- src/sources/exec/mod.rs | 2 +- src/transforms/dedupe.rs | 79 ++++++++++++------- src/transforms/metric_to_log.rs | 12 +-- 16 files changed, 182 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2aa359b9ceb4..83babb5661e22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -362,11 +362,12 @@ libc = "0.2.147" similar-asserts = "1.4.2" proptest = "1.2" quickcheck = "1.0.3" +lookup = { package = "vector-lookup", path = "lib/vector-lookup", features = ["test"] } reqwest = { version = "0.11", features = ["json"] } tempfile = "3.6.0" test-generator = "0.3.1" -tokio-test = "0.4.2" tokio = { version = "1.30.0", features = ["test-util"] } +tokio-test = "0.4.2" tower-test = "0.4.0" vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] } wiremock = "0.5.19" diff --git a/benches/event.rs b/benches/event.rs index 0f8322da087fd..cb48f6101add3 100644 --- a/benches/event.rs +++ b/benches/event.rs @@ -15,7 +15,7 @@ fn benchmark_event_iterate(c: &mut Criterion) { log.insert(event_path!("key3"), Bytes::from("value3")); log }, - |e| e.all_fields().unwrap().count(), + |e| e.all_event_fields().unwrap().count(), BatchSize::SmallInput, ) }); @@ -35,7 +35,7 @@ fn benchmark_event_iterate(c: &mut Criterion) { log.insert(event_path!("key3"), Bytes::from("value3")); log }, - |e| e.all_fields().unwrap().count(), + |e| e.all_event_fields().unwrap().count(), BatchSize::SmallInput, ) }); @@ -48,7 +48,7 @@ fn benchmark_event_iterate(c: &mut Criterion) { log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2")); log }, - |e| e.all_fields().unwrap().count(), + |e| e.all_event_fields().unwrap().count(), BatchSize::SmallInput, ) }); diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 112764eaf8d43..d249e5133c0d8 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -30,6 +30,7 @@ use super::{ }; use crate::config::LogNamespace; use crate::config::{log_schema, telemetry}; +use crate::event::util::log::{all_fields, all_metadata_fields}; use crate::{event::MaybeAsLogMut, ByteSizeOf}; use lookup::{metadata_path, path}; use once_cell::sync::Lazy; @@ -420,8 +421,21 @@ impl LogEvent { } } - pub fn all_fields(&self) -> Option + Serialize> { - self.as_map().map(util::log::all_fields) + /// If the event root value is a map, build and return an iterator to event field and value pairs. + /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. + pub fn all_event_fields(&self) -> Option + Serialize> { + self.as_map().map(all_fields) + } + + /// If the metadata root value is a map, build and return an iterator to metadata field and value pairs. + /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. + pub fn all_metadata_fields( + &self, + ) -> Option + Serialize> { + match self.metadata.value() { + Value::Object(metadata_map) => Some(metadata_map).map(all_metadata_fields), + _ => None, + } } /// Returns an iterator of all fields if the value is an Object. Otherwise, @@ -1091,4 +1105,38 @@ mod test { vector_common::assert_event_data_eq!(merged, expected); } + + #[test] + fn event_fields_iter() { + let mut log = LogEvent::default(); + log.insert("a", 0); + log.insert("a.b", 1); + log.insert("c", 2); + let actual: Vec<(String, Value)> = log + .all_event_fields() + .unwrap() + .map(|(s, v)| (s, v.clone())) + .collect(); + assert_eq!( + actual, + vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())] + ); + } + + #[test] + fn metadata_fields_iter() { + let mut log = LogEvent::default(); + log.insert("%a", 0); + log.insert("%a.b", 1); + log.insert("%c", 2); + let actual: Vec<(String, Value)> = log + .all_metadata_fields() + .unwrap() + .map(|(s, v)| (s, v.clone())) + .collect(); + assert_eq!( + actual, + vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())] + ); + } } diff --git a/lib/vector-core/src/event/test/mod.rs b/lib/vector-core/src/event/test/mod.rs index aeda651c6399d..d1dda8523f6ab 100644 --- a/lib/vector-core/src/event/test/mod.rs +++ b/lib/vector-core/src/event/test/mod.rs @@ -14,7 +14,7 @@ fn event_iteration() { log.insert("Pitbull", "The bigger they are, the harder they fall"); let all = log - .all_fields() + .all_event_fields() .unwrap() .map(|(k, v)| (k, v.to_string_lossy())) .collect::>(); @@ -39,7 +39,7 @@ fn event_iteration_order() { log.insert("o9amkaRY", Value::from("pGsfG7Nr")); log.insert("YRjhxXcg", Value::from("nw8iM5Jr")); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, vec![ diff --git a/lib/vector-core/src/event/test/serialization.rs b/lib/vector-core/src/event/test/serialization.rs index 07f92b5c39720..aaab559da3184 100644 --- a/lib/vector-core/src/event/test/serialization.rs +++ b/lib/vector-core/src/event/test/serialization.rs @@ -73,7 +73,7 @@ fn serialization() { "timestamp": event.get(log_schema().timestamp_key().unwrap().to_string().as_str()), }); - let actual_all = serde_json::to_value(event.all_fields().unwrap()).unwrap(); + let actual_all = serde_json::to_value(event.all_event_fields().unwrap()).unwrap(); assert_eq!(expected_all, actual_all); let rfc3339_re = Regex::new(r"\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z\z").unwrap(); @@ -90,7 +90,7 @@ fn type_serialization() { event.insert("bool", true); event.insert("string", "thisisastring"); - let map = serde_json::to_value(event.all_fields().unwrap()).unwrap(); + let map = serde_json::to_value(event.all_event_fields().unwrap()).unwrap(); assert_eq!(map["float"], json!(5.5)); assert_eq!(map["int"], json!(4)); assert_eq!(map["bool"], json!(true)); diff --git a/lib/vector-core/src/event/util/log/all_fields.rs b/lib/vector-core/src/event/util/log/all_fields.rs index c7d4cadbbb98a..9cc9cc2043b31 100644 --- a/lib/vector-core/src/event/util/log/all_fields.rs +++ b/lib/vector-core/src/event/util/log/all_fields.rs @@ -5,6 +5,7 @@ use std::{ }; use serde::{Serialize, Serializer}; +use vrl::path::PathPrefix; use super::Value; @@ -14,6 +15,12 @@ pub fn all_fields(fields: &BTreeMap) -> FieldsIter { FieldsIter::new(fields) } +/// Same functionality as `all_fields` but it prepends a character that denotes the +/// path type. +pub fn all_metadata_fields(fields: &BTreeMap) -> FieldsIter { + FieldsIter::new_with_prefix(PathPrefix::Metadata, fields) +} + /// An iterator with a single "message" element pub fn all_fields_non_object_root(value: &Value) -> FieldsIter { FieldsIter::non_object(value) @@ -37,6 +44,8 @@ enum PathComponent<'a> { /// If a key maps to an empty collection, the key and the empty collection will be returned. #[derive(Clone)] pub struct FieldsIter<'a> { + /// If specified, this will be prepended to each path. + path_prefix: Option, /// Stack of iterators used for the depth-first traversal. stack: Vec>, /// Path components from the root up to the top of the stack. @@ -44,8 +53,21 @@ pub struct FieldsIter<'a> { } impl<'a> FieldsIter<'a> { + // TODO deprecate this in favor of `new_with_prefix`. fn new(fields: &'a BTreeMap) -> FieldsIter<'a> { FieldsIter { + path_prefix: None, + stack: vec![LeafIter::Map(fields.iter())], + path: vec![], + } + } + + fn new_with_prefix( + path_prefix: PathPrefix, + fields: &'a BTreeMap, + ) -> FieldsIter<'a> { + FieldsIter { + path_prefix: Some(path_prefix), stack: vec![LeafIter::Map(fields.iter())], path: vec![], } @@ -55,6 +77,7 @@ impl<'a> FieldsIter<'a> { /// will be treated as an object with a single "message" key fn non_object(value: &'a Value) -> FieldsIter<'a> { FieldsIter { + path_prefix: None, stack: vec![LeafIter::Root((value, false))], path: vec![], } @@ -82,7 +105,13 @@ impl<'a> FieldsIter<'a> { } fn make_path(&mut self, component: PathComponent<'a>) -> String { - let mut res = String::new(); + let mut res = match self.path_prefix { + None => String::new(), + Some(prefix) => match prefix { + PathPrefix::Event => String::from("."), + PathPrefix::Metadata => String::from("%"), + }, + }; let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable(); loop { match path_iter.next() { @@ -177,6 +206,26 @@ mod test { assert_eq!(collected, expected); } + #[test] + fn metadata_keys_simple() { + let fields = fields_from_json(json!({ + "field_1": 1, + "field_0": 0, + "field_2": 2 + })); + let expected: Vec<_> = vec![ + ("%field_0", &Value::Integer(0)), + ("%field_1", &Value::Integer(1)), + ("%field_2", &Value::Integer(2)), + ] + .into_iter() + .map(|(k, v)| (k.into(), v)) + .collect(); + + let collected: Vec<_> = all_metadata_fields(&fields).collect(); + assert_eq!(collected, expected); + } + #[test] fn keys_nested() { let fields = fields_from_json(json!({ diff --git a/lib/vector-core/src/event/util/log/mod.rs b/lib/vector-core/src/event/util/log/mod.rs index 4aeffc3c67041..87bfbc7231fe8 100644 --- a/lib/vector-core/src/event/util/log/mod.rs +++ b/lib/vector-core/src/event/util/log/mod.rs @@ -1,7 +1,7 @@ mod all_fields; mod keys; -pub use all_fields::{all_fields, all_fields_non_object_root}; +pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields}; pub use keys::keys; pub(self) use super::Value; diff --git a/lib/vector-lookup/Cargo.toml b/lib/vector-lookup/Cargo.toml index 986f0317a31d6..9d4d0db3965a2 100644 --- a/lib/vector-lookup/Cargo.toml +++ b/lib/vector-lookup/Cargo.toml @@ -11,3 +11,6 @@ serde = { version = "1.0.183", default-features = false, features = ["derive", " vector-config = { path = "../vector-config" } vector-config-macros = { path = "../vector-config-macros" } vrl.workspace = true + +[features] +test = [] diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index b8c61d83fc137..d066eb596e3e2 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -70,3 +70,10 @@ impl<'a> TargetPath<'a> for &'a ConfigTargetPath { &self.0.path } } + +#[cfg(any(test, feature = "test"))] +impl From<&str> for ConfigTargetPath { + fn from(path: &str) -> Self { + ConfigTargetPath::try_from(path.to_string()).unwrap() + } +} diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 425a0e22980d3..7a7be839d904f 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -185,7 +185,7 @@ impl Transformer { TimestampFormat::Unix => { if log.value().is_object() { let mut unix_timestamps = Vec::new(); - for (k, v) in log.all_fields().expect("must be an object") { + for (k, v) in log.all_event_fields().expect("must be an object") { if let Value::Timestamp(ts) = v { unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp()))); } diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 9cddbef79f949..352e6ce313bdb 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -118,7 +118,7 @@ async fn azure_blob_insert_json_into_blob() { ); let expected = events .iter() - .map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap()) + .map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap()) .collect::>(); assert_eq!(expected, blob_lines); } @@ -179,7 +179,7 @@ async fn azure_blob_insert_json_into_blob_gzip() { ); let expected = events .iter() - .map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap()) + .map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap()) .collect::>(); assert_eq!(expected, blob_lines); } diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 5d2dc458d499a..c7c562999d1e9 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -329,7 +329,8 @@ mod integration_tests { for i in 0..input.len() { let data = messages[i].message.decode_data(); let data = serde_json::to_value(data).unwrap(); - let expected = serde_json::to_value(input[i].as_log().all_fields().unwrap()).unwrap(); + let expected = + serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap(); assert_eq!(data, expected); } } diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index dbfde6c95a9cd..c76fbc6f01450 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -558,7 +558,7 @@ mod integration_tests { } for event in events { - let json = serde_json::to_value(event.as_log().all_fields().unwrap()).unwrap(); + let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap(); match query_event { "query" => { if json["messageType"] == json!("ClientQuery") { diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 33d51180c781f..0797752bf2049 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -1135,7 +1135,7 @@ mod tests { assert!(log.get(PID_KEY).is_some()); assert!(log.get_timestamp().is_some()); - assert_eq!(8, log.all_fields().unwrap().count()); + assert_eq!(8, log.all_event_fields().unwrap().count()); } else { panic!("Expected to receive a linux event"); } diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 0e88626cb2b25..f0e31cd77f5b1 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -2,9 +2,11 @@ use std::{future::ready, num::NonZeroUsize, pin::Pin}; use bytes::Bytes; use futures::{Stream, StreamExt}; +use lookup::lookup_v2::ConfigTargetPath; use lru::LruCache; use vector_config::configurable_component; use vector_core::config::{clone_input_definitions, LogNamespace}; +use vrl::path::OwnedTargetPath; use crate::{ config::{ @@ -43,7 +45,7 @@ pub enum FieldMatchConfig { docs::examples = "field1", docs::examples = "parent.child_field" ))] - Vec, + Vec, ), /// Matches events using all fields except for the ignored ones. @@ -55,7 +57,7 @@ pub enum FieldMatchConfig { docs::examples = "host", docs::examples = "hostname" ))] - Vec, + Vec, ), } @@ -102,16 +104,16 @@ fn default_cache_config() -> CacheConfig { // These aren't great defaults in that case, but hard-coding isn't much better since the // structure can vary significantly. This should probably either become a required field // in the future, or maybe the "semantic meaning" can be utilized here. -fn default_match_fields() -> Vec { +fn default_match_fields() -> Vec { let mut fields = Vec::new(); - if let Some(message_key) = log_schema().message_key() { - fields.push(message_key.to_string()); + if let Some(message_key) = log_schema().message_key_target_path() { + fields.push(ConfigTargetPath(message_key.clone())); } - if let Some(host_key) = log_schema().host_key() { - fields.push(host_key.to_string()); + if let Some(host_key) = log_schema().host_key_target_path() { + fields.push(ConfigTargetPath(host_key.clone())); } - if let Some(timestamp_key) = log_schema().timestamp_key() { - fields.push(timestamp_key.to_string()); + if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { + fields.push(ConfigTargetPath(timestamp_key.clone())); } fields } @@ -197,7 +199,7 @@ type TypeId = u8; #[derive(PartialEq, Eq, Hash)] enum CacheEntry { Match(Vec>), - Ignore(Vec<(String, TypeId, Bytes)>), + Ignore(Vec<(OwnedTargetPath, TypeId, Bytes)>), } /// Assigns a unique number to each of the types supported by Event::Value. @@ -244,7 +246,7 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry { FieldMatchConfig::MatchFields(fields) => { let mut entry = Vec::new(); for field_name in fields.iter() { - if let Some(value) = event.as_log().get(field_name.as_str()) { + if let Some(value) = event.as_log().get(field_name) { entry.push(Some((type_id_for_value(value), value.coerce_to_bytes()))); } else { entry.push(None); @@ -255,14 +257,18 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry { FieldMatchConfig::IgnoreFields(fields) => { let mut entry = Vec::new(); - if let Some(all_fields) = event.as_log().all_fields() { - for (field_name, value) in all_fields { - if !fields.contains(&field_name) { - entry.push(( - field_name, - type_id_for_value(value), - value.coerce_to_bytes(), - )); + if let Some(event_fields) = event.as_log().all_event_fields() { + if let Some(metadata_fields) = event.as_log().all_metadata_fields() { + for (field_name, value) in event_fields.chain(metadata_fields) { + if let Ok(path) = ConfigTargetPath::try_from(field_name) { + if !fields.contains(&path) { + entry.push(( + path.0, + type_id_for_value(value), + value.coerce_to_bytes(), + )); + } + } } } } @@ -289,6 +295,7 @@ impl TaskTransform for Dedupe { mod tests { use std::{collections::BTreeMap, sync::Arc}; + use lookup::lookup_v2::ConfigTargetPath; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; @@ -309,7 +316,10 @@ mod tests { crate::test_util::test_generate_config::(); } - fn make_match_transform_config(num_events: usize, fields: Vec) -> DedupeConfig { + fn make_match_transform_config( + num_events: usize, + fields: Vec, + ) -> DedupeConfig { DedupeConfig { cache: CacheConfig { num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"), @@ -318,7 +328,10 @@ mod tests { } } - fn make_ignore_transform_config(num_events: usize, given_fields: Vec) -> DedupeConfig { + fn make_ignore_transform_config( + num_events: usize, + given_fields: Vec, + ) -> DedupeConfig { // "message" and "timestamp" are added automatically to all Events let mut fields = vec!["message".into(), "timestamp".into()]; fields.extend(given_fields); @@ -334,34 +347,40 @@ mod tests { #[tokio::test] async fn dedupe_match_basic() { let transform_config = make_match_transform_config(5, vec!["matched".into()]); - basic(transform_config).await; + basic(transform_config, "matched", "unmatched").await; } #[tokio::test] async fn dedupe_ignore_basic() { let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]); - basic(transform_config).await; + basic(transform_config, "matched", "unmatched").await; } - async fn basic(transform_config: DedupeConfig) { + #[tokio::test] + async fn dedupe_ignore_with_metadata_field() { + let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]); + basic(transform_config, "matched", "%ignored").await; + } + + async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) { assert_transform_compliance(async { let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), transform_config).await; let mut event1 = Event::Log(LogEvent::from("message")); - event1.as_mut_log().insert("matched", "some value"); - event1.as_mut_log().insert("unmatched", "another value"); + event1.as_mut_log().insert(first_path, "some value"); + event1.as_mut_log().insert(second_path, "another value"); // Test that unmatched field isn't considered let mut event2 = Event::Log(LogEvent::from("message")); - event2.as_mut_log().insert("matched", "some value2"); - event2.as_mut_log().insert("unmatched", "another value"); + event2.as_mut_log().insert(first_path, "some value2"); + event2.as_mut_log().insert(second_path, "another value"); // Test that matched field is considered let mut event3 = Event::Log(LogEvent::from("message")); - event3.as_mut_log().insert("matched", "some value"); - event3.as_mut_log().insert("unmatched", "another value2"); + event3.as_mut_log().insert(first_path, "some value"); + event3.as_mut_log().insert(second_path, "another value2"); // First event should always be passed through as-is. tx.send(event1.clone()).await.unwrap(); diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index ed6b6d8c8abcd..3748f6bac5c48 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -423,7 +423,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, @@ -453,7 +453,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, @@ -483,7 +483,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, @@ -515,7 +515,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, @@ -566,7 +566,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected, @@ -615,7 +615,7 @@ mod tests { metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); - let collected: Vec<_> = log.all_fields().unwrap().collect(); + let collected: Vec<_> = log.all_event_fields().unwrap().collect(); assert_eq!( collected,