Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate LogSchema::message_key to new lookup code #18024

Merged
merged 10 commits into from
Jul 21, 2023
2 changes: 1 addition & 1 deletion benches/codecs/character_delimited_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| CharacterDelimitedDecoder::new_with_max_length(b'a', ml))
.unwrap_or(CharacterDelimitedDecoder::new(b'a')),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer);
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
2 changes: 1 addition & 1 deletion benches/codecs/newline_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| NewlineDelimitedDecoder::new_with_max_length(ml))
.unwrap_or(NewlineDelimitedDecoder::new()),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer);
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
35 changes: 11 additions & 24 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::Bytes;
use lookup::lookup_v2::parse_value_path;
use lookup::OwnedTargetPath;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
Expand All @@ -9,6 +8,7 @@ use vector_core::{
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand All @@ -25,7 +25,7 @@ impl BytesDeserializerConfig {

/// Build the `BytesDeserializer` from this configuration.
pub fn build(&self) -> BytesDeserializer {
BytesDeserializer::new()
BytesDeserializer
}

/// Return the type of event build by this deserializer.
Expand All @@ -37,7 +37,7 @@ impl BytesDeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
),
Expand All @@ -54,32 +54,16 @@ impl BytesDeserializerConfig {
/// This deserializer can be considered as the no-op action for input where no
/// further decoding has been specified.
#[derive(Debug, Clone)]
pub struct BytesDeserializer {
// Only used with the "Legacy" namespace. The "Vector" namespace decodes the data at the root of the event.
log_schema_message_key: &'static str,
}

impl Default for BytesDeserializer {
fn default() -> Self {
Self::new()
}
}
pub struct BytesDeserializer;

impl BytesDeserializer {
/// Creates a new `BytesDeserializer`.
pub fn new() -> Self {
Self {
log_schema_message_key: log_schema().message_key(),
}
}

/// Deserializes the given bytes, which will always produce a single `LogEvent`.
pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
match log_namespace {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.insert(self.log_schema_message_key, bytes);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
log
}
}
Expand Down Expand Up @@ -107,15 +91,18 @@ mod tests {
#[test]
fn deserialize_bytes_legacy_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer;

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
let mut events = events.into_iter();

{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log[log_schema().message_key()], "foo".into());
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"foo".into()
);
}

assert_eq!(events.next(), None);
Expand All @@ -124,7 +111,7 @@ mod tests {
#[test]
fn deserialize_bytes_vector_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer;

let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
assert_eq!(events.len(), 1);
Expand Down
4 changes: 2 additions & 2 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get(log_schema().message_key()),
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains(log_schema().message_key()));
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
}

// filter out id
Expand Down
14 changes: 8 additions & 6 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::lookup_v2::parse_value_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
Expand Down Expand Up @@ -71,7 +70,7 @@ impl SyslogDeserializerConfig {
// The `message` field is always defined. If parsing fails, the entire body becomes the
// message.
.with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
);
Expand Down Expand Up @@ -429,7 +428,7 @@ fn insert_fields_from_syslog(
) {
match log_namespace {
LogNamespace::Legacy => {
log.insert(event_path!(log_schema().message_key()), parsed.msg);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
}
LogNamespace::Vector => {
log.insert(event_path!("message"), parsed.msg);
Expand Down Expand Up @@ -500,7 +499,10 @@ mod tests {

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].as_log()[log_schema().message_key()], "MSG".into());
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
"MSG".into()
);
assert!(
events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
);
Expand All @@ -522,8 +524,8 @@ mod tests {

fn init() {
let mut schema = LogSchema::default();
schema.set_message_key("legacy_message".to_string());
schema.set_message_key("legacy_timestamp".to_string());
schema.set_message_key(Some(owned_value_path!("legacy_message")));
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
init_log_schema(schema, false);
}
}
12 changes: 12 additions & 0 deletions lib/codecs/src/encoding/format/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use vector_core::config::log_schema;
use vector_core::schema;
use vrl::value::Kind;

/// Inspect the global log schema and create a schema requirement.
pub fn get_serializer_schema_requirement() -> schema::Requirement {
if let Some(message_key) = log_schema().message_key() {
schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any())
} else {
schema::Requirement::empty()
}
}
17 changes: 10 additions & 7 deletions lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -138,13 +139,15 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
err_missing_field(HOST)?;
}

let message_key = log_schema().message_key();
if !log.contains(SHORT_MESSAGE) {
// rename the log_schema().message_key() to SHORT_MESSAGE
if log.contains(message_key) {
log.rename_key(message_key, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
}
}
Ok(log)
Expand Down Expand Up @@ -329,7 +332,7 @@ mod tests {
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
log_schema().message_key() => "Some message",
log_schema().message_key().unwrap().to_string() => "Some message",
};

let jsn = do_serialize(true, event_fields).unwrap();
Expand Down
1 change: 1 addition & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#![deny(missing_docs)]

mod avro;
mod common;
mod csv;
mod gelf;
mod json;
Expand Down
20 changes: 4 additions & 16 deletions lib/codecs/src/encoding/format/raw_message.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use crate::encoding::format::common::get_serializer_schema_requirement;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_core::{
config::{log_schema, DataType},
event::Event,
schema,
};
use vrl::value::Kind;
use vector_core::{config::DataType, event::Event, schema};

/// Config used to build a `RawMessageSerializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
Expand All @@ -30,7 +26,7 @@ impl RawMessageSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
get_serializer_schema_requirement()
}
}

Expand All @@ -49,18 +45,10 @@ impl Encoder<Event> for RawMessageSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

let log = event.as_log();

if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
buffer.put(bytes);
}

Ok(())
}
}
Expand Down
18 changes: 4 additions & 14 deletions lib/codecs/src/encoding/format/text.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::encoding::format::common::get_serializer_schema_requirement;
use bytes::{BufMut, BytesMut};
use tokio_util::codec::Encoder;
use vector_core::{
config::{log_schema, DataType},
event::Event,
schema,
};
use vrl::value::Kind;
use vector_core::{config::DataType, event::Event, schema};

use crate::MetricTagValues;

Expand Down Expand Up @@ -42,7 +38,7 @@ impl TextSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
get_serializer_schema_requirement()
}
}

Expand All @@ -67,15 +63,9 @@ impl Encoder<Event> for TextSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

match event {
Event::Log(log) => {
if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
buffer.put(bytes);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::Value;

use super::proto::{
Expand Down Expand Up @@ -94,7 +95,7 @@ impl ResourceLog {
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.insert(log_schema().message_key(), v);
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
}
log
}
Expand Down
Loading
Loading