diff --git a/lib/codecs/src/common/mod.rs b/lib/codecs/src/common/mod.rs new file mode 100644 index 0000000000000..230f3b31d2f97 --- /dev/null +++ b/lib/codecs/src/common/mod.rs @@ -0,0 +1,3 @@ +//! A collection of common utility features used by both encoding and decoding logic. + +pub mod protobuf; diff --git a/lib/codecs/src/common/protobuf.rs b/lib/codecs/src/common/protobuf.rs new file mode 100644 index 0000000000000..c50321c211a23 --- /dev/null +++ b/lib/codecs/src/common/protobuf.rs @@ -0,0 +1,36 @@ +use prost_reflect::{DescriptorPool, MessageDescriptor}; +use std::path::Path; + +/// Load a `MessageDescriptor` from a specific message type from the given descriptor set file. +/// +/// The path should point to the output of `protoc -o ...` +pub fn get_message_descriptor( + descriptor_set_path: &Path, + message_type: &str, +) -> vector_common::Result { + let b = std::fs::read(descriptor_set_path).map_err(|e| { + format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",) + })?; + let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| { + format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}") + })?; + pool.get_message_by_name(message_type).ok_or_else(|| { + format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'") + .into() + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + #[test] + fn test_get_message_descriptor() { + let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/protobuf/test.desc"); + let message_descriptor = get_message_descriptor(&path, "test.Integers").unwrap(); + assert_eq!("Integers", message_descriptor.name()); + assert_eq!(4, message_descriptor.fields().count()); + } +} diff --git a/lib/codecs/src/decoding/format/mod.rs b/lib/codecs/src/decoding/format/mod.rs index 5172705448dc2..52c4557d7f57f 100644 --- a/lib/codecs/src/decoding/format/mod.rs +++ b/lib/codecs/src/decoding/format/mod.rs @@ -20,7 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig}; pub use native_json::{ NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, }; -pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig}; +pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions}; use smallvec::SmallVec; #[cfg(feature = "syslog")] pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions}; diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 71bd98c1567fa..65be4990b0905 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -1,12 +1,11 @@ use std::collections::BTreeMap; -use std::fs; use std::path::PathBuf; use bytes::Bytes; use chrono::Utc; use derivative::Derivative; use ordered_float::NotNan; -use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage}; +use prost_reflect::{DynamicMessage, MessageDescriptor, ReflectMessage}; use smallvec::{smallvec, SmallVec}; use vector_config::configurable_component; use vector_core::event::LogEvent; @@ -17,6 +16,8 @@ use vector_core::{ }; use vrl::value::Kind; +use crate::common::protobuf::get_message_descriptor; + use super::Deserializer; /// Config used to build a `ProtobufDeserializer`. @@ -73,10 +74,10 @@ impl ProtobufDeserializerConfig { #[derivative(Default)] pub struct ProtobufDeserializerOptions { /// Path to desc file - desc_file: PathBuf, + pub desc_file: PathBuf, /// message type. e.g package.message - message_type: String, + pub message_type: String, } /// Deserializer that builds `Event`s from a byte frame containing protobuf. @@ -90,19 +91,6 @@ impl ProtobufDeserializer { pub fn new(message_descriptor: MessageDescriptor) -> Self { Self { message_descriptor } } - - fn get_message_descriptor( - desc_file: &PathBuf, - message_type: String, - ) -> vector_common::Result { - let b = fs::read(desc_file) - .map_err(|e| format!("Failed to open protobuf desc file '{desc_file:?}': {e}",))?; - let pool = DescriptorPool::decode(b.as_slice()) - .map_err(|e| format!("Failed to parse protobuf desc file '{desc_file:?}': {e}"))?; - Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| { - panic!("The message type '{message_type}' could not be found in '{desc_file:?}'") - })) - } } impl Deserializer for ProtobufDeserializer { @@ -137,10 +125,8 @@ impl Deserializer for ProtobufDeserializer { impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer { type Error = vector_common::Error; fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result { - let message_descriptor = ProtobufDeserializer::get_message_descriptor( - &config.protobuf.desc_file, - config.protobuf.message_type.clone(), - )?; + let message_descriptor = + get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?; Ok(Self::new(message_descriptor)) } } @@ -245,8 +231,7 @@ mod tests { use super::*; fn test_data_dir() -> PathBuf { - PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()) - .join("tests/data/decoding/protobuf") + PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf") } fn parse_and_validate( @@ -256,11 +241,7 @@ mod tests { validate_log: fn(&LogEvent), ) { let input = Bytes::from(protobuf_bin_message); - let message_descriptor = ProtobufDeserializer::get_message_descriptor( - &protobuf_desc_path, - message_type.to_string(), - ) - .unwrap(); + let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap(); let deserializer = ProtobufDeserializer::new(message_descriptor); for namespace in [LogNamespace::Legacy, LogNamespace::Vector] { @@ -352,9 +333,9 @@ mod tests { #[test] fn deserialize_error_invalid_protobuf() { let input = Bytes::from("{ foo"); - let message_descriptor = ProtobufDeserializer::get_message_descriptor( + let message_descriptor = get_message_descriptor( &test_data_dir().join("test_protobuf.desc"), - "test_protobuf.Person".to_string(), + "test_protobuf.Person", ) .unwrap(); let deserializer = ProtobufDeserializer::new(message_descriptor); diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index f4465fcacb434..0b9b451a55138 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -12,7 +12,7 @@ pub use format::{ GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer, - ProtobufDeserializerConfig, + ProtobufDeserializerConfig, ProtobufDeserializerOptions, }; #[cfg(feature = "syslog")] pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions}; diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index efff723f65b46..e61f7cae0bb96 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -11,6 +11,7 @@ mod json; mod logfmt; mod native; mod native_json; +mod protobuf; mod raw_message; mod text; @@ -24,6 +25,7 @@ pub use json::{JsonSerializer, JsonSerializerConfig}; pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig}; pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; +pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; pub use text::{TextSerializer, TextSerializerConfig}; use vector_core::event::Event; diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs new file mode 100644 index 0000000000000..a67a631edf3eb --- /dev/null +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -0,0 +1,468 @@ +use crate::common::protobuf::get_message_descriptor; +use crate::encoding::BuildError; +use bytes::BytesMut; +use chrono::Timelike; +use prost::Message; +use prost_reflect::{DynamicMessage, FieldDescriptor, Kind, MapKey, MessageDescriptor}; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio_util::codec::Encoder; +use vector_core::{ + config::DataType, + event::{Event, Value}, + schema, +}; + +/// Config used to build a `ProtobufSerializer`. +#[crate::configurable_component] +#[derive(Debug, Clone)] +pub struct ProtobufSerializerConfig { + /// Options for the Protobuf serializer. + pub protobuf: ProtobufSerializerOptions, +} + +impl ProtobufSerializerConfig { + /// Build the `ProtobufSerializer` from this configuration. + pub fn build(&self) -> Result { + let message_descriptor = + get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?; + Ok(ProtobufSerializer { message_descriptor }) + } + + /// The data type of events that are accepted by `ProtobufSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log.and(DataType::Trace) + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + // While technically we support `Value` variants that can't be losslessly serialized to + // Protobuf, we don't want to enforce that limitation to users yet. + schema::Requirement::empty() + } +} + +/// Protobuf serializer options. +#[crate::configurable_component] +#[derive(Debug, Clone)] +pub struct ProtobufSerializerOptions { + /// The path to the protobuf descriptor set file. + /// + /// This file is the output of `protoc -o ...` + #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))] + pub desc_file: PathBuf, + + /// The name of the message type to use for serializing. + #[configurable(metadata(docs::examples = "package.Message"))] + pub message_type: String, +} + +/// Serializer that converts an `Event` to bytes using the Protobuf format. +#[derive(Debug, Clone)] +pub struct ProtobufSerializer { + /// The protobuf message definition to use for serialization. + message_descriptor: MessageDescriptor, +} + +/// Convert a single raw vector `Value` into a protobuf `Value`. +/// +/// Unlike `convert_value`, this ignores any field metadata such as cardinality. +fn convert_value_raw( + value: Value, + kind: &prost_reflect::Kind, +) -> Result { + let kind_str = value.kind_str().to_owned(); + match (value, kind) { + (Value::Boolean(b), Kind::Bool) => Ok(prost_reflect::Value::Bool(b)), + (Value::Bytes(b), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(b)), + (Value::Bytes(b), Kind::String) => Ok(prost_reflect::Value::String( + String::from_utf8_lossy(&b).into_owned(), + )), + (Value::Bytes(b), Kind::Enum(descriptor)) => { + let string = String::from_utf8_lossy(&b).into_owned(); + if let Some(d) = descriptor + .values() + .find(|v| v.name().eq_ignore_ascii_case(&string)) + { + Ok(prost_reflect::Value::EnumNumber(d.number())) + } else { + Err(format!( + "Enum `{}` has no value that matches string '{}'", + descriptor.full_name(), + string + ) + .into()) + } + } + (Value::Float(f), Kind::Double) => Ok(prost_reflect::Value::F64(f.into_inner())), + (Value::Float(f), Kind::Float) => Ok(prost_reflect::Value::F32(f.into_inner() as f32)), + (Value::Integer(i), Kind::Int32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Int64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Sint32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Sint64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Sfixed32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Sfixed64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Uint32) => Ok(prost_reflect::Value::U32(i as u32)), + (Value::Integer(i), Kind::Uint64) => Ok(prost_reflect::Value::U64(i as u64)), + (Value::Integer(i), Kind::Fixed32) => Ok(prost_reflect::Value::U32(i as u32)), + (Value::Integer(i), Kind::Fixed64) => Ok(prost_reflect::Value::U64(i as u64)), + (Value::Integer(i), Kind::Enum(_)) => Ok(prost_reflect::Value::EnumNumber(i as i32)), + (Value::Object(o), Kind::Message(message_descriptor)) => { + if message_descriptor.is_map_entry() { + let value_field = message_descriptor + .get_field_by_name("value") + .ok_or("Internal error with proto map processing")?; + let mut map: HashMap = HashMap::new(); + for (key, val) in o.into_iter() { + match convert_value(&value_field, val) { + Ok(prost_val) => { + map.insert(MapKey::String(key), prost_val); + } + Err(e) => return Err(e), + } + } + Ok(prost_reflect::Value::Map(map)) + } else { + // if it's not a map, it's an actual message + Ok(prost_reflect::Value::Message(encode_message( + message_descriptor, + Value::Object(o), + )?)) + } + } + (Value::Regex(r), Kind::String) => Ok(prost_reflect::Value::String(r.as_str().to_owned())), + (Value::Regex(r), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(r.as_bytes())), + (Value::Timestamp(t), Kind::Int64) => Ok(prost_reflect::Value::I64(t.timestamp_micros())), + (Value::Timestamp(t), Kind::Message(descriptor)) + if descriptor.full_name() == "google.protobuf.Timestamp" => + { + let mut message = DynamicMessage::new(descriptor.clone()); + message.try_set_field_by_name("seconds", prost_reflect::Value::I64(t.timestamp()))?; + message + .try_set_field_by_name("nanos", prost_reflect::Value::I32(t.nanosecond() as i32))?; + Ok(prost_reflect::Value::Message(message)) + } + _ => Err(format!("Cannot encode vector `{kind_str}` into protobuf `{kind:?}`",).into()), + } +} + +/// Convert a vector `Value` into a protobuf `Value`. +fn convert_value( + field_descriptor: &FieldDescriptor, + value: Value, +) -> Result { + if let Value::Array(a) = value { + if field_descriptor.cardinality() == prost_reflect::Cardinality::Repeated { + let repeated: Result, vector_common::Error> = a + .into_iter() + .map(|v| convert_value_raw(v, &field_descriptor.kind())) + .collect(); + Ok(prost_reflect::Value::List(repeated?)) + } else { + Err("Cannot encode vector array into a non-repeated protobuf field".into()) + } + } else { + convert_value_raw(value, &field_descriptor.kind()) + } +} + +/// Convert a vector object (`Value`) into a protobuf message. +/// +/// This function can only operate on `Value::Object`s, +/// since they are the only field-based vector Value +/// and protobuf messages are defined as a collection of fields and values. +fn encode_message( + message_descriptor: &MessageDescriptor, + value: Value, +) -> Result { + let mut message = DynamicMessage::new(message_descriptor.clone()); + if let Value::Object(map) = value { + for field in message_descriptor.fields() { + match map.get(field.name()) { + None | Some(Value::Null) => message.clear_field(&field), + Some(value) => { + message.try_set_field(&field, convert_value(&field, value.clone())?)? + } + } + } + Ok(message) + } else { + Err("ProtobufSerializer only supports serializing objects".into()) + } +} + +impl ProtobufSerializer { + /// Creates a new `ProtobufSerializer`. + pub fn new(message_descriptor: MessageDescriptor) -> Self { + Self { message_descriptor } + } + + /// Get a description of the message type used in serialization. + pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto { + self.message_descriptor.descriptor_proto() + } +} + +impl Encoder for ProtobufSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + let message = match event { + Event::Log(log) => encode_message(&self.message_descriptor, log.into_parts().0), + Event::Metric(_) => unimplemented!(), + Event::Trace(trace) => encode_message( + &self.message_descriptor, + Value::Object(trace.into_parts().0), + ), + }?; + message.encode(buffer).map_err(Into::into) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use chrono::{DateTime, NaiveDateTime, Utc}; + use ordered_float::NotNan; + use prost_reflect::MapKey; + use std::collections::{BTreeMap, HashMap}; + + macro_rules! mfield { + ($m:expr, $f:expr) => { + $m.get_field_by_name($f).unwrap().into_owned() + }; + } + + fn test_message_descriptor(message_type: &str) -> MessageDescriptor { + let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/protobuf/test.desc"); + get_message_descriptor(&path, &format!("test.{message_type}")).unwrap() + } + + #[test] + fn test_encode_integers() { + let message = encode_message( + &test_message_descriptor("Integers"), + Value::Object(BTreeMap::from([ + ("i32".into(), Value::Integer(-1234)), + ("i64".into(), Value::Integer(-9876)), + ("u32".into(), Value::Integer(1234)), + ("u64".into(), Value::Integer(9876)), + ])), + ) + .unwrap(); + assert_eq!(Some(-1234), mfield!(message, "i32").as_i32()); + assert_eq!(Some(-9876), mfield!(message, "i64").as_i64()); + assert_eq!(Some(1234), mfield!(message, "u32").as_u32()); + assert_eq!(Some(9876), mfield!(message, "u64").as_u64()); + } + + #[test] + fn test_encode_floats() { + let message = encode_message( + &test_message_descriptor("Floats"), + Value::Object(BTreeMap::from([ + ("d".into(), Value::Float(NotNan::new(11.0).unwrap())), + ("f".into(), Value::Float(NotNan::new(2.0).unwrap())), + ])), + ) + .unwrap(); + assert_eq!(Some(11.0), mfield!(message, "d").as_f64()); + assert_eq!(Some(2.0), mfield!(message, "f").as_f32()); + } + + #[test] + fn test_encode_bytes() { + let bytes = Bytes::from(vec![0, 1, 2, 3]); + let message = encode_message( + &test_message_descriptor("Bytes"), + Value::Object(BTreeMap::from([ + ("text".into(), Value::Bytes(Bytes::from("vector"))), + ("binary".into(), Value::Bytes(bytes.clone())), + ])), + ) + .unwrap(); + assert_eq!(Some("vector"), mfield!(message, "text").as_str()); + assert_eq!(Some(&bytes), mfield!(message, "binary").as_bytes()); + } + + #[test] + fn test_encode_map() { + let message = encode_message( + &test_message_descriptor("Map"), + Value::Object(BTreeMap::from([ + ( + "names".into(), + Value::Object(BTreeMap::from([ + ("forty-four".into(), Value::Integer(44)), + ("one".into(), Value::Integer(1)), + ])), + ), + ( + "people".into(), + Value::Object(BTreeMap::from([( + "mark".into(), + Value::Object(BTreeMap::from([ + ("nickname".into(), Value::Bytes(Bytes::from("jeff"))), + ("age".into(), Value::Integer(22)), + ])), + )])), + ), + ])), + ) + .unwrap(); + // the simpler string->primitive map + assert_eq!( + Some(&HashMap::from([ + ( + MapKey::String("forty-four".into()), + prost_reflect::Value::I32(44), + ), + (MapKey::String("one".into()), prost_reflect::Value::I32(1),), + ])), + mfield!(message, "names").as_map() + ); + // the not-simpler string->message map + let people = mfield!(message, "people").as_map().unwrap().to_owned(); + assert_eq!(1, people.len()); + assert_eq!( + Some("jeff"), + mfield!( + people[&MapKey::String("mark".into())].as_message().unwrap(), + "nickname" + ) + .as_str() + ); + assert_eq!( + Some(22), + mfield!( + people[&MapKey::String("mark".into())].as_message().unwrap(), + "age" + ) + .as_u32() + ); + } + + #[test] + fn test_encode_enum() { + let message = encode_message( + &test_message_descriptor("Enum"), + Value::Object(BTreeMap::from([ + ("breakfast".into(), Value::Bytes(Bytes::from("tomato"))), + ("dinner".into(), Value::Bytes(Bytes::from("OLIVE"))), + ("lunch".into(), Value::Integer(0)), + ])), + ) + .unwrap(); + assert_eq!(Some(2), mfield!(message, "breakfast").as_enum_number()); + assert_eq!(Some(0), mfield!(message, "lunch").as_enum_number()); + assert_eq!(Some(1), mfield!(message, "dinner").as_enum_number()); + } + + #[test] + fn test_encode_timestamp() { + let message = encode_message( + &test_message_descriptor("Timestamp"), + Value::Object(BTreeMap::from([( + "morning".into(), + Value::Timestamp(DateTime::from_naive_utc_and_offset( + NaiveDateTime::from_timestamp_opt(8675, 309).unwrap(), + Utc, + )), + )])), + ) + .unwrap(); + let timestamp = mfield!(message, "morning").as_message().unwrap().clone(); + assert_eq!(Some(8675), mfield!(timestamp, "seconds").as_i64()); + assert_eq!(Some(309), mfield!(timestamp, "nanos").as_i32()); + } + + #[test] + fn test_encode_repeated_primitive() { + let message = encode_message( + &test_message_descriptor("RepeatedPrimitive"), + Value::Object(BTreeMap::from([( + "numbers".into(), + Value::Array(vec![ + Value::Integer(8), + Value::Integer(6), + Value::Integer(4), + ]), + )])), + ) + .unwrap(); + let list = mfield!(message, "numbers").as_list().unwrap().to_vec(); + assert_eq!(3, list.len()); + assert_eq!(Some(8), list[0].as_i64()); + assert_eq!(Some(6), list[1].as_i64()); + assert_eq!(Some(4), list[2].as_i64()); + } + + #[test] + fn test_encode_repeated_message() { + let message = encode_message( + &test_message_descriptor("RepeatedMessage"), + Value::Object(BTreeMap::from([( + "messages".into(), + Value::Array(vec![ + Value::Object(BTreeMap::from([( + "text".into(), + Value::Bytes(Bytes::from("vector")), + )])), + Value::Object(BTreeMap::from([("index".into(), Value::Integer(4444))])), + Value::Object(BTreeMap::from([ + ("text".into(), Value::Bytes(Bytes::from("protobuf"))), + ("index".into(), Value::Integer(1)), + ])), + ]), + )])), + ) + .unwrap(); + let list = mfield!(message, "messages").as_list().unwrap().to_vec(); + assert_eq!(3, list.len()); + assert_eq!( + Some("vector"), + mfield!(list[0].as_message().unwrap(), "text").as_str() + ); + assert!(!list[0].as_message().unwrap().has_field_by_name("index")); + assert!(!list[1].as_message().unwrap().has_field_by_name("t4ext")); + assert_eq!( + Some(4444), + mfield!(list[1].as_message().unwrap(), "index").as_u32() + ); + assert_eq!( + Some("protobuf"), + mfield!(list[2].as_message().unwrap(), "text").as_str() + ); + assert_eq!( + Some(1), + mfield!(list[2].as_message().unwrap(), "index").as_u32() + ); + } + + fn run_encoding_on_decoding_test_data( + filename: &str, + message_type: &str, + ) -> Result { + let test_data_dir = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/protobuf"); + let descriptor_set_path = test_data_dir.join(filename); + let message_descriptor = + get_message_descriptor(&descriptor_set_path, message_type).unwrap(); + encode_message( + &message_descriptor, + Value::Object(BTreeMap::from([ + ("name".into(), Value::Bytes(Bytes::from("rope"))), + ("id".into(), Value::Integer(9271)), + ])), + ) + } + + #[test] + fn test_encode_decoding_protobuf_test_data() { + // just check for the side-effect of success + run_encoding_on_decoding_test_data("test_protobuf.desc", "test_protobuf.Person").unwrap(); + run_encoding_on_decoding_test_data("test_protobuf3.desc", "test_protobuf3.Person").unwrap(); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index f9516411720d1..098613c877cfb 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -11,7 +11,8 @@ pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, - NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, RawMessageSerializer, + NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ProtobufSerializer, + ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; pub use framing::{ @@ -222,6 +223,11 @@ pub enum SerializerConfig { /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs NativeJson, + /// Encodes an event as a [Protobuf][protobuf] message. + /// + /// [protobuf]: https://protobuf.dev/ + Protobuf(ProtobufSerializerConfig), + /// No encoding. /// /// This encoding uses the `message` field of a log event. @@ -284,6 +290,12 @@ impl From for SerializerConfig { } } +impl From for SerializerConfig { + fn from(config: ProtobufSerializerConfig) -> Self { + Self::Protobuf(config) + } +} + impl From for SerializerConfig { fn from(_: RawMessageSerializerConfig) -> Self { Self::RawMessage @@ -311,6 +323,7 @@ impl SerializerConfig { SerializerConfig::NativeJson => { Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build())) } + SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)), SerializerConfig::RawMessage => { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } @@ -332,9 +345,9 @@ impl SerializerConfig { // we should do so accurately, even if practically it doesn't need to be. // // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing - SerializerConfig::Avro { .. } | SerializerConfig::Native => { - FramingConfig::LengthDelimited - } + SerializerConfig::Avro { .. } + | SerializerConfig::Native + | SerializerConfig::Protobuf(_) => FramingConfig::LengthDelimited, SerializerConfig::Csv(_) | SerializerConfig::Gelf | SerializerConfig::Json(_) @@ -357,6 +370,7 @@ impl SerializerConfig { SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(), SerializerConfig::Native => NativeSerializerConfig.input_type(), SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(), + SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), } @@ -374,6 +388,7 @@ impl SerializerConfig { SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(), SerializerConfig::Native => NativeSerializerConfig.schema_requirement(), SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(), + SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), } @@ -397,6 +412,8 @@ pub enum Serializer { Native(NativeSerializer), /// Uses a `NativeJsonSerializer` for serialization. NativeJson(NativeJsonSerializer), + /// Uses a `ProtobufSerializer` for serialization. + Protobuf(ProtobufSerializer), /// Uses a `RawMessageSerializer` for serialization. RawMessage(RawMessageSerializer), /// Uses a `TextSerializer` for serialization. @@ -413,6 +430,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Protobuf(_) | Serializer::RawMessage(_) => false, } } @@ -433,6 +451,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Protobuf(_) | Serializer::RawMessage(_) => { panic!("Serializer does not support JSON") } @@ -482,6 +501,12 @@ impl From for Serializer { } } +impl From for Serializer { + fn from(serializer: ProtobufSerializer) -> Self { + Self::Protobuf(serializer) + } +} + impl From for Serializer { fn from(serializer: RawMessageSerializer) -> Self { Self::RawMessage(serializer) @@ -506,6 +531,7 @@ impl tokio_util::codec::Encoder for Serializer { Serializer::Logfmt(serializer) => serializer.encode(event, buffer), Serializer::Native(serializer) => serializer.encode(event, buffer), Serializer::NativeJson(serializer) => serializer.encode(event, buffer), + Serializer::Protobuf(serializer) => serializer.encode(event, buffer), Serializer::RawMessage(serializer) => serializer.encode(event, buffer), Serializer::Text(serializer) => serializer.encode(event, buffer), } diff --git a/lib/codecs/src/lib.rs b/lib/codecs/src/lib.rs index 0aa2c63fe1952..07adc18df67b3 100644 --- a/lib/codecs/src/lib.rs +++ b/lib/codecs/src/lib.rs @@ -4,6 +4,7 @@ #![deny(missing_docs)] #![deny(warnings)] +mod common; pub mod decoding; pub mod encoding; pub mod gelf; diff --git a/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto b/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto new file mode 100644 index 0000000000000..3b2df6d91168e --- /dev/null +++ b/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto @@ -0,0 +1,147 @@ +// Protocol Buffers - Google's data interchange format +// Copyright 2008 Google Inc. All rights reserved. +// https://developers.google.com/protocol-buffers/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package google.protobuf; + +option csharp_namespace = "Google.Protobuf.WellKnownTypes"; +option cc_enable_arenas = true; +option go_package = "google.golang.org/protobuf/types/known/timestamppb"; +option java_package = "com.google.protobuf"; +option java_outer_classname = "TimestampProto"; +option java_multiple_files = true; +option objc_class_prefix = "GPB"; + +// A Timestamp represents a point in time independent of any time zone or local +// calendar, encoded as a count of seconds and fractions of seconds at +// nanosecond resolution. The count is relative to an epoch at UTC midnight on +// January 1, 1970, in the proleptic Gregorian calendar which extends the +// Gregorian calendar backwards to year one. +// +// All minutes are 60 seconds long. Leap seconds are "smeared" so that no leap +// second table is needed for interpretation, using a [24-hour linear +// smear](https://developers.google.com/time/smear). +// +// The range is from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z. By +// restricting to that range, we ensure that we can convert to and from [RFC +// 3339](https://www.ietf.org/rfc/rfc3339.txt) date strings. +// +// # Examples +// +// Example 1: Compute Timestamp from POSIX `time()`. +// +// Timestamp timestamp; +// timestamp.set_seconds(time(NULL)); +// timestamp.set_nanos(0); +// +// Example 2: Compute Timestamp from POSIX `gettimeofday()`. +// +// struct timeval tv; +// gettimeofday(&tv, NULL); +// +// Timestamp timestamp; +// timestamp.set_seconds(tv.tv_sec); +// timestamp.set_nanos(tv.tv_usec * 1000); +// +// Example 3: Compute Timestamp from Win32 `GetSystemTimeAsFileTime()`. +// +// FILETIME ft; +// GetSystemTimeAsFileTime(&ft); +// UINT64 ticks = (((UINT64)ft.dwHighDateTime) << 32) | ft.dwLowDateTime; +// +// // A Windows tick is 100 nanoseconds. Windows epoch 1601-01-01T00:00:00Z +// // is 11644473600 seconds before Unix epoch 1970-01-01T00:00:00Z. +// Timestamp timestamp; +// timestamp.set_seconds((INT64) ((ticks / 10000000) - 11644473600LL)); +// timestamp.set_nanos((INT32) ((ticks % 10000000) * 100)); +// +// Example 4: Compute Timestamp from Java `System.currentTimeMillis()`. +// +// long millis = System.currentTimeMillis(); +// +// Timestamp timestamp = Timestamp.newBuilder().setSeconds(millis / 1000) +// .setNanos((int) ((millis % 1000) * 1000000)).build(); +// +// +// Example 5: Compute Timestamp from Java `Instant.now()`. +// +// Instant now = Instant.now(); +// +// Timestamp timestamp = +// Timestamp.newBuilder().setSeconds(now.getEpochSecond()) +// .setNanos(now.getNano()).build(); +// +// +// Example 6: Compute Timestamp from current time in Python. +// +// timestamp = Timestamp() +// timestamp.GetCurrentTime() +// +// # JSON Mapping +// +// In JSON format, the Timestamp type is encoded as a string in the +// [RFC 3339](https://www.ietf.org/rfc/rfc3339.txt) format. That is, the +// format is "{year}-{month}-{day}T{hour}:{min}:{sec}[.{frac_sec}]Z" +// where {year} is always expressed using four digits while {month}, {day}, +// {hour}, {min}, and {sec} are zero-padded to two digits each. The fractional +// seconds, which can go up to 9 digits (i.e. up to 1 nanosecond resolution), +// are optional. The "Z" suffix indicates the timezone ("UTC"); the timezone +// is required. A proto3 JSON serializer should always use UTC (as indicated by +// "Z") when printing the Timestamp type and a proto3 JSON parser should be +// able to accept both UTC and other timezones (as indicated by an offset). +// +// For example, "2017-01-15T01:30:15.01Z" encodes 15.01 seconds past +// 01:30 UTC on January 15, 2017. +// +// In JavaScript, one can convert a Date object to this format using the +// standard +// [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString) +// method. In Python, a standard `datetime.datetime` object can be converted +// to this format using +// [`strftime`](https://docs.python.org/2/library/time.html#time.strftime) with +// the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one can use +// the Joda Time's [`ISODateTimeFormat.dateTime()`]( +// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime%2D%2D +// ) to obtain a formatter capable of generating timestamps in this format. +// +// +message Timestamp { + // Represents seconds of UTC time since Unix epoch + // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to + // 9999-12-31T23:59:59Z inclusive. + int64 seconds = 1; + + // Non-negative fractions of a second at nanosecond resolution. Negative + // second values with fractions must still have non-negative nanos values + // that count forward in time. Must be from 0 to 999,999,999 + // inclusive. + int32 nanos = 2; +} diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone.pb b/lib/codecs/tests/data/protobuf/person_someone.pb similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone.pb rename to lib/codecs/tests/data/protobuf/person_someone.pb diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone.txt b/lib/codecs/tests/data/protobuf/person_someone.txt similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone.txt rename to lib/codecs/tests/data/protobuf/person_someone.txt diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone3.pb b/lib/codecs/tests/data/protobuf/person_someone3.pb similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone3.pb rename to lib/codecs/tests/data/protobuf/person_someone3.pb diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone3.txt b/lib/codecs/tests/data/protobuf/person_someone3.txt similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone3.txt rename to lib/codecs/tests/data/protobuf/person_someone3.txt diff --git a/lib/codecs/tests/data/protobuf/test.desc b/lib/codecs/tests/data/protobuf/test.desc new file mode 100644 index 0000000000000..f12bfa7d889b8 Binary files /dev/null and b/lib/codecs/tests/data/protobuf/test.desc differ diff --git a/lib/codecs/tests/data/protobuf/test.proto b/lib/codecs/tests/data/protobuf/test.proto new file mode 100644 index 0000000000000..8e3275b7e5394 --- /dev/null +++ b/lib/codecs/tests/data/protobuf/test.proto @@ -0,0 +1,61 @@ +// Remember to recompile `test.desc` when you update this file: +// protoc -I . -o test.desc test.proto google/protobuf/timestamp.proto + +syntax = "proto3"; + +package test; + +import "google/protobuf/timestamp.proto"; + +message Integers { + int32 i32 = 1; + int64 i64 = 2; + uint32 u32 = 3; + uint64 u64 = 4; +} + +message Floats { + double d = 1; + float f = 2; +} + +message Bytes { + string text = 1; + bytes binary = 2; +} + +message Map { + message Person { + string nickname = 1; + uint32 age = 2; + }; + map names = 1; + map people = 2; +} + +message Enum { + enum Fruit { + APPLE = 0; + OLIVE = 1; + TOMATO = 2; + } + Fruit breakfast = 1; + Fruit lunch = 2; + Fruit dinner = 3; +} + +message Timestamp { + google.protobuf.Timestamp morning = 1; +} + +message RepeatedPrimitive { + repeated int64 numbers = 1; +} + +message RepeatedMessage { + message EmbeddedMessage { + optional string text = 1; + optional uint32 index = 2; + } + repeated EmbeddedMessage messages = 1; +} diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf.desc b/lib/codecs/tests/data/protobuf/test_protobuf.desc similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf.desc rename to lib/codecs/tests/data/protobuf/test_protobuf.desc diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf.proto b/lib/codecs/tests/data/protobuf/test_protobuf.proto similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf.proto rename to lib/codecs/tests/data/protobuf/test_protobuf.proto diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf3.desc b/lib/codecs/tests/data/protobuf/test_protobuf3.desc similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf3.desc rename to lib/codecs/tests/data/protobuf/test_protobuf3.desc diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf3.proto b/lib/codecs/tests/data/protobuf/test_protobuf3.proto similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf3.proto rename to lib/codecs/tests/data/protobuf/test_protobuf3.proto diff --git a/lib/codecs/tests/protobuf.rs b/lib/codecs/tests/protobuf.rs new file mode 100644 index 0000000000000..92acbffd9c3a8 --- /dev/null +++ b/lib/codecs/tests/protobuf.rs @@ -0,0 +1,67 @@ +//! Tests for the behaviour of Protobuf serializer and deserializer (together). + +use bytes::{Bytes, BytesMut}; +use std::path::{Path, PathBuf}; +use tokio_util::codec::Encoder; +use vector_core::config::LogNamespace; + +use codecs::decoding::format::Deserializer; +use codecs::decoding::{ + ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions, +}; +use codecs::encoding::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; + +fn test_data_dir() -> PathBuf { + PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf") +} + +fn read_protobuf_bin_message(path: &Path) -> Bytes { + let message_raw = std::fs::read(path).unwrap(); + Bytes::copy_from_slice(&message_raw) +} + +/// Build the serializer and deserializer from common settings +fn build_serializer_pair( + desc_file: PathBuf, + message_type: String, +) -> (ProtobufSerializer, ProtobufDeserializer) { + let serializer = ProtobufSerializerConfig { + protobuf: ProtobufSerializerOptions { + desc_file: desc_file.clone(), + message_type: message_type.clone(), + }, + } + .build() + .unwrap(); + let deserializer = ProtobufDeserializerConfig { + protobuf: ProtobufDeserializerOptions { + desc_file, + message_type, + }, + } + .build() + .unwrap(); + (serializer, deserializer) +} + +#[test] +fn roundtrip_coding() { + let protobuf_message = read_protobuf_bin_message(&test_data_dir().join("person_someone.pb")); + let desc_file = test_data_dir().join("test_protobuf.desc"); + let message_type: String = "test_protobuf.Person".into(); + let (mut serializer, deserializer) = build_serializer_pair(desc_file, message_type); + + let events_original = deserializer + .parse(protobuf_message, LogNamespace::Vector) + .unwrap(); + assert_eq!(1, events_original.len()); + let mut new_message = BytesMut::new(); + serializer + .encode(events_original[0].clone(), &mut new_message) + .unwrap(); + let protobuf_message: Bytes = new_message.into(); + let events_encoded = deserializer + .parse(protobuf_message, LogNamespace::Vector) + .unwrap(); + assert_eq!(events_original, events_encoded); +} diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 0ae7b1648a026..c2e6b57106c4f 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -112,6 +112,11 @@ impl EncodingConfigWithFraming { // https://github.com/Graylog2/graylog2-server/issues/1240 CharacterDelimitedEncoder::new(0).into() } + (None, Serializer::Protobuf(_)) => { + // Protobuf uses length-delimited messages, see: + // https://developers.google.com/protocol-buffers/docs/techniques#streaming + LengthDelimitedEncoder::new().into() + } ( None, Serializer::Csv(_) diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index 3f9970fb961a2..f7bfe086e2085 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -113,7 +113,7 @@ impl Encoder { Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_), Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), ) => "application/json", - (Serializer::Native(_), _) => "application/octet-stream", + (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream", ( Serializer::Avro(_) | Serializer::Csv(_) diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 105503bf2f4ea..b66e9832c0f0f 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -142,7 +142,14 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S // `message` field... but it's close enough for now. DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()), DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()), - DeserializerConfig::Protobuf(_) => unimplemented!(), + DeserializerConfig::Protobuf(config) => { + SerializerConfig::Protobuf(codecs::encoding::ProtobufSerializerConfig { + protobuf: codecs::encoding::ProtobufSerializerOptions { + desc_file: config.protobuf.desc_file.clone(), + message_type: config.protobuf.message_type.clone(), + }, + }) + } // TODO: We need to create an Avro serializer because, certainly, for any source decoding // the data as Avro, we can't possibly send anything else without the source just // immediately barfing. @@ -189,6 +196,14 @@ fn serializer_config_to_deserializer( SerializerConfig::Logfmt => todo!(), SerializerConfig::Native => DeserializerConfig::Native, SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()), + SerializerConfig::Protobuf(config) => { + DeserializerConfig::Protobuf(codecs::decoding::ProtobufDeserializerConfig { + protobuf: codecs::decoding::ProtobufDeserializerOptions { + desc_file: config.protobuf.desc_file.clone(), + message_type: config.protobuf.message_type.clone(), + }, + }) + } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, }; diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 9cb00d2b53ea9..93590c6c4ba96 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -235,11 +235,11 @@ impl WebSocketSink { const fn should_encode_as_binary(&self) -> bool { use codecs::encoding::Serializer::{ - Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, RawMessage, Text, + Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, }; match self.encoder.serializer() { - RawMessage(_) | Avro(_) | Native(_) => true, + RawMessage(_) | Avro(_) | Native(_) | Protobuf(_) => true, Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, } } diff --git a/website/cue/reference/components/sinks/base/amqp.cue b/website/cue/reference/components/sinks/base/amqp.cue index 366520be4c1a2..a592e402c6f5d 100644 --- a/website/cue/reference/components/sinks/base/amqp.cue +++ b/website/cue/reference/components/sinks/base/amqp.cue @@ -102,6 +102,11 @@ base: components: sinks: amqp: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -236,6 +241,27 @@ base: components: sinks: amqp: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index cae0c301c8a7c..68e8b1614b1e4 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -273,6 +273,11 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -407,6 +412,27 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index 6b59b70a3abfb..f1a647e602fd5 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -252,6 +252,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -386,6 +391,27 @@ base: components: sinks: aws_kinesis_firehose: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index b1d181751b0dc..f723f9e8ad5b2 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -252,6 +252,11 @@ base: components: sinks: aws_kinesis_streams: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -386,6 +391,27 @@ base: components: sinks: aws_kinesis_streams: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index bc0c94eb51443..16b10bc9018bc 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -361,6 +361,11 @@ base: components: sinks: aws_s3: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -495,6 +500,27 @@ base: components: sinks: aws_s3: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_sns.cue b/website/cue/reference/components/sinks/base/aws_sns.cue index 59d187f844953..1cf31e8956d8c 100644 --- a/website/cue/reference/components/sinks/base/aws_sns.cue +++ b/website/cue/reference/components/sinks/base/aws_sns.cue @@ -188,6 +188,11 @@ base: components: sinks: aws_sns: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -322,6 +327,27 @@ base: components: sinks: aws_sns: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_sqs.cue b/website/cue/reference/components/sinks/base/aws_sqs.cue index 04a752fdf46a0..cb7b1dcd79a9e 100644 --- a/website/cue/reference/components/sinks/base/aws_sqs.cue +++ b/website/cue/reference/components/sinks/base/aws_sqs.cue @@ -188,6 +188,11 @@ base: components: sinks: aws_sqs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -322,6 +327,27 @@ base: components: sinks: aws_sqs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/azure_blob.cue b/website/cue/reference/components/sinks/base/azure_blob.cue index 33c66f48f3208..a56680098c0fc 100644 --- a/website/cue/reference/components/sinks/base/azure_blob.cue +++ b/website/cue/reference/components/sinks/base/azure_blob.cue @@ -215,6 +215,11 @@ base: components: sinks: azure_blob: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -349,6 +354,27 @@ base: components: sinks: azure_blob: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/console.cue b/website/cue/reference/components/sinks/base/console.cue index bfa8a11acbfb2..9ebd1a606fd5b 100644 --- a/website/cue/reference/components/sinks/base/console.cue +++ b/website/cue/reference/components/sinks/base/console.cue @@ -86,6 +86,11 @@ base: components: sinks: console: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -220,6 +225,27 @@ base: components: sinks: console: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index b98956de02479..590a6fb945d22 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -106,6 +106,11 @@ base: components: sinks: file: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -240,6 +245,27 @@ base: components: sinks: file: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue index 03d5071ac300f..5a049cd6f6ab2 100644 --- a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue +++ b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue @@ -155,6 +155,11 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -289,6 +294,27 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index 445fdfa244b5a..47f6bf3b03df7 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -239,6 +239,11 @@ base: components: sinks: gcp_cloud_storage: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -373,6 +378,27 @@ base: components: sinks: gcp_cloud_storage: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_pubsub.cue b/website/cue/reference/components/sinks/base/gcp_pubsub.cue index fd9d75f13f584..0b36642444576 100644 --- a/website/cue/reference/components/sinks/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/base/gcp_pubsub.cue @@ -153,6 +153,11 @@ base: components: sinks: gcp_pubsub: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -287,6 +292,27 @@ base: components: sinks: gcp_pubsub: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/http.cue b/website/cue/reference/components/sinks/base/http.cue index 84e11a38f9a5c..58fcd9c005c81 100644 --- a/website/cue/reference/components/sinks/base/http.cue +++ b/website/cue/reference/components/sinks/base/http.cue @@ -194,6 +194,11 @@ base: components: sinks: http: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -328,6 +333,27 @@ base: components: sinks: http: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/humio_logs.cue b/website/cue/reference/components/sinks/base/humio_logs.cue index abd851e5963b9..2892e356c6784 100644 --- a/website/cue/reference/components/sinks/base/humio_logs.cue +++ b/website/cue/reference/components/sinks/base/humio_logs.cue @@ -147,6 +147,11 @@ base: components: sinks: humio_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -281,6 +286,27 @@ base: components: sinks: humio_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/kafka.cue b/website/cue/reference/components/sinks/base/kafka.cue index 630a2a7a5cd2c..fd95ee383b816 100644 --- a/website/cue/reference/components/sinks/base/kafka.cue +++ b/website/cue/reference/components/sinks/base/kafka.cue @@ -141,6 +141,11 @@ base: components: sinks: kafka: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -275,6 +280,27 @@ base: components: sinks: kafka: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index 3dff6f06e9ecf..fe7a9160bc88b 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -198,6 +198,11 @@ base: components: sinks: loki: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -332,6 +337,27 @@ base: components: sinks: loki: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index b290de01c6105..11d3dc126fa7d 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -186,6 +186,11 @@ base: components: sinks: nats: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -320,6 +325,27 @@ base: components: sinks: nats: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/papertrail.cue b/website/cue/reference/components/sinks/base/papertrail.cue index 99a2794e5cfee..6f0e9d17eb439 100644 --- a/website/cue/reference/components/sinks/base/papertrail.cue +++ b/website/cue/reference/components/sinks/base/papertrail.cue @@ -86,6 +86,11 @@ base: components: sinks: papertrail: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -220,6 +225,27 @@ base: components: sinks: papertrail: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index ddfd315be19a5..428c4f42791e0 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -180,6 +180,11 @@ base: components: sinks: pulsar: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -314,6 +319,27 @@ base: components: sinks: pulsar: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/redis.cue b/website/cue/reference/components/sinks/base/redis.cue index 211e84aa8b6ac..6d2e1964de935 100644 --- a/website/cue/reference/components/sinks/base/redis.cue +++ b/website/cue/reference/components/sinks/base/redis.cue @@ -139,6 +139,11 @@ base: components: sinks: redis: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -273,6 +278,27 @@ base: components: sinks: redis: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/socket.cue b/website/cue/reference/components/sinks/base/socket.cue index 6d08b5a959d80..99c353f604166 100644 --- a/website/cue/reference/components/sinks/base/socket.cue +++ b/website/cue/reference/components/sinks/base/socket.cue @@ -98,6 +98,11 @@ base: components: sinks: socket: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -232,6 +237,27 @@ base: components: sinks: socket: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue index a0f031235fcc5..e79d4cdb46261 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue @@ -197,6 +197,11 @@ base: components: sinks: splunk_hec_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -331,6 +336,27 @@ base: components: sinks: splunk_hec_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/webhdfs.cue b/website/cue/reference/components/sinks/base/webhdfs.cue index 9c13ce1bb894f..0dd49a0d683b8 100644 --- a/website/cue/reference/components/sinks/base/webhdfs.cue +++ b/website/cue/reference/components/sinks/base/webhdfs.cue @@ -147,6 +147,11 @@ base: components: sinks: webhdfs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -281,6 +286,27 @@ base: components: sinks: webhdfs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/websocket.cue b/website/cue/reference/components/sinks/base/websocket.cue index 3e1f2b6277baf..c9c30e5e170ec 100644 --- a/website/cue/reference/components/sinks/base/websocket.cue +++ b/website/cue/reference/components/sinks/base/websocket.cue @@ -133,6 +133,11 @@ base: components: sinks: websocket: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -267,6 +272,27 @@ base: components: sinks: websocket: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false