From 7458356df1333a09c3b36c546dd08fccbb3ce134 Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 09:03:33 -0700 Subject: [PATCH 01/19] Instrument eventlog Signed-off-by: Ana Hobden --- src/event/log_event.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/event/log_event.rs b/src/event/log_event.rs index 48aae250a4b9b..5ff6590520c1a 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -4,6 +4,7 @@ use std::{ collections::{btree_map::Entry, BTreeMap, HashMap}, convert::{TryFrom, TryInto}, iter::FromIterator, + fmt::{Debug, Display}, }; use string_cache::DefaultAtom; @@ -13,70 +14,83 @@ pub struct LogEvent { } impl LogEvent { + #[instrument(skip(self, key), fields(key = %key))] pub fn get(&self, key: &DefaultAtom) -> Option<&Value> { util::log::get(&self.fields, key) } + #[instrument(skip(self, key), fields(key = %key.as_ref()))] pub fn get_flat(&self, key: impl AsRef) -> Option<&Value> { self.fields.get(key.as_ref()) } + #[instrument(skip(self, key), fields(key = %key))] pub fn get_mut(&mut self, key: &DefaultAtom) -> Option<&mut Value> { util::log::get_mut(&mut self.fields, key) } + #[instrument(skip(self, key), fields(key = %key.as_ref()))] pub fn contains(&self, key: impl AsRef) -> bool { util::log::contains(&self.fields, key.as_ref()) } + #[instrument(skip(self, key), fields(key = %key.as_ref()))] pub fn insert(&mut self, key: K, value: V) -> Option where K: AsRef, - V: Into, + V: Into + Debug, { util::log::insert(&mut self.fields, key.as_ref(), value.into()) } + #[instrument(skip(self, key), fields(key = ?key))] pub fn insert_path(&mut self, key: Vec, value: V) -> Option where - V: Into, + V: Into + Debug, { util::log::insert_path(&mut self.fields, key, value.into()) } + #[instrument(skip(self, key), fields(key = %key))] pub fn insert_flat(&mut self, key: K, value: V) where - K: Into, - V: Into, + K: Into + Display, + V: Into + Debug, { self.fields.insert(key.into(), value.into()); } + #[instrument(skip(self, key), fields(key = %key))] pub fn try_insert(&mut self, key: &DefaultAtom, value: V) where - V: Into, + V: Into + Debug, { if !self.contains(key) { self.insert(key.clone(), value); } } + #[instrument(skip(self, key), fields(key = %key))] pub fn remove(&mut self, key: &DefaultAtom) -> Option { util::log::remove(&mut self.fields, &key, false) } + #[instrument(skip(self, key), fields(key = %key.as_ref()))] pub fn remove_prune(&mut self, key: impl AsRef, prune: bool) -> Option { util::log::remove(&mut self.fields, key.as_ref(), prune) } + #[instrument(skip(self))] pub fn keys<'a>(&'a self) -> impl Iterator + 'a { util::log::keys(&self.fields) } + #[instrument(skip(self))] pub fn all_fields(&self) -> impl Iterator + Serialize { util::log::all_fields(&self.fields) } + #[instrument(skip(self))] pub fn is_empty(&self) -> bool { self.fields.is_empty() } From 80c2878970a99b3ab066aaa910b1810475bd688e Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 10:32:55 -0700 Subject: [PATCH 02/19] Migrate get Signed-off-by: Ana Hobden --- benches/lua.rs | 6 +-- src/event/log_event.rs | 10 ++--- src/sources/docker.rs | 22 ++++----- src/sources/file.rs | 20 ++++----- src/sources/journald.rs | 6 +-- src/sources/splunk_hec.rs | 8 ++-- src/sources/stdin.rs | 6 +-- src/transforms/add_fields.rs | 16 +++---- src/transforms/aws_ec2_metadata.rs | 50 ++++++++++----------- src/transforms/coercer.rs | 8 ++-- src/transforms/concat.rs | 6 +-- src/transforms/dedupe.rs | 30 ++++++------- src/transforms/json_parser.rs | 16 +++---- src/transforms/logfmt_parser.rs | 62 +++++++++++++------------- src/transforms/lua/v1/mod.rs | 26 +++++------ src/transforms/lua/v2/interop/event.rs | 4 +- src/transforms/lua/v2/interop/log.rs | 10 ++--- src/transforms/lua/v2/mod.rs | 24 +++++----- src/transforms/reduce/mod.rs | 36 +++++++-------- src/transforms/regex_parser.rs | 58 ++++++++++++------------ src/transforms/remove_fields.rs | 6 +-- src/transforms/rename_fields.rs | 10 ++--- src/transforms/split.rs | 30 ++++++------- src/transforms/tokenizer.rs | 30 ++++++------- 24 files changed, 250 insertions(+), 250 deletions(-) diff --git a/benches/lua.rs b/benches/lua.rs index ec9636c80ac3a..1acfe23d45f5c 100644 --- a/benches/lua.rs +++ b/benches/lua.rs @@ -17,11 +17,11 @@ fn add_fields(c: &mut Criterion) { let key = "the key"; let value = "this is the value"; - let key_atom_native = key.into(); + let key_atom_native = key; let value_bytes_native = Bytes::from(value).into(); - let key_atom_v1 = key.into(); + let key_atom_v1 = key; let value_bytes_v1 = Bytes::from(value).into(); - let key_atom_v2 = key.into(); + let key_atom_v2 = key; let value_bytes_v2 = Bytes::from(value).into(); c.bench( diff --git a/src/event/log_event.rs b/src/event/log_event.rs index 5ff6590520c1a..53540858d23ea 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -14,9 +14,9 @@ pub struct LogEvent { } impl LogEvent { - #[instrument(skip(self, key), fields(key = %key))] - pub fn get(&self, key: &DefaultAtom) -> Option<&Value> { - util::log::get(&self.fields, key) + #[instrument(skip(self, key), fields(key = %key.as_ref()))] + pub fn get(&self, key: impl AsRef) -> Option<&Value> { + util::log::get(&self.fields, key.as_ref()) } #[instrument(skip(self, key), fields(key = %key.as_ref()))] @@ -189,10 +189,10 @@ impl TryInto for LogEvent { } } -impl std::ops::Index<&DefaultAtom> for LogEvent { +impl std::ops::Index<&str> for LogEvent { type Output = Value; - fn index(&self, key: &DefaultAtom) -> &Value { + fn index(&self, key: &str) -> &Value { self.get(key) .expect(&*format!("Key is not found: {:?}", key)) } diff --git a/src/sources/docker.rs b/src/sources/docker.rs index a39fb76a79575..0d05405edfac7 100644 --- a/src/sources/docker.rs +++ b/src/sources/docker.rs @@ -973,7 +973,7 @@ fn line_agg_adapter( .remove(&Atom::from(log_schema().message_key())) .expect("message must exist in the event"); let stream_value = log_event - .get(&STREAM) + .get(&*STREAM) .expect("stream must exist in the event"); let stream = stream_value.as_bytes(); @@ -1260,11 +1260,11 @@ mod integration_tests { container_remove(&id, &docker).await; let log = events[0].as_log(); - assert_eq!(log[&Atom::from(log_schema().message_key())], message.into()); - assert_eq!(log[&super::CONTAINER], id.into()); - assert!(log.get(&super::CREATED_AT).is_some()); - assert_eq!(log[&super::IMAGE], "busybox".into()); - assert!(log.get(&format!("label.{}", label).into()).is_some()); + assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!(log[&*super::CONTAINER], id.into()); + assert!(log.get(&*super::CREATED_AT).is_some()); + assert_eq!(log[&*super::IMAGE], "busybox".into()); + assert!(log.get(format!("label.{}", label)).is_some()); assert_eq!(events[0].as_log()[&super::NAME], name.into()); assert_eq!( events[0].as_log()[&Atom::from(log_schema().source_type_key())], @@ -1363,11 +1363,11 @@ mod integration_tests { container_remove(&id, &docker).await; let log = events[0].as_log(); - assert_eq!(log[&Atom::from(log_schema().message_key())], message.into()); - assert_eq!(log[&super::CONTAINER], id.into()); - assert!(log.get(&super::CREATED_AT).is_some()); - assert_eq!(log[&super::IMAGE], "busybox".into()); - assert!(log.get(&format!("label.{}", label).into()).is_some()); + assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!(log[&*super::CONTAINER], id.into()); + assert!(log.get(&*super::CREATED_AT).is_some()); + assert_eq!(log[&*super::IMAGE], "busybox".into()); + assert!(log.get(format!("label.{}", label)).is_some()); assert_eq!(events[0].as_log()[&super::NAME], name.into()); assert_eq!( events[0].as_log()[&Atom::from(log_schema().source_type_key())], diff --git a/src/sources/file.rs b/src/sources/file.rs index b4af5dd8cc882..9f009284f8a4a 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -447,8 +447,8 @@ mod tests { let event = create_event(line, file, &host_key, &hostname, &file_key); let log = event.into_log(); - assert_eq!(log[&"file".into()], "some_file.rs".into()); - assert_eq!(log[&"host".into()], "Some.Machine".into()); + assert_eq!(log["file"], "some_file.rs".into()); + assert_eq!(log["host"], "Some.Machine".into()); assert_eq!( log[&Atom::from(log_schema().message_key())], "hello world".into() @@ -500,14 +500,14 @@ mod tests { if line.starts_with("hello") { assert_eq!(line, format!("hello {}", hello_i)); assert_eq!( - event.as_log()[&"file".into()].to_string_lossy(), + event.as_log()["file"].to_string_lossy(), path1.to_str().unwrap() ); hello_i += 1; } else { assert_eq!(line, format!("goodbye {}", goodbye_i)); assert_eq!( - event.as_log()[&"file".into()].to_string_lossy(), + event.as_log()["file"].to_string_lossy(), path2.to_str().unwrap() ); goodbye_i += 1; @@ -562,7 +562,7 @@ mod tests { for event in received { assert_eq!( - event.as_log()[&"file".into()].to_string_lossy(), + event.as_log()["file"].to_string_lossy(), path.to_str().unwrap() ); @@ -628,7 +628,7 @@ mod tests { for event in received { assert_eq!( - event.as_log()[&"file".into()].to_string_lossy(), + event.as_log()["file"].to_string_lossy(), path.to_str().unwrap() ); @@ -737,7 +737,7 @@ mod tests { .0 .unwrap(); assert_eq!( - received.as_log()[&"file".into()].to_string_lossy(), + received.as_log()["file"].to_string_lossy(), path.to_str().unwrap() ); } @@ -774,7 +774,7 @@ mod tests { .0 .unwrap(); assert_eq!( - received.as_log()[&"source".into()].to_string_lossy(), + received.as_log()["source"].to_string_lossy(), path.to_str().unwrap() ); } @@ -1047,7 +1047,7 @@ mod tests { let before_lines = received .iter() .filter(|event| { - event.as_log()[&"file".into()] + event.as_log()["file"] .to_string_lossy() .ends_with("before") }) @@ -1056,7 +1056,7 @@ mod tests { let after_lines = received .iter() .filter(|event| { - event.as_log()[&"file".into()] + event.as_log()["file"] .to_string_lossy() .ends_with("after") }) diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 681aaf4cf01e4..eb3bbc6cd8599 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -215,8 +215,8 @@ fn create_event(record: Record) -> Event { } // Translate the timestamp, and so leave both old and new names. if let Some(timestamp) = log - .get(&SOURCE_TIMESTAMP) - .or_else(|| log.get(&RECEIVED_TIMESTAMP)) + .get(&*SOURCE_TIMESTAMP) + .or_else(|| log.get(&*RECEIVED_TIMESTAMP)) { if let Value::Bytes(timestamp) = timestamp { if let Ok(timestamp) = String::from_utf8_lossy(timestamp).parse::() { @@ -766,6 +766,6 @@ mod tests { } fn priority(event: &Event) -> Value { - event.as_log()[&"PRIORITY".into()].clone() + event.as_log()["PRIORITY"].clone() } } diff --git a/src/sources/splunk_hec.rs b/src/sources/splunk_hec.rs index 531c8cf75d038..7f464ec8c0cb0 100644 --- a/src/sources/splunk_hec.rs +++ b/src/sources/splunk_hec.rs @@ -1006,14 +1006,14 @@ mod tests { sink.run(stream::once(future::ready(event))).await.unwrap(); let event = collect_n(source, 1).await.unwrap().remove(0); - assert_eq!(event.as_log()[&"greeting".into()], "hello".into()); - assert_eq!(event.as_log()[&"name".into()], "bob".into()); + assert_eq!(event.as_log()["greeting"], "hello".into()); + assert_eq!(event.as_log()["name"], "bob".into()); assert!(event .as_log() - .get(&Atom::from(log_schema().timestamp_key())) + .get(log_schema().timestamp_key()) .is_some()); assert_eq!( - event.as_log()[&Atom::from(log_schema().source_type_key())], + event.as_log()[log_schema().source_type_key()], "splunk_hec".into() ); } diff --git a/src/sources/stdin.rs b/src/sources/stdin.rs index 615f12eff7a84..c956ab04d3843 100644 --- a/src/sources/stdin.rs +++ b/src/sources/stdin.rs @@ -148,13 +148,13 @@ mod tests { let event = create_event(line, &host_key, &hostname); let log = event.into_log(); - assert_eq!(log[&"host".into()], "Some.Machine".into()); + assert_eq!(log["host"], "Some.Machine".into()); assert_eq!( - log[&Atom::from(log_schema().message_key())], + log[log_schema().message_key()], "hello world".into() ); assert_eq!( - log[&Atom::from(log_schema().source_type_key())], + log[log_schema().source_type_key()], "stdin".into() ); } diff --git a/src/transforms/add_fields.rs b/src/transforms/add_fields.rs index 2fd14d9909ea2..3591684aa76d5 100644 --- a/src/transforms/add_fields.rs +++ b/src/transforms/add_fields.rs @@ -217,13 +217,13 @@ mod tests { let event = transform.transform(event).unwrap().into_log(); tracing::error!(?event); - assert_eq!(event[&"float".into()], 4.5.into()); - assert_eq!(event[&"int".into()], 4.into()); - assert_eq!(event[&"string".into()], "thisisastring".into()); - assert_eq!(event[&"bool".into()], true.into()); - assert_eq!(event[&"array[0]".into()], 1.into()); - assert_eq!(event[&"array[1]".into()], 2.into()); - assert_eq!(event[&"array[2]".into()], 3.into()); - assert_eq!(event[&"table.key".into()], "value".into()); + assert_eq!(event["float"], 4.5.into()); + assert_eq!(event["int"], 4.into()); + assert_eq!(event["string"], "thisisastring".into()); + assert_eq!(event["bool"], true.into()); + assert_eq!(event["array[0]"], 1.into()); + assert_eq!(event["array[1]"], 2.into()); + assert_eq!(event["array[2]"], 3.into()); + assert_eq!(event["table.key"], "value".into()); } } diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index 0ae2f207c641c..0c4fdf9206a35 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -527,32 +527,32 @@ mod integration_tests { let log = event.as_log(); assert_eq!( - log.get(&"availability-zone".into()), + log.get("availability-zone"), Some(&"ww-region-1a".into()) ); - assert_eq!(log.get(&"public-ipv4".into()), Some(&"192.1.1.1".into())); + assert_eq!(log.get("public-ipv4"), Some(&"192.1.1.1".into())); assert_eq!( - log.get(&"public-hostname".into()), + log.get("public-hostname"), Some(&"mock-public-hostname".into()) ); - assert_eq!(log.get(&"local-ipv4".into()), Some(&"192.1.1.2".into())); + assert_eq!(log.get(&"local-ipv4"), Some(&"192.1.1.2".into())); assert_eq!( - log.get(&"local-hostname".into()), + log.get("local-hostname"), Some(&"mock-hostname".into()) ); assert_eq!( - log.get(&"instance-id".into()), + log.get("instance-id"), Some(&"i-096fba6d03d36d262".into()) ); assert_eq!( - log.get(&"ami-id".into()), + log.get("ami-id"), Some(&"ami-05f27d4d6770a43d2".into()) ); - assert_eq!(log.get(&"instance-type".into()), Some(&"t2.micro".into())); - assert_eq!(log.get(&"region".into()), Some(&"us-east-1".into())); - assert_eq!(log.get(&"vpc-id".into()), Some(&"mock-vpc-id".into())); - assert_eq!(log.get(&"subnet-id".into()), Some(&"mock-subnet-id".into())); - assert_eq!(log.get(&"role-name[0]".into()), Some(&"mock-user".into())); + assert_eq!(log.get("instance-type"), Some(&"t2.micro".into())); + assert_eq!(log.get("region"), Some(&"us-east-1".into())); + assert_eq!(log.get("vpc-id"), Some(&"mock-vpc-id".into())); + assert_eq!(log.get("subnet-id"), Some(&"mock-subnet-id".into())); + assert_eq!(log.get("role-name[0]"), Some(&"mock-user".into())); } #[tokio::test] @@ -572,15 +572,15 @@ mod integration_tests { let event = transform.transform(event).unwrap(); let log = event.as_log(); - assert_eq!(log.get(&"availability-zone".into()), None); - assert_eq!(log.get(&"public-ipv4".into()), Some(&"192.1.1.1".into())); - assert_eq!(log.get(&"public-hostname".into()), None); - assert_eq!(log.get(&"local-ipv4".into()), None); - assert_eq!(log.get(&"local-hostname".into()), None); - assert_eq!(log.get(&"instance-id".into()), None,); - assert_eq!(log.get(&"instance-type".into()), None,); - assert_eq!(log.get(&"ami-id".into()), None); - assert_eq!(log.get(&"region".into()), Some(&"us-east-1".into())); + assert_eq!(log.get("availability-zone"), None); + assert_eq!(log.get("public-ipv4"), Some(&"192.1.1.1".into())); + assert_eq!(log.get("public-hostname"), None); + assert_eq!(log.get("local-ipv4"), None); + assert_eq!(log.get("local-hostname"), None); + assert_eq!(log.get("instance-id"), None,); + assert_eq!(log.get("instance-type"), None,); + assert_eq!(log.get("ami-id"), None); + assert_eq!(log.get("region"), Some(&"us-east-1".into())); } #[tokio::test] @@ -601,11 +601,11 @@ mod integration_tests { let log = event.as_log(); assert_eq!( - log.get(&"ec2.metadata.availability-zone".into()), + log.get("ec2.metadata.availability-zone"), Some(&"ww-region-1a".into()) ); assert_eq!( - log.get(&"ec2.metadata.public-ipv4".into()), + log.get("ec2.metadata.public-ipv4"), Some(&"192.1.1.1".into()) ); @@ -626,9 +626,9 @@ mod integration_tests { let log = event.as_log(); assert_eq!( - log.get(&"availability-zone".into()), + log.get("availability-zone"), Some(&"ww-region-1a".into()) ); - assert_eq!(log.get(&"public-ipv4".into()), Some(&"192.1.1.1".into())); + assert_eq!(log.get("public-ipv4"), Some(&"192.1.1.1".into())); } } diff --git a/src/transforms/coercer.rs b/src/transforms/coercer.rs index 9881d78efafba..c4e67f9dbb32a 100644 --- a/src/transforms/coercer.rs +++ b/src/transforms/coercer.rs @@ -136,20 +136,20 @@ mod tests { #[tokio::test] async fn converts_valid_fields() { let log = parse_it("").await; - assert_eq!(log[&"number".into()], Value::Integer(1234)); - assert_eq!(log[&"bool".into()], Value::Boolean(true)); + assert_eq!(log["number"], Value::Integer(1234)); + assert_eq!(log["bool"], Value::Boolean(true)); } #[tokio::test] async fn leaves_unnamed_fields_as_is() { let log = parse_it("").await; - assert_eq!(log[&"other".into()], Value::Bytes("no".into())); + assert_eq!(log["other"], Value::Bytes("no".into())); } #[tokio::test] async fn drops_nonconvertible_fields() { let log = parse_it("").await; - assert!(log.get(&"float".into()).is_none()); + assert!(log.get("float").is_none()); } #[tokio::test] diff --git a/src/transforms/concat.rs b/src/transforms/concat.rs index 8f343b5c88c95..f456b836dac54 100644 --- a/src/transforms/concat.rs +++ b/src/transforms/concat.rs @@ -219,7 +219,7 @@ mod tests { ); let new_event = transform.transform(event).unwrap(); - assert_eq!(new_event.as_log()[&"out".into()], "Hello users".into()); + assert_eq!(new_event.as_log()["out"], "Hello users".into()); } #[test] @@ -238,7 +238,7 @@ mod tests { ); let new_event = transform.transform(event).unwrap(); - assert_eq!(new_event.as_log()[&"out".into()], "Hello World".into()); + assert_eq!(new_event.as_log()["out"], "Hello World".into()); } #[test] fn concat_mixed() { @@ -259,7 +259,7 @@ mod tests { ); let new_event = transform.transform(event).unwrap(); - assert_eq!(new_event.as_log()[&"out".into()], "W o r l d".into()); + assert_eq!(new_event.as_log()["out"], "W o r l d".into()); } #[test] diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 9a25f98ae3813..55a656f039479 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -255,12 +255,12 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "some value".into()); + assert_eq!(new_event.as_log()["matched"], "some value".into()); // Second event differs in matched field so should be outputted even though it // has the same value for unmatched field. let new_event = transform.transform(event2).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "some value2".into()); + assert_eq!(new_event.as_log()["matched"], "some value2".into()); // Third event has the same value for "matched" as first event, so it should be dropped. assert_eq!(None, transform.transform(event3)); @@ -287,12 +287,12 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched1".into()], "some value".into()); + assert_eq!(new_event.as_log()["matched1"], "some value".into()); // Second event has a different matched field name with the same value, so it should not be // considered a dupe let new_event = transform.transform(event2).unwrap(); - assert_eq!(new_event.as_log()[&"matched2".into()], "some value".into()); + assert_eq!(new_event.as_log()["matched2"], "some value".into()); } #[test] @@ -321,8 +321,8 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched1".into()], "value1".into()); - assert_eq!(new_event.as_log()[&"matched2".into()], "value2".into()); + assert_eq!(new_event.as_log()["matched1"], "value1".into()); + assert_eq!(new_event.as_log()["matched2"], "value2".into()); // Second event is the same just with different field order, so it shouldn't be outputted. assert_eq!(None, transform.transform(event2)); @@ -355,17 +355,17 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "some value".into()); + assert_eq!(new_event.as_log()["matched"], "some value".into()); // Second event gets outputted because it's not a dupe. This causes the first // Event to be evicted from the cache. let new_event = transform.transform(event2).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "some value2".into()); + assert_eq!(new_event.as_log()["matched"], "some value2".into()); // Third event is a dupe but gets outputted anyway because the first event has aged // out of the cache. let new_event = transform.transform(event3).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "some value".into()); + assert_eq!(new_event.as_log()["matched"], "some value".into()); } #[test] @@ -391,12 +391,12 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], "123".into()); + assert_eq!(new_event.as_log()["matched"], "123".into()); // Second event should also get passed through even though the string representations of // "matched" are the same. let new_event = transform.transform(event2).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], 123.into()); + assert_eq!(new_event.as_log()["matched"], 123.into()); } #[test] @@ -426,7 +426,7 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - let res_value = new_event.as_log()[&"matched".into()].clone(); + let res_value = new_event.as_log()["matched"].clone(); if let Value::Map(map) = res_value { assert_eq!(map.get("key").unwrap(), &Value::from("123")); } @@ -434,7 +434,7 @@ mod tests { // Second event should also get passed through even though the string representations of // "matched" are the same. let new_event = transform.transform(event2).unwrap(); - let res_value = new_event.as_log()[&"matched".into()].clone(); + let res_value = new_event.as_log()["matched"].clone(); if let Value::Map(map) = res_value { assert_eq!(map.get("key").unwrap(), &Value::from(123)); } @@ -461,10 +461,10 @@ mod tests { // First event should always be passed through as-is. let new_event = transform.transform(event1).unwrap(); - assert_eq!(new_event.as_log()[&"matched".into()], Value::Null); + assert_eq!(new_event.as_log()["matched"], Value::Null); // Second event should also get passed through as null is different than missing let new_event = transform.transform(event2).unwrap(); - assert_eq!(false, new_event.as_log().contains(&"matched")); + assert_eq!(false, new_event.as_log().contains("matched")); } } diff --git a/src/transforms/json_parser.rs b/src/transforms/json_parser.rs index d8fde65a5e67d..b6e36c183c461 100644 --- a/src/transforms/json_parser.rs +++ b/src/transforms/json_parser.rs @@ -496,8 +496,8 @@ mod test { let event = parser.transform(event).unwrap(); let event = event.as_log(); - assert_eq!(event[&Atom::from("that.greeting")], "hello".into()); - assert_eq!(event[&Atom::from("that.name")], "bob".into()); + assert_eq!(event["that.greeting"], "hello".into()); + assert_eq!(event["that.name"], "bob".into()); } #[test] @@ -513,9 +513,9 @@ mod test { let event = parser.transform(event).unwrap(); let event = event.as_log(); - assert_eq!(event[&"message".into()], message.into()); - assert_eq!(event.get(&"message.greeting".into()), None); - assert_eq!(event.get(&"message.name".into()), None); + assert_eq!(event["message"], message.into()); + assert_eq!(event.get("message.greeting"), None); + assert_eq!(event.get("message.name"), None); } #[test] @@ -532,11 +532,11 @@ mod test { let event = parser.transform(event).unwrap(); let event = event.as_log(); - match event.get(&"message".into()) { + match event.get("message") { Some(crate::event::Value::Map(_)) => (), _ => panic!("\"message\" is not a map"), } - assert_eq!(event[&Atom::from("message.greeting")], "hello".into()); - assert_eq!(event[&Atom::from("message.name")], "bob".into()); + assert_eq!(event["message.greeting"], "hello".into()); + assert_eq!(event["message.name"], "bob".into()); } } diff --git a/src/transforms/logfmt_parser.rs b/src/transforms/logfmt_parser.rs index fd2541df561b3..6768f3c212346 100644 --- a/src/transforms/logfmt_parser.rs +++ b/src/transforms/logfmt_parser.rs @@ -142,26 +142,26 @@ mod tests { async fn logfmt_adds_parsed_field_to_event() { let log = parse_log("status=1234 time=\"5678\"", false, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_some()); } #[tokio::test] async fn logfmt_does_drop_parsed_field() { let log = parse_log("status=1234 time=5678", true, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_none()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_none()); } #[tokio::test] async fn logfmt_does_not_drop_same_name_parsed_field() { let log = parse_log("status=1234 message=yes", true, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"message".into()], "yes".into()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["message"], "yes".into()); } #[tokio::test] @@ -173,10 +173,10 @@ mod tests { ) .await; - assert_eq!(log[&"number".into()], Value::Float(42.3)); - assert_eq!(log[&"flag".into()], Value::Boolean(true)); - assert_eq!(log[&"code".into()], Value::Integer(1234)); - assert_eq!(log[&"rest".into()], Value::Bytes("word".into())); + assert_eq!(log["number"], Value::Float(42.3)); + assert_eq!(log["flag"], Value::Boolean(true)); + assert_eq!(log["code"], Value::Integer(1234)); + assert_eq!(log["rest"], Value::Bytes("word".into())); } #[tokio::test] @@ -187,36 +187,36 @@ mod tests { &[("status", "integer"), ("bytes", "integer")], ).await; - assert_eq!(log[&"at".into()], "info".into()); - assert_eq!(log[&"method".into()], "GET".into()); - assert_eq!(log[&"path".into()], "/cart_link".into()); + assert_eq!(log["at"], "info".into()); + assert_eq!(log["method"], "GET".into()); + assert_eq!(log["path"], "/cart_link".into()); assert_eq!( - log[&"request_id".into()], + log["request_id"], "05726858-c44e-4f94-9a20-37df73be9006".into(), ); - assert_eq!(log[&"fwd".into()], "73.75.38.87".into()); - assert_eq!(log[&"dyno".into()], "web.1".into()); - assert_eq!(log[&"connect".into()], "1ms".into()); - assert_eq!(log[&"service".into()], "22ms".into()); - assert_eq!(log[&"status".into()], Value::Integer(304)); - assert_eq!(log[&"bytes".into()], Value::Integer(656)); - assert_eq!(log[&"protocol".into()], "http".into()); + assert_eq!(log["fwd"], "73.75.38.87".into()); + assert_eq!(log["dyno"], "web.1".into()); + assert_eq!(log["connect"], "1ms".into()); + assert_eq!(log["service"], "22ms".into()); + assert_eq!(log["status"], Value::Integer(304)); + assert_eq!(log["bytes"], Value::Integer(656)); + assert_eq!(log["protocol"], "http".into()); } #[tokio::test] async fn logfmt_handles_herokus_weird_octothorpes() { let log = parse_log("source=web.1 dyno=heroku.2808254.d97d0ea7-cf3d-411b-b453-d2943a50b456 sample#memory_total=21.00MB sample#memory_rss=21.22MB sample#memory_cache=0.00MB sample#memory_swap=0.00MB sample#memory_pgpgin=348836pages sample#memory_pgpgout=343403pages", true, &[]).await; - assert_eq!(log[&"source".into()], "web.1".into()); + assert_eq!(log["source"], "web.1".into()); assert_eq!( - log[&"dyno".into()], + log["dyno"], "heroku.2808254.d97d0ea7-cf3d-411b-b453-d2943a50b456".into() ); - assert_eq!(log[&"sample#memory_total".into()], "21.00MB".into()); - assert_eq!(log[&"sample#memory_rss".into()], "21.22MB".into()); - assert_eq!(log[&"sample#memory_cache".into()], "0.00MB".into()); - assert_eq!(log[&"sample#memory_swap".into()], "0.00MB".into()); - assert_eq!(log[&"sample#memory_pgpgin".into()], "348836pages".into()); - assert_eq!(log[&"sample#memory_pgpgout".into()], "343403pages".into()); + assert_eq!(log["sample#memory_total"], "21.00MB".into()); + assert_eq!(log["sample#memory_rss"], "21.22MB".into()); + assert_eq!(log["sample#memory_cache"], "0.00MB".into()); + assert_eq!(log["sample#memory_swap"], "0.00MB".into()); + assert_eq!(log["sample#memory_pgpgin"], "348836pages".into()); + assert_eq!(log["sample#memory_pgpgout"], "343403pages".into()); } } diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index 150c0b57e2ae7..efb8e0429e1ee 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -180,7 +180,7 @@ impl rlua::UserData for LuaEvent { ); methods.add_meta_method(rlua::MetaMethod::Index, |ctx, this, key: String| { - if let Some(value) = this.inner.as_log().get(&key.into()) { + if let Some(value) = this.inner.as_log().get(key) { let string = ctx.create_string(&value.as_bytes())?; Ok(Some(string)) } else { @@ -203,7 +203,7 @@ impl rlua::UserData for LuaEvent { let key: Option = next.call((keys, prev))?; match key .clone() - .and_then(|k| event.inner.as_log().get(&k.into())) + .and_then(|k| event.inner.as_log().get(k)) { Some(value) => Ok((key, Some(ctx.create_string(&value.as_bytes())?))), None => Ok((None, None)), @@ -244,7 +244,7 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"hello".into()], "goodbye".into()); + assert_eq!(event.as_log()["hello"], "goodbye".into()); } #[test] @@ -263,7 +263,7 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"name".into()], "Bob".into()); + assert_eq!(event.as_log()["name"], "Bob".into()); } #[test] @@ -281,7 +281,7 @@ mod tests { event.as_mut_log().insert("name", "Bob"); let event = transform.transform(event).unwrap(); - assert!(event.as_log().get(&"name".into()).is_none()); + assert!(event.as_log().get("name").is_none()); } #[test] @@ -319,7 +319,7 @@ mod tests { let event = Event::new_empty_log(); let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"result".into()], "empty".into()); + assert_eq!(event.as_log()["result"], "empty".into()); } #[test] @@ -334,7 +334,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"number".into()], Value::Integer(3)); + assert_eq!(event.as_log()["number"], Value::Integer(3)); } #[test] @@ -349,7 +349,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"number".into()], Value::Float(3.14159)); + assert_eq!(event.as_log()["number"], Value::Float(3.14159)); } #[test] @@ -364,7 +364,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"bool".into()], Value::Boolean(true)); + assert_eq!(event.as_log()["bool"], Value::Boolean(true)); } #[test] @@ -379,7 +379,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log().get(&"junk".into()), None); + assert_eq!(event.as_log().get("junk"), None); } #[test] @@ -480,7 +480,7 @@ mod tests { let event = Event::new_empty_log(); let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"new field".into()], "new value".into()); + assert_eq!(event.as_log()["new field"], "new value".into()); } #[test] @@ -502,7 +502,7 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"name".into()], "nameBob".into()); - assert_eq!(event.as_log()[&"friend".into()], "friendAlice".into()); + assert_eq!(event.as_log()["name"], "nameBob".into()); + assert_eq!(event.as_log()["friend"], "friendAlice".into()); } } diff --git a/src/transforms/lua/v2/interop/event.rs b/src/transforms/lua/v2/interop/event.rs index 839f897c9839e..3fe55d5a8093c 100644 --- a/src/transforms/lua/v2/interop/event.rs +++ b/src/transforms/lua/v2/interop/event.rs @@ -115,9 +115,9 @@ mod test { Lua::new().context(|ctx| { let event = ctx.load(lua_event).eval::().unwrap(); let log = event.as_log(); - assert_eq!(log[&"field".into()], Value::Bytes("example".into())); + assert_eq!(log["field"], Value::Bytes("example".into())); assert_eq!( - log[&"nested.field".into()], + log["nested.field"], Value::Bytes("another example".into()) ); }); diff --git a/src/transforms/lua/v2/interop/log.rs b/src/transforms/lua/v2/interop/log.rs index 236730a287ff2..e152098c16d57 100644 --- a/src/transforms/lua/v2/interop/log.rs +++ b/src/transforms/lua/v2/interop/log.rs @@ -79,15 +79,15 @@ mod test { Lua::new().context(move |ctx| { let event: LogEvent = ctx.load(lua_event).eval().unwrap(); - assert_eq!(event[&"a".into()], Value::Integer(1)); - assert_eq!(event[&"nested.field".into()], Value::Bytes("2".into())); + assert_eq!(event["a"], Value::Integer(1)); + assert_eq!(event["nested.field"], Value::Bytes("2".into())); assert_eq!( - event[&"nested.array[0]".into()], + event["nested.array[0]"], Value::Bytes("example value".into()) ); - assert_eq!(event[&"nested.array[1]".into()], Value::Bytes("".into())); + assert_eq!(event["nested.array[1]"], Value::Bytes("".into())); assert_eq!( - event[&"nested.array[2]".into()], + event["nested.array[2]"], Value::Bytes("another value".into()) ); }); diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index 4906bfb858493..a6349c96b4ecb 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -378,7 +378,7 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"hello".into()], "goodbye".into()); + assert_eq!(event.as_log()["hello"], "goodbye".into()); } #[test] @@ -401,7 +401,7 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"name".into()], "Bob".into()); + assert_eq!(event.as_log()["name"], "Bob".into()); } #[test] @@ -423,7 +423,7 @@ mod tests { event.as_mut_log().insert("name", "Bob"); let event = transform.transform(event).unwrap(); - assert!(event.as_log().get(&"name".into()).is_none()); + assert!(event.as_log().get("name").is_none()); } #[test] @@ -492,7 +492,7 @@ mod tests { let event = Event::new_empty_log(); let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"result".into()], "empty".into()); + assert_eq!(event.as_log()["result"], "empty".into()); } #[test] @@ -511,7 +511,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"number".into()], Value::Integer(3)); + assert_eq!(event.as_log()["number"], Value::Integer(3)); } #[test] @@ -530,7 +530,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"number".into()], Value::Float(3.14159)); + assert_eq!(event.as_log()["number"], Value::Float(3.14159)); } #[test] @@ -549,7 +549,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log()[&"bool".into()], Value::Boolean(true)); + assert_eq!(event.as_log()["bool"], Value::Boolean(true)); } #[test] @@ -568,7 +568,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log().get(&"junk".into()), None); + assert_eq!(event.as_log().get("junk"), None); } #[test] @@ -609,7 +609,7 @@ mod tests { .unwrap(); let event = transform.transform(Event::new_empty_log()).unwrap(); - assert_eq!(event.as_log().get(&"result".into()), None); + assert_eq!(event.as_log().get("result"), None); } #[test] @@ -692,7 +692,7 @@ mod tests { let event = Event::new_empty_log(); let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"new field".into()], "new value".into()); + assert_eq!(event.as_log()["new field"], "new value".into()); } #[test] @@ -718,8 +718,8 @@ mod tests { let event = transform.transform(event).unwrap(); - assert_eq!(event.as_log()[&"name".into()], "nameBob".into()); - assert_eq!(event.as_log()[&"friend".into()], "friendAlice".into()); + assert_eq!(event.as_log()["name"], "nameBob".into()); + assert_eq!(event.as_log()["friend"], "friendAlice".into()); } #[test] diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index e05c76f735eb3..87b47e4c8433c 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -344,11 +344,11 @@ identifier_fields = [ "request_id" ] assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"message".into()], + outputs.first().unwrap().as_log()["message"], "test message 1".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"counter".into()], + outputs.first().unwrap().as_log()["counter"], Value::from(8) ); @@ -363,15 +363,15 @@ identifier_fields = [ "request_id" ] assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"message".into()], + outputs.first().unwrap().as_log()["message"], "test message 2".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"extra_field".into()], + outputs.first().unwrap().as_log()["extra_field"], "value1".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"counter".into()], + outputs.first().unwrap().as_log()["counter"], Value::from(7) ); } @@ -424,18 +424,18 @@ merge_strategies.baz = "max" assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"message".into()], + outputs.first().unwrap().as_log()["message"], "test message 1".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"foo".into()], + outputs.first().unwrap().as_log()["foo"], "first foo second foo".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"bar".into()], + outputs.first().unwrap().as_log()["bar"], Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]), ); - assert_eq!(outputs.first().unwrap().as_log()[&"baz".into()], 3.into(),); + assert_eq!(outputs.first().unwrap().as_log()["baz"], 3.into(),); } #[tokio::test] @@ -478,11 +478,11 @@ identifier_fields = [ "request_id" ] assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"message".into()], + outputs.first().unwrap().as_log()["message"], "test message 1".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"counter".into()], + outputs.first().unwrap().as_log()["counter"], Value::from(8) ); @@ -496,15 +496,15 @@ identifier_fields = [ "request_id" ] assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"message".into()], + outputs.first().unwrap().as_log()["message"], "test message 2".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"extra_field".into()], + outputs.first().unwrap().as_log()["extra_field"], "value1".into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"counter".into()], + outputs.first().unwrap().as_log()["counter"], Value::from(7) ); } @@ -556,13 +556,13 @@ merge_strategies.bar = "concat" assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"foo".into()], + outputs.first().unwrap().as_log()["foo"], json!([[1, 3], [5, 7], "done"]).into() ); assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"bar".into()], + outputs.first().unwrap().as_log()["bar"], json!([1, 3, 5, 7, "done"]).into() ); @@ -583,11 +583,11 @@ merge_strategies.bar = "concat" assert_eq!(outputs.len(), 1); assert_eq!( - outputs.first().unwrap().as_log()[&"foo".into()], + outputs.first().unwrap().as_log()["foo"], json!([[2, 4], [6, 8], "done"]).into() ); assert_eq!( - outputs.first().unwrap().as_log()[&"bar".into()], + outputs.first().unwrap().as_log()["bar"], json!([2, 4, 6, 8, "done"]).into() ); } diff --git a/src/transforms/regex_parser.rs b/src/transforms/regex_parser.rs index fb05d48f2c6b5..da01fa1a2d7de 100644 --- a/src/transforms/regex_parser.rs +++ b/src/transforms/regex_parser.rs @@ -340,9 +340,9 @@ mod tests { .await .unwrap(); - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_some()); } #[tokio::test] @@ -355,8 +355,8 @@ mod tests { .await .unwrap(); - assert_eq!(log.get(&"status".into()), None); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log.get("status"), None); + assert!(log.get("message").is_some()); } #[tokio::test] @@ -369,9 +369,9 @@ mod tests { .await .unwrap(); - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_none()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_none()); } #[tokio::test] @@ -384,8 +384,8 @@ mod tests { .await .unwrap(); - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"message".into()], "yes".into()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["message"], "yes".into()); } #[tokio::test] @@ -398,7 +398,7 @@ mod tests { .await .unwrap(); - assert!(log.get(&"message".into()).is_some()); + assert!(log.get(&"message").is_some()); } #[tokio::test] @@ -441,9 +441,9 @@ mod tests { .await .unwrap(); - assert_eq!(log[&"message".into()], message.into()); - assert_eq!(log.get(&"message.status".into()), None); - assert_eq!(log.get(&"message.time".into()), None); + assert_eq!(log["message"], message.into()); + assert_eq!(log.get("message.status"), None); + assert_eq!(log.get("message.time"), None); } #[tokio::test] @@ -488,7 +488,7 @@ mod tests { let log = do_transform("1234", r#"['(?P\d+)?']"#, "") .await .unwrap(); - assert_eq!(log[&"status".into()], "1234".into()); + assert_eq!(log["status"], "1234".into()); } #[tokio::test] @@ -496,7 +496,7 @@ mod tests { let log = do_transform("none", r#"['(?P\d+)?']"#, "") .await .unwrap(); - assert!(log.get(&"status".into()).is_none()); + assert!(log.get("status").is_none()); } #[tokio::test] @@ -513,9 +513,9 @@ mod tests { ) .await .expect("Failed to parse log"); - assert_eq!(log[&"check".into()], Value::Boolean(false)); - assert_eq!(log[&"status".into()], Value::Integer(1234)); - assert_eq!(log[&"time".into()], Value::Float(6789.01)); + assert_eq!(log["check"], Value::Boolean(false)); + assert_eq!(log["status"], Value::Integer(1234)); + assert_eq!(log["time"], Value::Float(6789.01)); } #[tokio::test] @@ -538,11 +538,11 @@ mod tests { .await .unwrap(); - assert_eq!(log[&"id1".into()], Value::Integer(1234)); - assert_eq!(log.get(&"id2".into()), None); - assert_eq!(log.get(&"time".into()), None); - assert_eq!(log.get(&"check".into()), None); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log["id1"], Value::Integer(1234)); + assert_eq!(log.get("id2"), None); + assert_eq!(log.get("time"), None); + assert_eq!(log.get("check"), None); + assert!(log.get("message").is_some()); } #[tokio::test] @@ -566,10 +566,10 @@ mod tests { .await .unwrap(); - assert_eq!(log.get(&"id1".into()), None); - assert_eq!(log[&"id2".into()], Value::Integer(1234)); - assert_eq!(log[&"time".into()], Value::Float(235.42)); - assert_eq!(log[&"check".into()], Value::Boolean(true)); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log.get("id1"), None); + assert_eq!(log["id2"], Value::Integer(1234)); + assert_eq!(log["time"], Value::Float(235.42)); + assert_eq!(log["check"], Value::Boolean(true)); + assert!(log.get("message").is_some()); } } diff --git a/src/transforms/remove_fields.rs b/src/transforms/remove_fields.rs index e47c849b11efd..4c12e6897594b 100644 --- a/src/transforms/remove_fields.rs +++ b/src/transforms/remove_fields.rs @@ -99,10 +99,10 @@ mod tests { let new_event = transform.transform(event).unwrap(); - assert!(new_event.as_log().get(&"to_remove".into()).is_none()); - assert!(new_event.as_log().get(&"unknown".into()).is_none()); + assert!(new_event.as_log().get("to_remove").is_none()); + assert!(new_event.as_log().get("unknown").is_none()); assert_eq!( - new_event.as_log()[&"to_keep".into()], + new_event.as_log()["to_keep"], "another value".into() ); } diff --git a/src/transforms/rename_fields.rs b/src/transforms/rename_fields.rs index c2b5d2d08dd73..fca3ea1cd84d0 100644 --- a/src/transforms/rename_fields.rs +++ b/src/transforms/rename_fields.rs @@ -117,12 +117,12 @@ mod tests { let new_event = transform.transform(event).unwrap(); - assert!(new_event.as_log().get(&"to_move".into()).is_none()); - assert_eq!(new_event.as_log()[&"moved".into()], "some value".into()); - assert!(new_event.as_log().get(&"not_present".into()).is_none()); - assert!(new_event.as_log().get(&"should_not_exist".into()).is_none()); + assert!(new_event.as_log().get("to_move").is_none()); + assert_eq!(new_event.as_log()["moved"], "some value".into()); + assert!(new_event.as_log().get("not_present").is_none()); + assert!(new_event.as_log().get("should_not_exist").is_none()); assert_eq!( - new_event.as_log()[&"do_not_move".into()], + new_event.as_log()["do_not_move"], "not moved".into() ); } diff --git a/src/transforms/split.rs b/src/transforms/split.rs index 4f7232bd20e24..fd16a25286bd6 100644 --- a/src/transforms/split.rs +++ b/src/transforms/split.rs @@ -204,18 +204,18 @@ mod tests { async fn split_adds_parsed_field_to_event() { let log = parse_log("1234 5678", "status time", None, None, false, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_some()); } #[tokio::test] async fn split_does_drop_parsed_field() { let log = parse_log("1234 5678", "status time", None, Some("message"), true, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_none()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_none()); } #[tokio::test] @@ -230,8 +230,8 @@ mod tests { ) .await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"message".into()], "yes".into()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["message"], "yes".into()); } #[tokio::test] @@ -246,10 +246,10 @@ mod tests { ) .await; - assert_eq!(log[&"number".into()], Value::Float(42.3)); - assert_eq!(log[&"flag".into()], Value::Boolean(true)); - assert_eq!(log[&"code".into()], Value::Integer(1234)); - assert_eq!(log[&"rest".into()], Value::Bytes("word".into())); + assert_eq!(log["number"], Value::Float(42.3)); + assert_eq!(log["flag"], Value::Boolean(true)); + assert_eq!(log["code"], Value::Integer(1234)); + assert_eq!(log["rest"], Value::Bytes("word".into())); } #[tokio::test] @@ -264,8 +264,8 @@ mod tests { ) .await; - assert_eq!(log[&"code".into()], Value::Integer(1234)); - assert_eq!(log[&"who".into()], Value::Bytes("foo".into())); - assert_eq!(log[&"why".into()], Value::Bytes("bar".into())); + assert_eq!(log["code"], Value::Integer(1234)); + assert_eq!(log["who"], Value::Bytes("foo".into())); + assert_eq!(log["why"], Value::Bytes("bar".into())); } } diff --git a/src/transforms/tokenizer.rs b/src/transforms/tokenizer.rs index e28018ce3f628..c2df7bb994769 100644 --- a/src/transforms/tokenizer.rs +++ b/src/transforms/tokenizer.rs @@ -163,26 +163,26 @@ mod tests { async fn tokenizer_adds_parsed_field_to_event() { let log = parse_log("1234 5678", "status time", None, false, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_some()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_some()); } #[tokio::test] async fn tokenizer_does_drop_parsed_field() { let log = parse_log("1234 5678", "status time", Some("message"), true, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"time".into()], "5678".into()); - assert!(log.get(&"message".into()).is_none()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["time"], "5678".into()); + assert!(log.get("message").is_none()); } #[tokio::test] async fn tokenizer_does_not_drop_same_name_parsed_field() { let log = parse_log("1234 yes", "status message", Some("message"), true, &[]).await; - assert_eq!(log[&"status".into()], "1234".into()); - assert_eq!(log[&"message".into()], "yes".into()); + assert_eq!(log["status"], "1234".into()); + assert_eq!(log["message"], "yes".into()); } #[tokio::test] @@ -196,10 +196,10 @@ mod tests { ) .await; - assert_eq!(log[&"number".into()], Value::Float(42.3)); - assert_eq!(log[&"flag".into()], Value::Boolean(true)); - assert_eq!(log[&"code".into()], Value::Integer(1234)); - assert_eq!(log[&"rest".into()], Value::Bytes("word".into())); + assert_eq!(log["number"], Value::Float(42.3)); + assert_eq!(log["flag"], Value::Boolean(true)); + assert_eq!(log["code"], Value::Integer(1234)); + assert_eq!(log["rest"], Value::Bytes("word".into())); } #[tokio::test] @@ -212,8 +212,8 @@ mod tests { &[("code", "integer"), ("who", "string"), ("why", "string")], ) .await; - assert_eq!(log[&"code".into()], Value::Integer(1234)); - assert_eq!(log[&"who".into()], Value::Bytes("-".into())); - assert_eq!(log[&"why".into()], Value::Bytes("foo".into())); + assert_eq!(log["code"], Value::Integer(1234)); + assert_eq!(log["who"], Value::Bytes("-".into())); + assert_eq!(log["why"], Value::Bytes("foo".into())); } } From 779f96ce80aaf98609fd5a8c3e6fdc03077f7bed Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 10:37:57 -0700 Subject: [PATCH 03/19] Migrate get_mut Signed-off-by: Ana Hobden --- src/event/log_event.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/event/log_event.rs b/src/event/log_event.rs index 53540858d23ea..c99aa19fe6089 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -24,9 +24,9 @@ impl LogEvent { self.fields.get(key.as_ref()) } - #[instrument(skip(self, key), fields(key = %key))] - pub fn get_mut(&mut self, key: &DefaultAtom) -> Option<&mut Value> { - util::log::get_mut(&mut self.fields, key) + #[instrument(skip(self, key), fields(key = %key.as_ref()))] + pub fn get_mut(&mut self, key: impl AsRef) -> Option<&mut Value> { + util::log::get_mut(&mut self.fields, key.as_ref()) } #[instrument(skip(self, key), fields(key = %key.as_ref()))] From 5e247e6b75d86d59b045739494763075f027429e Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 10:49:47 -0700 Subject: [PATCH 04/19] insert and try_insert migration Signed-off-by: Ana Hobden --- src/event/log_event.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/event/log_event.rs b/src/event/log_event.rs index c99aa19fe6089..858ca844ac514 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -35,11 +35,7 @@ impl LogEvent { } #[instrument(skip(self, key), fields(key = %key.as_ref()))] - pub fn insert(&mut self, key: K, value: V) -> Option - where - K: AsRef, - V: Into + Debug, - { + pub fn insert(&mut self, key: impl AsRef, value: impl Into + Debug) -> Option { util::log::insert(&mut self.fields, key.as_ref(), value.into()) } @@ -60,13 +56,14 @@ impl LogEvent { self.fields.insert(key.into(), value.into()); } - #[instrument(skip(self, key), fields(key = %key))] - pub fn try_insert(&mut self, key: &DefaultAtom, value: V) + #[instrument(skip(self, key), fields(key = %key.as_ref()))] + pub fn try_insert(&mut self, key: impl AsRef, value: V) where V: Into + Debug, { + let key = key.as_ref(); if !self.contains(key) { - self.insert(key.clone(), value); + self.insert(key, value); } } From 78e893ba5e91d9e970a410e676962d58a57c1685 Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 10:59:25 -0700 Subject: [PATCH 05/19] Migrate remove Signed-off-by: Ana Hobden --- src/event/log_event.rs | 11 ++++------- src/sinks/elasticsearch.rs | 2 +- src/sinks/logdna.rs | 4 ++-- src/sources/journald.rs | 4 ++-- src/sources/kubernetes_logs/parser/cri.rs | 2 +- src/sources/kubernetes_logs/parser/docker.rs | 4 ++-- src/transforms/lua/v1/mod.rs | 4 ++-- src/transforms/regex_parser.rs | 4 ++-- 8 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/event/log_event.rs b/src/event/log_event.rs index 858ca844ac514..c215ff10171f1 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -57,19 +57,16 @@ impl LogEvent { } #[instrument(skip(self, key), fields(key = %key.as_ref()))] - pub fn try_insert(&mut self, key: impl AsRef, value: V) - where - V: Into + Debug, - { + pub fn try_insert(&mut self, key: impl AsRef, value: impl Into + Debug) { let key = key.as_ref(); if !self.contains(key) { self.insert(key, value); } } - #[instrument(skip(self, key), fields(key = %key))] - pub fn remove(&mut self, key: &DefaultAtom) -> Option { - util::log::remove(&mut self.fields, &key, false) + #[instrument(skip(self, key), fields(key = %key.as_ref()))] + pub fn remove(&mut self, key: impl AsRef) -> Option { + util::log::remove(&mut self.fields, key.as_ref(), false) } #[instrument(skip(self, key), fields(key = %key.as_ref()))] diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index 638224c33cc9e..9e3596cbafaf2 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -482,7 +482,7 @@ async fn finish_signer( } fn maybe_set_id(key: Option>, doc: &mut serde_json::Value, event: &mut Event) { - if let Some(val) = key.and_then(|k| event.as_mut_log().remove(&k.as_ref().into())) { + if let Some(val) = key.and_then(|k| event.as_mut_log().remove(k)) { let val = val.to_string_lossy(); doc.as_object_mut() diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index 98e59e3f57d98..b349c1095ca1b 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -124,11 +124,11 @@ impl HttpSink for LogdnaConfig { map.insert("line".to_string(), json!(line)); map.insert("timestamp".to_string(), json!(timestamp)); - if let Some(app) = log.remove(&"app".into()) { + if let Some(app) = log.remove("app") { map.insert("app".to_string(), json!(app)); } - if let Some(file) = log.remove(&"file".into()) { + if let Some(file) = log.remove("file") { map.insert("file".to_string(), json!(file)); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index eb3bbc6cd8599..466cd1c59b240 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -207,10 +207,10 @@ impl JournaldConfig { fn create_event(record: Record) -> Event { let mut log = LogEvent::from_iter(record); // Convert some journald-specific field names into Vector standard ones. - if let Some(message) = log.remove(&MESSAGE) { + if let Some(message) = log.remove(&*MESSAGE) { log.insert(log_schema().message_key(), message); } - if let Some(host) = log.remove(&HOSTNAME) { + if let Some(host) = log.remove(&*HOSTNAME) { log.insert(log_schema().host_key(), host); } // Translate the timestamp, and so leave both old and new names. diff --git a/src/sources/kubernetes_logs/parser/cri.rs b/src/sources/kubernetes_logs/parser/cri.rs index 901538ed867c2..5518c8ee64f2d 100644 --- a/src/sources/kubernetes_logs/parser/cri.rs +++ b/src/sources/kubernetes_logs/parser/cri.rs @@ -60,7 +60,7 @@ impl Transform for Cri { fn normalize_event(log: &mut LogEvent) -> Result<(), NormalizationError> { // Detect if this is a partial event. let multiline_tag = log - .remove(&MULTILINE_TAG) + .remove(&*MULTILINE_TAG) .context(MultilineTagFieldMissing)?; let multiline_tag = match multiline_tag { Value::Bytes(val) => val, diff --git a/src/sources/kubernetes_logs/parser/docker.rs b/src/sources/kubernetes_logs/parser/docker.rs index afae32d0d9f06..9f23ff32a79fb 100644 --- a/src/sources/kubernetes_logs/parser/docker.rs +++ b/src/sources/kubernetes_logs/parser/docker.rs @@ -57,7 +57,7 @@ const DOCKER_MESSAGE_SPLIT_THRESHOLD: usize = 16 * 1024; // 16 Kib fn normalize_event(log: &mut LogEvent) -> Result<(), NormalizationError> { // Parse and rename timestamp. - let time = log.remove(&TIME).context(TimeFieldMissing)?; + let time = log.remove(&*TIME).context(TimeFieldMissing)?; let time = match time { Value::Bytes(val) => val, _ => return Err(NormalizationError::TimeValueUnexpectedType), @@ -67,7 +67,7 @@ fn normalize_event(log: &mut LogEvent) -> Result<(), NormalizationError> { log.insert(log_schema().timestamp_key(), time.with_timezone(&Utc)); // Parse message, remove trailing newline and detect if it's partial. - let message = log.remove(&LOG).context(LogFieldMissing)?; + let message = log.remove(&*LOG).context(LogFieldMissing)?; let mut message = match message { Value::Bytes(val) => val, _ => return Err(NormalizationError::LogValueUnexpectedType), diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index efb8e0429e1ee..84c01982d85f5 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -162,7 +162,7 @@ impl rlua::UserData for LuaEvent { this.inner.as_mut_log().insert(key, Value::Boolean(boolean)); } Some(rlua::Value::Nil) | None => { - this.inner.as_mut_log().remove(&key.into()); + this.inner.as_mut_log().remove(key); } _ => { info!( @@ -171,7 +171,7 @@ impl rlua::UserData for LuaEvent { field = key.as_str(), rate_limit_secs = 30 ); - this.inner.as_mut_log().remove(&key.into()); + this.inner.as_mut_log().remove(key); } } diff --git a/src/transforms/regex_parser.rs b/src/transforms/regex_parser.rs index da01fa1a2d7de..a988695608b08 100644 --- a/src/transforms/regex_parser.rs +++ b/src/transforms/regex_parser.rs @@ -415,7 +415,7 @@ mod tests { .unwrap(); // timestamp is unpredictable, don't compare it - log.remove(&"timestamp".into()); + log.remove("timestamp"); let log = serde_json::to_value(log.all_fields()).unwrap(); assert_eq!( log, @@ -460,7 +460,7 @@ mod tests { .unwrap(); // timestamp is unpredictable, don't compare it - log.remove(&"timestamp".into()); + log.remove("timestamp"); let log = serde_json::to_value(log.all_fields()).unwrap(); assert_eq!( log, From cdaee53c695d3de192d800c94061f39e25da518d Mon Sep 17 00:00:00 2001 From: Ana Hobden Date: Fri, 9 Oct 2020 15:29:57 -0700 Subject: [PATCH 06/19] wip Signed-off-by: Ana Hobden --- Cargo.lock | 72 -------- Cargo.toml | 1 - benches/bench.rs | 20 +-- src/conditions/check_fields.rs | 28 +-- src/event/discriminant.rs | 22 +-- src/event/log_event.rs | 7 +- src/event/lookup/mod.rs | 9 - src/event/merge.rs | 16 +- src/event/merge_state.rs | 12 +- src/event/mod.rs | 19 +- src/internal_events/ansi_stripper.rs | 8 +- src/internal_events/elasticsearch.rs | 4 +- src/internal_events/json_parser.rs | 6 +- src/internal_events/log_to_metric.rs | 9 +- src/internal_events/regex_parser.rs | 8 +- src/internal_events/split.rs | 6 +- src/internal_events/splunk_hec.rs | 6 +- src/internal_events/tokenizer.rs | 6 +- src/mapping/mod.rs | 72 ++++---- src/mapping/query/path.rs | 8 +- src/serde.rs | 6 +- src/sinks/aws_cloudwatch_logs/mod.rs | 20 +-- src/sinks/aws_kinesis_firehose.rs | 4 +- src/sinks/aws_kinesis_streams.rs | 10 +- src/sinks/aws_s3.rs | 4 +- src/sinks/azure_monitor_logs.rs | 4 +- src/sinks/blackhole.rs | 4 +- src/sinks/clickhouse.rs | 6 +- src/sinks/console.rs | 4 +- src/sinks/datadog/logs.rs | 8 +- src/sinks/elasticsearch.rs | 10 +- src/sinks/file/mod.rs | 20 +-- src/sinks/gcp/cloud_storage.rs | 4 +- src/sinks/gcp/stackdriver_logs.rs | 11 +- src/sinks/honeycomb.rs | 4 +- src/sinks/http.rs | 4 +- src/sinks/influxdb/logs.rs | 8 +- src/sinks/kafka.rs | 14 +- src/sinks/logdna.rs | 6 +- src/sinks/loki.rs | 8 +- src/sinks/papertrail.rs | 10 +- src/sinks/pulsar.rs | 8 +- src/sinks/sematext_logs.rs | 6 +- src/sinks/splunk_hec.rs | 20 +-- src/sinks/util/encoding/config.rs | 8 +- src/sinks/util/encoding/mod.rs | 36 ++-- src/sinks/util/encoding/with_default.rs | 8 +- src/sinks/util/mod.rs | 4 +- src/sources/docker.rs | 46 ++--- src/sources/file.rs | 44 ++--- src/sources/generator.rs | 6 +- src/sources/http.rs | 70 ++++---- src/sources/journald.rs | 28 +-- src/sources/kafka.rs | 10 +- src/sources/kubernetes_logs/parser/cri.rs | 10 +- src/sources/kubernetes_logs/parser/docker.rs | 12 +- src/sources/kubernetes_logs/parser/picker.rs | 4 +- .../kubernetes_logs/partial_events_merger.rs | 7 +- src/sources/logplex.rs | 36 ++-- src/sources/socket/mod.rs | 56 +++--- src/sources/socket/tcp.rs | 6 +- src/sources/socket/udp.rs | 6 +- src/sources/splunk_hec.rs | 69 ++++---- src/sources/stdin.rs | 6 +- src/template.rs | 26 +-- src/transforms/add_tags.rs | 18 +- src/transforms/ansi_stripper.rs | 12 +- src/transforms/aws_ec2_metadata.rs | 163 ++++++++---------- src/transforms/coercer.rs | 6 +- src/transforms/concat.rs | 40 ++--- src/transforms/dedupe.rs | 12 +- src/transforms/field_filter.rs | 4 +- src/transforms/geoip.rs | 36 ++-- src/transforms/grok_parser.rs | 22 +-- src/transforms/json_parser.rs | 86 ++++----- src/transforms/log_to_metric.rs | 46 ++--- src/transforms/logfmt_parser.rs | 16 +- src/transforms/merge.rs | 30 ++-- src/transforms/metric_to_log.rs | 18 +- src/transforms/reduce/merge_strategy.rs | 4 +- src/transforms/reduce/mod.rs | 5 +- src/transforms/regex_parser.rs | 31 ++-- src/transforms/remap.rs | 4 +- src/transforms/remove_fields.rs | 4 +- src/transforms/remove_tags.rs | 10 +- src/transforms/sampler.rs | 24 +-- src/transforms/split.rs | 29 ++-- src/transforms/tokenizer.rs | 24 +-- src/types.rs | 29 +--- src/wasm/mod.rs | 4 +- tests/support/mod.rs | 2 +- tests/topology.rs | 2 +- 92 files changed, 789 insertions(+), 932 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9713a61a1c59e..a4770bc8fecf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3198,12 +3198,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "new_debug_unreachable" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" - [[package]] name = "nix" version = "0.16.1" @@ -3630,25 +3624,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "phf_generator" -version = "0.7.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662" -dependencies = [ - "phf_shared", - "rand 0.6.5", -] - -[[package]] -name = "phf_shared" -version = "0.7.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0" -dependencies = [ - "siphasher", -] - [[package]] name = "pin-project" version = "0.4.23" @@ -3720,12 +3695,6 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" -[[package]] -name = "precomputed-hash" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" - [[package]] name = "predicates" version = "1.0.5" @@ -4979,12 +4948,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" -[[package]] -name = "siphasher" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac" - [[package]] name = "slab" version = "0.4.2" @@ -5149,40 +5112,6 @@ dependencies = [ "futures 0.1.29", ] -[[package]] -name = "string_cache" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89c058a82f9fd69b1becf8c274f412281038877c553182f1d02eb027045a2d67" -dependencies = [ - "lazy_static", - "new_debug_unreachable", - "phf_shared", - "precomputed-hash", - "serde", - "string_cache_codegen", - "string_cache_shared", -] - -[[package]] -name = "string_cache_codegen" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f45ed1b65bf9a4bf2f7b7dc59212d1926e9eaf00fa998988e420fd124467c6" -dependencies = [ - "phf_generator", - "phf_shared", - "proc-macro2 1.0.19", - "quote 1.0.7", - "string_cache_shared", -] - -[[package]] -name = "string_cache_shared" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1884d1bc09741d466d9b14e6d37ac89d6909cbcac41dd9ae982d4d063bbedfc" - [[package]] name = "strip-ansi-escapes" version = "0.1.0" @@ -6214,7 +6143,6 @@ dependencies = [ "smpl_jwt", "snafu", "stream-cancel", - "string_cache", "strip-ansi-escapes", "structopt", "syslog", diff --git a/Cargo.toml b/Cargo.toml index 7d640e7513ed8..3823a675d3be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,6 @@ hyper = "0.13" hyper-openssl = "0.8" openssl = "0.10.26" openssl-probe = "0.1.2" -string_cache = "0.7.3" flate2 = "1.0.6" async-compression = { version = "0.3.5", features = ["tokio-02", "gzip"] } structopt = "0.3.13" diff --git a/benches/bench.rs b/benches/bench.rs index c619bf0c60f68..5e8463153817c 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -9,7 +9,7 @@ use rand::{ prelude::*, }; use std::convert::TryFrom; -use string_cache::DefaultAtom as Atom; + use vector::transforms::{ add_fields::AddFields, coercer::CoercerConfig, @@ -697,7 +697,7 @@ fn benchmark_remap(c: &mut Criterion) { assert_eq!( result .as_log() - .get(&Atom::from("foo")) + .get("foo") .unwrap() .to_string_lossy(), "bar" @@ -705,7 +705,7 @@ fn benchmark_remap(c: &mut Criterion) { assert_eq!( result .as_log() - .get(&Atom::from("bar")) + .get("bar") .unwrap() .to_string_lossy(), "baz" @@ -713,7 +713,7 @@ fn benchmark_remap(c: &mut Criterion) { assert_eq!( result .as_log() - .get(&Atom::from("copy")) + .get("copy") .unwrap() .to_string_lossy(), "buz" @@ -758,7 +758,7 @@ fn benchmark_remap(c: &mut Criterion) { assert_eq!( result .as_log() - .get(&Atom::from("foo")) + .get("foo") .unwrap() .to_string_lossy(), r#"{"key": "value"}"# @@ -766,7 +766,7 @@ fn benchmark_remap(c: &mut Criterion) { assert_eq!( result .as_log() - .get(&Atom::from("bar")) + .get("bar") .unwrap() .to_string_lossy(), r#"{"key":"value"}"# @@ -785,7 +785,7 @@ fn benchmark_remap(c: &mut Criterion) { c.bench_function("remap: parse JSON with json_parser", |b| { let tform = JsonParser::from(JsonParserConfig { - field: Some(Atom::from("foo")), + field: Some("foo"), target_field: Some("bar".to_owned()), drop_field: false, drop_invalid: false, @@ -813,15 +813,15 @@ fn benchmark_remap(c: &mut Criterion) { move || { let result = tform.transform(event.clone()).unwrap(); assert_eq!( - result.as_log().get(&Atom::from("number")).unwrap(), + result.as_log().get("number").unwrap(), &Value::Integer(1234) ); assert_eq!( - result.as_log().get(&Atom::from("bool")).unwrap(), + result.as_log().get("bool").unwrap(), &Value::Boolean(true) ); assert_eq!( - result.as_log().get(&Atom::from("timestamp")).unwrap(), + result.as_log().get("timestamp").unwrap(), &Value::Timestamp(timestamp), ); } diff --git a/src/conditions/check_fields.rs b/src/conditions/check_fields.rs index 2e36f24600827..28b5be94df564 100644 --- a/src/conditions/check_fields.rs +++ b/src/conditions/check_fields.rs @@ -9,7 +9,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use std::net::IpAddr; use std::str::FromStr; -use string_cache::DefaultAtom as Atom; + #[derive(Deserialize, Serialize, Clone, Derivative)] #[serde(untagged)] @@ -35,7 +35,7 @@ pub trait CheckFieldsPredicate: std::fmt::Debug + Send + Sync { #[derive(Debug, Clone)] struct EqualsPredicate { - target: Atom, + target: String, arg: CheckFieldsPredicateArg, } @@ -77,7 +77,7 @@ impl CheckFieldsPredicate for EqualsPredicate { Event::Metric(m) => m .tags .as_ref() - .and_then(|t| t.get(self.target.as_ref())) + .and_then(|t| t.get(&self.target)) .map_or(false, |v| match &self.arg { CheckFieldsPredicateArg::String(s) => s.as_bytes() == v.as_bytes(), _ => false, @@ -90,7 +90,7 @@ impl CheckFieldsPredicate for EqualsPredicate { #[derive(Debug, Clone)] struct ContainsPredicate { - target: Atom, + target: String, arg: Vec, } @@ -129,7 +129,7 @@ impl CheckFieldsPredicate for ContainsPredicate { #[derive(Debug, Clone)] struct StartsWithPredicate { - target: Atom, + target: String, arg: Vec, } @@ -170,7 +170,7 @@ impl CheckFieldsPredicate for StartsWithPredicate { #[derive(Debug, Clone)] struct EndsWithPredicate { - target: Atom, + target: String, arg: Vec, } @@ -209,7 +209,7 @@ impl CheckFieldsPredicate for EndsWithPredicate { #[derive(Debug, Clone)] struct NotEqualsPredicate { - target: Atom, + target: String, arg: Vec, } @@ -244,7 +244,7 @@ impl CheckFieldsPredicate for NotEqualsPredicate { Event::Metric(m) => m .tags .as_ref() - .and_then(|t| t.get(self.target.as_ref())) + .and_then(|t| t.get(&self.target)) .map_or(false, |v| { !self.arg.iter().any(|s| v.as_bytes() == s.as_bytes()) }), @@ -256,7 +256,7 @@ impl CheckFieldsPredicate for NotEqualsPredicate { #[derive(Debug, Clone)] struct RegexPredicate { - target: Atom, + target: String, regex: Regex, } @@ -286,7 +286,7 @@ impl CheckFieldsPredicate for RegexPredicate { Event::Metric(metric) => metric .tags .as_ref() - .and_then(|tags| tags.get(self.target.as_ref())) + .and_then(|tags| tags.get(&self.target)) .map_or(false, |field| self.regex.is_match(field)), } } @@ -296,7 +296,7 @@ impl CheckFieldsPredicate for RegexPredicate { #[derive(Debug, Clone)] struct ExistsPredicate { - target: Atom, + target: String, arg: bool, } @@ -322,7 +322,7 @@ impl CheckFieldsPredicate for ExistsPredicate { Event::Metric(m) => m .tags .as_ref() - .map_or(false, |t| t.contains_key(self.target.as_ref())), + .map_or(false, |t| t.contains_key(&self.target)), }) == self.arg } } @@ -331,7 +331,7 @@ impl CheckFieldsPredicate for ExistsPredicate { #[derive(Debug, Clone)] struct IpCidrPredicate { - target: Atom, + target: String, cidrs: Vec, } @@ -401,7 +401,7 @@ impl CheckFieldsPredicate for NegatePredicate { #[derive(Debug, Clone)] struct LengthEqualsPredicate { - target: Atom, + target: String, arg: i64, } diff --git a/src/event/discriminant.rs b/src/event/discriminant.rs index 8253f2782ee72..11f9827dd371a 100644 --- a/src/event/discriminant.rs +++ b/src/event/discriminant.rs @@ -3,7 +3,7 @@ use std::{ collections::BTreeMap, hash::{Hash, Hasher}, }; -use string_cache::DefaultAtom as Atom; + // TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here // would be much easier. The issue is with `f64` type. We should consider using @@ -24,9 +24,9 @@ pub struct Discriminant { impl Discriminant { /// Create a new Discriminant from the `LogEvent` and an ordered slice of /// fields to include into a discriminant value. - pub fn from_log_event(event: &LogEvent, discriminant_fields: &[Atom]) -> Self { + pub fn from_log_event(event: &LogEvent, discriminant_fields: &Vec) -> Self { let values: Vec> = discriminant_fields - .iter() + .into_iter() .map(|discriminant_field| event.get(discriminant_field).cloned()) .collect(); Self { values } @@ -174,7 +174,7 @@ mod tests { let mut event_2 = event_1.clone(); event_2.insert("irrelevant", "does not matter if it's different"); - let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")]; + let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -191,7 +191,7 @@ mod tests { let mut event_2 = event_1.clone(); event_2.insert("container_id", "def"); - let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")]; + let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -209,7 +209,7 @@ mod tests { event_2.insert("b", "b"); event_2.insert("a", "a"); - let discriminant_fields = vec![Atom::from("a"), Atom::from("b")]; + let discriminant_fields = vec!["a".to_string(), "b".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -227,7 +227,7 @@ mod tests { event_2.insert("nested.b", "b"); event_2.insert("nested.a", "a"); - let discriminant_fields = vec![Atom::from("nested")]; + let discriminant_fields = vec!["nested".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -245,7 +245,7 @@ mod tests { event_2.insert("array[1]", "b"); event_2.insert("array[0]", "a"); - let discriminant_fields = vec![Atom::from("array")]; + let discriminant_fields = vec!["array".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -260,7 +260,7 @@ mod tests { event_1.insert("nested.a", "a"); // `nested` is a `Value::Map` let event_2 = LogEvent::default(); // empty event - let discriminant_fields = vec![Atom::from("nested")]; + let discriminant_fields = vec!["nested".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -276,7 +276,7 @@ mod tests { let mut event_2 = LogEvent::default(); event_2.insert("nested", "x"); // `nested` is a `Value::String` - let discriminant_fields = vec![Atom::from("nested")]; + let discriminant_fields = vec!["nested".to_string()]; let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields); let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields); @@ -308,7 +308,7 @@ mod tests { LogEvent::default() }; - let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")]; + let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()]; let mut process_event = |event| { let discriminant = Discriminant::from_log_event(&event, &discriminant_fields); diff --git a/src/event/log_event.rs b/src/event/log_event.rs index c215ff10171f1..10476c7105d08 100644 --- a/src/event/log_event.rs +++ b/src/event/log_event.rs @@ -6,7 +6,6 @@ use std::{ iter::FromIterator, fmt::{Debug, Display}, }; -use string_cache::DefaultAtom; #[derive(PartialEq, Debug, Clone, Default)] pub struct LogEvent { @@ -192,16 +191,16 @@ impl std::ops::Index<&str> for LogEvent { } } -impl, V: Into> Extend<(K, V)> for LogEvent { +impl, V: Into> Extend<(K, V)> for LogEvent { fn extend>(&mut self, iter: I) { for (k, v) in iter { - self.insert(k.into(), v.into()); + self.insert(k.as_ref(), v.into()); } } } // Allow converting any kind of appropriate key/value iterator directly into a LogEvent. -impl, V: Into> FromIterator<(K, V)> for LogEvent { +impl, V: Into> FromIterator<(K, V)> for LogEvent { fn from_iter>(iter: T) -> Self { let mut log_event = LogEvent::default(); log_event.extend(iter); diff --git a/src/event/lookup/mod.rs b/src/event/lookup/mod.rs index 2f34820eabbc9..f686cb499676e 100644 --- a/src/event/lookup/mod.rs +++ b/src/event/lookup/mod.rs @@ -217,15 +217,6 @@ impl From<&str> for Lookup { } } -impl From for Lookup { - fn from(input: string_cache::DefaultAtom) -> Self { - Self { - segments: vec![Segment::field(input.to_string())], - } - // We know this must be at least one segment. - } -} - impl Index for Lookup { type Output = Segment; diff --git a/src/event/merge.rs b/src/event/merge.rs index d4a64c69772ce..7905ff9531b92 100644 --- a/src/event/merge.rs +++ b/src/event/merge.rs @@ -1,9 +1,9 @@ use crate::event::{LogEvent, Value}; use bytes::BytesMut; -use string_cache::DefaultAtom as Atom; + /// Merges all fields specified at `fields` from `incoming` to `current`. -pub fn merge_log_event(current: &mut LogEvent, mut incoming: LogEvent, fields: &[Atom]) { +pub fn merge_log_event<'a>(current: &mut LogEvent, mut incoming: LogEvent, fields: Vec) { for field in fields { let incoming_val = match incoming.remove(field) { None => continue, @@ -65,11 +65,11 @@ mod test { // Specify the fields that will be merged. // Only the ones listed will be merged from the `incoming` event // to the `current`. - let fields_to_merge = [ - Atom::from("merge"), - Atom::from("merge_a"), - Atom::from("merge_b"), - Atom::from("merge_c"), + let fields_to_merge = vec![ + "merge".to_string(), + "merge_a".to_string(), + "merge_b".to_string(), + "merge_c".to_string(), ]; let current = { @@ -108,7 +108,7 @@ mod test { }; let mut merged = current; - merge_log_event(&mut merged, incoming, &fields_to_merge); + merge_log_event(&mut merged, incoming, fields_to_merge); let expected = { let mut log = LogEvent::default(); diff --git a/src/event/merge_state.rs b/src/event/merge_state.rs index d52c8165a5288..a08dd2ee69ac5 100644 --- a/src/event/merge_state.rs +++ b/src/event/merge_state.rs @@ -1,6 +1,6 @@ use crate::event::merge::merge_log_event; use crate::event::LogEvent; -use string_cache::DefaultAtom as Atom; + /// Encapsulates the inductive events merging algorithm. /// @@ -22,13 +22,13 @@ impl LogEventMergeState { } /// Merge the incoming (partial) event in. - pub fn merge_in_next_event(&mut self, incoming: LogEvent, fields: &[Atom]) { + pub fn merge_in_next_event<'a>(&mut self, incoming: LogEvent, fields: Vec) { merge_log_event(&mut self.intermediate_merged_event, incoming, fields); } /// Merge the final (non-partial) event in and return the resulting (merged) /// event. - pub fn merge_in_final_event(mut self, incoming: LogEvent, fields: &[Atom]) -> LogEvent { + pub fn merge_in_final_event<'a>(mut self, incoming: LogEvent, fields: Vec) -> LogEvent { self.merge_in_next_event(incoming, fields); self.intermediate_merged_event } @@ -38,7 +38,7 @@ impl LogEventMergeState { mod test { use super::LogEventMergeState; use crate::event::{Event, LogEvent}; - use string_cache::DefaultAtom as Atom; + fn log_event_with_message(message: &str) -> LogEvent { Event::from(message).into_log() @@ -46,7 +46,7 @@ mod test { #[test] fn log_event_merge_state_example() { - let fields = &[Atom::from("message")]; + let fields = vec!["message".to_string()]; let mut state = LogEventMergeState::new(log_event_with_message("hel")); state.merge_in_next_event(log_event_with_message("lo "), fields); @@ -54,7 +54,7 @@ mod test { assert_eq!( merged_event - .get(&Atom::from("message")) + .get("message") .unwrap() .as_bytes() .as_ref(), diff --git a/src/event/mod.rs b/src/event/mod.rs index 0e1c66dcb8ae6..6713a7d5d3cc0 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -2,9 +2,8 @@ use self::proto::{event_wrapper::Event as EventProto, metric::Value as MetricPro use crate::config::log_schema; use bytes::Bytes; use chrono::{DateTime, SecondsFormat, TimeZone, Utc}; -use lazy_static::lazy_static; use std::collections::{BTreeMap, HashMap}; -use string_cache::DefaultAtom as Atom; + pub mod discriminant; pub mod merge; @@ -28,11 +27,7 @@ pub mod proto { include!(concat!(env!("OUT_DIR"), "/event.proto.rs")); } -pub const PARTIAL_STR: &str = "_partial"; // TODO: clean up the _STR suffix after we get rid of atoms - -lazy_static! { - pub static ref PARTIAL: Atom = Atom::from(PARTIAL_STR); -} +pub const PARTIAL: &str = "_partial"; #[derive(PartialEq, Debug, Clone)] pub enum Event { @@ -388,7 +383,7 @@ impl From for Event { .insert(log_schema().message_key(), message); event .as_mut_log() - .insert(Atom::from(log_schema().timestamp_key()), Utc::now()); + .insert(log_schema().timestamp_key(), Utc::now()); event } @@ -434,7 +429,7 @@ mod test { "message": "raw log line", "foo": "bar", "bar": "baz", - "timestamp": event.as_log().get(&Atom::from(log_schema().timestamp_key())), + "timestamp": event.as_log().get(log_schema().timestamp_key()), }); let actual_all = serde_json::to_value(event.as_log().all_fields()).unwrap(); @@ -498,9 +493,9 @@ mod test { fn event_iteration_order() { let mut event = Event::new_empty_log(); let log = event.as_mut_log(); - log.insert(&Atom::from("lZDfzKIL"), Value::from("tOVrjveM")); - log.insert(&Atom::from("o9amkaRY"), Value::from("pGsfG7Nr")); - log.insert(&Atom::from("YRjhxXcg"), Value::from("nw8iM5Jr")); + log.insert("lZDfzKIL", Value::from("tOVrjveM")); + log.insert("o9amkaRY", Value::from("pGsfG7Nr")); + log.insert("YRjhxXcg", Value::from("nw8iM5Jr")); let collected: Vec<_> = log.all_fields().collect(); assert_eq!( diff --git a/src/internal_events/ansi_stripper.rs b/src/internal_events/ansi_stripper.rs index 8386f44990a5b..9ebf8e588c7a6 100644 --- a/src/internal_events/ansi_stripper.rs +++ b/src/internal_events/ansi_stripper.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use metrics::counter; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub struct ANSIStripperEventProcessed; @@ -13,7 +13,7 @@ impl InternalEvent for ANSIStripperEventProcessed { #[derive(Debug)] pub struct ANSIStripperFieldMissing<'a> { - pub field: &'a Atom, + pub field: &'a str, } impl InternalEvent for ANSIStripperFieldMissing<'_> { @@ -32,7 +32,7 @@ impl InternalEvent for ANSIStripperFieldMissing<'_> { #[derive(Debug)] pub struct ANSIStripperFieldInvalid<'a> { - pub field: &'a Atom, + pub field: &'a str, } impl InternalEvent for ANSIStripperFieldInvalid<'_> { @@ -51,7 +51,7 @@ impl InternalEvent for ANSIStripperFieldInvalid<'_> { #[derive(Debug)] pub struct ANSIStripperFailed<'a> { - pub field: &'a Atom, + pub field: &'a str, pub error: std::io::Error, } diff --git a/src/internal_events/elasticsearch.rs b/src/internal_events/elasticsearch.rs index 20c9d406a0ba6..fd442ecf2049b 100644 --- a/src/internal_events/elasticsearch.rs +++ b/src/internal_events/elasticsearch.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use metrics::counter; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub struct ElasticSearchEventReceived { @@ -21,7 +21,7 @@ impl InternalEvent for ElasticSearchEventReceived { #[derive(Debug)] pub struct ElasticSearchMissingKeys { - pub keys: Vec, + pub keys: Vec, } impl InternalEvent for ElasticSearchMissingKeys { diff --git a/src/internal_events/json_parser.rs b/src/internal_events/json_parser.rs index 0269b0df32470..63e57d6c03b1c 100644 --- a/src/internal_events/json_parser.rs +++ b/src/internal_events/json_parser.rs @@ -1,7 +1,7 @@ use super::InternalEvent; use metrics::counter; use serde_json::Error; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub(crate) struct JsonParserEventProcessed; @@ -18,7 +18,7 @@ impl InternalEvent for JsonParserEventProcessed { #[derive(Debug)] pub(crate) struct JsonParserFailedParse<'a> { - pub field: &'a Atom, + pub field: &'a str, pub value: &'a str, pub error: Error, } @@ -43,7 +43,7 @@ impl<'a> InternalEvent for JsonParserFailedParse<'a> { #[derive(Debug)] pub(crate) struct JsonParserTargetExists<'a> { - pub target_field: &'a Atom, + pub target_field: &'a str, } impl<'a> InternalEvent for JsonParserTargetExists<'a> { diff --git a/src/internal_events/log_to_metric.rs b/src/internal_events/log_to_metric.rs index a21dea8802a97..fd14437aa5ddb 100644 --- a/src/internal_events/log_to_metric.rs +++ b/src/internal_events/log_to_metric.rs @@ -1,7 +1,6 @@ use super::InternalEvent; use metrics::counter; use std::num::ParseFloatError; -use string_cache::DefaultAtom; pub(crate) struct LogToMetricEventProcessed; @@ -16,7 +15,7 @@ impl InternalEvent for LogToMetricEventProcessed { } pub(crate) struct LogToMetricFieldNotFound { - pub field: DefaultAtom, + pub field: String, } impl InternalEvent for LogToMetricFieldNotFound { @@ -35,12 +34,12 @@ impl InternalEvent for LogToMetricFieldNotFound { } } -pub(crate) struct LogToMetricParseFloatError { - pub field: DefaultAtom, +pub(crate) struct LogToMetricParseFloatError<'a> { + pub field: &'a str, pub error: ParseFloatError, } -impl InternalEvent for LogToMetricParseFloatError { +impl<'a> InternalEvent for LogToMetricParseFloatError<'a> { fn emit_logs(&self) { warn!( message = "Failed to parse field as float.", diff --git a/src/internal_events/regex_parser.rs b/src/internal_events/regex_parser.rs index e1606ff15e18b..261c755625bf1 100644 --- a/src/internal_events/regex_parser.rs +++ b/src/internal_events/regex_parser.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use metrics::counter; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub(crate) struct RegexParserEventProcessed; @@ -36,7 +36,7 @@ impl InternalEvent for RegexParserFailedMatch<'_> { #[derive(Debug)] pub(crate) struct RegexParserMissingField<'a> { - pub field: &'a Atom, + pub field: &'a str, } impl InternalEvent for RegexParserMissingField<'_> { @@ -51,7 +51,7 @@ impl InternalEvent for RegexParserMissingField<'_> { #[derive(Debug)] pub(crate) struct RegexParserTargetExists<'a> { - pub target_field: &'a Atom, + pub target_field: &'a str, } impl<'a> InternalEvent for RegexParserTargetExists<'a> { @@ -70,7 +70,7 @@ impl<'a> InternalEvent for RegexParserTargetExists<'a> { #[derive(Debug)] pub(crate) struct RegexParserConversionFailed<'a> { - pub name: &'a Atom, + pub name: &'a str, pub error: crate::types::Error, } diff --git a/src/internal_events/split.rs b/src/internal_events/split.rs index f969b4c4546a7..127c37a2d36cb 100644 --- a/src/internal_events/split.rs +++ b/src/internal_events/split.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use metrics::counter; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub struct SplitEventProcessed; @@ -13,7 +13,7 @@ impl InternalEvent for SplitEventProcessed { #[derive(Debug)] pub struct SplitFieldMissing<'a> { - pub field: &'a Atom, + pub field: &'a str, } impl<'a> InternalEvent for SplitFieldMissing<'a> { @@ -32,7 +32,7 @@ impl<'a> InternalEvent for SplitFieldMissing<'a> { #[derive(Debug)] pub struct SplitConvertFailed<'a> { - pub field: &'a Atom, + pub field: &'a str, pub error: crate::types::Error, } diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs index 061f4485eb46a..50e5904e0844d 100644 --- a/src/internal_events/splunk_hec.rs +++ b/src/internal_events/splunk_hec.rs @@ -1,7 +1,7 @@ use super::InternalEvent; use metrics::counter; use serde_json::Error; -use string_cache::DefaultAtom as Atom; + #[cfg(feature = "sources-splunk_hec")] pub(crate) use self::source::*; @@ -39,7 +39,7 @@ impl InternalEvent for SplunkEventEncodeError { #[derive(Debug)] pub struct SplunkSourceTypeMissingKeys { - pub keys: Vec, + pub keys: Vec, } impl InternalEvent for SplunkSourceTypeMissingKeys { @@ -58,7 +58,7 @@ impl InternalEvent for SplunkSourceTypeMissingKeys { #[derive(Debug)] pub struct SplunkSourceMissingKeys { - pub keys: Vec, + pub keys: Vec, } impl InternalEvent for SplunkSourceMissingKeys { diff --git a/src/internal_events/tokenizer.rs b/src/internal_events/tokenizer.rs index 6117e7d109bc2..197aa4c939429 100644 --- a/src/internal_events/tokenizer.rs +++ b/src/internal_events/tokenizer.rs @@ -1,6 +1,6 @@ use super::InternalEvent; use metrics::counter; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub(crate) struct TokenizerEventProcessed; @@ -13,7 +13,7 @@ impl InternalEvent for TokenizerEventProcessed { #[derive(Debug)] pub(crate) struct TokenizerFieldMissing<'a> { - pub field: &'a Atom, + pub field: &'a str, } impl<'a> InternalEvent for TokenizerFieldMissing<'a> { @@ -32,7 +32,7 @@ impl<'a> InternalEvent for TokenizerFieldMissing<'a> { #[derive(Debug)] pub(crate) struct TokenizerConvertFailed<'a> { - pub field: &'a Atom, + pub field: &'a str, pub error: crate::types::Error, } diff --git a/src/mapping/mod.rs b/src/mapping/mod.rs index 549080fd4661f..fd8adae062901 100644 --- a/src/mapping/mod.rs +++ b/src/mapping/mod.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; pub mod parser; pub mod query; -use string_cache::DefaultAtom as Atom; + pub type Result = std::result::Result; @@ -39,13 +39,13 @@ impl Function for Assignment { #[derive(Debug)] pub(self) struct Deletion { // TODO: Switch to String once Event API is cleaned up. - paths: Vec, + paths: Vec, } impl Deletion { pub(self) fn new(mut paths: Vec) -> Self { Self { - paths: paths.drain(..).map(Atom::from).collect(), + paths: paths.drain(..).collect(), } } } @@ -87,7 +87,7 @@ impl Function for OnlyFields { .collect(); for key in keys { - target_log.remove_prune(&Atom::from(key), true); + target_log.remove_prune(key, true); } Ok(()) @@ -196,14 +196,14 @@ where #[derive(Debug)] pub(in crate::mapping) struct MergeFn { - to_path: Atom, + to_path: String, from: Box, deep: Option>, } impl MergeFn { pub(in crate::mapping) fn new( - to_path: Atom, + to_path: String, from: Box, deep: Option>, ) -> Self { @@ -259,13 +259,13 @@ mod tests { ( { let mut event = Event::from("foo body"); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { let mut event = Event::from("foo body"); event.as_mut_log().insert("foo", Value::from("bar")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(Assignment::new( @@ -277,7 +277,7 @@ mod tests { ( { let mut event = Event::from("foo body"); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { @@ -285,7 +285,7 @@ mod tests { event .as_mut_log() .insert("foo bar\\.baz.buz", Value::from("quack")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(Assignment::new( @@ -297,13 +297,13 @@ mod tests { ( { let mut event = Event::from("foo body"); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event.as_mut_log().insert("foo", Value::from("bar")); event }, { let mut event = Event::from("foo body"); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(Deletion::new(vec!["foo".to_string()]))]), @@ -313,13 +313,13 @@ mod tests { { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("baz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { let mut event = Event::from("foo body"); event.as_mut_log().insert("foo", Value::from("bar")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![ @@ -335,14 +335,14 @@ mod tests { { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("baz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("baz")); event.as_mut_log().insert("foo", Value::from("bar is baz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(IfStatement::new( @@ -363,12 +363,12 @@ mod tests { { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("buz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { let mut event = Event::from("foo body"); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(IfStatement::new( @@ -389,13 +389,13 @@ mod tests { { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("buz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { let mut event = Event::from("foo body"); event.as_mut_log().insert("bar", Value::from("buz")); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(IfStatement::new( @@ -441,8 +441,8 @@ mod tests { event .as_mut_log() .insert("nested.and_here", Value::from("sixth")); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, Mapping::new(vec![Box::new(OnlyFields::new(vec![ @@ -471,7 +471,7 @@ mod tests { event .as_mut_log() .insert("bar", serde_json::json!({ "key2": "val2" })); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, { @@ -480,7 +480,7 @@ mod tests { event .as_mut_log() .insert("bar", serde_json::json!({ "key2": "val2" })); - event.as_mut_log().remove(&Atom::from("timestamp")); + event.as_mut_log().remove("timestamp"); event }, Mapping::new(vec![Box::new(MergeFn::new( @@ -502,8 +502,8 @@ mod tests { event .as_mut_log() .insert("bar", serde_json::json!({ "key2": "val2" })); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, { @@ -514,8 +514,8 @@ mod tests { event .as_mut_log() .insert("bar", serde_json::json!({ "key2": "val2" })); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, Mapping::new(vec![Box::new(MergeFn::new( @@ -546,8 +546,8 @@ mod tests { } }), ); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, { @@ -571,8 +571,8 @@ mod tests { } }), ); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, Mapping::new(vec![Box::new(MergeFn::new( @@ -603,8 +603,8 @@ mod tests { } }), ); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, { @@ -629,8 +629,8 @@ mod tests { } }), ); - event.as_mut_log().remove(&Atom::from("timestamp")); - event.as_mut_log().remove(&Atom::from("message")); + event.as_mut_log().remove("timestamp"); + event.as_mut_log().remove("message"); event }, Mapping::new(vec![Box::new(MergeFn::new( diff --git a/src/mapping/query/path.rs b/src/mapping/query/path.rs index f6296a8177c21..06041aec3f702 100644 --- a/src/mapping/query/path.rs +++ b/src/mapping/query/path.rs @@ -3,12 +3,12 @@ use crate::{ event::{util::log::get_value, Event, PathIter, Value}, mapping::Result, }; -use string_cache::DefaultAtom as Atom; + #[derive(Debug)] pub(in crate::mapping) struct Path { // TODO: Switch to String once Event API is cleaned up. - path: Vec>, + path: Vec>, } impl From<&str> for Path { @@ -27,7 +27,7 @@ impl From>> for Path { .iter() .map(|c| { c.iter() - .map(|p| Atom::from(p.replace(".", "\\."))) + .map(|p| p.replace(".", "\\.")) .collect() }) .collect(), @@ -43,7 +43,7 @@ impl From>> for Path { .iter() .map(|c| { c.iter() - .map(|p| Atom::from(p.replace(".", "\\."))) + .map(|p| p.replace(".", "\\.")) .collect() }) .collect(), diff --git a/src/serde.rs b/src/serde.rs index 266f7c11f52dc..883c5cf4ef916 100644 --- a/src/serde.rs +++ b/src/serde.rs @@ -1,6 +1,6 @@ use indexmap::map::IndexMap; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + pub fn default_true() -> bool { true @@ -32,10 +32,10 @@ pub enum FieldsOrValue { pub struct Fields(IndexMap>); impl Fields { - pub fn all_fields(self) -> impl Iterator { + pub fn all_fields(self) -> impl Iterator { self.0 .into_iter() - .map(|(k, v)| -> Box> { + .map(|(k, v)| -> Box> { match v { // boxing is used as a way to avoid incompatible types of the match arms FieldsOrValue::Value(v) => Box::new(std::iter::once((k.into(), v))), diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index d3ec3402e3d97..2d4721409e707 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -34,7 +34,7 @@ use std::{ fmt, task::{Context, Poll}, }; -use string_cache::DefaultAtom as Atom; + use tokio::sync::oneshot; use tower::{ buffer::Buffer, @@ -418,7 +418,7 @@ fn encode_log( mut log: LogEvent, encoding: &EncodingConfig, ) -> Result { - let timestamp = match log.remove(&Atom::from(log_schema().timestamp_key())) { + let timestamp = match log.remove(log_schema().timestamp_key()) { Some(Value::Timestamp(ts)) => ts.timestamp_millis(), _ => Utc::now().timestamp_millis(), }; @@ -426,7 +426,7 @@ fn encode_log( let message = match encoding.codec() { Encoding::Json => serde_json::to_string(&log).unwrap(), Encoding::Text => log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.to_string_lossy()) .unwrap_or_else(|| "".into()), }; @@ -677,7 +677,7 @@ mod tests { }; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; - use string_cache::DefaultAtom as Atom; + #[test] fn partition_static() { @@ -796,7 +796,7 @@ mod tests { event.insert("key", "value"); let encoded = encode_log(event.clone(), &Encoding::Json.into()).unwrap(); - let ts = if let Value::Timestamp(ts) = event[&Atom::from(log_schema().timestamp_key())] { + let ts = if let Value::Timestamp(ts) = event[log_schema().timestamp_key()] { ts.timestamp_millis() } else { panic!() @@ -810,8 +810,8 @@ mod tests { let mut event = Event::from("hello world").into_log(); event.insert("key", "value"); let encoded = encode_log(event, &Encoding::Json.into()).unwrap(); - let map: HashMap = serde_json::from_str(&encoded.message[..]).unwrap(); - assert!(map.get(&Atom::from(log_schema().timestamp_key())).is_none()); + let map: HashMap = serde_json::from_str(&encoded.message[..]).unwrap(); + assert!(map.get(log_schema().timestamp_key()).is_none()); } #[test] @@ -831,7 +831,7 @@ mod tests { let mut event = Event::new_empty_log(); event .as_mut_log() - .insert(&Atom::from(log_schema().timestamp_key()), timestamp); + .insert(log_schema().timestamp_key(), timestamp); encode_log(event.into_log(), &Encoding::Text.into()).unwrap() }) .collect(); @@ -942,7 +942,7 @@ mod integration_tests { let timestamp = chrono::Utc::now() - chrono::Duration::days(1); event.as_mut_log().insert( - Atom::from(log_schema().timestamp_key()), + log_schema().timestamp_key(), Value::Timestamp(timestamp), ); } @@ -1005,7 +1005,7 @@ mod integration_tests { let mut event = Event::from(line.clone()); event .as_mut_log() - .insert(Atom::from(log_schema().timestamp_key()), now + offset); + .insert(log_schema().timestamp_key(), now + offset); events.push(event); line }; diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index 9a531840e5433..50f1732823fd9 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -27,7 +27,7 @@ use std::{ fmt, task::{Context, Poll}, }; -use string_cache::DefaultAtom as Atom; + use tower::Service; use tracing_futures::Instrument; @@ -241,7 +241,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Option Encoding::Json => serde_json::to_vec(&log).expect("Error encoding event as json."), Encoding::Text => log - .get(&Atom::from(crate::config::log_schema().message_key())) + .get(crate::config::log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(), }; diff --git a/src/sinks/aws_kinesis_streams.rs b/src/sinks/aws_kinesis_streams.rs index 2353ace856f86..f584142f379bc 100644 --- a/src/sinks/aws_kinesis_streams.rs +++ b/src/sinks/aws_kinesis_streams.rs @@ -30,7 +30,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use string_cache::DefaultAtom as Atom; + use tower::Service; use tracing_futures::Instrument; @@ -44,7 +44,7 @@ pub struct KinesisService { #[serde(deny_unknown_fields)] pub struct KinesisSinkConfig { pub stream_name: String, - pub partition_key_field: Option, + pub partition_key_field: Option, #[serde(flatten)] pub region: RegionOrEndpoint, pub encoding: EncodingConfig, @@ -253,7 +253,7 @@ enum HealthcheckError { fn encode_event( mut event: Event, - partition_key_field: &Option, + partition_key_field: &Option, encoding: &EncodingConfig, ) -> Option { let partition_key = if let Some(partition_key_field) = partition_key_field { @@ -283,7 +283,7 @@ fn encode_event( let data = match encoding.codec() { Encoding::Json => serde_json::to_vec(&log).expect("Error encoding event as json."), Encoding::Text => log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(), }; @@ -362,7 +362,7 @@ mod tests { event.as_mut_log().insert("key", "some_key"); let mut encoding: EncodingConfig<_> = Encoding::Json.into(); - encoding.except_fields = Some(vec![Atom::from("key")]); + encoding.except_fields = Some(vec!["key".into()]); let event = encode_event(event, &Some("key".into()), &encoding).unwrap(); let map: BTreeMap = serde_json::from_slice(&event.data[..]).unwrap(); diff --git a/src/sinks/aws_s3.rs b/src/sinks/aws_s3.rs index d234c12701f51..ebf8c69a4ac2c 100644 --- a/src/sinks/aws_s3.rs +++ b/src/sinks/aws_s3.rs @@ -30,7 +30,7 @@ use std::collections::BTreeMap; use std::convert::{TryFrom, TryInto}; use std::task::Context; use std::task::Poll; -use string_cache::DefaultAtom as Atom; + use tower::{Service, ServiceBuilder}; use tracing_futures::Instrument; use uuid::Uuid; @@ -402,7 +402,7 @@ fn encode_event( .expect("Failed to encode event as json, this is a bug!"), Encoding::Text => { let mut bytes = log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(); bytes.push(b'\n'); diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index 3d393f3277ad2..76bde7e839514 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -23,7 +23,7 @@ use openssl::{base64, hash, pkey, sign}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; -use string_cache::DefaultAtom as Atom; + #[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(deny_unknown_fields)] @@ -156,7 +156,7 @@ impl HttpSink for AzureMonitorLogsSink { let mut log = event.into_log(); let timestamp_key = log_schema().timestamp_key(); - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove(&Atom::from(timestamp_key)) { + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove(timestamp_key) { ts } else { chrono::Utc::now() diff --git a/src/sinks/blackhole.rs b/src/sinks/blackhole.rs index 291bef2905891..dbad043460b4d 100644 --- a/src/sinks/blackhole.rs +++ b/src/sinks/blackhole.rs @@ -9,7 +9,7 @@ use crate::{ use async_trait::async_trait; use futures::{future, stream::BoxStream, FutureExt, StreamExt}; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + pub struct BlackholeSink { total_events: usize, @@ -68,7 +68,7 @@ impl StreamSink for BlackholeSink { while let Some(event) = input.next().await { let message_len = match event { Event::Log(log) => log - .get(&Atom::from(crate::config::log_schema().message_key())) + .get(crate::config::log_schema().message_key()) .map(|v| v.as_bytes().len()) .unwrap_or(0), Event::Metric(metric) => { diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 77f6eccb609c7..1dff4467a5f7c 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -255,7 +255,7 @@ mod integration_tests { }; use futures::{future, stream}; use serde_json::Value; - use string_cache::DefaultAtom as Atom; + use tokio::time::{timeout, Duration}; #[tokio::test] @@ -352,7 +352,7 @@ mod integration_tests { format!( "{}", exp_event - .get(&Atom::from(log_schema().timestamp_key())) + .get(log_schema().timestamp_key()) .unwrap() .as_timestamp() .unwrap() @@ -412,7 +412,7 @@ timestamp_format = "unix""#, format!( "{}", exp_event - .get(&Atom::from(log_schema().timestamp_key())) + .get(log_schema().timestamp_key()) .unwrap() .as_timestamp() .unwrap() diff --git a/src/sinks/console.rs b/src/sinks/console.rs index 4dc557848991f..1dffa025f6b4a 100644 --- a/src/sinks/console.rs +++ b/src/sinks/console.rs @@ -15,7 +15,7 @@ use futures::{ FutureExt, }; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + use tokio::io::{self, AsyncWriteExt}; #[derive(Debug, Derivative, Deserialize, Serialize)] @@ -94,7 +94,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Option .ok(), Encoding::Text => { let field = crate::config::log_schema().message_key(); - match log.get(&Atom::from(field)) { + match log.get(field) { Some(v) => Some(v.to_string_lossy()), None => { emit!(ConsoleFieldNotFound { diff --git a/src/sinks/datadog/logs.rs b/src/sinks/datadog/logs.rs index 831323e9973a3..bb00764b1ac35 100644 --- a/src/sinks/datadog/logs.rs +++ b/src/sinks/datadog/logs.rs @@ -10,7 +10,7 @@ use crate::{ use bytes::Bytes; use futures01::{stream::iter_ok, Sink}; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + #[derive(Deserialize, Serialize, Debug)] #[serde(deny_unknown_fields)] @@ -77,15 +77,15 @@ fn encode_event( ) -> Option { let log = event.as_mut_log(); - if let Some(message) = log.remove(&Atom::from(log_schema().message_key())) { + if let Some(message) = log.remove(log_schema().message_key()) { log.insert("message", message); } - if let Some(timestamp) = log.remove(&Atom::from(log_schema().timestamp_key())) { + if let Some(timestamp) = log.remove(log_schema().timestamp_key()) { log.insert("date", timestamp); } - if let Some(host) = log.remove(&Atom::from(log_schema().host_key())) { + if let Some(host) = log.remove(log_schema().host_key()) { log.insert("host", host); } diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index 9e3596cbafaf2..833b9ea937d4e 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -498,7 +498,7 @@ mod tests { use http::{Response, StatusCode}; use pretty_assertions::assert_eq; use serde_json::json; - use string_cache::DefaultAtom as Atom; + #[test] fn generate_config() { @@ -515,7 +515,7 @@ mod tests { maybe_set_id(id_key, &mut action, &mut event); assert_eq!(json!({"_id": "bar"}), action); - assert_eq!(None, event.as_log().get(&Atom::from("foo"))); + assert_eq!(None, event.as_log().get("foo")); } #[test] @@ -562,7 +562,7 @@ mod tests { index: Some(String::from("{{ idx }}")), encoding: EncodingConfigWithDefault { codec: Encoding::Default, - except_fields: Some(vec![Atom::from("idx"), Atom::from("timestamp")]), + except_fields: Some(vec!["idx".to_string(), "timestamp".to_string()]), ..Default::default() }, endpoint: String::from("https://example.com"), @@ -600,7 +600,7 @@ mod integration_tests { use serde_json::{json, Value}; use std::fs::File; use std::io::Read; - use string_cache::DefaultAtom as Atom; + #[test] fn ensure_pipeline_in_params() { @@ -670,7 +670,7 @@ mod integration_tests { let expected = json!({ "message": "raw log line", "foo": "bar", - "timestamp": input_event.as_log()[&Atom::from(crate::config::log_schema().timestamp_key())], + "timestamp": input_event.as_log()[crate::config::log_schema().timestamp_key()], }); assert_eq!(expected, value); } diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index e23cc98012082..6799833458851 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -19,7 +19,7 @@ use futures::{ }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; -use string_cache::DefaultAtom as Atom; + use tokio::{ fs::{self, File}, io::AsyncWriteExt, @@ -303,7 +303,7 @@ pub fn encode_event(encoding: &EncodingConfigWithDefault, mut event: E match encoding.codec() { Encoding::Ndjson => serde_json::to_vec(&log).expect("Unable to encode event as JSON."), Encoding::Text => log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.to_string_lossy().into_bytes()) .unwrap_or_default(), } @@ -438,35 +438,35 @@ mod tests { ]; assert_eq!( - input[0].as_log()[&Atom::from(log_schema().message_key())], + input[0].as_log()[log_schema().message_key()], From::<&str>::from(&output[0][0]) ); assert_eq!( - input[1].as_log()[&Atom::from(log_schema().message_key())], + input[1].as_log()[log_schema().message_key()], From::<&str>::from(&output[1][0]) ); assert_eq!( - input[2].as_log()[&Atom::from(log_schema().message_key())], + input[2].as_log()[log_schema().message_key()], From::<&str>::from(&output[0][1]) ); assert_eq!( - input[3].as_log()[&Atom::from(log_schema().message_key())], + input[3].as_log()[log_schema().message_key()], From::<&str>::from(&output[3][0]) ); assert_eq!( - input[4].as_log()[&Atom::from(log_schema().message_key())], + input[4].as_log()[log_schema().message_key()], From::<&str>::from(&output[2][0]) ); assert_eq!( - input[5].as_log()[&Atom::from(log_schema().message_key())], + input[5].as_log()[log_schema().message_key()], From::<&str>::from(&output[2][1]) ); assert_eq!( - input[6].as_log()[&Atom::from(log_schema().message_key())], + input[6].as_log()[log_schema().message_key()], From::<&str>::from(&output[4][0]) ); assert_eq!( - input[7].as_log()[&Atom::from(log_schema().message_key())], + input[7].as_log()[log_schema().message_key()], From::<&str>::from(&output[5][0]) ); } diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 40447fa27b9c6..c2d6599186787 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -31,7 +31,7 @@ use snafu::{ResultExt, Snafu}; use std::collections::HashMap; use std::convert::TryFrom; use std::task::Poll; -use string_cache::DefaultAtom as Atom; + use tower::{Service, ServiceBuilder}; use uuid::Uuid; @@ -417,7 +417,7 @@ fn encode_event( .expect("Failed to encode event as json, this is a bug!"), Encoding::Text => { let mut bytes = log - .get(&Atom::from(crate::config::log_schema().message_key())) + .get(crate::config::log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(); bytes.push(b'\n'); diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index 693d4355a3559..9283ac09ecdd0 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, map}; use snafu::Snafu; use std::collections::HashMap; -use string_cache::DefaultAtom as Atom; + #[derive(Debug, Snafu)] enum HealthcheckError { @@ -59,7 +59,7 @@ pub struct StackdriverConfig { struct StackdriverSink { config: StackdriverConfig, creds: Option, - severity_key: Option, + severity_key: Option, } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] @@ -126,10 +126,7 @@ impl SinkConfig for StackdriverConfig { let sink = StackdriverSink { config: self.clone(), creds, - severity_key: self - .severity_key - .as_ref() - .map(|key| Atom::from(key.as_str())), + severity_key: self.severity_key.clone(), }; let healthcheck = healthcheck(client.clone(), sink.clone()).boxed(); @@ -180,7 +177,7 @@ impl HttpSink for StackdriverSink { entry.insert("severity".into(), json!(severity)); // If the event contains a timestamp, send it in the main message so gcp can pick it up. - if let Some(timestamp) = log.get(&Atom::from(log_schema().timestamp_key())) { + if let Some(timestamp) = log.get(log_schema().timestamp_key()) { entry.insert("timestamp".into(), json!(timestamp)); } diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs index 436d8527fcf2a..2e2819e92d37e 100644 --- a/src/sinks/honeycomb.rs +++ b/src/sinks/honeycomb.rs @@ -11,7 +11,7 @@ use futures01::Sink; use http::{Request, StatusCode, Uri}; use serde::{Deserialize, Serialize}; use serde_json::json; -use string_cache::DefaultAtom as Atom; + lazy_static::lazy_static! { static ref HOST: UriSerde = Uri::from_static("https://api.honeycomb.io/1/batch").into(); @@ -89,7 +89,7 @@ impl HttpSink for HoneycombConfig { let mut log = event.into_log(); let timestamp = if let Some(Value::Timestamp(ts)) = - log.remove(&Atom::from(log_schema().timestamp_key())) + log.remove(log_schema().timestamp_key()) { ts } else { diff --git a/src/sinks/http.rs b/src/sinks/http.rs index f8de919460478..ddcbd7bedb6c7 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -20,7 +20,7 @@ use indexmap::IndexMap; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; -use string_cache::DefaultAtom as Atom; + #[derive(Debug, Snafu)] enum BuildError { @@ -164,7 +164,7 @@ impl HttpSink for HttpSinkConfig { let body = match &self.encoding.codec() { Encoding::Text => { - if let Some(v) = event.get(&Atom::from(crate::config::log_schema().message_key())) { + if let Some(v) = event.get(crate::config::log_schema().message_key()) { let mut b = v.to_string_lossy().into_bytes(); b.push(b'\n'); b diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 97d717d867a25..1ed8127c2aa1f 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -19,7 +19,7 @@ use http::{Request, Uri}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap, HashSet}; -use string_cache::DefaultAtom as Atom; + #[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(deny_unknown_fields)] @@ -152,7 +152,7 @@ impl HttpSink for InfluxDBLogsSink { // Timestamp let timestamp = encode_timestamp( - match event.remove(&Atom::from(log_schema().timestamp_key())) { + match event.remove(log_schema().timestamp_key()) { Some(Value::Timestamp(ts)) => Some(ts), _ => None, }, @@ -231,7 +231,7 @@ mod tests { }; use chrono::{offset::TimeZone, Utc}; use futures::{stream, StreamExt}; - use string_cache::DefaultAtom as Atom; + #[test] fn test_config_without_tags() { @@ -259,7 +259,7 @@ mod tests { "ns", vec![], ); - sink.encoding.except_fields = Some(vec![Atom::from("host")]); + sink.encoding.except_fields = Some(vec!["host".into()]); let bytes = sink.encode_event(event).unwrap(); let string = std::str::from_utf8(&bytes).unwrap(); diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs index 886e35875a92a..cd9ceb14a200b 100644 --- a/src/sinks/kafka.rs +++ b/src/sinks/kafka.rs @@ -21,7 +21,7 @@ use snafu::{ResultExt, Snafu}; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::time::Duration; -use string_cache::DefaultAtom as Atom; + type MetadataFuture = future01::Join::Error>>; @@ -37,7 +37,7 @@ enum BuildError { pub struct KafkaSinkConfig { bootstrap_servers: String, topic: String, - key_field: Option, + key_field: Option, encoding: EncodingConfigWithDefault, #[serde(default)] compression: KafkaCompression, @@ -70,7 +70,7 @@ pub enum Encoding { pub struct KafkaSink { producer: FutureProducer, topic: Template, - key_field: Option, + key_field: Option, encoding: EncodingConfig, in_flight: FuturesUnordered, usize>>, @@ -159,7 +159,7 @@ impl Sink for KafkaSink { let mut record = FutureRecord::to(&topic).key(&key).payload(&body[..]); if let Some(Value::Timestamp(timestamp)) = - item.as_log().get(&Atom::from(log_schema().timestamp_key())) + item.as_log().get(log_schema().timestamp_key()) { record = record.timestamp(timestamp.timestamp_millis()); } @@ -259,7 +259,7 @@ async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> { fn encode_event( mut event: Event, - key_field: &Option, + key_field: &Option, encoding: &EncodingConfig, ) -> (Vec, Vec) { let key = key_field @@ -274,7 +274,7 @@ fn encode_event( Encoding::Json => serde_json::to_vec(&event.as_log()).unwrap(), Encoding::Text => event .as_log() - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(), }; @@ -333,7 +333,7 @@ mod tests { &Some("key".into()), &EncodingConfigWithDefault { codec: Encoding::Json, - except_fields: Some(vec![Atom::from("key")]), + except_fields: Some(vec!["key".into()]), ..Default::default() } .into(), diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index b349c1095ca1b..86d9a36d27257 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -13,7 +13,7 @@ use http::{Request, StatusCode, Uri}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::time::SystemTime; -use string_cache::DefaultAtom as Atom; + lazy_static::lazy_static! { static ref HOST: UriSerde = Uri::from_static("https://logs.logdna.com").into(); @@ -113,10 +113,10 @@ impl HttpSink for LogdnaConfig { let mut log = event.into_log(); let line = log - .remove(&Atom::from(crate::config::log_schema().message_key())) + .remove(crate::config::log_schema().message_key()) .unwrap_or_else(|| String::from("").into()); let timestamp = log - .remove(&Atom::from(crate::config::log_schema().timestamp_key())) + .remove(crate::config::log_schema().timestamp_key()) .unwrap_or_else(|| chrono::Utc::now().into()); let mut map = serde_json::map::Map::new(); diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 9bf3080e99b7b..a269d663fec40 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -29,7 +29,7 @@ use futures::FutureExt; use futures01::Sink; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use string_cache::DefaultAtom as Atom; + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -143,7 +143,7 @@ impl HttpSink for LokiConfig { let timestamp = match event .as_log() - .get(&Atom::from(log_schema().timestamp_key())) + .get(log_schema().timestamp_key()) { Some(event::Value::Timestamp(ts)) => ts.timestamp_nanos(), _ => chrono::Utc::now().timestamp_nanos(), @@ -152,7 +152,7 @@ impl HttpSink for LokiConfig { if self.remove_timestamp { event .as_mut_log() - .remove(&Atom::from(log_schema().timestamp_key())); + .remove(log_schema().timestamp_key()); } self.encoding.apply_rules(&mut event); @@ -162,7 +162,7 @@ impl HttpSink for LokiConfig { Encoding::Text => event .as_log() - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(Value::to_string_lossy) .unwrap_or_default(), }; diff --git a/src/sinks/papertrail.rs b/src/sinks/papertrail.rs index e546e4d2c8c05..f72fcf535f943 100644 --- a/src/sinks/papertrail.rs +++ b/src/sinks/papertrail.rs @@ -11,7 +11,7 @@ use crate::{ use bytes::Bytes; use futures01::{stream::iter_ok, Sink}; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + use syslog::{Facility, Formatter3164, LogFormat, Severity}; #[derive(Deserialize, Serialize, Debug)] @@ -78,7 +78,7 @@ impl SinkConfig for PapertrailConfig { fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) -> Option { let host = if let Some(host) = event .as_mut_log() - .remove(&Atom::from(log_schema().host_key())) + .remove(log_schema().host_key()) { Some(host.to_string_lossy()) } else { @@ -100,7 +100,7 @@ fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) let message = match encoding.codec() { Encoding::Json => serde_json::to_string(&log).unwrap(), Encoding::Text => log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.to_string_lossy()) .unwrap_or_default(), }; @@ -117,7 +117,7 @@ fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) #[cfg(test)] mod tests { use super::*; - use string_cache::DefaultAtom as Atom; + #[test] fn encode_event_apply_rules() { @@ -130,7 +130,7 @@ mod tests { &EncodingConfig { codec: Encoding::Json, only_fields: None, - except_fields: Some(vec![Atom::from("magic")]), + except_fields: Some(vec!["magic".into()]), timestamp_format: None, }, ) diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 665597a95ead8..c783e4324070b 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -15,7 +15,7 @@ use pulsar::{ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::{collections::HashSet, sync::Arc}; -use string_cache::DefaultAtom as Atom; + type MetadataFuture = future::Join::Error>>; @@ -196,7 +196,7 @@ fn encode_event(mut item: Event, encoding: &EncodingConfig) -> crate:: Ok(match encoding.codec() { Encoding::Json => serde_json::to_vec(&log)?, Encoding::Text => log - .get(&Atom::from(log_schema().message_key())) + .get(log_schema().message_key()) .map(|v| v.as_bytes().to_vec()) .unwrap_or_default(), }) @@ -206,7 +206,7 @@ fn encode_event(mut item: Event, encoding: &EncodingConfig) -> crate:: mod tests { use super::*; use std::collections::HashMap; - use string_cache::DefaultAtom as Atom; + #[test] fn pulsar_event_json() { @@ -238,7 +238,7 @@ mod tests { evt, &EncodingConfigWithDefault { codec: Encoding::Json, - except_fields: Some(vec![Atom::from("key")]), + except_fields: Some(vec!["key".into()]), ..Default::default() } .into(), diff --git a/src/sinks/sematext_logs.rs b/src/sinks/sematext_logs.rs index 1f0485dae99a1..0ca3758cc0dd4 100644 --- a/src/sinks/sematext_logs.rs +++ b/src/sinks/sematext_logs.rs @@ -8,7 +8,7 @@ use crate::{ }; use futures01::{Future, Sink}; use serde::{Deserialize, Serialize}; -use string_cache::DefaultAtom as Atom; + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SematextLogsConfig { @@ -92,11 +92,11 @@ impl SinkConfig for SematextLogsConfig { fn map_timestamp(mut event: Event) -> impl Future { let log = event.as_mut_log(); - if let Some(ts) = log.remove(&Atom::from(crate::config::log_schema().timestamp_key())) { + if let Some(ts) = log.remove(crate::config::log_schema().timestamp_key()) { log.insert("@timestamp", ts); } - if let Some(host) = log.remove(&Atom::from(crate::config::log_schema().host_key())) { + if let Some(host) = log.remove(crate::config::log_schema().host_key()) { log.insert("os.host", host); } diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index 77474f598e2a8..1a8c1882bfa2a 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use snafu::{ResultExt, Snafu}; use std::convert::TryFrom; -use string_cache::DefaultAtom as Atom; + #[derive(Debug, Snafu)] pub enum BuildError { @@ -40,7 +40,7 @@ pub struct HecSinkConfig { #[serde(default = "default_host_key")] pub host_key: String, #[serde(default)] - pub indexed_fields: Vec, + pub indexed_fields: Vec, pub index: Option, pub sourcetype: Option