Skip to content

Commit

Permalink
chore(external docs): update sink tutorials with Data Volume tag chan…
Browse files Browse the repository at this point in the history
…ges (#18148)

* Update tutorials

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Made clearer grouping by source and service

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored and pront committed Aug 4, 2023
1 parent a7c95dd commit f8ca439
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 43 deletions.
2 changes: 1 addition & 1 deletion docs/tutorials/sinks/1_basic_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the
sink's behaviour.

```rust
#[configurable_component(sink("basic", "Basic sink."))]
#[configurable_component(sink("basic"))]
#[derive(Clone, Debug)]
/// A basic sink that dumps its output to stdout.
pub struct BasicConfig {
Expand Down
49 changes: 36 additions & 13 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
http::HttpClient,
internal_events::SinkRequestBuildError,
};
use vector_core::config::telemetry;
use bytes::Bytes;
```

Expand Down Expand Up @@ -81,12 +82,12 @@ struct BasicEncoder;
The Encoder must implement the [`Encoder`][encoder] trait:

```rust
impl Encoder<Event> for BasicEncoder {
impl encoding::Encoder<Event> for BasicEncoder {
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
}
}
```
Expand All @@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each
event is processed in some way prior to encoding.

[`encode_input`][encoder_encode_input] serializes the event to a String and
writes these bytes:
writes these bytes. The function also creates a [`GroupedCountByteSize`]
[grouped_count_byte_size] object. This object tracks the size of the event
that is sent by the sink, optionally grouped by the source and service that
originated the event if Vector has been configured to do so. It is necessary to
calculate the sizes in this function since the encode function sometimes drops
fields from the event prior to encoding. We need the size to be calculated after
these fields have been dropped.

```rust
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());

let event = serde_json::to_string(&input).unwrap();
write_all(writer, 1, event.as_bytes()).map(|()| event.len())
write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size))
}
```

Expand Down Expand Up @@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields:

```rust
impl MetaDescriptive for BasicRequest {
fn get_metadata(&self) -> RequestMetadata {
self.metadata
fn get_metadata(&self) -> &RequestMetadata {
&self.metadata
}

fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.metadata
}
}

Expand Down Expand Up @@ -249,7 +263,7 @@ when sending the event to an `amqp` server.
mut input: Event,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = input.take_finalizers();
let metadata_builder = RequestMetadataBuilder::from_events(&input);
let metadata_builder = RequestMetadataBuilder::from_event(&input);
(finalizers, metadata_builder, input)
}
```
Expand Down Expand Up @@ -338,7 +352,12 @@ that will be invoked to send the actual data.
match client.call(req).await {
Ok(response) => {
if response.status().is_success() {
Ok(BasicResponse { byte_size })
Ok(BasicResponse {
byte_size,
json_size: request
.metadata
.into_events_estimated_json_encoded_byte_size(),
})
} else {
Err("received error response")
}
Expand All @@ -359,18 +378,21 @@ The return from our service must be an object that implements the
```rust
struct BasicResponse {
byte_size: usize,
json_size: GroupedCountByteSize,
}

impl DriverResponse for BasicResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
}

fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size).into()
fn events_sent(&self) -> &GroupedCountByteSize {
&self.json_size
}
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.byte_size)
}}
```

Vector calls the methods in this trait to determine if the event was delivered successfully.
Expand Down Expand Up @@ -492,3 +514,4 @@ BODY:
[sinkbuilder_ext_into_driver]: https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver
[stream_filter_map]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map
[driver]: https://rust-doc.vector.dev/vector_core/stream/struct.driver
[grouped_count_byte_size]: https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl DataStreamConfig {
let (dtype, dataset, namespace) = if !self.auto_routing {
(self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
} else {
let data_stream = log.get("data_stream").and_then(|ds| ds.as_object());
let data_stream = log.get(event_path!("data_stream")).and_then(|ds| ds.as_object());
let dtype = data_stream
.and_then(|ds| ds.get("type"))
.map(|value| value.to_string_lossy().into_owned())
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
event_model.insert(k, v.clone());
}

if let Some(message) = log.get("message") {
if let Some(message) = log.get(event_path!("message")) {
let message = message.to_string_lossy().replace("\\\"", "\"");
// If message contains a JSON string, parse it and insert all fields into self
if let serde_json::Result::Ok(json_map) =
Expand Down Expand Up @@ -189,7 +189,7 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
for (k, v) in log.convert_to_fields() {
log_model.insert(k, v.clone());
}
if log.get("message").is_none() {
if log.get(event_path!("message")).is_none() {
log_model.insert(
"message".to_owned(),
Value::from("log from vector".to_owned()),
Expand Down
2 changes: 1 addition & 1 deletion src/sources/docker_logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ mod integration_tests {

let log = events[0].as_log();
let meta = log.metadata().value();
assert_eq!(log.get(".").unwrap(), &value!(message));
assert_eq!(log.get(event_path!(".")).unwrap(), &value!(message));
assert_eq!(
meta.get(path!(DockerLogsConfig::NAME, CONTAINER)).unwrap(),
&value!(id)
Expand Down
12 changes: 6 additions & 6 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,10 +925,10 @@ mod tests {

assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert_eq!(log.get("field").unwrap(), &msg.into());
assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
assert_eq!(log.get("tag").unwrap(), &tag.into());
assert_eq!(log.get(event_path!("field")).unwrap(), &msg.into());
assert!(matches!(log.get(event_path!("host")).unwrap(), Value::Bytes(_)));
assert!(matches!(log.get(event_path!("timestamp")).unwrap(), Value::Timestamp(_)));
assert_eq!(log.get(event_path!("tag")).unwrap(), &tag.into());

(result, output.into())
}
Expand Down Expand Up @@ -1142,8 +1142,8 @@ mod integration_tests {
let log = events[0].as_log();
assert_eq!(log["tag"], "http.0".into());
assert_eq!(log["message"], msg.into());
assert!(log.get("timestamp").is_some());
assert!(log.get("host").is_some());
assert!(log.get(event_path!("timestamp")).is_some());
assert!(log.get(event_path!("host")).is_some());
})
.await;
}
Expand Down
10 changes: 5 additions & 5 deletions src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,12 +1154,12 @@ mod integration_tests {
assert_eq!(events.len(), lines.len());
for (message, event) in lines.into_iter().zip(events) {
let log = event.into_log();
assert_eq!(log.get("message"), Some(&message.into()));
assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
assert_eq!(log.get(event_path!("message")), Some(&message.into()));
assert_eq!(log.get(event_path!("source_type")), Some(&"gcp_pubsub".into()));
assert!(log.get(event_path!("timestamp")).unwrap().as_timestamp().unwrap() >= &start);
assert!(log.get(event_path!("timestamp")).unwrap().as_timestamp().unwrap() <= &end);
assert!(
message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
message_ids.insert(log.get(event_path!("message_id")).unwrap().clone().to_string()),
"Message contained duplicate message_id"
);
let logattr = log
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn request_query_applied() {
]);

for log in logs {
let query = log.get("data").expect("data must be available");
let query = log.get(event_path!("data")).expect("data must be available");
let mut got: HashMap<String, Vec<String>> = HashMap::new();
for (k, v) in
url::form_urlencoded::parse(query.as_bytes().expect("byte conversion should succeed"))
Expand Down
8 changes: 4 additions & 4 deletions src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ mod tests {
assert_eq!(log["metadata.level"], "ERROR".into());
// The first log event occurs outside our custom span
if i == 0 {
assert!(log.get("vector.component_id").is_none());
assert!(log.get("vector.component_kind").is_none());
assert!(log.get("vector.component_type").is_none());
assert!(log.get(event_path!("vector.component_id")).is_none());
assert!(log.get(event_path!("vector.component_kind")).is_none());
assert!(log.get(event_path!("vector.component_type")).is_none());
} else if i < 3 {
assert_eq!(log["vector.component_id"], "foo".into());
assert_eq!(log["vector.component_kind"], "source".into());
Expand All @@ -319,7 +319,7 @@ mod tests {
assert_eq!(log["vector.component_type"], "internal_logs".into());
assert_eq!(log["vector.component_new_field"], "baz".into());
assert_eq!(log["vector.component_numerical_field"], 1.into());
assert!(log.get("vector.ignored_field").is_none());
assert!(log.get(event_path!("vector.ignored_field")).is_none());
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,15 @@ mod test {
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert_eq!(
log.get("message").unwrap().to_string_lossy(),
log.get(event_path!("message")).unwrap().to_string_lossy(),
"Hello, world!".to_string()
);
assert_eq!(
log.get("source_type").unwrap().to_string_lossy(),
log.get(event_path!("source_type")).unwrap().to_string_lossy(),
"logstash".to_string()
);
assert!(log.get("host").is_some());
assert!(log.get("timestamp").is_some());
assert!(log.get(event_path!("host")).is_some());
assert!(log.get(event_path!("timestamp")).is_some());
}

fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
Expand Down Expand Up @@ -894,12 +894,12 @@ mod integration_tests {

let log = events[0].as_log();
assert_eq!(
log.get("@metadata.beat"),
log.get(event_path!("@metadata.beat")),
Some(String::from("heartbeat").into()).as_ref()
);
assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
assert!(log.get("timestamp").is_some());
assert!(log.get("host").is_some());
assert_eq!(log.get(event_path!("summary.up")), Some(1.into()).as_ref());
assert!(log.get(event_path!("timestamp")).is_some());
assert!(log.get(event_path!("host")).is_some());
}

fn logstash_address() -> String {
Expand Down Expand Up @@ -937,7 +937,7 @@ mod integration_tests {
.unwrap()
.to_string_lossy()
.contains("Hello World"));
assert!(log.get("host").is_some());
assert!(log.get(event_path!("host")).is_some());
}

async fn source(
Expand Down

0 comments on commit f8ca439

Please sign in to comment.