Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: LogSchema metadata key refacoring #18099

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,8 @@ fn insert_fields_from_syslog(

#[cfg(test)]
mod tests {
use vector_core::config::{init_log_schema, log_schema, LogSchema};

use super::*;
use vector_core::config::{init_log_schema, log_schema, LogSchema};

#[test]
fn deserialize_syslog_legacy_namespace() {
Expand Down Expand Up @@ -522,8 +521,12 @@ mod tests {

fn init() {
let mut schema = LogSchema::default();
schema.set_message_key(Some(owned_value_path!("legacy_message")));
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
"legacy_message"
))));
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
"legacy_timestamp"
))));
init_log_schema(schema, false);
}
}
45 changes: 24 additions & 21 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
use lookup::lookup_v2::OptionalTargetPath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::PathPrefix;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -74,7 +73,7 @@ pub struct LogSchema {
/// Generally, this field will be set by Vector to hold event-specific metadata, such as
/// annotations by the `remap` transform when an error or abort is encountered.
#[serde(default = "LogSchema::default_metadata_key")]
metadata_key: OptionalValuePath,
metadata_key: OptionalTargetPath,
}

impl Default for LogSchema {
Expand Down Expand Up @@ -106,8 +105,8 @@ impl LogSchema {
OptionalTargetPath::event(SOURCE_TYPE)
}

fn default_metadata_key() -> OptionalValuePath {
OptionalValuePath::new(METADATA)
fn default_metadata_key() -> OptionalTargetPath {
OptionalTargetPath::event(METADATA)
}

pub fn message_key(&self) -> Option<&OwnedValuePath> {
Expand Down Expand Up @@ -140,7 +139,7 @@ impl LogSchema {
}

pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
self.metadata_key.path.as_ref()
self.metadata_key.as_ref().map(|key| &key.path)
}

pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
Expand All @@ -159,24 +158,28 @@ impl LogSchema {
self.source_type_key.as_ref()
}

pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.metadata_key.as_ref()
}

pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
self.message_key = OptionalTargetPath { path };
}

pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
self.timestamp_key = OptionalTargetPath { path };
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
self.host_key = OptionalTargetPath { path };
}

pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
self.metadata_key = OptionalValuePath { path };
pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
self.source_type_key = OptionalTargetPath { path };
}

pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
self.metadata_key = OptionalTargetPath { path };
}

/// Merge two `LogSchema` instances together.
Expand All @@ -195,35 +198,35 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
} else {
self.set_host_key(other.host_key().cloned());
self.set_host_key(other.host_key_target_path().cloned());
}
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
&& self.message_key() != other.message_key()
{
errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
} else {
self.set_message_key(other.message_key().cloned());
self.set_message_key(other.message_key_target_path().cloned());
}
if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
&& self.timestamp_key() != other.timestamp_key()
{
errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
} else {
self.set_timestamp_key(other.timestamp_key().cloned());
self.set_timestamp_key(other.timestamp_key_target_path().cloned());
}
if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
&& self.source_type_key() != other.source_type_key()
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().cloned());
self.set_source_type_key(other.source_type_key_target_path().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
{
errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
} else {
self.set_metadata_key(other.metadata_key().cloned());
self.set_metadata_key(other.metadata_key_target_path().cloned());
}
}

Expand Down
7 changes: 3 additions & 4 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,14 @@ mod test {
use chrono::Utc;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_common::btreemap;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;

#[test]
fn test_insert_standard_vector_source_metadata() {
let nested_path = "a.b.c.d".to_string();

let mut schema = LogSchema::default();
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
"a", "b", "c", "d"
))));
init_log_schema(schema, false);

let namespace = LogNamespace::Legacy;
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vector_common::{
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::{PathPrefix, ValuePath};

use super::{
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
Expand Down Expand Up @@ -95,12 +94,11 @@ impl TraceEvent {

pub fn maybe_insert<'a, F: FnOnce() -> Value>(
&mut self,
prefix: PathPrefix,
path: Option<impl ValuePath<'a>>,
path: Option<impl TargetPath<'a>>,
value_callback: F,
) -> Option<Value> {
if let Some(path) = path {
return self.0.insert((prefix, path), value_callback());
return self.0.insert(path, value_callback());
}
None
}
Expand Down
6 changes: 3 additions & 3 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ impl http_client::HttpClientContext for HttpClientContext {
}
}
Event::Trace(ref mut trace) => {
if let Some(source_type_key) = log_schema().source_type_key_target_path() {
trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME));
}
trace.maybe_insert(log_schema().source_type_key_target_path(), || {
Bytes::from(HttpClientConfig::NAME).into()
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ where
}
}
Event::Trace(ref mut trace) => {
trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || {
trace.maybe_insert(log_schema().metadata_key_target_path(), || {
self.dropped_data(reason, error).into()
});
}
Expand Down