Skip to content

Commit

Permalink
feat(codecs): add support for protobuf encoding (#18598)
Browse files Browse the repository at this point in the history
* feat(codecs): add support for protobuf encoding

* rename descriptor_set_path to desc_file

* fix spelling errors

* describe encode_message

* factor out common test code

* move protobuf test data to one directory

* create common protobuf module

* add map encoding and test

* add enum encoding and test

* add timestamp encoding and test

* fix spelling again

* assert -> assert_eq

* remove simple comments

* enums are not case-sensitive

* add round trip test

* spelling (maybe?)

* fix clippy errors
  • Loading branch information
goakley authored Sep 29, 2023
1 parent decaaeb commit 737f5c3
Show file tree
Hide file tree
Showing 50 changed files with 1,502 additions and 40 deletions.
3 changes: 3 additions & 0 deletions lib/codecs/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! A collection of common utility features used by both encoding and decoding logic.
pub mod protobuf;
36 changes: 36 additions & 0 deletions lib/codecs/src/common/protobuf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use prost_reflect::{DescriptorPool, MessageDescriptor};
use std::path::Path;

/// Load a `MessageDescriptor` from a specific message type from the given descriptor set file.
///
/// The path should point to the output of `protoc -o <path> ...`
pub fn get_message_descriptor(
descriptor_set_path: &Path,
message_type: &str,
) -> vector_common::Result<MessageDescriptor> {
let b = std::fs::read(descriptor_set_path).map_err(|e| {
format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",)
})?;
let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| {
format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}")
})?;
pool.get_message_by_name(message_type).ok_or_else(|| {
format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'")
.into()
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;

#[test]
fn test_get_message_descriptor() {
let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap())
.join("tests/data/protobuf/test.desc");
let message_descriptor = get_message_descriptor(&path, "test.Integers").unwrap();
assert_eq!("Integers", message_descriptor.name());
assert_eq!(4, message_descriptor.fields().count());
}
}
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
};
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig};
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions};
use smallvec::SmallVec;
#[cfg(feature = "syslog")]
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
Expand Down
41 changes: 11 additions & 30 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use ordered_float::NotNan;
use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage};
use prost_reflect::{DynamicMessage, MessageDescriptor, ReflectMessage};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::event::LogEvent;
Expand All @@ -17,6 +16,8 @@ use vector_core::{
};
use vrl::value::Kind;

use crate::common::protobuf::get_message_descriptor;

use super::Deserializer;

/// Config used to build a `ProtobufDeserializer`.
Expand Down Expand Up @@ -73,10 +74,10 @@ impl ProtobufDeserializerConfig {
#[derivative(Default)]
pub struct ProtobufDeserializerOptions {
/// Path to desc file
desc_file: PathBuf,
pub desc_file: PathBuf,

/// message type. e.g package.message
message_type: String,
pub message_type: String,
}

/// Deserializer that builds `Event`s from a byte frame containing protobuf.
Expand All @@ -90,19 +91,6 @@ impl ProtobufDeserializer {
pub fn new(message_descriptor: MessageDescriptor) -> Self {
Self { message_descriptor }
}

fn get_message_descriptor(
desc_file: &PathBuf,
message_type: String,
) -> vector_common::Result<MessageDescriptor> {
let b = fs::read(desc_file)
.map_err(|e| format!("Failed to open protobuf desc file '{desc_file:?}': {e}",))?;
let pool = DescriptorPool::decode(b.as_slice())
.map_err(|e| format!("Failed to parse protobuf desc file '{desc_file:?}': {e}"))?;
Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| {
panic!("The message type '{message_type}' could not be found in '{desc_file:?}'")
}))
}
}

impl Deserializer for ProtobufDeserializer {
Expand Down Expand Up @@ -137,10 +125,8 @@ impl Deserializer for ProtobufDeserializer {
impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
type Error = vector_common::Error;
fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
&config.protobuf.desc_file,
config.protobuf.message_type.clone(),
)?;
let message_descriptor =
get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
Ok(Self::new(message_descriptor))
}
}
Expand Down Expand Up @@ -245,8 +231,7 @@ mod tests {
use super::*;

fn test_data_dir() -> PathBuf {
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap())
.join("tests/data/decoding/protobuf")
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
}

fn parse_and_validate(
Expand All @@ -256,11 +241,7 @@ mod tests {
validate_log: fn(&LogEvent),
) {
let input = Bytes::from(protobuf_bin_message);
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
&protobuf_desc_path,
message_type.to_string(),
)
.unwrap();
let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
let deserializer = ProtobufDeserializer::new(message_descriptor);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
Expand Down Expand Up @@ -352,9 +333,9 @@ mod tests {
#[test]
fn deserialize_error_invalid_protobuf() {
let input = Bytes::from("{ foo");
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
let message_descriptor = get_message_descriptor(
&test_data_dir().join("test_protobuf.desc"),
"test_protobuf.Person".to_string(),
"test_protobuf.Person",
)
.unwrap();
let deserializer = ProtobufDeserializer::new(message_descriptor);
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use format::{
GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig,
JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
ProtobufDeserializerConfig,
ProtobufDeserializerConfig, ProtobufDeserializerOptions,
};
#[cfg(feature = "syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
Expand Down
2 changes: 2 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod json;
mod logfmt;
mod native;
mod native_json;
mod protobuf;
mod raw_message;
mod text;

Expand All @@ -24,6 +25,7 @@ pub use json::{JsonSerializer, JsonSerializerConfig};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
pub use text::{TextSerializer, TextSerializerConfig};
use vector_core::event::Event;
Expand Down
Loading

0 comments on commit 737f5c3

Please sign in to comment.