diff --git a/docs/tutorials/sinks/1_basic_sink.md b/docs/tutorials/sinks/1_basic_sink.md index 40553b8820992..fc188d5ac8e4d 100644 --- a/docs/tutorials/sinks/1_basic_sink.md +++ b/docs/tutorials/sinks/1_basic_sink.md @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the sink's behaviour. ```rust -#[configurable_component(sink("basic", "Basic sink."))] +#[configurable_component(sink("basic"))] #[derive(Clone, Debug)] /// A basic sink that dumps its output to stdout. pub struct BasicConfig { diff --git a/docs/tutorials/sinks/2_http_sink.md b/docs/tutorials/sinks/2_http_sink.md index e92cb94bcac76..179ff4192c8cc 100644 --- a/docs/tutorials/sinks/2_http_sink.md +++ b/docs/tutorials/sinks/2_http_sink.md @@ -16,6 +16,7 @@ use crate::{ http::HttpClient, internal_events::SinkRequestBuildError, }; +use vector_core::config::telemetry; use bytes::Bytes; ``` @@ -81,12 +82,12 @@ struct BasicEncoder; The Encoder must implement the [`Encoder`][encoder] trait: ```rust -impl Encoder for BasicEncoder { +impl encoding::Encoder for BasicEncoder { fn encode_input( &self, input: Event, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { } } ``` @@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each event is processed in some way prior to encoding. [`encode_input`][encoder_encode_input] serializes the event to a String and -writes these bytes: +writes these bytes. The function also creates a [`GroupedCountByteSize`] +[grouped_count_byte_size] object. This object tracks the size of the event +that is sent by the sink, optionally grouped by the source and service that +originated the event if Vector has been configured to do so. It is necessary to +calculate the sizes in this function since the encode function sometimes drops +fields from the event prior to encoding. We need the size to be calculated after +these fields have been dropped. ```rust fn encode_input( &self, input: Event, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + let event = serde_json::to_string(&input).unwrap(); - write_all(writer, 1, event.as_bytes()).map(|()| event.len()) + write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size)) } ``` @@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields: ```rust impl MetaDescriptive for BasicRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -249,7 +263,7 @@ when sending the event to an `amqp` server. mut input: Event, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { let finalizers = input.take_finalizers(); - let metadata_builder = RequestMetadataBuilder::from_events(&input); + let metadata_builder = RequestMetadataBuilder::from_event(&input); (finalizers, metadata_builder, input) } ``` @@ -338,7 +352,12 @@ that will be invoked to send the actual data. match client.call(req).await { Ok(response) => { if response.status().is_success() { - Ok(BasicResponse { byte_size }) + Ok(BasicResponse { + byte_size, + json_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) } else { Err("received error response") } @@ -359,6 +378,7 @@ The return from our service must be an object that implements the ```rust struct BasicResponse { byte_size: usize, + json_size: GroupedCountByteSize, } impl DriverResponse for BasicResponse { @@ -366,11 +386,13 @@ impl DriverResponse for BasicResponse { EventStatus::Delivered } - fn events_sent(&self) -> RequestCountByteSize { - // (events count, byte size) - CountByteSize(1, self.byte_size).into() + fn events_sent(&self) -> &GroupedCountByteSize { + &self.json_size } -} + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) + }} ``` Vector calls the methods in this trait to determine if the event was delivered successfully. @@ -492,3 +514,4 @@ BODY: [sinkbuilder_ext_into_driver]: https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver [stream_filter_map]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map [driver]: https://rust-doc.vector.dev/vector_core/stream/struct.driver +[grouped_count_byte_size]: https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize