Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(codecs): add support for protobuf encoding #18598

Merged
merged 17 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/codecs/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! A collection of common utility features used by both encoding and decoding logic.

pub mod protobuf;
36 changes: 36 additions & 0 deletions lib/codecs/src/common/protobuf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use prost_reflect::{DescriptorPool, MessageDescriptor};
use std::path::Path;

/// Load a `MessageDescriptor` from a specific message type from the given descriptor set file.
///
/// The path should point to the output of `protoc -o <path> ...`
pub fn get_message_descriptor(
descriptor_set_path: &Path,
message_type: &str,
) -> vector_common::Result<MessageDescriptor> {
let b = std::fs::read(descriptor_set_path).map_err(|e| {
format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",)
})?;
let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| {
format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}")
})?;
pool.get_message_by_name(message_type).ok_or_else(|| {
format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'")
.into()
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;

#[test]
fn test_get_message_descriptor() {
let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap())
.join("tests/data/protobuf/test.desc");
let message_descriptor = get_message_descriptor(&path, "test.Integers").unwrap();
assert_eq!("Integers", message_descriptor.name());
assert_eq!(4, message_descriptor.fields().count());
}
}
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
};
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig};
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions};
use smallvec::SmallVec;
#[cfg(feature = "syslog")]
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
Expand Down
41 changes: 11 additions & 30 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use ordered_float::NotNan;
use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage};
use prost_reflect::{DynamicMessage, MessageDescriptor, ReflectMessage};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::event::LogEvent;
Expand All @@ -17,6 +16,8 @@ use vector_core::{
};
use vrl::value::Kind;

use crate::common::protobuf::get_message_descriptor;

use super::Deserializer;

/// Config used to build a `ProtobufDeserializer`.
Expand Down Expand Up @@ -73,10 +74,10 @@ impl ProtobufDeserializerConfig {
#[derivative(Default)]
pub struct ProtobufDeserializerOptions {
/// Path to desc file
desc_file: PathBuf,
pub desc_file: PathBuf,

/// message type. e.g package.message
message_type: String,
pub message_type: String,
}

/// Deserializer that builds `Event`s from a byte frame containing protobuf.
Expand All @@ -90,19 +91,6 @@ impl ProtobufDeserializer {
pub fn new(message_descriptor: MessageDescriptor) -> Self {
Self { message_descriptor }
}

fn get_message_descriptor(
desc_file: &PathBuf,
message_type: String,
) -> vector_common::Result<MessageDescriptor> {
let b = fs::read(desc_file)
.map_err(|e| format!("Failed to open protobuf desc file '{desc_file:?}': {e}",))?;
let pool = DescriptorPool::decode(b.as_slice())
.map_err(|e| format!("Failed to parse protobuf desc file '{desc_file:?}': {e}"))?;
Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| {
panic!("The message type '{message_type}' could not be found in '{desc_file:?}'")
}))
}
}

impl Deserializer for ProtobufDeserializer {
Expand Down Expand Up @@ -137,10 +125,8 @@ impl Deserializer for ProtobufDeserializer {
impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
type Error = vector_common::Error;
fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
&config.protobuf.desc_file,
config.protobuf.message_type.clone(),
)?;
let message_descriptor =
get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
Ok(Self::new(message_descriptor))
}
}
Expand Down Expand Up @@ -245,8 +231,7 @@ mod tests {
use super::*;

fn test_data_dir() -> PathBuf {
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap())
.join("tests/data/decoding/protobuf")
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
}

fn parse_and_validate(
Expand All @@ -256,11 +241,7 @@ mod tests {
validate_log: fn(&LogEvent),
) {
let input = Bytes::from(protobuf_bin_message);
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
&protobuf_desc_path,
message_type.to_string(),
)
.unwrap();
let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
let deserializer = ProtobufDeserializer::new(message_descriptor);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
Expand Down Expand Up @@ -352,9 +333,9 @@ mod tests {
#[test]
fn deserialize_error_invalid_protobuf() {
let input = Bytes::from("{ foo");
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
let message_descriptor = get_message_descriptor(
&test_data_dir().join("test_protobuf.desc"),
"test_protobuf.Person".to_string(),
"test_protobuf.Person",
)
.unwrap();
let deserializer = ProtobufDeserializer::new(message_descriptor);
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use format::{
GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig,
JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
ProtobufDeserializerConfig,
ProtobufDeserializerConfig, ProtobufDeserializerOptions,
};
#[cfg(feature = "syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
Expand Down
2 changes: 2 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod json;
mod logfmt;
mod native;
mod native_json;
mod protobuf;
mod raw_message;
mod text;

Expand All @@ -24,6 +25,7 @@ pub use json::{JsonSerializer, JsonSerializerConfig};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
pub use text::{TextSerializer, TextSerializerConfig};
use vector_core::event::Event;
Expand Down
Loading