Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability): count byte_size after transforming event #17941

Merged
merged 9 commits into from
Jul 21, 2023
16 changes: 12 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,19 @@ impl LogEvent {
}
}

/// Retrieves the value of a field based on it's meaning.
/// This will first check if the value has previously been dropped. It is worth being
/// aware that if the field has been dropped and then some how readded, we still fetch
/// the dropped value here.
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.and_then(|path| self.get(path))
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
Some(dropped)
} else {
self.metadata()
.schema_definition()
.meaning_path(meaning.as_ref())
.and_then(|path| self.get(path))
Comment on lines +300 to +306
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for these two paths to get out of sync? E.g. if you drop a field with a given meaning and then subsequently write a new field and assign it that same meaning? Or is that not a problem because we're only using this in the {only,except}_fields path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very valid point. It's not a problem here because the dropping and retrieving the value all occur in the sink, so there is no opportunity for a new value to be added. However, there is a risk that a future change could stumble on this. I have added a comment to warn about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do end up going with some kind of change like #18028 then it would be nice to undo this just to get rid of the oddity in the API and potential for mismatch. I'm not sure if that's just something like a TODO on the field or an issue, but maybe worth noting so that this doesn't stick around in the event that it becomes unneeded.

}
}

// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.
Expand Down
22 changes: 22 additions & 0 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub struct EventMetadata {
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
schema_definition: Arc<schema::Definition>,

/// A store of values that may be dropped during the encoding process but may be needed
/// later on. The map is indexed by meaning.
/// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
/// we need to ensure it is still available later on for emitting metrics tagged by the service.
/// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
/// we have to use `String`.
dropped_fields: BTreeMap<String, Value>,
}

fn default_metadata_value() -> Value {
Expand Down Expand Up @@ -123,6 +131,19 @@ impl EventMetadata {
pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
self.secrets.insert(SPLUNK_HEC_TOKEN, secret);
}

/// Adds the value to the dropped fields list.
/// There is currently no way to remove a field from this list, so if a field is dropped
/// and then the field is re-added with a new value - the dropped value will still be
/// retrieved.
pub fn add_dropped_field(&mut self, meaning: String, value: Value) {
self.dropped_fields.insert(meaning, value);
}

/// Fetches the dropped field by meaning.
pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
self.dropped_fields.get(meaning.as_ref())
}
}

impl Default for EventMetadata {
Expand All @@ -134,6 +155,7 @@ impl Default for EventMetadata {
schema_definition: default_schema_definition(),
source_id: None,
upstream_id: None,
dropped_fields: BTreeMap::new(),
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions lib/vector-core/src/schema/meaning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Constants for commonly used semantic meanings.

/// The service typically represents the application that generated the event.
pub const SERVICE: &str = "service";

/// The main text message of the event.
pub const MESSAGE: &str = "message";

/// The main timestamp of the event.
pub const TIMESTAMP: &str = "timestamp";

/// The hostname of the machine where the event was generated.
pub const HOST: &str = "host";

pub const SOURCE: &str = "source";
pub const SEVERITY: &str = "severity";
pub const TRACE_ID: &str = "trace_id";
1 change: 1 addition & 0 deletions lib/vector-core/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod definition;
pub mod meaning;
mod requirement;

pub use definition::Definition;
Expand Down
129 changes: 123 additions & 6 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use lookup::{
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
use vector_core::schema::meaning;
use vrl::value::Value;

use crate::{event::Event, serde::skip_serializing_if_default};
Expand Down Expand Up @@ -128,20 +129,52 @@ impl Transformer {

fn apply_only_fields(&self, log: &mut LogEvent) {
if let Some(only_fields) = self.only_fields.as_ref() {
let old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));

for field in only_fields {
if let Some(value) = old_value.get(field) {
log.insert((PathPrefix::Event, field), value.clone());
if let Some(value) = old_value.remove(field, true) {
log.insert((PathPrefix::Event, field), value);
}
spencergilbert marked this conversation as resolved.
Show resolved Hide resolved
}

// We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there
// is a service meaning, we move this value to `dropped_fields` in the metadata.
// If the field is still in the new log message after pruning it will have been removed from `old_value` above.
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE);
if let Some(service_path) = service_path {
let mut new_log = LogEvent::from(old_value);
if let Some(service) = new_log.remove(service_path) {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), service);
}
}
}
}

fn apply_except_fields(&self, log: &mut LogEvent) {
use lookup::path::TargetPath;

if let Some(except_fields) = self.except_fields.as_ref() {
let service_path = log
.metadata()
.schema_definition()
.meaning_path(meaning::SERVICE)
.map(|path| path.value_path().to_string());

for field in except_fields {
log.remove(field.as_str());
let value = log.remove(field.as_str());

// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
// refer to this later when emitting metrics.
if let Some(v) = value {
if matches!(service_path.as_ref(), Some(path) if path == field) {
log.metadata_mut()
.add_dropped_field(meaning::SERVICE.to_string(), v);
}
}
}
}
}
Expand Down Expand Up @@ -213,10 +246,15 @@ pub enum TimestampFormat {
#[cfg(test)]
mod tests {
use indoc::indoc;
use vector_core::config::log_schema;
use lookup::path::parse_target_path;
use vector_common::btreemap;
use vector_core::config::{log_schema, LogNamespace};
use vrl::value::Kind;

use crate::config::schema;

use super::*;
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

#[test]
fn serialize() {
Expand Down Expand Up @@ -374,4 +412,83 @@ mod tests {
"#});
assert!(config.is_err())
}

#[test]
fn only_fields_with_service() {
let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("message", 1);
log.insert("thing.service", "carrot");
}

let schema = schema::Definition::new_with_default_metadata(
Kind::object(btreemap! {
"thing" => Kind::object(btreemap! {
"service" => Kind::bytes(),
})
}),
[LogNamespace::Vector],
);

let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");

let mut event = Event::from(log);

event
.metadata_mut()
.set_schema_definition(&Arc::new(schema));

transformer.transform(&mut event);
assert!(event.as_mut_log().contains("message"));

// Event no longer contains the service field.
assert!(!event.as_mut_log().contains("thing.service"));

// But we can still get the service by meaning.
assert_eq!(
&Value::from("carrot"),
event.as_log().get_by_meaning("service").unwrap()
);
}

#[test]
fn except_fields_with_service() {
let transformer: Transformer =
toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
let mut log = LogEvent::default();
{
log.insert("message", 1);
log.insert("thing.service", "carrot");
}

let schema = schema::Definition::new_with_default_metadata(
Kind::object(btreemap! {
"thing" => Kind::object(btreemap! {
"service" => Kind::bytes(),
})
}),
[LogNamespace::Vector],
);

let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");

let mut event = Event::from(log);

event
.metadata_mut()
.set_schema_definition(&Arc::new(schema));

transformer.transform(&mut event);
assert!(event.as_mut_log().contains("message"));

// Event no longer contains the service field.
assert!(!event.as_mut_log().contains("thing.service"));

// But we can still get the service by meaning.
assert_eq!(
&Value::from("carrot"),
event.as_log().get_by_meaning("service").unwrap()
);
}
}
13 changes: 11 additions & 2 deletions src/sinks/amqp/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::sinks::prelude::*;
use bytes::BytesMut;
use std::io;
use tokio_util::codec::Encoder as _;
use vector_core::config::telemetry;

#[derive(Clone, Debug)]
pub(super) struct AmqpEncoder {
Expand All @@ -11,9 +12,17 @@ pub(super) struct AmqpEncoder {
}

impl encoding::Encoder<Event> for AmqpEncoder {
fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result<usize> {
fn encode_input(
&self,
mut input: Event,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut body = BytesMut::new();
self.transformer.transform(&mut input);

let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());

let mut encoder = self.encoder.clone();
encoder
.encode(input, &mut body)
Comment on lines +24 to 28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit outside the scope of the PR, but I'm curious why we need to estimate the JSON size right before we actually encode the event. Don't we have the actual size here that we could use directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was part of #17576. The intention was to use a measurement that could be taken anywhere in the processing pipeline (source, transforms, sinks) so that the values are comparable.

Expand All @@ -22,6 +31,6 @@ impl encoding::Encoder<Event> for AmqpEncoder {
let body = body.freeze();
write_all(writer, 1, body.as_ref())?;

Ok(body.len())
Ok((body.len(), byte_size))
}
}
24 changes: 19 additions & 5 deletions src/sinks/azure_blob/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use codecs::{
encoding::{Framer, FramingConfig},
NewlineDelimitedEncoder, TextSerializerConfig,
};
use vector_core::partition::Partitioner;
use vector_common::request_metadata::GroupedCountByteSize;
use vector_core::{partition::Partitioner, EstimatedJsonEncodedSizeOf};

use super::config::AzureBlobSinkConfig;
use super::request_builder::AzureBlobRequestOptions;
Expand Down Expand Up @@ -68,10 +69,13 @@ fn azure_blob_build_request_without_compression() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -112,10 +116,14 @@ fn azure_blob_build_request_with_compression() {
),
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -157,10 +165,13 @@ fn azure_blob_build_request_with_time_format() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down Expand Up @@ -205,10 +216,13 @@ fn azure_blob_build_request_with_uuid() {
compression,
};

let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());

let (metadata, request_metadata_builder, _events) =
request_options.split_input((key, vec![log]));

let payload = EncodeResult::uncompressed(Bytes::new());
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = request_metadata_builder.build(&payload);
let request = request_options.build_request(metadata, request_metadata, payload);

Expand Down
16 changes: 8 additions & 8 deletions src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{convert::TryFrom, sync::Arc};
use indoc::indoc;
use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::config::proxy::ProxyConfig;
use vector_core::{config::proxy::ProxyConfig, schema::meaning};
use vrl::value::Kind;

use super::{service::LogApiRetry, sink::LogSinkBuilder};
Expand Down Expand Up @@ -176,13 +176,13 @@ impl SinkConfig for DatadogLogsConfig {

fn input(&self) -> Input {
let requirement = schema::Requirement::empty()
.required_meaning("message", Kind::bytes())
.required_meaning("timestamp", Kind::timestamp())
.optional_meaning("host", Kind::bytes())
.optional_meaning("source", Kind::bytes())
.optional_meaning("severity", Kind::bytes())
.optional_meaning("service", Kind::bytes())
.optional_meaning("trace_id", Kind::bytes());
.required_meaning(meaning::MESSAGE, Kind::bytes())
.required_meaning(meaning::TIMESTAMP, Kind::timestamp())
.optional_meaning(meaning::HOST, Kind::bytes())
.optional_meaning(meaning::SOURCE, Kind::bytes())
.optional_meaning(meaning::SEVERITY, Kind::bytes())
.optional_meaning(meaning::SERVICE, Kind::bytes())
.optional_meaning(meaning::TRACE_ID, Kind::bytes());

Input::log().with_schema_requirement(requirement)
}
Expand Down
Loading
Loading