Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(external docs): update sink tutorials with Data Volume tag changes #18148

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/tutorials/sinks/1_basic_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the
sink's behaviour.

```rust
#[configurable_component(sink("basic", "Basic sink."))]
#[configurable_component(sink("basic"))]
#[derive(Clone, Debug)]
/// A basic sink that dumps its output to stdout.
pub struct BasicConfig {
Expand Down
49 changes: 36 additions & 13 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
http::HttpClient,
internal_events::SinkRequestBuildError,
};
use vector_core::config::telemetry;
use bytes::Bytes;
```

Expand Down Expand Up @@ -81,12 +82,12 @@ struct BasicEncoder;
The Encoder must implement the [`Encoder`][encoder] trait:

```rust
impl Encoder<Event> for BasicEncoder {
impl encoding::Encoder<Event> for BasicEncoder {
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
}
}
```
Expand All @@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each
event is processed in some way prior to encoding.

[`encode_input`][encoder_encode_input] serializes the event to a String and
writes these bytes:
writes these bytes. The function also creates a
[`GroupedCountByteSize`][grouped_count_byte_size] object. This object tracks the
size of the event that is sent by the sink. This object can also track the size
by the source and service that originated the event if Vector has been configured
neuronull marked this conversation as resolved.
Show resolved Hide resolved
to do so. It is necessary to calculate the sizes in this function since the encode
function sometimes drops fields from the event prior to encoding. We need the size
to be calculated after these fields have been dropped.

```rust
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());

let event = serde_json::to_string(&input).unwrap();
write_all(writer, 1, event.as_bytes()).map(|()| event.len())
write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size))
}
```

Expand Down Expand Up @@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields:

```rust
impl MetaDescriptive for BasicRequest {
fn get_metadata(&self) -> RequestMetadata {
self.metadata
fn get_metadata(&self) -> &RequestMetadata {
&self.metadata
}

fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.metadata
}
}

Expand Down Expand Up @@ -249,7 +263,7 @@ when sending the event to an `amqp` server.
mut input: Event,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = input.take_finalizers();
let metadata_builder = RequestMetadataBuilder::from_events(&input);
let metadata_builder = RequestMetadataBuilder::from_event(&input);
(finalizers, metadata_builder, input)
}
```
Expand Down Expand Up @@ -338,7 +352,12 @@ that will be invoked to send the actual data.
match client.call(req).await {
Ok(response) => {
if response.status().is_success() {
Ok(BasicResponse { byte_size })
Ok(BasicResponse {
byte_size,
json_size: request
.metadata
.into_events_estimated_json_encoded_byte_size(),
})
} else {
Err("received error response")
}
Expand All @@ -359,18 +378,21 @@ The return from our service must be an object that implements the
```rust
struct BasicResponse {
byte_size: usize,
json_size: GroupedCountByteSize,
}

impl DriverResponse for BasicResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
}

fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size).into()
fn events_sent(&self) -> &GroupedCountByteSize {
&self.json_size
}
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.byte_size)
}}
```

Vector calls the methods in this trait to determine if the event was delivered successfully.
Expand Down Expand Up @@ -492,3 +514,4 @@ BODY:
[sinkbuilder_ext_into_driver]: https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver
[stream_filter_map]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map
[driver]: https://rust-doc.vector.dev/vector_core/stream/struct.driver
[grouped_count_byte_size]: https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize