From f8ca439f2b642b5eaecebaf472b99b0ad090e9c6 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 4 Aug 2023 11:53:13 +0100 Subject: [PATCH] chore(external docs): update sink tutorials with Data Volume tag changes (#18148) * Update tutorials Signed-off-by: Stephen Wakely * Made clearer grouping by source and service Signed-off-by: Stephen Wakely --------- Signed-off-by: Stephen Wakely --- docs/tutorials/sinks/1_basic_sink.md | 2 +- docs/tutorials/sinks/2_http_sink.md | 49 ++++++++++++++++++++-------- src/sinks/elasticsearch/config.rs | 2 +- src/sinks/new_relic/model.rs | 4 +-- src/sources/docker_logs/tests.rs | 2 +- src/sources/fluent/mod.rs | 12 +++---- src/sources/gcp_pubsub.rs | 10 +++--- src/sources/http_client/tests.rs | 2 +- src/sources/internal_logs.rs | 8 ++--- src/sources/logstash.rs | 18 +++++----- 10 files changed, 66 insertions(+), 43 deletions(-) diff --git a/docs/tutorials/sinks/1_basic_sink.md b/docs/tutorials/sinks/1_basic_sink.md index 40553b8820992f..fc188d5ac8e4d2 100644 --- a/docs/tutorials/sinks/1_basic_sink.md +++ b/docs/tutorials/sinks/1_basic_sink.md @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the sink's behaviour. ```rust -#[configurable_component(sink("basic", "Basic sink."))] +#[configurable_component(sink("basic"))] #[derive(Clone, Debug)] /// A basic sink that dumps its output to stdout. pub struct BasicConfig { diff --git a/docs/tutorials/sinks/2_http_sink.md b/docs/tutorials/sinks/2_http_sink.md index e92cb94bcac765..179ff4192c8cc0 100644 --- a/docs/tutorials/sinks/2_http_sink.md +++ b/docs/tutorials/sinks/2_http_sink.md @@ -16,6 +16,7 @@ use crate::{ http::HttpClient, internal_events::SinkRequestBuildError, }; +use vector_core::config::telemetry; use bytes::Bytes; ``` @@ -81,12 +82,12 @@ struct BasicEncoder; The Encoder must implement the [`Encoder`][encoder] trait: ```rust -impl Encoder for BasicEncoder { +impl encoding::Encoder for BasicEncoder { fn encode_input( &self, input: Event, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { } } ``` @@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each event is processed in some way prior to encoding. [`encode_input`][encoder_encode_input] serializes the event to a String and -writes these bytes: +writes these bytes. The function also creates a [`GroupedCountByteSize`] +[grouped_count_byte_size] object. This object tracks the size of the event +that is sent by the sink, optionally grouped by the source and service that +originated the event if Vector has been configured to do so. It is necessary to +calculate the sizes in this function since the encode function sometimes drops +fields from the event prior to encoding. We need the size to be calculated after +these fields have been dropped. ```rust fn encode_input( &self, input: Event, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + let event = serde_json::to_string(&input).unwrap(); - write_all(writer, 1, event.as_bytes()).map(|()| event.len()) + write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size)) } ``` @@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields: ```rust impl MetaDescriptive for BasicRequest { - fn get_metadata(&self) -> RequestMetadata { - self.metadata + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata } } @@ -249,7 +263,7 @@ when sending the event to an `amqp` server. mut input: Event, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { let finalizers = input.take_finalizers(); - let metadata_builder = RequestMetadataBuilder::from_events(&input); + let metadata_builder = RequestMetadataBuilder::from_event(&input); (finalizers, metadata_builder, input) } ``` @@ -338,7 +352,12 @@ that will be invoked to send the actual data. match client.call(req).await { Ok(response) => { if response.status().is_success() { - Ok(BasicResponse { byte_size }) + Ok(BasicResponse { + byte_size, + json_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) } else { Err("received error response") } @@ -359,6 +378,7 @@ The return from our service must be an object that implements the ```rust struct BasicResponse { byte_size: usize, + json_size: GroupedCountByteSize, } impl DriverResponse for BasicResponse { @@ -366,11 +386,13 @@ impl DriverResponse for BasicResponse { EventStatus::Delivered } - fn events_sent(&self) -> RequestCountByteSize { - // (events count, byte size) - CountByteSize(1, self.byte_size).into() + fn events_sent(&self) -> &GroupedCountByteSize { + &self.json_size } -} + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) + }} ``` Vector calls the methods in this trait to determine if the event was delivered successfully. @@ -492,3 +514,4 @@ BODY: [sinkbuilder_ext_into_driver]: https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver [stream_filter_map]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map [driver]: https://rust-doc.vector.dev/vector_core/stream/struct.driver +[grouped_count_byte_size]: https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 9f50d79e44a2d5..0c1cfdf7f6b7f6 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -443,7 +443,7 @@ impl DataStreamConfig { let (dtype, dataset, namespace) = if !self.auto_routing { (self.dtype(log)?, self.dataset(log)?, self.namespace(log)?) } else { - let data_stream = log.get("data_stream").and_then(|ds| ds.as_object()); + let data_stream = log.get(event_path!("data_stream")).and_then(|ds| ds.as_object()); let dtype = data_stream .and_then(|ds| ds.get("type")) .map(|value| value.to_string_lossy().into_owned()) diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index c49337a13e3e42..26c15095f91036 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -117,7 +117,7 @@ impl TryFrom> for EventsApiModel { event_model.insert(k, v.clone()); } - if let Some(message) = log.get("message") { + if let Some(message) = log.get(event_path!("message")) { let message = message.to_string_lossy().replace("\\\"", "\""); // If message contains a JSON string, parse it and insert all fields into self if let serde_json::Result::Ok(json_map) = @@ -189,7 +189,7 @@ impl TryFrom> for LogsApiModel { for (k, v) in log.convert_to_fields() { log_model.insert(k, v.clone()); } - if log.get("message").is_none() { + if log.get(event_path!("message")).is_none() { log_model.insert( "message".to_owned(), Value::from("log from vector".to_owned()), diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index f68e3580920c52..fbd727e22efa29 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -382,7 +382,7 @@ mod integration_tests { let log = events[0].as_log(); let meta = log.metadata().value(); - assert_eq!(log.get(".").unwrap(), &value!(message)); + assert_eq!(log.get(event_path!(".")).unwrap(), &value!(message)); assert_eq!( meta.get(path!(DockerLogsConfig::NAME, CONTAINER)).unwrap(), &value!(id) diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index f6ae0955d15eb6..2032094a736e75 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -925,10 +925,10 @@ mod tests { assert_eq!(events.len(), 1); let log = events[0].as_log(); - assert_eq!(log.get("field").unwrap(), &msg.into()); - assert!(matches!(log.get("host").unwrap(), Value::Bytes(_))); - assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_))); - assert_eq!(log.get("tag").unwrap(), &tag.into()); + assert_eq!(log.get(event_path!("field")).unwrap(), &msg.into()); + assert!(matches!(log.get(event_path!("host")).unwrap(), Value::Bytes(_))); + assert!(matches!(log.get(event_path!("timestamp")).unwrap(), Value::Timestamp(_))); + assert_eq!(log.get(event_path!("tag")).unwrap(), &tag.into()); (result, output.into()) } @@ -1142,8 +1142,8 @@ mod integration_tests { let log = events[0].as_log(); assert_eq!(log["tag"], "http.0".into()); assert_eq!(log["message"], msg.into()); - assert!(log.get("timestamp").is_some()); - assert!(log.get("host").is_some()); + assert!(log.get(event_path!("timestamp")).is_some()); + assert!(log.get(event_path!("host")).is_some()); }) .await; } diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 29128200d9e6b5..c81b79f01f16fe 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -1154,12 +1154,12 @@ mod integration_tests { assert_eq!(events.len(), lines.len()); for (message, event) in lines.into_iter().zip(events) { let log = event.into_log(); - assert_eq!(log.get("message"), Some(&message.into())); - assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into())); - assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start); - assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end); + assert_eq!(log.get(event_path!("message")), Some(&message.into())); + assert_eq!(log.get(event_path!("source_type")), Some(&"gcp_pubsub".into())); + assert!(log.get(event_path!("timestamp")).unwrap().as_timestamp().unwrap() >= &start); + assert!(log.get(event_path!("timestamp")).unwrap().as_timestamp().unwrap() <= &end); assert!( - message_ids.insert(log.get("message_id").unwrap().clone().to_string()), + message_ids.insert(log.get(event_path!("message_id")).unwrap().clone().to_string()), "Message contained duplicate message_id" ); let logattr = log diff --git a/src/sources/http_client/tests.rs b/src/sources/http_client/tests.rs index bb97289807fbda..8db3e922896a36 100644 --- a/src/sources/http_client/tests.rs +++ b/src/sources/http_client/tests.rs @@ -172,7 +172,7 @@ async fn request_query_applied() { ]); for log in logs { - let query = log.get("data").expect("data must be available"); + let query = log.get(event_path!("data")).expect("data must be available"); let mut got: HashMap> = HashMap::new(); for (k, v) in url::form_urlencoded::parse(query.as_bytes().expect("byte conversion should succeed")) diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index 1034a37b508ef1..ce9cb59dd85b39 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -303,9 +303,9 @@ mod tests { assert_eq!(log["metadata.level"], "ERROR".into()); // The first log event occurs outside our custom span if i == 0 { - assert!(log.get("vector.component_id").is_none()); - assert!(log.get("vector.component_kind").is_none()); - assert!(log.get("vector.component_type").is_none()); + assert!(log.get(event_path!("vector.component_id")).is_none()); + assert!(log.get(event_path!("vector.component_kind")).is_none()); + assert!(log.get(event_path!("vector.component_type")).is_none()); } else if i < 3 { assert_eq!(log["vector.component_id"], "foo".into()); assert_eq!(log["vector.component_kind"], "source".into()); @@ -319,7 +319,7 @@ mod tests { assert_eq!(log["vector.component_type"], "internal_logs".into()); assert_eq!(log["vector.component_new_field"], "baz".into()); assert_eq!(log["vector.component_numerical_field"], 1.into()); - assert!(log.get("vector.ignored_field").is_none()); + assert!(log.get(event_path!("vector.ignored_field")).is_none()); } } } diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 1230884a4d9510..1c2cee88dd6ef7 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -740,15 +740,15 @@ mod test { assert_eq!(events.len(), 1); let log = events[0].as_log(); assert_eq!( - log.get("message").unwrap().to_string_lossy(), + log.get(event_path!("message")).unwrap().to_string_lossy(), "Hello, world!".to_string() ); assert_eq!( - log.get("source_type").unwrap().to_string_lossy(), + log.get(event_path!("source_type")).unwrap().to_string_lossy(), "logstash".to_string() ); - assert!(log.get("host").is_some()); - assert!(log.get("timestamp").is_some()); + assert!(log.get(event_path!("host")).is_some()); + assert!(log.get(event_path!("timestamp")).is_some()); } fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { @@ -894,12 +894,12 @@ mod integration_tests { let log = events[0].as_log(); assert_eq!( - log.get("@metadata.beat"), + log.get(event_path!("@metadata.beat")), Some(String::from("heartbeat").into()).as_ref() ); - assert_eq!(log.get("summary.up"), Some(1.into()).as_ref()); - assert!(log.get("timestamp").is_some()); - assert!(log.get("host").is_some()); + assert_eq!(log.get(event_path!("summary.up")), Some(1.into()).as_ref()); + assert!(log.get(event_path!("timestamp")).is_some()); + assert!(log.get(event_path!("host")).is_some()); } fn logstash_address() -> String { @@ -937,7 +937,7 @@ mod integration_tests { .unwrap() .to_string_lossy() .contains("Hello World")); - assert!(log.get("host").is_some()); + assert!(log.get(event_path!("host")).is_some()); } async fn source(