Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace various string paths with actual paths #18109

Merged
merged 8 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions benches/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Bytes;
use criterion::{criterion_group, BatchSize, Criterion};
use vector::event::LogEvent;
use vrl::event_path;

fn benchmark_event_iterate(c: &mut Criterion) {
let mut group = c.benchmark_group("event/iterate");
Expand All @@ -9,9 +10,9 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1", Bytes::from("value1"));
log.insert("key2", Bytes::from("value2"));
log.insert("key3", Bytes::from("value3"));
log.insert(event_path!("key1"), Bytes::from("value1"));
log.insert(event_path!("key2"), Bytes::from("value2"));
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -23,9 +24,15 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1.nested1.nested2", Bytes::from("value1"));
log.insert("key1.nested1.nested3", Bytes::from("value4"));
log.insert("key3", Bytes::from("value3"));
log.insert(
event_path!("key1", "nested1", "nested2"),
Bytes::from("value1"),
);
log.insert(
event_path!("key1", "nested1", "nested3"),
Bytes::from("value4"),
);
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -37,8 +44,8 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1.nested1[0]", Bytes::from("value1"));
log.insert("key1.nested1[1]", Bytes::from("value2"));
log.insert(event_path!("key1", "nested1", 0), Bytes::from("value1"));
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -53,25 +60,31 @@ fn benchmark_event_create(c: &mut Criterion) {
group.bench_function("single-level", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1", Bytes::from("value1"));
log.insert("key2", Bytes::from("value2"));
log.insert("key3", Bytes::from("value3"));
log.insert(event_path!("key1"), Bytes::from("value1"));
log.insert(event_path!("key2"), Bytes::from("value2"));
log.insert(event_path!("key3"), Bytes::from("value3"));
})
});

group.bench_function("nested-keys", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1.nested1.nested2", Bytes::from("value1"));
log.insert("key1.nested1.nested3", Bytes::from("value4"));
log.insert("key3", Bytes::from("value3"));
log.insert(
event_path!("key1", "nested1", "nested2"),
Bytes::from("value1"),
);
log.insert(
event_path!("key1", "nested1", "nested3"),
Bytes::from("value4"),
);
log.insert(event_path!("key3"), Bytes::from("value3"));
})
});
group.bench_function("array", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1.nested1[0]", Bytes::from("value1"));
log.insert("key1.nested1[1]", Bytes::from("value2"));
log.insert(event_path!("key1", "nested1", 0), Bytes::from("value1"));
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
})
});
}
Expand Down
3 changes: 2 additions & 1 deletion benches/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vector::{
test_util::collect_ready,
transforms::{self, OutputBuffer, Transform},
};
use vrl::event_path;

fn bench_add_fields(c: &mut Criterion) {
let event = Event::from(LogEvent::default());
Expand Down Expand Up @@ -87,7 +88,7 @@ fn bench_field_filter(c: &mut Criterion) {
let events = (0..num_events)
.map(|i| {
let mut event = LogEvent::default();
event.insert("the_field", (i % 10).to_string());
event.insert(event_path!("the_field"), (i % 10).to_string());
Event::from(event)
})
.collect::<Vec<_>>();
Expand Down
38 changes: 28 additions & 10 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vector::{
},
};
use vector_common::TimeZone;
use vrl::event_path;
use vrl::prelude::*;

criterion_group!(
Expand All @@ -35,9 +36,18 @@ fn benchmark_remap(c: &mut Criterion) {
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(output_1.get("foo").unwrap().to_string_lossy(), "bar");
debug_assert_eq!(output_1.get("bar").unwrap().to_string_lossy(), "baz");
debug_assert_eq!(output_1.get("copy").unwrap().to_string_lossy(), "buz");
debug_assert_eq!(
output_1.get(event_path!("foo")).unwrap().to_string_lossy(),
"bar"
);
debug_assert_eq!(
output_1.get(event_path!("bar")).unwrap().to_string_lossy(),
"baz"
);
debug_assert_eq!(
output_1.get(event_path!("copy")).unwrap().to_string_lossy(),
"buz"
);

result
};
Expand Down Expand Up @@ -67,7 +77,9 @@ fn benchmark_remap(c: &mut Criterion) {

let event = {
let mut event = Event::Log(LogEvent::from("augment me"));
event.as_mut_log().insert("copy_from", "buz".to_owned());
event
.as_mut_log()
.insert(event_path!("copy_from"), "buz".to_owned());
event
};

Expand All @@ -88,11 +100,11 @@ fn benchmark_remap(c: &mut Criterion) {
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(
output_1.get("foo").unwrap().to_string_lossy(),
output_1.get(event_path!("foo")).unwrap().to_string_lossy(),
r#"{"key": "value"}"#
);
debug_assert_eq!(
output_1.get("bar").unwrap().to_string_lossy(),
output_1.get(event_path!("bar")).unwrap().to_string_lossy(),
r#"{"key":"value"}"#
);

Expand Down Expand Up @@ -141,10 +153,16 @@ fn benchmark_remap(c: &mut Criterion) {
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(output_1.get("number").unwrap(), &Value::Integer(1234));
debug_assert_eq!(output_1.get("bool").unwrap(), &Value::Boolean(true));
debug_assert_eq!(
output_1.get("timestamp").unwrap(),
output_1.get(event_path!("number")).unwrap(),
&Value::Integer(1234)
);
debug_assert_eq!(
output_1.get(event_path!("bool")).unwrap(),
&Value::Boolean(true)
);
debug_assert_eq!(
output_1.get(event_path!("timestamp")).unwrap(),
&Value::Timestamp(timestamp),
);

Expand Down Expand Up @@ -176,7 +194,7 @@ fn benchmark_remap(c: &mut Criterion) {
("bool", "yes"),
("timestamp", "19/06/2019:17:20:49 -0400"),
] {
event.as_mut_log().insert(key, value.to_owned());
event.as_mut_log().insert(event_path!(key), value.to_owned());
}

let timestamp =
Expand Down
15 changes: 8 additions & 7 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};

use super::{default_lossy, Deserializer};
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};

/// On GELF decoding behavior:
Expand Down Expand Up @@ -123,11 +124,11 @@ impl GelfDeserializer {
.into());
}

log.insert(VERSION, parsed.version.to_string());
log.insert(HOST, parsed.host.to_string());
log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());

if let Some(full_message) = &parsed.full_message {
log.insert(FULL_MESSAGE, full_message.to_string());
log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
Expand All @@ -145,19 +146,19 @@ impl GelfDeserializer {
}

if let Some(level) = parsed.level {
log.insert(LEVEL, level);
log.insert(&GELF_TARGET_PATHS.level, level);
}
if let Some(facility) = &parsed.facility {
log.insert(FACILITY, facility.to_string());
log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
}
if let Some(line) = parsed.line {
log.insert(
LINE,
&GELF_TARGET_PATHS.line,
Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
);
}
if let Some(file) = &parsed.file {
log.insert(FILE, file.to_string());
log.insert(&GELF_TARGET_PATHS.file, file.to_string());
}

if let Some(add) = &parsed.additional_fields {
Expand Down
18 changes: 8 additions & 10 deletions lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
use bytes::{BufMut, BytesMut};
use lookup::event_path;
Expand All @@ -12,7 +13,6 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -131,20 +131,18 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
}

// add the VERSION if it does not exist
if !log.contains(VERSION) {
log.insert(VERSION, GELF_VERSION);
if !log.contains(&GELF_TARGET_PATHS.version) {
log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
}

if !log.contains(HOST) {
if !log.contains(&GELF_TARGET_PATHS.host) {
err_missing_field(HOST)?;
}

if !log.contains(SHORT_MESSAGE) {
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
if !log.contains(&GELF_TARGET_PATHS.short_message) {
if let Some(message_key) = log_schema().message_key_target_path() {
if log.contains(message_key) {
log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
Expand Down
27 changes: 26 additions & 1 deletion lib/codecs/src/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

use once_cell::sync::Lazy;
use regex::Regex;
use vrl::owned_value_path;
use vrl::path::OwnedTargetPath;

/// GELF Message fields. Definitions from <https://docs.graylog.org/docs/gelf>.
pub mod gelf_fields {

/// (not a field) The latest version of the GELF specification.
pub const GELF_VERSION: &str = "1.1";

Expand Down Expand Up @@ -40,6 +41,30 @@ pub mod gelf_fields {
// < Every field with an underscore (_) prefix will be treated as an additional field. >
}

/// GELF owned target paths.
pub(crate) struct GelfTargetPaths {
pub version: OwnedTargetPath,
pub host: OwnedTargetPath,
pub full_message: OwnedTargetPath,
pub level: OwnedTargetPath,
pub facility: OwnedTargetPath,
pub line: OwnedTargetPath,
pub file: OwnedTargetPath,
pub short_message: OwnedTargetPath,
}

/// Lazily initialized singleton.
pub(crate) static GELF_TARGET_PATHS: Lazy<GelfTargetPaths> = Lazy::new(|| GelfTargetPaths {
pront marked this conversation as resolved.
Show resolved Hide resolved
version: OwnedTargetPath::event(owned_value_path!(gelf_fields::VERSION)),
host: OwnedTargetPath::event(owned_value_path!(gelf_fields::HOST)),
full_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::FULL_MESSAGE)),
level: OwnedTargetPath::event(owned_value_path!(gelf_fields::LEVEL)),
facility: OwnedTargetPath::event(owned_value_path!(gelf_fields::FACILITY)),
line: OwnedTargetPath::event(owned_value_path!(gelf_fields::LINE)),
file: OwnedTargetPath::event(owned_value_path!(gelf_fields::FILE)),
short_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::SHORT_MESSAGE)),
});

/// Regex for matching valid field names. Must contain only word chars, periods and dashes.
/// Additional field names must also be prefixed with an `_` , however that is intentionally
/// omitted from this regex to be checked separately to create a specific error message.
Expand Down
24 changes: 10 additions & 14 deletions lib/vector-core/benches/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ use criterion::{
use lookup::event_path;
use vector_core::event::LogEvent;

fn default_log_event() -> LogEvent {
let mut log_event = LogEvent::default();
log_event.insert(event_path!("one"), 1);
log_event.insert(event_path!("two"), 2);
log_event.insert(event_path!("three"), 3);
log_event
}

fn rename_key_flat(c: &mut Criterion) {
let mut group: BenchmarkGroup<WallTime> =
c.benchmark_group("vector_core::event::log_event::LogEvent::rename_key_flat");
group.sampling_mode(SamplingMode::Auto);

group.bench_function("rename_flat_key (key is present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("one"), event_path!("1"));
},
Expand All @@ -29,13 +31,7 @@ fn rename_key_flat(c: &mut Criterion) {

group.bench_function("rename_flat_key (key is NOT present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("four"), event_path!("4"));
},
Expand Down
Loading
Loading