Skip to content

Commit

Permalink
feat: replace LogEvent 'String's with '&OwnedTargetPath's (vectordotd…
Browse files Browse the repository at this point in the history
…ev#18084)

* feat: replace LogEvent 'String's with '&OwnedTargetPath's

* update comment
  • Loading branch information
pront authored Jul 26, 2023
1 parent b70074c commit 065eecb
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 106 deletions.
69 changes: 45 additions & 24 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use lookup::lookup_v2::OptionalValuePath;
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::PathPrefix;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -49,24 +50,24 @@ pub struct LogSchema {
///
/// This would be the field that holds the raw message, such as a raw log line.
#[serde(default = "LogSchema::default_message_key")]
message_key: OptionalValuePath,
message_key: OptionalTargetPath,

/// The name of the event field to treat as the event timestamp.
#[serde(default = "LogSchema::default_timestamp_key")]
timestamp_key: OptionalValuePath,
timestamp_key: OptionalTargetPath,

/// The name of the event field to treat as the host which sent the message.
///
/// This field will generally represent a real host, or container, that generated the message,
/// but is somewhat source-dependent.
#[serde(default = "LogSchema::default_host_key")]
host_key: OptionalValuePath,
host_key: OptionalTargetPath,

/// The name of the event field to set the source identifier in.
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: OptionalValuePath,
source_type_key: OptionalTargetPath,

/// The name of the event field to set the event metadata in.
///
Expand All @@ -89,69 +90,89 @@ impl Default for LogSchema {
}

impl LogSchema {
fn default_message_key() -> OptionalValuePath {
OptionalValuePath::new(MESSAGE)
fn default_message_key() -> OptionalTargetPath {
OptionalTargetPath::event(MESSAGE)
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath::new(TIMESTAMP)
fn default_timestamp_key() -> OptionalTargetPath {
OptionalTargetPath::event(TIMESTAMP)
}

fn default_host_key() -> OptionalValuePath {
OptionalValuePath::new(HOST)
fn default_host_key() -> OptionalTargetPath {
OptionalTargetPath::event(HOST)
}

fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new(SOURCE_TYPE)
fn default_source_type_key() -> OptionalTargetPath {
OptionalTargetPath::event(SOURCE_TYPE)
}

fn default_metadata_key() -> OptionalValuePath {
OptionalValuePath::new(METADATA)
}

pub fn message_key(&self) -> Option<&OwnedValuePath> {
self.message_key.path.as_ref()
self.message_key.path.as_ref().map(|key| &key.path)
}

/// Returns an `OwnedTargetPath` of the message key.
/// This parses the path and will panic if it is invalid.
///
/// This should only be used where the result will either be cached,
/// or performance isn't critical, since this requires parsing / memory allocation.
/// or performance isn't critical, since this requires memory allocation.
pub fn owned_message_path(&self) -> OwnedTargetPath {
OwnedTargetPath::event(self.message_key.clone().path.expect("valid message key"))
self.message_key
.path
.as_ref()
.expect("valid message key")
.clone()
}

pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
self.timestamp_key.path.as_ref()
self.timestamp_key.as_ref().map(|key| &key.path)
}

pub fn host_key(&self) -> Option<&OwnedValuePath> {
self.host_key.path.as_ref()
self.host_key.as_ref().map(|key| &key.path)
}

pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
self.source_type_key.as_ref().map(|key| &key.path)
}

pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
self.metadata_key.path.as_ref()
}

pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.message_key.as_ref()
}

pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.timestamp_key.as_ref()
}

pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.host_key.as_ref()
}

pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.source_type_key.as_ref()
}

pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
self.message_key = OptionalValuePath { path };
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_timestamp_key(&mut self, v: Option<OwnedValuePath>) {
self.timestamp_key = OptionalValuePath { path: v };
pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalValuePath { path };
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
Expand Down
47 changes: 24 additions & 23 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::OwnedValuePath;
use vrl::path::{OwnedTargetPath, OwnedValuePath};

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand All @@ -32,6 +32,15 @@ use crate::config::LogNamespace;
use crate::config::{log_schema, telemetry};
use crate::{event::MaybeAsLogMut, ByteSizeOf};
use lookup::{metadata_path, path};
use once_cell::sync::Lazy;
use vrl::owned_value_path;

static VECTOR_SOURCE_TYPE_PATH: Lazy<Option<OwnedTargetPath>> = Lazy::new(|| {
Some(OwnedTargetPath::metadata(owned_value_path!(
"vector",
"source_type"
)))
});

#[derive(Debug, Deserialize)]
struct Inner {
Expand Down Expand Up @@ -296,7 +305,7 @@ impl LogEvent {

/// Retrieves the value of a field based on it's meaning.
/// This will first check if the value has previously been dropped. It is worth being
/// aware that if the field has been dropped and then some how readded, we still fetch
/// aware that if the field has been dropped and then somehow re-added, we still fetch
/// the dropped value here.
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
Expand All @@ -309,12 +318,11 @@ impl LogEvent {
}
}

// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.
pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<String> {
/// Retrieves the target path of a field based on the specified `meaning`.
fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.map(std::string::ToString::to_string)
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
Expand Down Expand Up @@ -452,45 +460,37 @@ impl LogEvent {
impl LogEvent {
/// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace)
/// or from the message key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn message_path(&self) -> Option<String> {
pub fn message_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("message"),
LogNamespace::Legacy => log_schema().message_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().message_key_target_path(),
}
}

/// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn timestamp_path(&self) -> Option<String> {
pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
LogNamespace::Legacy => log_schema().timestamp_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
}
}

/// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace)
/// or from the host key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn host_path(&self) -> Option<String> {
pub fn host_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("host"),
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().host_key_target_path(),
}
}

/// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> Option<String> {
pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
LogNamespace::Legacy => log_schema().source_type_key_target_path(),
}
}

Expand Down Expand Up @@ -520,7 +520,8 @@ impl LogEvent {
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
pub fn remove_timestamp(&mut self) -> Option<Value> {
self.timestamp_path()
.and_then(|key| self.remove(key.as_str()))
.cloned()
.and_then(|key| self.remove(&key))
}

/// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace)
Expand Down
20 changes: 20 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use vector_config::configurable_component;
use vrl::owned_value_path;
use vrl::path::PathPrefix;

use crate::lookup_v2::PathParseError;
use crate::{OwnedTargetPath, OwnedValuePath};
Expand All @@ -16,6 +17,25 @@ impl OptionalTargetPath {
pub fn none() -> Self {
Self { path: None }
}

pub fn event(path: &str) -> Self {
Self {
path: Some(OwnedTargetPath {
prefix: PathPrefix::Event,
path: owned_value_path!(path),
}),
}
}

pub fn from(prefix: PathPrefix, path: Option<OwnedValuePath>) -> Self {
Self {
path: path.map(|path| OwnedTargetPath { prefix, path }),
}
}

pub fn as_ref(&self) -> Option<&OwnedTargetPath> {
self.path.as_ref()
}
}

impl TryFrom<String> for OptionalTargetPath {
Expand Down
17 changes: 9 additions & 8 deletions src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,26 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
if !log.contains("text") {
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)");
log.rename_key(message_path.as_str(), event_path!("text"))
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("text"));
}

if !log.contains("host") {
if let Some(host_path) = log.host_path() {
log.rename_key(host_path.as_str(), event_path!("host"));
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("host"));
}
}

if !log.contains("date_happened") {
if let Some(timestamp_path) = log.timestamp_path() {
log.rename_key(timestamp_path.as_str(), "date_happened");
if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
log.rename_key(timestamp_path, "date_happened");
}
}

if !log.contains("source_type_name") {
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
log.rename_key(source_type_path, "source_type_name");
}
}

Expand Down
20 changes: 11 additions & 9 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,21 @@ impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)");
log.rename_key(message_path.as_str(), event_path!("message"));
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

if let Some(host_path) = log.host_path() {
log.rename_key(host_path.as_str(), event_path!("hostname"));
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}

if let Some(Value::Timestamp(ts)) = log.remove(
log
let message_path = log
.timestamp_path()
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
.as_str()
) {
.expect(
"timestamp is required (make sure the \"timestamp\" semantic meaning is set)",
)
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
Expand Down
9 changes: 3 additions & 6 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,12 @@ impl DataStreamConfig {

/// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
pub fn remap_timestamp(&self, log: &mut LogEvent) {
if let Some(timestamp_key) = log.timestamp_path() {
if timestamp_key == DATA_STREAM_TIMESTAMP_KEY {
if let Some(timestamp_key) = log.timestamp_path().cloned() {
if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
return;
}

log.rename_key(
timestamp_key.as_str(),
event_path!(DATA_STREAM_TIMESTAMP_KEY),
)
log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
}
}

Expand Down
Loading

0 comments on commit 065eecb

Please sign in to comment.