Skip to content

Commit

Permalink
Migrate LogSchema::host_key to new lookup code.
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Jul 14, 2023
1 parent af5f3d8 commit 1c7e1c3
Show file tree
Hide file tree
Showing 33 changed files with 226 additions and 152 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ members = [

[workspace.dependencies]
vrl = { version = "0.5.0", features = ["cli", "test", "test_framework", "arbitrary"] }
#vrl = { path = "../vrl", features = ["cli", "test", "test_framework", "arbitrary"] }

[dependencies]
vrl.workspace = true
Expand Down
17 changes: 8 additions & 9 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use lookup::lookup_v2::OptionalValuePath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vrl::path::parse_target_path;
use vector_config::configurable_component;
use vrl::path::parse_target_path;

Expand Down Expand Up @@ -56,7 +55,7 @@ pub struct LogSchema {
/// This field will generally represent a real host, or container, that generated the message,
/// but is somewhat source-dependent.
#[serde(default = "LogSchema::default_host_key")]
host_key: String,
host_key: OptionalValuePath,

/// The name of the event field to set the source identifier in.
///
Expand Down Expand Up @@ -93,8 +92,8 @@ impl LogSchema {
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
fn default_host_key() -> OptionalValuePath {
OptionalValuePath::new("host")
}

fn default_source_type_key() -> OptionalValuePath {
Expand Down Expand Up @@ -122,8 +121,8 @@ impl LogSchema {
self.timestamp_key.path.as_ref()
}

pub fn host_key(&self) -> &str {
&self.host_key
pub fn host_key(&self) -> Option<&OwnedValuePath> {
self.host_key.path.as_ref()
}

pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
Expand All @@ -142,8 +141,8 @@ impl LogSchema {
self.timestamp_key = OptionalValuePath { path: v };
}

pub fn set_host_key(&mut self, v: String) {
self.host_key = v;
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalValuePath { path };
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
Expand All @@ -170,7 +169,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
} else {
self.set_host_key(other.host_key().to_string());
self.set_host_key(other.host_key().cloned());
}
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
&& self.message_key() != other.message_key()
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl LogEvent {
pub fn host_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("host"),
LogNamespace::Legacy => Some(log_schema().host_key().to_owned()),
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -505,7 +505,9 @@ impl LogEvent {
pub fn get_host(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("host"),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())),
LogNamespace::Legacy => log_schema()
.host_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}

Expand Down
9 changes: 8 additions & 1 deletion lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use vrl::owned_value_path;
use vector_config::configurable_component;
use vrl::owned_value_path;

Expand Down Expand Up @@ -92,3 +91,11 @@ impl From<OwnedValuePath> for OptionalValuePath {
Self { path: Some(path) }
}
}

impl From<Option<OwnedValuePath>> for OptionalValuePath {
fn from(value: Option<OwnedValuePath>) -> Self {
value.map_or(OptionalValuePath::none(), |path| {
OptionalValuePath::from(path)
})
}
}
10 changes: 8 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,10 @@ mod tests {
)
.unwrap();

assert_eq!("host", config.global.log_schema.host_key().to_string());
assert_eq!(
"host",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!(
"message",
config.global.log_schema.message_key().to_string()
Expand Down Expand Up @@ -879,7 +882,10 @@ mod tests {
)
.unwrap();

assert_eq!("this", config.global.log_schema.host_key().to_string());
assert_eq!(
"this",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!("that", config.global.log_schema.message_key().to_string());
assert_eq!(
"then",
Expand Down
9 changes: 7 additions & 2 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ fn sketch_to_proto_message(
let name = get_namespaced_name(metric, default_namespace);
let ts = encode_timestamp(metric.timestamp());
let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key()).unwrap_or_default();
let host = tags
.remove(log_schema.host_key().unwrap().to_string().as_str())
.unwrap_or_default();
let tags = encode_tags(&tags);

let cnt = ddsketch.count() as i64;
Expand Down Expand Up @@ -497,7 +499,10 @@ fn generate_series_metrics(
let name = get_namespaced_name(metric, default_namespace);

let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key());
let host = log_schema
.host_key()
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());

let source_type_name = tags.remove("source_type_name");
let device = tags.remove("device");
let ts = encode_timestamp(metric.timestamp());
Expand Down
13 changes: 9 additions & 4 deletions src/sinks/datadog/traces/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use futures_util::{
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vrl::path::PathPrefix;

use vector_core::{
config::log_schema,
event::Event,
Expand All @@ -15,11 +17,13 @@ use vector_core::{
stream::{BatcherSettings, DriverResponse},
};

use super::service::TraceApiRequest;
use crate::{
internal_events::DatadogTracesEncodingError,
sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
};

use super::service::TraceApiRequest;

#[derive(Default)]
struct EventPartitioner;

Expand Down Expand Up @@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner {
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
hostname: t
.get(log_schema().host_key())
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get("agent_version")
.map(|s| s.to_string_lossy().into_owned()),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod integration_tests {
source: None,
encoding: JsonSerializerConfig::default().into(),
event_type: None,
host_key: log_schema().host_key().to_string(),
host_key: log_schema().host_key().unwrap().to_string(),
indexed_fields: vec![],
index: None,
compression: Compression::None,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/humio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod logs;
pub mod metrics;

fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
crate::config::log_schema().host_key().unwrap().to_string()
}
24 changes: 14 additions & 10 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut};
use futures::SinkExt;
use http::{Request, Uri};
use indoc::indoc;
use vrl::value::Kind;

use lookup::lookup_v2::{parse_value_path, OptionalValuePath};
use lookup::{OwnedValuePath, PathPrefix};
use vector_config::configurable_component;
use vector_core::config::log_schema;
use vector_core::schema;
use vrl::value::Kind;

use crate::{
codecs::Transformer,
Expand Down Expand Up @@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.host_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().host_key())
.expect("global log_schema.host_key to be valid path")
});
.or(log_schema().host_key().cloned())
.unwrap();

let message_key = self
.message_key
Expand Down Expand Up @@ -409,10 +408,10 @@ mod tests {
use futures::{channel::mpsc, stream, StreamExt};
use http::{request::Parts, StatusCode};
use indoc::indoc;

use lookup::owned_value_path;
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};

use super::*;
use crate::{
sinks::{
influxdb::test_util::{assert_fields, split_line_protocol, ts},
Expand All @@ -427,6 +426,8 @@ mod tests {
},
};

use super::*;

type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;

#[test]
Expand Down Expand Up @@ -880,16 +881,17 @@ mod tests {
#[cfg(feature = "influxdb-integration-tests")]
#[cfg(test)]
mod integration_tests {
use std::sync::Arc;

use chrono::Utc;
use codecs::BytesDeserializerConfig;
use futures::stream;
use vrl::value;

use codecs::BytesDeserializerConfig;
use lookup::{owned_value_path, path};
use std::sync::Arc;
use vector_core::config::{LegacyKey, LogNamespace};
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
use vrl::value;

use super::*;
use crate::{
config::SinkContext,
sinks::influxdb::{
Expand All @@ -900,6 +902,8 @@ mod integration_tests {
test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
};

use super::*;

#[tokio::test]
async fn influxdb2_logs_put_data() {
let endpoint = address_v2();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn build_uri(
}

pub fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
crate::config::log_schema().host_key().unwrap().to_string()
}

pub fn config_timestamp_key() -> OptionalValuePath {
Expand Down
16 changes: 10 additions & 6 deletions src/sources/datadog_agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use chrono::{TimeZone, Utc};
use http::StatusCode;
use prost::Message;
use serde::{Deserialize, Serialize};
use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter};

use vector_common::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
use vector_core::{metrics::AgentDDSketch, EstimatedJsonEncodedSizeOf};
use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter};

use crate::{
common::datadog::{DatadogMetricType, DatadogSeriesMetric},
Expand Down Expand Up @@ -243,7 +244,7 @@ pub(crate) fn decode_ddseries_v2(
// As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
// the hostname can be found in MetricSeries::resources and that is the only value stored there.
if r.r#type.eq("host") {
tags.replace(log_schema().host_key().to_string(), r.name);
tags.replace(log_schema().host_key().unwrap().to_string(), r.name);
} else {
// But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
tags.replace(format!("resource.{}", r.r#type), r.name);
Expand Down Expand Up @@ -385,9 +386,12 @@ fn into_vector_metric(
) -> Vec<Event> {
let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());

dd_metric
.host
.and_then(|host| tags.replace(log_schema().host_key().to_owned(), host));
if let Some(key) = log_schema().host_key() {
dd_metric
.host
.and_then(|host| tags.replace(key.to_string(), host));
}

dd_metric
.source_type_name
.and_then(|source| tags.replace("source_type_name".into(), source));
Expand Down Expand Up @@ -498,7 +502,7 @@ pub(crate) fn decode_ddsketch(
// sketch_series.distributions is also always empty from payload coming from dd agents
let mut tags = into_metric_tags(sketch_series.tags);
tags.replace(
log_schema().host_key().to_string(),
log_schema().host_key().unwrap().to_string(),
sketch_series.host.clone(),
);
sketch_series.dogsketches.into_iter().map(move |sketch| {
Expand Down
6 changes: 4 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ pub struct ApiKeyQueryParams {
#[derive(Clone)]
pub(crate) struct DatadogAgentSource {
pub(crate) api_key_extractor: ApiKeyExtractor,
pub(crate) log_schema_host_key: &'static str,
pub(crate) log_schema_host_key: String,
pub(crate) log_schema_source_type_key: String,
pub(crate) log_namespace: LogNamespace,
pub(crate) decoder: Decoder,
Expand Down Expand Up @@ -333,7 +333,9 @@ impl DatadogAgentSource {
matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema().host_key(),
log_schema_host_key: log_schema()
.host_key()
.map_or("".to_string(), |key| key.to_string()),
log_schema_source_type_key: log_schema()
.source_type_key()
.map_or("".to_string(), |key| key.to_string()),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/datadog_agent/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn handle_dd_trace_payload_v1(
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v2".to_string());
trace_event.insert(source.log_schema_host_key, hostname.clone());
trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone());
trace_event.insert("env", env.clone());
trace_event.insert("agent_version", agent_version.clone());
trace_event.insert("target_tps", target_tps);
Expand Down Expand Up @@ -259,7 +259,7 @@ fn handle_dd_trace_payload_v0(
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v1".to_string());
trace_event.insert(source.log_schema_host_key, hostname.clone());
trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone());
trace_event.insert("env", env.clone());
Event::Trace(trace_event)
})
Expand Down
Loading

0 comments on commit 1c7e1c3

Please sign in to comment.