Skip to content

Commit

Permalink
chore: tidy encode_input function (vectordotdev#18300)
Browse files Browse the repository at this point in the history
* Use itertools to determine the last item in the list

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* itertools needed in core now

Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>
Co-authored-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>
  • Loading branch information
StephenWakely and jszwedko authored Aug 18, 2023
1 parent 7b2bddc commit 2a45722
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 23 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ lapin = { version = "2.3.1", default-features = false, features = ["native-tls"]
# API
async-graphql = { version = "6.0.0", default-features = false, optional = true, features = ["chrono", "playground"] }
async-graphql-warp = { version = "6.0.0", default-features = false, optional = true }
itertools = { version = "0.11.0", default-features = false, optional = true }
itertools = { version = "0.11.0", default-features = false }

# API client
crossterm = { version = "0.26.1", default-features = false, features = ["event-stream"], optional = true }
Expand Down Expand Up @@ -440,7 +440,6 @@ api = [
"dep:async-graphql",
"dep:async-graphql-warp",
"dep:base64",
"dep:itertools",
"vector-core/api",
]

Expand Down
37 changes: 16 additions & 21 deletions src/sinks/util/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use bytes::BytesMut;
use codecs::encoding::Framer;
use itertools::{Itertools, Position};
use tokio_util::codec::Encoder as _;
use vector_common::request_metadata::GroupedCountByteSize;
use vector_core::{config::telemetry, EstimatedJsonEncodedSizeOf};
Expand All @@ -24,7 +25,7 @@ pub trait Encoder<T> {
impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
fn encode_input(
&self,
mut events: Vec<Event>,
events: Vec<Event>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut encoder = self.1.clone();
Expand All @@ -36,37 +37,31 @@ impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {

let mut byte_size = telemetry().create_request_count_byte_size();

if let Some(last) = events.pop() {
for mut event in events {
self.0.transform(&mut event);

// Ensure the json size is calculated after any fields have been removed
// by the transformer.
byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut bytes = BytesMut::new();
encoder
.encode(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
write_all(writer, n_events_pending, &bytes)?;
bytes_written += bytes.len();
n_events_pending -= 1;
}
let mut event = last;
for (position, mut event) in events.into_iter().with_position() {
self.0.transform(&mut event);

// Ensure the json size is calculated after any fields have been removed
// by the transformer.
byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut bytes = BytesMut::new();
encoder
.serialize(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
match position {
Position::Last | Position::Only => {
encoder
.serialize(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
}
_ => {
encoder
.encode(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
}
}
write_all(writer, n_events_pending, &bytes)?;
bytes_written += bytes.len();
n_events_pending -= 1;
}

let batch_suffix = encoder.batch_suffix();
assert!(n_events_pending == 0);
write_all(writer, 0, batch_suffix)?;
Expand Down

0 comments on commit 2a45722

Please sign in to comment.