Skip to content

Commit

Permalink
chore: string paths should not be used outside test code
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Jul 31, 2023
1 parent 8a2f8f6 commit 292113f
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 120 deletions.
15 changes: 8 additions & 7 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};

use super::{default_lossy, Deserializer};
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};

/// On GELF decoding behavior:
Expand Down Expand Up @@ -123,11 +124,11 @@ impl GelfDeserializer {
.into());
}

log.insert(VERSION, parsed.version.to_string());
log.insert(HOST, parsed.host.to_string());
log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());

if let Some(full_message) = &parsed.full_message {
log.insert(FULL_MESSAGE, full_message.to_string());
log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
Expand All @@ -145,19 +146,19 @@ impl GelfDeserializer {
}

if let Some(level) = parsed.level {
log.insert(LEVEL, level);
log.insert(&GELF_TARGET_PATHS.level, level);
}
if let Some(facility) = &parsed.facility {
log.insert(FACILITY, facility.to_string());
log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
}
if let Some(line) = parsed.line {
log.insert(
LINE,
&GELF_TARGET_PATHS.line,
Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
);
}
if let Some(file) = &parsed.file {
log.insert(FILE, file.to_string());
log.insert(&GELF_TARGET_PATHS.file, file.to_string());
}

if let Some(add) = &parsed.additional_fields {
Expand Down
18 changes: 8 additions & 10 deletions lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
use bytes::{BufMut, BytesMut};
use lookup::event_path;
Expand All @@ -12,7 +13,6 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -131,20 +131,18 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
}

// add the VERSION if it does not exist
if !log.contains(VERSION) {
log.insert(VERSION, GELF_VERSION);
if !log.contains(&GELF_TARGET_PATHS.version) {
log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
}

if !log.contains(HOST) {
if !log.contains(&GELF_TARGET_PATHS.host) {
err_missing_field(HOST)?;
}

if !log.contains(SHORT_MESSAGE) {
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
if !log.contains(&GELF_TARGET_PATHS.short_message) {
if let Some(message_key) = log_schema().message_key_target_path() {
if log.contains(message_key) {
log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
Expand Down
27 changes: 26 additions & 1 deletion lib/codecs/src/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

use once_cell::sync::Lazy;
use regex::Regex;
use vrl::owned_value_path;
use vrl::path::OwnedTargetPath;

/// GELF Message fields. Definitions from <https://docs.graylog.org/docs/gelf>.
pub mod gelf_fields {

/// (not a field) The latest version of the GELF specification.
pub const GELF_VERSION: &str = "1.1";

Expand Down Expand Up @@ -40,6 +41,30 @@ pub mod gelf_fields {
// < Every field with an underscore (_) prefix will be treated as an additional field. >
}

/// GELF event paths.
pub(crate) struct GelfTargetPaths {
pub version: OwnedTargetPath,
pub host: OwnedTargetPath,
pub full_message: OwnedTargetPath,
pub level: OwnedTargetPath,
pub facility: OwnedTargetPath,
pub line: OwnedTargetPath,
pub file: OwnedTargetPath,
pub short_message: OwnedTargetPath,
}

/// GELF target paths.
pub(crate) static GELF_TARGET_PATHS: Lazy<GelfTargetPaths> = Lazy::new(|| GelfTargetPaths {
version: OwnedTargetPath::event(owned_value_path!(gelf_fields::VERSION)),
host: OwnedTargetPath::event(owned_value_path!(gelf_fields::HOST)),
full_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::FULL_MESSAGE)),
level: OwnedTargetPath::event(owned_value_path!(gelf_fields::LEVEL)),
facility: OwnedTargetPath::event(owned_value_path!(gelf_fields::FACILITY)),
line: OwnedTargetPath::event(owned_value_path!(gelf_fields::LINE)),
file: OwnedTargetPath::event(owned_value_path!(gelf_fields::FILE)),
short_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::SHORT_MESSAGE)),
});

/// Regex for matching valid field names. Must contain only word chars, periods and dashes.
/// Additional field names must also be prefixed with an `_` , however that is intentionally
/// omitted from this regex to be checked separately to create a specific error message.
Expand Down
24 changes: 10 additions & 14 deletions lib/vector-core/benches/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ use criterion::{
use lookup::event_path;
use vector_core::event::LogEvent;

fn default_log_event() -> LogEvent {
let mut log_event = LogEvent::default();
log_event.insert(event_path!("one"), 1);
log_event.insert(event_path!("two"), 2);
log_event.insert(event_path!("three"), 3);
log_event
}

fn rename_key_flat(c: &mut Criterion) {
let mut group: BenchmarkGroup<WallTime> =
c.benchmark_group("vector_core::event::log_event::LogEvent::rename_key_flat");
group.sampling_mode(SamplingMode::Auto);

group.bench_function("rename_flat_key (key is present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("one"), event_path!("1"));
},
Expand All @@ -29,13 +31,7 @@ fn rename_key_flat(c: &mut Criterion) {

group.bench_function("rename_flat_key (key is NOT present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("four"), event_path!("4"));
},
Expand Down
41 changes: 21 additions & 20 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::config::{log_schema, telemetry};
use crate::{event::MaybeAsLogMut, ByteSizeOf};
use lookup::{metadata_path, path};
use once_cell::sync::Lazy;
use vrl::owned_value_path;
use vrl::{event_path, owned_value_path};

static VECTOR_SOURCE_TYPE_PATH: Lazy<Option<OwnedTargetPath>> = Lazy::new(|| {
Some(OwnedTargetPath::metadata(owned_value_path!(
Expand Down Expand Up @@ -162,8 +162,8 @@ impl LogEvent {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), msg.into());

if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}

log
Expand Down Expand Up @@ -436,12 +436,13 @@ impl LogEvent {
/// Merge all fields specified at `fields` from `incoming` to `current`.
pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
for field in fields {
let Some(incoming_val) = incoming.remove(field.as_ref()) else {
let field_path = event_path!(field.as_ref());
let Some(incoming_val) = incoming.remove(field_path) else {
continue
};
match self.get_mut(field.as_ref()) {
match self.get_mut(field_path) {
None => {
self.insert(field.as_ref(), incoming_val);
self.insert(field_path, incoming_val);
}
Some(current_val) => current_val.merge(incoming_val),
}
Expand Down Expand Up @@ -568,8 +569,8 @@ mod test_utils {
fn from(message: Bytes) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), message);
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}
log
}
Expand Down Expand Up @@ -684,11 +685,11 @@ impl From<&tracing::Event<'_>> for LogEvent {
event.record(&mut maker);

let mut log = maker;
log.insert("timestamp", now);
log.insert(event_path!("timestamp"), now);

let meta = event.metadata();
log.insert(
"metadata.kind",
event_path!("metadata", "kind"),
if meta.is_event() {
Value::Bytes("event".to_string().into())
} else if meta.is_span() {
Expand All @@ -697,42 +698,42 @@ impl From<&tracing::Event<'_>> for LogEvent {
Value::Null
},
);
log.insert("metadata.level", meta.level().to_string());
log.insert(event_path!("metadata", "level"), meta.level().to_string());
log.insert(
"metadata.module_path",
event_path!("metadata", "module_path"),
meta.module_path()
.map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
);
log.insert("metadata.target", meta.target().to_string());
log.insert(event_path!("metadata", "target"), meta.target().to_string());

log
}
}

impl tracing::field::Visit for LogEvent {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.insert(field.name(), value.to_string());
self.insert(event_path!(field.name()), value.to_string());
}

fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
self.insert(field.name(), format!("{value:?}"));
self.insert(event_path!(field.name()), format!("{value:?}"));
}

fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.insert(field.name(), value);
self.insert(event_path!(field.name()), value);
}

fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
let field = field.name();
let field_path = event_path!(field.name());
let converted: Result<i64, _> = value.try_into();
match converted {
Ok(value) => self.insert(field, value),
Err(_) => self.insert(field, value.to_string()),
Ok(value) => self.insert(field_path, value),
Err(_) => self.insert(field_path, value.to_string()),
};
}

fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.insert(field.name(), value);
self.insert(event_path!(field.name()), value);
}
}

Expand Down
7 changes: 3 additions & 4 deletions lib/vector-core/src/event/test/size_of.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@ fn log_operation_maintains_size() {
match action {
Action::InsertFlat { key, value } => {
let new_value_sz = value.size_of();
let old_value_sz = log_event
.get((PathPrefix::Event, path!(key.as_str())))
.map_or(0, ByteSizeOf::size_of);
let target_path = (PathPrefix::Event, path!(key.as_str()));
let old_value_sz = log_event.get(target_path).map_or(0, ByteSizeOf::size_of);
if !log_event.contains(key.as_str()) {
current_size += key.size_of();
}
log_event.insert((PathPrefix::Event, path!(&key)), value);
log_event.insert(target_path, value);
current_size -= old_value_sz;
current_size += new_value_sz;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ impl TraceEvent {
self.0.get_mut(key.as_ref())
}

pub fn contains(&self, key: impl AsRef<str>) -> bool {
self.0.contains(key.as_ref())
pub fn contains<'a>(&self, key: impl TargetPath<'a>) -> bool {
self.0.contains(key)
}

pub fn insert<'a>(
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Transformer {
}
}
for (k, v) in unix_timestamps {
log.insert(k.as_str(), v);
log.insert(event_path!(k.as_str()), v);
}
} else {
// root is not an object
Expand Down
3 changes: 2 additions & 1 deletion src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::sync::{
Mutex,
};
use uuid::Uuid;
use vrl::event_path;

pub use self::unit_test_components::{
UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
Expand Down Expand Up @@ -572,7 +573,7 @@ fn build_input_event(input: &TestInput) -> Result<Event, String> {
NotNan::new(*f).map_err(|_| "NaN value not supported".to_string())?,
),
};
event.insert(path.as_str(), value);
event.insert(event_path!(path.as_str()), value);
}
Ok(event.into())
} else {
Expand Down
5 changes: 1 addition & 4 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
let line = input_lines.next().unwrap();
let mut event = LogEvent::from(line.clone());
event.insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
log_schema().timestamp_key_target_path().unwrap(),
now + offset,
);
events.push(Event::Log(event));
Expand Down
8 changes: 1 addition & 7 deletions src/sinks/aws_cloudwatch_logs/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,7 @@ mod tests {
let timestamp = Utc::now();
let message = "event message";
let mut event = LogEvent::from(message);
event.insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
timestamp,
);
event.insert(log_schema().timestamp_key_target_path().unwrap(), timestamp);

let request = request_builder.build(event.into()).unwrap();
assert_eq!(request.timestamp, timestamp.timestamp_millis());
Expand Down
8 changes: 5 additions & 3 deletions src/sinks/azure_monitor_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,13 @@ mod tests {
fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) {
let now = chrono::Utc::now();

let timestamp_key = log_schema().timestamp_key().unwrap();
let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
log.insert((PathPrefix::Event, timestamp_key), now);
log.insert(log_schema().timestamp_key_target_path().unwrap(), now);

(timestamp_key.to_string(), timestamp_value)
(
log_schema().timestamp_key().unwrap().to_string(),
timestamp_value,
)
}

#[test]
Expand Down
Loading

0 comments on commit 292113f

Please sign in to comment.