Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace LogEvent 'String's with '&OwnedTargetPath's #18084

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use lookup::lookup_v2::OptionalValuePath;
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::PathPrefix;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -49,24 +50,24 @@ pub struct LogSchema {
///
/// This would be the field that holds the raw message, such as a raw log line.
#[serde(default = "LogSchema::default_message_key")]
message_key: OptionalValuePath,
message_key: OptionalTargetPath,

/// The name of the event field to treat as the event timestamp.
#[serde(default = "LogSchema::default_timestamp_key")]
timestamp_key: OptionalValuePath,
timestamp_key: OptionalTargetPath,

/// The name of the event field to treat as the host which sent the message.
///
/// This field will generally represent a real host, or container, that generated the message,
/// but is somewhat source-dependent.
#[serde(default = "LogSchema::default_host_key")]
host_key: OptionalValuePath,
host_key: OptionalTargetPath,

/// The name of the event field to set the source identifier in.
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: OptionalValuePath,
source_type_key: OptionalTargetPath,

/// The name of the event field to set the event metadata in.
///
Expand All @@ -89,69 +90,89 @@ impl Default for LogSchema {
}

impl LogSchema {
fn default_message_key() -> OptionalValuePath {
OptionalValuePath::new(MESSAGE)
fn default_message_key() -> OptionalTargetPath {
OptionalTargetPath::event(MESSAGE)
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath::new(TIMESTAMP)
fn default_timestamp_key() -> OptionalTargetPath {
OptionalTargetPath::event(TIMESTAMP)
}

fn default_host_key() -> OptionalValuePath {
OptionalValuePath::new(HOST)
fn default_host_key() -> OptionalTargetPath {
OptionalTargetPath::event(HOST)
}

fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new(SOURCE_TYPE)
fn default_source_type_key() -> OptionalTargetPath {
OptionalTargetPath::event(SOURCE_TYPE)
}

fn default_metadata_key() -> OptionalValuePath {
OptionalValuePath::new(METADATA)
}

pub fn message_key(&self) -> Option<&OwnedValuePath> {
self.message_key.path.as_ref()
self.message_key.path.as_ref().map(|key| &key.path)
}

/// Returns an `OwnedTargetPath` of the message key.
/// This parses the path and will panic if it is invalid.
///
/// This should only be used where the result will either be cached,
/// or performance isn't critical, since this requires parsing / memory allocation.
/// or performance isn't critical, since this requires memory allocation.
pub fn owned_message_path(&self) -> OwnedTargetPath {
OwnedTargetPath::event(self.message_key.clone().path.expect("valid message key"))
self.message_key
.path
.as_ref()
.expect("valid message key")
.clone()
}

pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
self.timestamp_key.path.as_ref()
self.timestamp_key.as_ref().map(|key| &key.path)
}

pub fn host_key(&self) -> Option<&OwnedValuePath> {
self.host_key.path.as_ref()
self.host_key.as_ref().map(|key| &key.path)
}

pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
self.source_type_key.as_ref().map(|key| &key.path)
}

pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
self.metadata_key.path.as_ref()
}

pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.message_key.as_ref()
}

pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.timestamp_key.as_ref()
}

pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.host_key.as_ref()
}

pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
self.source_type_key.as_ref()
}

pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
self.message_key = OptionalValuePath { path };
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_timestamp_key(&mut self, v: Option<OwnedValuePath>) {
self.timestamp_key = OptionalValuePath { path: v };
pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalValuePath { path };
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
}

pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
Expand Down
47 changes: 24 additions & 23 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::OwnedValuePath;
use vrl::path::{OwnedTargetPath, OwnedValuePath};

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand All @@ -32,6 +32,15 @@ use crate::config::LogNamespace;
use crate::config::{log_schema, telemetry};
use crate::{event::MaybeAsLogMut, ByteSizeOf};
use lookup::{metadata_path, path};
use once_cell::sync::Lazy;
use vrl::owned_value_path;

static VECTOR_SOURCE_TYPE_PATH: Lazy<Option<OwnedTargetPath>> = Lazy::new(|| {
Some(OwnedTargetPath::metadata(owned_value_path!(
"vector",
"source_type"
)))
});

#[derive(Debug, Deserialize)]
struct Inner {
Expand Down Expand Up @@ -296,7 +305,7 @@ impl LogEvent {

/// Retrieves the value of a field based on it's meaning.
/// This will first check if the value has previously been dropped. It is worth being
/// aware that if the field has been dropped and then some how readded, we still fetch
/// aware that if the field has been dropped and then somehow re-added, we still fetch
/// the dropped value here.
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
Expand All @@ -309,12 +318,11 @@ impl LogEvent {
}
}

// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.
pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<String> {
/// Retrieves the target path of a field based on the specified `meaning`.
fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.map(std::string::ToString::to_string)
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
Expand Down Expand Up @@ -452,45 +460,37 @@ impl LogEvent {
impl LogEvent {
/// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace)
/// or from the message key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn message_path(&self) -> Option<String> {
pub fn message_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("message"),
LogNamespace::Legacy => log_schema().message_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().message_key_target_path(),
}
}

/// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn timestamp_path(&self) -> Option<String> {
pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
LogNamespace::Legacy => log_schema().timestamp_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
}
}

/// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace)
/// or from the host key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn host_path(&self) -> Option<String> {
pub fn host_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("host"),
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
LogNamespace::Legacy => log_schema().host_key_target_path(),
}
}

/// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> Option<String> {
pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
match self.namespace() {
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
LogNamespace::Legacy => log_schema().source_type_key_target_path(),
}
}

Expand Down Expand Up @@ -520,7 +520,8 @@ impl LogEvent {
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
pub fn remove_timestamp(&mut self) -> Option<Value> {
self.timestamp_path()
.and_then(|key| self.remove(key.as_str()))
.cloned()
.and_then(|key| self.remove(&key))
Comment on lines +523 to +524
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the clone? It looks like we only use it as a reference. Same goes for most of the rest of the clones in the rest of the diff.

Copy link
Member Author

@pront pront Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because timestamp_path needs an immutable reference to self and then inside the closure remove requires a mutable reference to the same self. Same for the rest of the diff.

Note that currently, we map/convert all paths to Strings, see here and here. So this PR is at the very least avoiding the conversions. I didn't think about further optimizations at this point because my immediate goal is to get rid of String types in favor of proper path types. Always happy to hear more ideas for further optimizations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. In this case it is actually safe because timestamp path really only needs an immutable reference to self.metadata while remove needs mutable access to self.inner, but the compiler can't see that through the function interfaces. We certainly don't need to do it as part of this change, but at some point it may be worth differentiating those so we can drop the clone.

}

/// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace)
Expand Down
20 changes: 20 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use vector_config::configurable_component;
use vrl::owned_value_path;
use vrl::path::PathPrefix;

use crate::lookup_v2::PathParseError;
use crate::{OwnedTargetPath, OwnedValuePath};
Expand All @@ -16,6 +17,25 @@ impl OptionalTargetPath {
pub fn none() -> Self {
Self { path: None }
}

pub fn event(path: &str) -> Self {
Self {
path: Some(OwnedTargetPath {
prefix: PathPrefix::Event,
path: owned_value_path!(path),
}),
}
}

pub fn from(prefix: PathPrefix, path: Option<OwnedValuePath>) -> Self {
Self {
path: path.map(|path| OwnedTargetPath { prefix, path }),
}
}

pub fn as_ref(&self) -> Option<&OwnedTargetPath> {
self.path.as_ref()
}
}

impl TryFrom<String> for OptionalTargetPath {
Expand Down
17 changes: 9 additions & 8 deletions src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,26 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
if !log.contains("text") {
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)");
log.rename_key(message_path.as_str(), event_path!("text"))
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("text"));
}

if !log.contains("host") {
if let Some(host_path) = log.host_path() {
log.rename_key(host_path.as_str(), event_path!("host"));
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("host"));
}
}

if !log.contains("date_happened") {
if let Some(timestamp_path) = log.timestamp_path() {
log.rename_key(timestamp_path.as_str(), "date_happened");
if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
log.rename_key(timestamp_path, "date_happened");
}
}

if !log.contains("source_type_name") {
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
log.rename_key(source_type_path, "source_type_name");
}
}

Expand Down
20 changes: 11 additions & 9 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,21 @@ impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)");
log.rename_key(message_path.as_str(), event_path!("message"));
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

if let Some(host_path) = log.host_path() {
log.rename_key(host_path.as_str(), event_path!("hostname"));
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}

if let Some(Value::Timestamp(ts)) = log.remove(
log
let message_path = log
.timestamp_path()
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
.as_str()
) {
.expect(
"timestamp is required (make sure the \"timestamp\" semantic meaning is set)",
)
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
Expand Down
9 changes: 3 additions & 6 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,12 @@ impl DataStreamConfig {

/// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
pub fn remap_timestamp(&self, log: &mut LogEvent) {
if let Some(timestamp_key) = log.timestamp_path() {
if timestamp_key == DATA_STREAM_TIMESTAMP_KEY {
if let Some(timestamp_key) = log.timestamp_path().cloned() {
if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
return;
}

log.rename_key(
timestamp_key.as_str(),
event_path!(DATA_STREAM_TIMESTAMP_KEY),
)
log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
}
}

Expand Down
Loading