Skip to content

Commit

Permalink
feat: LogSchema metadata key refacoring (vectordotdev#18099)
Browse files Browse the repository at this point in the history
* feat: LogSchema metadata key refacoring

* fix failing test
  • Loading branch information
pront authored Jul 27, 2023
1 parent f015b29 commit a8bb9f4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 37 deletions.
11 changes: 7 additions & 4 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,8 @@ fn insert_fields_from_syslog(

#[cfg(test)]
mod tests {
use vector_core::config::{init_log_schema, log_schema, LogSchema};

use super::*;
use vector_core::config::{init_log_schema, log_schema, LogSchema};

#[test]
fn deserialize_syslog_legacy_namespace() {
Expand Down Expand Up @@ -522,8 +521,12 @@ mod tests {

fn init() {
let mut schema = LogSchema::default();
schema.set_message_key(Some(owned_value_path!("legacy_message")));
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
"legacy_message"
))));
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
"legacy_timestamp"
))));
init_log_schema(schema, false);
}
}
45 changes: 24 additions & 21 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
use lookup::lookup_v2::OptionalTargetPath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::PathPrefix;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -74,7 +73,7 @@ pub struct LogSchema {
/// Generally, this field will be set by Vector to hold event-specific metadata, such as
/// annotations by the `remap` transform when an error or abort is encountered.
#[serde(default = "LogSchema::default_metadata_key")]
metadata_key: OptionalValuePath,
metadata_key: OptionalTargetPath,
}

impl Default for LogSchema {
Expand Down Expand Up @@ -106,8 +105,8 @@ impl LogSchema {
OptionalTargetPath::event(SOURCE_TYPE)
}

fn default_metadata_key() -> OptionalValuePath {
OptionalValuePath::new(METADATA)
fn default_metadata_key() -> OptionalTargetPath {
OptionalTargetPath::event(METADATA)
}

pub fn message_key(&self) -> Option<&OwnedValuePath> {
Expand Down Expand Up @@ -140,7 +139,7 @@ impl LogSchema {
}

pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
self.metadata_key.path.as_ref()
self.metadata_key.as_ref().map(|key| &key.path)
}

pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
Expand All @@ -159,24 +158,28 @@ impl LogSchema {
self.source_type_key.as_ref()
}

pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.metadata_key.as_ref()
}

pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
self.message_key = OptionalTargetPath { path };
}

pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
self.timestamp_key = OptionalTargetPath { path };
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
self.host_key = OptionalTargetPath { path };
}

pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
self.metadata_key = OptionalValuePath { path };
pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
self.source_type_key = OptionalTargetPath { path };
}

pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
self.metadata_key = OptionalTargetPath { path };
}

/// Merge two `LogSchema` instances together.
Expand All @@ -195,35 +198,35 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
} else {
self.set_host_key(other.host_key().cloned());
self.set_host_key(other.host_key_target_path().cloned());
}
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
&& self.message_key() != other.message_key()
{
errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
} else {
self.set_message_key(other.message_key().cloned());
self.set_message_key(other.message_key_target_path().cloned());
}
if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
&& self.timestamp_key() != other.timestamp_key()
{
errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
} else {
self.set_timestamp_key(other.timestamp_key().cloned());
self.set_timestamp_key(other.timestamp_key_target_path().cloned());
}
if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
&& self.source_type_key() != other.source_type_key()
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().cloned());
self.set_source_type_key(other.source_type_key_target_path().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
{
errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
} else {
self.set_metadata_key(other.metadata_key().cloned());
self.set_metadata_key(other.metadata_key_target_path().cloned());
}
}

Expand Down
7 changes: 3 additions & 4 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,14 @@ mod test {
use chrono::Utc;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_common::btreemap;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;

#[test]
fn test_insert_standard_vector_source_metadata() {
let nested_path = "a.b.c.d".to_string();

let mut schema = LogSchema::default();
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
"a", "b", "c", "d"
))));
init_log_schema(schema, false);

let namespace = LogNamespace::Legacy;
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vector_common::{
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::{PathPrefix, ValuePath};

use super::{
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
Expand Down Expand Up @@ -95,12 +94,11 @@ impl TraceEvent {

pub fn maybe_insert<'a, F: FnOnce() -> Value>(
&mut self,
prefix: PathPrefix,
path: Option<impl ValuePath<'a>>,
path: Option<impl TargetPath<'a>>,
value_callback: F,
) -> Option<Value> {
if let Some(path) = path {
return self.0.insert((prefix, path), value_callback());
return self.0.insert(path, value_callback());
}
None
}
Expand Down
6 changes: 3 additions & 3 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ impl http_client::HttpClientContext for HttpClientContext {
}
}
Event::Trace(ref mut trace) => {
if let Some(source_type_key) = log_schema().source_type_key_target_path() {
trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME));
}
trace.maybe_insert(log_schema().source_type_key_target_path(), || {
Bytes::from(HttpClientConfig::NAME).into()
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ where
}
}
Event::Trace(ref mut trace) => {
trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || {
trace.maybe_insert(log_schema().metadata_key_target_path(), || {
self.dropped_data(reason, error).into()
});
}
Expand Down

0 comments on commit a8bb9f4

Please sign in to comment.