Skip to content

Commit

Permalink
feat: Refactor 'event.get()' to use path types (#18160)
Browse files Browse the repository at this point in the history
* feat: Refactor 'get()' to use path types

* vdev fmt
  • Loading branch information
pront authored Aug 8, 2023
1 parent 3b53bcd commit e476e12
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 59 deletions.
8 changes: 7 additions & 1 deletion lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ impl Discriminant {
pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef<str>]) -> Self {
let values: Vec<Option<Value>> = discriminant_fields
.iter()
.map(|discriminant_field| event.get(discriminant_field.as_ref()).cloned())
.map(|discriminant_field| {
event
.parse_path_and_get_value(discriminant_field.as_ref())
.ok()
.flatten()
.cloned()
})
.collect();
Self { values }
}
Expand Down
31 changes: 24 additions & 7 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::OwnedTargetPath;
use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand Down Expand Up @@ -295,6 +295,16 @@ impl LogEvent {
self.metadata.add_finalizer(finalizer);
}

/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
/// # Errors
/// Will return an error if path parsing failed.
pub fn parse_path_and_get_value(
&self,
path: impl AsRef<str>,
) -> Result<Option<&Value>, PathParseError> {
parse_target_path(path.as_ref()).map(|path| self.get(&path))
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
match key.prefix() {
Expand Down Expand Up @@ -439,11 +449,14 @@ impl LogEvent {
let Some(incoming_val) = incoming.remove(field.as_ref()) else {
continue
};
match self.get_mut(field.as_ref()) {
None => {
self.insert(field.as_ref(), incoming_val);

if let Ok(path) = parse_target_path(field.as_ref()) {
match self.get_mut(&path) {
None => {
self.insert(&path, incoming_val);
}
Some(current_val) => current_val.merge(incoming_val),
}
Some(current_val) => current_val.merge(incoming_val),
}
}
self.metadata.merge(incoming.metadata);
Expand Down Expand Up @@ -642,7 +655,9 @@ where
type Output = Value;

fn index(&self, key: T) -> &Value {
self.get(key.as_ref())
self.parse_path_and_get_value(key.as_ref())
.ok()
.flatten()
.unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
}
}
Expand All @@ -654,7 +669,9 @@ where
{
fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
for (k, v) in iter {
self.insert(k.as_ref(), v.into());
if let Ok(path) = parse_target_path(k.as_ref()) {
self.insert(&path, v.into());
}
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_common::{
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::PathParseError;

use super::{
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
Expand Down Expand Up @@ -71,13 +72,23 @@ impl TraceEvent {
self.0.as_map().expect("inner value must be a map")
}

/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
/// # Errors
/// Will return an error if path parsing failed.
pub fn parse_path_and_get_value(
&self,
path: impl AsRef<str>,
) -> Result<Option<&Value>, PathParseError> {
self.0.parse_path_and_get_value(path)
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
self.0.get(key)
}

pub fn get_mut(&mut self, key: impl AsRef<str>) -> Option<&mut Value> {
self.0.get_mut(key.as_ref())
pub fn get_mut<'a>(&mut self, key: impl TargetPath<'a>) -> Option<&mut Value> {
self.0.get_mut(key)
}

pub fn contains(&self, key: impl AsRef<str>) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion src/api/schema/events/trace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_graphql::Object;
use vector_common::encode_logfmt;
use vrl::event_path;

use super::EventEncodingType;
use crate::{event, topology::TapOutput};
Expand Down Expand Up @@ -48,7 +49,7 @@ impl Trace {

/// Get JSON field data on the trace event, by field name
async fn json(&self, field: String) -> Option<String> {
self.event.get(field.as_str()).map(|field| {
self.event.get(event_path!(field.as_str())).map(|field| {
serde_json::to_string(field)
.expect("JSON serialization of log event field failed. Please report.")
})
Expand Down
34 changes: 23 additions & 11 deletions src/conditions/datadog_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ impl Filter<LogEvent> for EventFilter {
any_string_match("tags", move |value| value == field)
}
Field::Default(f) | Field::Facet(f) | Field::Reserved(f) => {
Run::boxed(move |log: &LogEvent| log.get(f.as_str()).is_some())
Run::boxed(move |log: &LogEvent| {
log.parse_path_and_get_value(f.as_str())
.ok()
.flatten()
.is_some()
})
}
}
}
Expand Down Expand Up @@ -165,8 +170,11 @@ impl Filter<LogEvent> for EventFilter {
match field {
// Facets are compared numerically if the value is numeric, or as strings otherwise.
Field::Facet(f) => {
Run::boxed(
move |log: &LogEvent| match (log.get(f.as_str()), &comparison_value) {
Run::boxed(move |log: &LogEvent| {
match (
log.parse_path_and_get_value(f.as_str()).ok().flatten(),
&comparison_value,
) {
// Integers.
(Some(Value::Integer(lhs)), ComparisonValue::Integer(rhs)) => {
match comparator {
Expand Down Expand Up @@ -227,8 +235,8 @@ impl Filter<LogEvent> for EventFilter {
}
}
_ => false,
},
)
}
})
}
// Tag values need extracting by "key:value" to be compared.
Field::Tag(tag) => any_string_match("tags", move |value| match value.split_once(':') {
Expand Down Expand Up @@ -266,9 +274,11 @@ where
{
let field = field.into();

Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
_ => false,
Run::boxed(move |log: &LogEvent| {
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
_ => false,
}
})
}

Expand All @@ -281,9 +291,11 @@ where
{
let field = field.into();

Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
Some(Value::Array(values)) => func(values),
_ => false,
Run::boxed(move |log: &LogEvent| {
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
Some(Value::Array(values)) => func(values),
_ => false,
}
})
}

Expand Down
9 changes: 5 additions & 4 deletions src/sinks/datadog/traces/apm_stats/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};

use chrono::Utc;
use vrl::event_path;

use super::{
bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey,
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Aggregator {
pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) {
// Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184

let spans = match trace.get("spans") {
let spans = match trace.get(event_path!("spans")) {
Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(),
_ => vec![],
};
Expand All @@ -189,16 +190,16 @@ impl Aggregator {
env: partition_key.env.clone().unwrap_or_default(),
hostname: partition_key.hostname.clone().unwrap_or_default(),
version: trace
.get("app_version")
.get(event_path!("app_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
container_id: trace
.get("container_id")
.get(event_path!("container_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
};
let synthetics = trace
.get("origin")
.get(event_path!("origin"))
.map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS))
.unwrap_or(false);

Expand Down
27 changes: 14 additions & 13 deletions src/sinks/datadog/traces/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use prost::Message;
use snafu::Snafu;
use vector_common::request_metadata::RequestMetadata;
use vector_core::event::{EventFinalizers, Finalizable};
use vrl::event_path;

use super::{
apm_stats::{compute_apm_stats, Aggregator},
Expand Down Expand Up @@ -283,7 +284,7 @@ impl DatadogTracesEncoder {

fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload {
let tags = trace
.get("tags")
.get(event_path!("tags"))
.and_then(|m| m.as_object())
.map(|m| {
m.iter()
Expand All @@ -292,7 +293,7 @@ impl DatadogTracesEncoder {
})
.unwrap_or_default();

let spans = match trace.get("spans") {
let spans = match trace.get(event_path!("spans")) {
Some(Value::Array(v)) => v
.iter()
.filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span))
Expand All @@ -302,19 +303,19 @@ impl DatadogTracesEncoder {

let chunk = dd_proto::TraceChunk {
priority: trace
.get("priority")
.get(event_path!("priority"))
.and_then(|v| v.as_integer().map(|v| v as i32))
// This should not happen for Datadog originated traces, but in case this field is not populated
// we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
// which is what the Datadog trace-agent is doing for OTLP originated traces, as per
// https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
.unwrap_or(1i32),
origin: trace
.get("origin")
.get(event_path!("origin"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
dropped_trace: trace
.get("dropped")
.get(event_path!("dropped"))
.and_then(|v| v.as_boolean())
.unwrap_or(false),
spans,
Expand All @@ -323,37 +324,37 @@ impl DatadogTracesEncoder {

dd_proto::TracerPayload {
container_id: trace
.get("container_id")
.get(event_path!("container_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
language_name: trace
.get("language_name")
.get(event_path!("language_name"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
language_version: trace
.get("language_version")
.get(event_path!("language_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
tracer_version: trace
.get("tracer_version")
.get(event_path!("tracer_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
runtime_id: trace
.get("runtime_id")
.get(event_path!("runtime_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
chunks: vec![chunk],
tags,
env: trace
.get("env")
.get(event_path!("env"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
hostname: trace
.get("hostname")
.get(event_path!("hostname"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
app_version: trace
.get("app_version")
.get(event_path!("app_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
}
Expand Down
11 changes: 7 additions & 4 deletions src/sinks/datadog/traces/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_util::{
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vrl::event_path;
use vrl::path::PathPrefix;

use vector_core::{
Expand Down Expand Up @@ -54,19 +55,21 @@ impl Partitioner for EventPartitioner {
}
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
env: t
.get(event_path!("env"))
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get("agent_version")
.get(event_path!("agent_version"))
.map(|s| s.to_string_lossy().into_owned()),
target_tps: t
.get("target_tps")
.get(event_path!("target_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
error_tps: t
.get("error_tps")
.get(event_path!("error_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
},
}
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@ impl DataStreamConfig {
let (dtype, dataset, namespace) = if !self.auto_routing {
(self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
} else {
let data_stream = log.get("data_stream").and_then(|ds| ds.as_object());
let data_stream = log
.get(event_path!("data_stream"))
.and_then(|ds| ds.as_object());
let dtype = data_stream
.and_then(|ds| ds.get("type"))
.map(|value| value.to_string_lossy().into_owned())
Expand Down
Loading

0 comments on commit e476e12

Please sign in to comment.