diff --git a/Cargo.lock b/Cargo.lock index a1fee9a897355..a34102b1ccdfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6321,6 +6321,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-reflect" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "000e1e05ebf7b26e1eba298e66fe4eee6eb19c567d0ffb35e0dd34231cdac4c8" +dependencies = [ + "once_cell", + "prost", + "prost-types", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -9246,6 +9257,7 @@ dependencies = [ "proptest", "prost", "prost-build", + "prost-reflect", "prost-types", "pulsar", "quickcheck", diff --git a/Cargo.toml b/Cargo.toml index 4ec26aa26bf2d..2880fba3d4dfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,8 +200,9 @@ serde_yaml = { version = "0.9.22", default-features = false } rmp-serde = { version = "1.1.1", default-features = false, optional = true } rmpv = { version = "1.0.0", default-features = false, features = ["with-serde"], optional = true } -# Prost +# Prost / Protocol Buffers prost = { version = "0.11", default-features = false, features = ["std"] } +prost-reflect = { version = "0.11", default-features = false, optional = true } prost-types = { version = "0.11", default-features = false, optional = true } # GCP @@ -673,7 +674,7 @@ sinks-databend = [] sinks-datadog_archives = ["sinks-aws_s3", "sinks-azure_blob", "sinks-gcp"] sinks-datadog_events = [] sinks-datadog_logs = [] -sinks-datadog_metrics = ["protobuf-build"] +sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"] sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"] sinks-file = ["dep:async-compression"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c137f340d73e9..c956f00af7ab0 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -394,6 +394,7 @@ proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Toln proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco " prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Tokio Contributors " +prost-reflect,https://github.com/andrewhickman/prost-reflect,MIT OR Apache-2.0,Andrew Hickman ptr_meta,https://github.com/djkoloski/ptr_meta,MIT,David Koloski pulsar,https://github.com/streamnative/pulsar-rs,MIT OR Apache-2.0,"Colin Stearns , Kevin Stenerson , Geoffroy Couprie " quad-rand,https://github.com/not-fl3/quad-rand,MIT,not-fl3 diff --git a/build.rs b/build.rs index 20cfd23d52a88..31c151f4b61d7 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,11 @@ -use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command}; +use std::{ + collections::HashSet, + env, + fs::File, + io::Write, + path::{Path, PathBuf}, + process::Command, +}; struct TrackedEnv { tracked: HashSet, @@ -124,8 +131,19 @@ fn main() { println!("cargo:rerun-if-changed=proto/google/rpc/status.proto"); println!("cargo:rerun-if-changed=proto/vector.proto"); + // Create and store the "file descriptor set" from the compiled Protocol Buffers packages. + // + // This allows us to use runtime reflection to manually build Protocol Buffers payloads + // in a type-safe way, which is necessary for incrementally building certain payloads, like + // the ones generated in the `datadog_metrics` sink. + let protobuf_fds_path = + PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set")) + .join("protobuf-fds.bin"); + let mut prost_build = prost_build::Config::new(); - prost_build.btree_map(["."]); + prost_build + .btree_map(["."]) + .file_descriptor_set_path(protobuf_fds_path); tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") diff --git a/src/internal_events/datadog_metrics.rs b/src/internal_events/datadog_metrics.rs index 792d8496f041d..c4daf1d3ce7f8 100644 --- a/src/internal_events/datadog_metrics.rs +++ b/src/internal_events/datadog_metrics.rs @@ -7,19 +7,17 @@ use vector_common::internal_event::{ }; #[derive(Debug)] -pub struct DatadogMetricsEncodingError { - pub error_message: &'static str, +pub struct DatadogMetricsEncodingError<'a> { + pub reason: &'a str, pub error_code: &'static str, pub dropped_events: usize, } -impl InternalEvent for DatadogMetricsEncodingError { +impl<'a> InternalEvent for DatadogMetricsEncodingError<'a> { fn emit(self) { - let reason = "Failed to encode Datadog metrics."; error!( - message = reason, - error = %self.error_message, - error_code = %self.error_code, + message = self.reason, + error_code = self.error_code, error_type = error_type::ENCODER_FAILED, intentional = "false", stage = error_stage::PROCESSING, @@ -35,7 +33,7 @@ impl InternalEvent for DatadogMetricsEncodingError { if self.dropped_events > 0 { emit!(ComponentEventsDropped:: { count: self.dropped_events, - reason, + reason: self.reason, }); } } diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index b77e94c30f793..0000000000000 --- a/src/proto.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] -use crate::event::proto as event; - -#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] -pub mod vector; diff --git a/src/proto/mod.rs b/src/proto/mod.rs new file mode 100644 index 0000000000000..efa1728fb6988 --- /dev/null +++ b/src/proto/mod.rs @@ -0,0 +1,19 @@ +#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] +use crate::event::proto as event; + +#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))] +pub mod vector; + +#[cfg(feature = "sinks-datadog_metrics")] +pub mod fds { + use once_cell::sync::OnceCell; + use prost_reflect::DescriptorPool; + + pub fn protobuf_descriptors() -> &'static DescriptorPool { + static PROTOBUF_FDS: OnceCell = OnceCell::new(); + PROTOBUF_FDS.get_or_init(|| { + DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref()) + .expect("should not fail to decode protobuf file descriptor set generated from build script") + }) + } +} diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index 1acedc003c079..9fb8c4cd48137 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -59,6 +59,11 @@ impl DatadogMetricsEndpoint { DatadogMetricsEndpoint::Sketches => "application/x-protobuf", } } + + // Gets whether or not this is a series endpoint. + pub const fn is_series(self) -> bool { + matches!(self, Self::Series) + } } /// Maps Datadog metric endpoints to their actual URI. diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index a2bd8330c5f35..0dd6c393e31b5 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -7,11 +7,13 @@ use std::{ use bytes::{BufMut, Bytes}; use chrono::{DateTime, Utc}; +use once_cell::sync::OnceCell; use prost::Message; use snafu::{ResultExt, Snafu}; use vector_core::{ config::{log_schema, LogSchema}, event::{metric::MetricSketch, Metric, MetricTags, MetricValue}, + metrics::AgentDDSketch, }; use super::config::{ @@ -19,7 +21,8 @@ use super::config::{ }; use crate::{ common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric}, - sinks::util::{encode_namespace, Compression, Compressor}, + proto::fds::protobuf_descriptors, + sinks::util::{encode_namespace, request_builder::EncodeResult, Compression, Compressor}, }; const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":["; @@ -37,6 +40,17 @@ pub enum CreateError { InvalidLimits, } +impl CreateError { + /// Gets the telemetry-friendly string version of this error. + /// + /// The value will be a short string with only lowercase letters and underscores. + pub const fn as_error_type(&self) -> &'static str { + match self { + Self::InvalidLimits => "invalid_payload_limits", + } + } +} + #[derive(Debug, Snafu)] pub enum EncoderError { #[snafu(display( @@ -49,11 +63,31 @@ pub enum EncoderError { metric_value: &'static str, }, - #[snafu(display("Failed to encode series metrics to JSON: {}", source))] + #[snafu( + context(false), + display("Failed to encode series metric to JSON: {source}") + )] JsonEncodingFailed { source: serde_json::Error }, - #[snafu(display("Failed to encode sketch metrics to Protocol Buffers: {}", source))] - ProtoEncodingFailed { source: prost::EncodeError }, + // Currently, the only time `prost` ever emits `EncodeError` is when there is insufficient + // buffer capacity, so we don't need to hold on to the error, and we can just hardcode this. + #[snafu(display( + "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity." + ))] + ProtoEncodingFailed, +} + +impl EncoderError { + /// Gets the telemetry-friendly string version of this error. + /// + /// The value will be a short string with only lowercase letters and underscores. + pub const fn as_error_type(&self) -> &'static str { + match self { + Self::InvalidMetric { .. } => "invalid_metric", + Self::JsonEncodingFailed { .. } => "failed_to_encode_series", + Self::ProtoEncodingFailed => "failed_to_encode_sketch", + } + } } #[derive(Debug, Snafu)] @@ -64,9 +98,6 @@ pub enum FinishError { ))] CompressionFailed { source: io::Error }, - #[snafu(display("Failed to encode pending metrics: {}", source))] - PendingEncodeFailed { source: EncoderError }, - #[snafu(display("Finished payload exceeded the (un)compressed size limits"))] TooLarge { metrics: Vec, @@ -81,7 +112,6 @@ impl FinishError { pub const fn as_error_type(&self) -> &'static str { match self { Self::CompressionFailed { .. } => "compression_failed", - Self::PendingEncodeFailed { .. } => "pending_encode_failed", Self::TooLarge { .. } => "too_large", } } @@ -91,21 +121,15 @@ struct EncoderState { writer: Compressor, written: usize, buf: Vec, - - pending: Vec, processed: Vec, } impl Default for EncoderState { fn default() -> Self { - EncoderState { - // We use the "zlib default" compressor because it's all Datadog supports, and adding it - // generically to `Compression` would make things a little weird because of the - // conversion trait implementations that are also only none vs gzip. + Self { writer: get_compressor(), written: 0, buf: Vec::with_capacity(1024), - pending: Vec::new(), processed: Vec::new(), } } @@ -145,7 +169,7 @@ impl DatadogMetricsEncoder { compressed_limit: usize, ) -> Result { let (uncompressed_limit, compressed_limit) = - validate_payload_size_limits(uncompressed_limit, compressed_limit) + validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit) .ok_or(CreateError::InvalidLimits)?; Ok(Self { @@ -195,15 +219,23 @@ impl DatadogMetricsEncoder { { return Ok(Some(metric)); } - serde_json::to_writer(&mut self.state.buf, series) - .context(JsonEncodingFailedSnafu)?; + serde_json::to_writer(&mut self.state.buf, series)?; } } - // We can't encode sketches incrementally (yet), so we don't do any encoding here. We - // simply store it for later, and in `try_encode_pending`, any such pending metrics will be - // encoded in a single operation. + // Sketches are encoded via ProtoBuf, also in an incremental fashion. DatadogMetricsEndpoint::Sketches => match metric.value() { - MetricValue::Sketch { .. } => {} + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + encode_sketch_incremental( + &metric, + ddsketch, + &self.default_namespace, + self.log_schema, + &mut self.state.buf, + ) + .map_err(|_| EncoderError::ProtoEncodingFailed)?; + } + }, value => { return Err(EncoderError::InvalidMetric { expected: "sketches", @@ -213,21 +245,14 @@ impl DatadogMetricsEncoder { }, } - // If we actually encoded a metric, we try to see if our temporary buffer can be compressed - // and added to the overall payload. Otherwise, it means we're deferring the metric for - // later encoding, so we store it off to the side. - if !self.state.buf.is_empty() { - match self.try_compress_buffer() { - Err(_) | Ok(false) => return Ok(Some(metric)), - Ok(true) => {} + // Try and see if our temporary buffer can be written to the compressor. + match self.try_compress_buffer() { + Err(_) | Ok(false) => Ok(Some(metric)), + Ok(true) => { + self.state.processed.push(metric); + Ok(None) } - - self.state.processed.push(metric); - } else { - self.state.pending.push(metric); } - - Ok(None) } fn try_compress_buffer(&mut self) -> io::Result { @@ -254,7 +279,8 @@ impl DatadogMetricsEncoder { // assume the worst case while our limits assume the worst case _overhead_. Maybe our // numbers are technically off in the end, but `finish` catches that for us, too. let compressed_len = self.state.writer.get_ref().len(); - if compressed_len + n > self.compressed_limit { + let max_compressed_metric_len = n + max_compressed_overhead_len(n); + if compressed_len + max_compressed_metric_len > self.compressed_limit { return Ok(false); } @@ -292,56 +318,7 @@ impl DatadogMetricsEncoder { self.encode_single_metric(metric) } - fn try_encode_pending(&mut self) -> Result<(), FinishError> { - // The Datadog Agent uses a particular Protocol Buffers library to incrementally encode the - // DDSketch structures into a payload, similar to how we incrementally encode the series - // metrics. Unfortunately, there's no existing Rust crate that allows writing out Protocol - // Buffers payloads by hand, so we have to cheat a little and buffer up the metrics until - // the very end. - // - // `try_encode`, and thus `encode_single_metric`, specifically store sketch-oriented metrics - // off to the side for this very purpose, letting us gather them all here, encoding them - // into a single Protocol Buffers payload. - // - // Naturally, this means we might actually generate a payload that's too big. This is a - // problem for the caller to figure out. Presently, the only usage of this encoder will - // naively attempt to split the batch into two and try again. - - // Only go through this if we're targeting the sketch endpoint. - if !(matches!(self.endpoint, DatadogMetricsEndpoint::Sketches)) { - return Ok(()); - } - - // Consume of all of the "pending" metrics and try to write them out as sketches. - let pending = mem::take(&mut self.state.pending); - write_sketches( - &pending, - &self.default_namespace, - self.log_schema, - &mut self.state.buf, - ) - .context(PendingEncodeFailedSnafu)?; - - if self.try_compress_buffer().context(CompressionFailedSnafu)? { - // Since we encoded and compressed them successfully, add them to the "processed" list. - self.state.processed.extend(pending); - Ok(()) - } else { - // The payload was too big overall, which we can't do anything about. Up to the caller - // now to try to encode them again after splitting the batch. - Err(FinishError::TooLarge { - metrics: pending, - // TODO: Hard-coded split code for now because we need to hoist up the logic for - // calculating the recommended splits to an instance method or something. - recommended_splits: 2, - }) - } - } - - pub fn finish(&mut self) -> Result<(Bytes, Vec, usize), FinishError> { - // Try to encode any pending metrics we had stored up. - self.try_encode_pending()?; - + pub fn finish(&mut self) -> Result<(EncodeResult, Vec), FinishError> { // Write any payload footer necessary for the configured endpoint. let n = write_payload_footer(self.endpoint, &mut self.state.writer) .context(CompressionFailedSnafu)?; @@ -371,7 +348,10 @@ impl DatadogMetricsEncoder { if recommended_splits == 1 { // "One" split means no splits needed: our payload didn't exceed either of the limits. - Ok((payload, processed, raw_bytes_written)) + Ok(( + EncodeResult::compressed(payload, raw_bytes_written), + processed, + )) } else { Err(FinishError::TooLarge { metrics: processed, @@ -381,6 +361,104 @@ impl DatadogMetricsEncoder { } } +fn get_sketch_payload_sketches_field_number() -> u32 { + static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceCell = OnceCell::new(); + *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| { + let descriptors = protobuf_descriptors(); + let descriptor = descriptors + .get_message_by_name("datadog.agentpayload.SketchPayload") + .expect("should not fail to find `SketchPayload` message in descriptor pool"); + + descriptor + .get_field_by_name("sketches") + .map(|field| field.number()) + .expect("`sketches` field must exist in `SketchPayload` message") + }) +} + +fn sketch_to_proto_message( + metric: &Metric, + ddsketch: &AgentDDSketch, + default_namespace: &Option>, + log_schema: &'static LogSchema, +) -> ddmetric_proto::sketch_payload::Sketch { + let name = get_namespaced_name(metric, default_namespace); + let ts = encode_timestamp(metric.timestamp()); + let mut tags = metric.tags().cloned().unwrap_or_default(); + let host = tags.remove(log_schema.host_key()).unwrap_or_default(); + let tags = encode_tags(&tags); + + let cnt = ddsketch.count() as i64; + let min = ddsketch + .min() + .expect("min should be present for non-empty sketch"); + let max = ddsketch + .max() + .expect("max should be present for non-empty sketch"); + let avg = ddsketch + .avg() + .expect("avg should be present for non-empty sketch"); + let sum = ddsketch + .sum() + .expect("sum should be present for non-empty sketch"); + + let (bins, counts) = ddsketch.bin_map().into_parts(); + let k = bins.into_iter().map(Into::into).collect(); + let n = counts.into_iter().map(Into::into).collect(); + + ddmetric_proto::sketch_payload::Sketch { + metric: name, + tags, + host, + distributions: Vec::new(), + dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch { + ts, + cnt, + min, + max, + avg, + sum, + k, + n, + }], + } +} + +fn encode_sketch_incremental( + metric: &Metric, + ddsketch: &AgentDDSketch, + default_namespace: &Option>, + log_schema: &'static LogSchema, + buf: &mut B, +) -> Result<(), prost::EncodeError> +where + B: BufMut, +{ + // This encodes a single sketch metric incrementally, which means that we specifically write it + // as if we were writing a single field entry in the overall `SketchPayload` message + // type. + // + // By doing so, we can encode multiple sketches and concatenate all the buffers, and have the + // resulting buffer appear as if it's a normal `SketchPayload` message with a bunch of repeats + // of the `sketches` field. + // + // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches -- + // and we never actually set the metadata field... so the resulting message generated overall + // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a + // single value for the given field. + + let sketch_proto = sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); + + // Manually write the field tag for `sketches` and then encode the sketch payload directly as a + // length-delimited message. + prost::encoding::encode_key( + get_sketch_payload_sketches_field_number(), + prost::encoding::WireType::LengthDelimited, + buf, + ); + sketch_proto.encode_length_delimited(buf) +} + fn get_namespaced_name(metric: &Metric, default_namespace: &Option>) -> String { encode_namespace( metric @@ -481,89 +559,10 @@ fn generate_series_metrics( Ok(results) } -fn write_sketches( - metrics: &[Metric], - default_namespace: &Option>, - log_schema: &'static LogSchema, - buf: &mut B, -) -> Result<(), EncoderError> -where - B: BufMut, -{ - let mut sketches = Vec::new(); - for metric in metrics { - match metric.value() { - MetricValue::Sketch { sketch } => match sketch { - MetricSketch::AgentDDSketch(ddsketch) => { - // Don't encode any empty sketches. - if ddsketch.is_empty() { - continue; - } - - let name = get_namespaced_name(metric, default_namespace); - let ts = encode_timestamp(metric.timestamp()); - let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()).unwrap_or_default(); - let tags = encode_tags(&tags); - - let cnt = ddsketch.count() as i64; - let min = ddsketch - .min() - .expect("min should be present for non-empty sketch"); - let max = ddsketch - .max() - .expect("max should be present for non-empty sketch"); - let avg = ddsketch - .avg() - .expect("avg should be present for non-empty sketch"); - let sum = ddsketch - .sum() - .expect("sum should be present for non-empty sketch"); - - let (bins, counts) = ddsketch.bin_map().into_parts(); - let k = bins.into_iter().map(Into::into).collect(); - let n = counts.into_iter().map(Into::into).collect(); - - let sketch = ddmetric_proto::sketch_payload::Sketch { - metric: name, - tags, - host, - distributions: Vec::new(), - dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch { - ts, - cnt, - min, - max, - avg, - sum, - k, - n, - }], - }; - - sketches.push(sketch); - } - }, - // We filter out non-sketch metrics during `encode_single_metric` if we're targeting - // the sketches endpoint. - _ => unreachable!(), - } - } - - let sketch_payload = ddmetric_proto::SketchPayload { - // TODO: The "common metadata" fields are things that only very loosely apply to Vector, or - // are hard to characterize -- for example, what's the API key for a sketch that didn't originate - // from the Datadog Agent? -- so we're just omitting it here in the hopes it doesn't - // actually matter. - metadata: None, - sketches, - }; - - // Now try encoding this sketch payload, and then try to compress it. - sketch_payload.encode(buf).context(ProtoEncodingFailedSnafu) -} - fn get_compressor() -> Compressor { + // We use the "zlib default" compressor because it's all Datadog supports, and adding it + // generically to `Compression` would make things a little weird because of the conversion trait + // implementations that are also only none vs gzip. Compression::zlib_default().into() } @@ -571,39 +570,52 @@ const fn max_uncompressed_header_len() -> usize { SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len() } +// Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib +// has a 2 byte header and 4 byte CRC trailer. [1] +// +// [1] https://www.zlib.net/zlib_tech.html +const ZLIB_HEADER_TRAILER: usize = 6; + const fn max_compression_overhead_len(compressed_limit: usize) -> usize { - // Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib - // has a 2 byte header and 4 byte CRC trailer. Additionally, Deflate, the underlying - // compression algorithm, has a technique to ensure that input data can't be encoded in such a - // way where it's expanded by a meaningful amount. + // We calculate the overhead as the zlib header/trailer plus the worst case overhead of + // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be + // compressed at all. + ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit) +} + +const fn max_compressed_overhead_len(len: usize) -> usize { + // Datadog ingest APIs accept zlib, which is what we're accounting for here. // - // This technique allows storing blocks of uncompressed data with only 5 bytes of overhead per - // block. Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations - // use block sizes of 16KB. [1][2] + // Deflate, the underlying compression algorithm, has a technique to ensure that input data + // can't be encoded in such a way where it's expanded by a meaningful amount. This technique + // allows storing blocks of uncompressed data with only 5 bytes of overhead per block. + // Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations use + // block sizes of 16KB. [1][2] // - // With all of that said, we calculate the overhead as the header plus trailer plus the given - // compressed size limit, minus the known overhead, multiplied such that it accounts for the - // worse case of entirely uncompressed data. + // We calculate the overhead of compressing a given `len` bytes as the worst case of that many + // bytes being written to the compressor and being unable to be compressed at all // // [1] https://www.zlib.net/zlib_tech.html // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html - const HEADER_TRAILER: usize = 6; const STORED_BLOCK_SIZE: usize = 16384; - HEADER_TRAILER + (1 + compressed_limit.saturating_sub(HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5 + (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5 } const fn validate_payload_size_limits( + endpoint: DatadogMetricsEndpoint, uncompressed_limit: usize, compressed_limit: usize, ) -> Option<(usize, usize)> { - // Get the maximum possible length of the header/footer combined. - // - // This only matters for series metrics at the moment, since sketches are encoded in a single - // shot to their Protocol Buffers representation. We're "wasting" `header_len` bytes in the - // case of sketches, but we're also talking about like 10 bytes: not enough to care about. - let header_len = max_uncompressed_header_len(); - if uncompressed_limit <= header_len { - return None; + if endpoint.is_series() { + // For series, we need to make sure the uncompressed limit can account for the header/footer + // we would add that wraps the encoded metrics up in the expected JSON object. This does + // imply that adding 1 to this limit would be allowed, and obviously we can't encode a + // series metric in a single byte, but this is just a simple sanity check, not an exhaustive + // search of the absolute bare minimum size. + let header_len = max_uncompressed_header_len(); + if uncompressed_limit <= header_len { + return None; + } } // Get the maximum possible overhead of the compression container, based on the incoming @@ -659,6 +671,7 @@ mod tests { use std::{ io::{self, copy}, num::NonZeroU32, + sync::Arc, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -668,16 +681,21 @@ mod tests { arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert, proptest, strategy::Strategy, string::string_regex, }; + use prost::Message; use vector_core::{ - config::log_schema, - event::{metric::TagValue, Metric, MetricKind, MetricTags, MetricValue}, + config::{log_schema, LogSchema}, + event::{ + metric::{MetricSketch, TagValue}, + Metric, MetricKind, MetricTags, MetricValue, + }, metric_tags, metrics::AgentDDSketch, }; use super::{ - encode_tags, encode_timestamp, generate_series_metrics, get_compressor, - max_compression_overhead_len, max_uncompressed_header_len, validate_payload_size_limits, + ddmetric_proto, encode_sketch_incremental, encode_tags, encode_timestamp, + generate_series_metrics, get_compressor, max_compression_overhead_len, + max_uncompressed_header_len, sketch_to_proto_message, validate_payload_size_limits, write_payload_footer, write_payload_header, DatadogMetricsEncoder, EncoderError, }; use crate::{ @@ -714,6 +732,10 @@ mod tests { compressor.finish().expect("should not fail").freeze() } + fn get_compressed_empty_sketches_payload() -> Bytes { + get_compressor().finish().expect("should not fail").freeze() + } + fn decompress_payload(payload: Bytes) -> io::Result { let mut decompressor = ZlibDecoder::new(&payload[..]); let mut decompressed = BytesMut::new().writer(); @@ -738,6 +760,41 @@ mod tests { } } + fn encode_sketches_normal( + metrics: &[Metric], + default_namespace: &Option>, + log_schema: &'static LogSchema, + buf: &mut B, + ) where + B: BufMut, + { + let mut sketches = Vec::new(); + for metric in metrics { + let MetricValue::Sketch { sketch } = metric.value() else { panic!("must be sketch") }; + match sketch { + MetricSketch::AgentDDSketch(ddsketch) => { + // Don't encode any empty sketches. + if ddsketch.is_empty() { + continue; + } + + let sketch = + sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema); + + sketches.push(sketch); + } + } + } + + let sketch_payload = ddmetric_proto::SketchPayload { + metadata: None, + sketches, + }; + + // Now try encoding this sketch payload, and then try to compress it. + sketch_payload.encode(buf).unwrap() + } + #[test] fn test_encode_tags() { assert_eq!( @@ -825,16 +882,9 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, mut processed, raw_bytes) = result.unwrap(); + let (_payload, mut processed) = result.unwrap(); assert_eq!(processed.len(), 1); assert_eq!(expected, processed.pop().unwrap()); - assert_eq!(100, payload.len()); - - // The payload is: - // {"series":[{"metric":"basic_counter","type":"count","interval":null,"points":[[1651664333,3.14]],"tags":[]}]} - // which comes to a total of 98 bytes. - // There are extra bytes that make up the header and footer. These should not be included in the raw bytes. - assert_eq!(109, raw_bytes); } #[test] @@ -855,25 +905,60 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, mut processed, raw_bytes) = result.unwrap(); + let (_payload, mut processed) = result.unwrap(); assert_eq!(processed.len(), 1); assert_eq!(expected, processed.pop().unwrap()); + } - assert_eq!(81, payload.len()); - assert_eq!(70, raw_bytes); + #[test] + fn encode_multiple_sketch_metrics_normal_vs_incremental() { + // This tests our incremental sketch encoding against the more straightforward approach of + // just building/encoding a full `SketchPayload` message. + let metrics = vec![ + get_simple_sketch(), + get_simple_sketch(), + get_simple_sketch(), + ]; + + let mut normal_buf = Vec::new(); + encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf); + + let mut incremental_buf = Vec::new(); + for metric in &metrics { + match metric.value() { + MetricValue::Sketch { sketch } => match sketch { + MetricSketch::AgentDDSketch(ddsketch) => encode_sketch_incremental( + metric, + ddsketch, + &None, + log_schema(), + &mut incremental_buf, + ) + .unwrap(), + }, + _ => panic!("should be a sketch"), + } + } + + assert_eq!(normal_buf, incremental_buf); } #[test] - fn payload_size_limits() { + fn payload_size_limits_series() { // Get the maximum length of the header/trailer data. let header_len = max_uncompressed_header_len(); // This is too small. - let result = validate_payload_size_limits(header_len, usize::MAX); + let result = + validate_payload_size_limits(DatadogMetricsEndpoint::Series, header_len, usize::MAX); assert_eq!(result, None); // This is just right. - let result = validate_payload_size_limits(header_len + 1, usize::MAX); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + header_len + 1, + usize::MAX, + ); assert_eq!(result, Some((header_len + 1, usize::MAX))); // Get the maximum compressed overhead length, based on our input uncompressed size. This @@ -882,16 +967,52 @@ mod tests { let compression_overhead_len = max_compression_overhead_len(usize::MAX); // This is too small. - let result = validate_payload_size_limits(usize::MAX, compression_overhead_len); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + usize::MAX, + compression_overhead_len, + ); + assert_eq!(result, None); + + // This is just right. + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series, + usize::MAX, + compression_overhead_len + 1, + ); + assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); + } + + #[test] + fn payload_size_limits_sketches() { + // There's no lower bound on uncompressed size for the sketches payload. + let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX); + assert_eq!(result, Some((0, usize::MAX))); + + // Get the maximum compressed overhead length, based on our input uncompressed size. This + // represents the worst case overhead based on the input data (of length usize::MAX, in this + // case) being entirely incompressible. + let compression_overhead_len = max_compression_overhead_len(usize::MAX); + + // This is too small. + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Sketches, + usize::MAX, + compression_overhead_len, + ); assert_eq!(result, None); // This is just right. - let result = validate_payload_size_limits(usize::MAX, compression_overhead_len + 1); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Sketches, + usize::MAX, + compression_overhead_len + 1, + ); assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); } #[test] - fn encode_breaks_out_when_limit_reached_uncompressed() { + fn encode_series_breaks_out_when_limit_reached_uncompressed() { // We manually create the encoder with an arbitrarily low "uncompressed" limit but high // "compressed" limit to exercise the codepath that should avoid encoding a metric when the // uncompressed payload would exceed the limit. @@ -905,7 +1026,8 @@ mod tests { .expect("payload size limits should be valid"); // Trying to encode a metric that would cause us to exceed our uncompressed limits will - // _not_ return an error from `try_encode`. + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. let counter = get_simple_counter(); let result = encoder.try_encode(counter.clone()); assert!(result.is_ok()); @@ -917,17 +1039,55 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes) = result.unwrap(); - let empty_payload = get_compressed_empty_series_payload(); - assert_eq!(payload, empty_payload); + let (payload, processed) = result.unwrap(); + assert_eq!( + payload.uncompressed_byte_size, + max_uncompressed_header_len() + ); + assert_eq!( + payload.into_payload(), + get_compressed_empty_series_payload() + ); assert_eq!(processed.len(), 0); + } + + #[test] + fn encode_sketches_breaks_out_when_limit_reached_uncompressed() { + // We manually create the encoder with an arbitrarily low "uncompressed" limit but high + // "compressed" limit to exercise the codepath that should avoid encoding a metric when the + // uncompressed payload would exceed the limit. + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Sketches, + None, + 1, + usize::MAX, + ) + .expect("payload size limits should be valid"); + + // Trying to encode a metric that would cause us to exceed our uncompressed limits will + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. + let sketch = get_simple_sketch(); + let result = encoder.try_encode(sketch.clone()); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(sketch)); - // Just the header and footer. - assert_eq!(13, raw_bytes); + // And similarly, since we didn't actually encode a metric, we _should_ be able to finish + // this payload, but it will be empty and no processed metrics should be returned. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (payload, processed) = result.unwrap(); + assert_eq!(payload.uncompressed_byte_size, 0); + assert_eq!( + payload.into_payload(), + get_compressed_empty_sketches_payload() + ); + assert_eq!(processed.len(), 0); } #[test] - fn encode_breaks_out_when_limit_reached_compressed() { + fn encode_series_breaks_out_when_limit_reached_compressed() { // We manually create the encoder with an arbitrarily low "compressed" limit but high // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the // compressed payload would exceed the limit. @@ -942,7 +1102,8 @@ mod tests { .expect("payload size limits should be valid"); // Trying to encode a metric that would cause us to exceed our compressed limits will - // _not_ return an error from `try_encode`. + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. let counter = get_simple_counter(); let result = encoder.try_encode(counter.clone()); assert!(result.is_ok()); @@ -954,13 +1115,54 @@ mod tests { let result = encoder.finish(); assert!(result.is_ok()); - let (payload, processed, raw_bytes) = result.unwrap(); - let empty_payload = get_compressed_empty_series_payload(); - assert_eq!(payload, empty_payload); + let (payload, processed) = result.unwrap(); + assert_eq!( + payload.uncompressed_byte_size, + max_uncompressed_header_len() + ); + assert_eq!( + payload.into_payload(), + get_compressed_empty_series_payload() + ); assert_eq!(processed.len(), 0); + } - // Just the header and footer. - assert_eq!(13, raw_bytes); + #[test] + fn encode_sketches_breaks_out_when_limit_reached_compressed() { + // We manually create the encoder with an arbitrarily low "compressed" limit but high + // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the + // compressed payload would exceed the limit. + let uncompressed_limit = 128; + let compressed_limit = 16; + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Sketches, + None, + uncompressed_limit, + compressed_limit, + ) + .expect("payload size limits should be valid"); + + // Trying to encode a metric that would cause us to exceed our compressed limits will + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. + let sketch = get_simple_sketch(); + let result = encoder.try_encode(sketch.clone()); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(sketch)); + + // And similarly, since we didn't actually encode a metric, we _should_ be able to finish + // this payload, but it will be empty (effectively, the header/footer will exist) and no + // processed metrics should be returned. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (payload, processed) = result.unwrap(); + assert_eq!(payload.uncompressed_byte_size, 0); + assert_eq!( + payload.into_payload(), + get_compressed_empty_sketches_payload() + ); + assert_eq!(processed.len(), 0); } fn arb_counter_metric() -> impl Strategy { @@ -1003,7 +1205,8 @@ mod tests { if let Ok(mut encoder) = result { _ = encoder.try_encode(metric); - if let Ok((payload, _processed, _raw_bytes)) = encoder.finish() { + if let Ok((payload, _processed)) = encoder.finish() { + let payload = payload.into_payload(); prop_assert!(payload.len() <= compressed_limit); let result = decompress_payload(payload); diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index 64b1226b661bf..d217986d6f520 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -1,12 +1,8 @@ use bytes::Bytes; -use serde_json::error::Category; use snafu::Snafu; -use std::{num::NonZeroUsize, sync::Arc}; +use std::sync::Arc; use vector_common::request_metadata::RequestMetadata; -use vector_core::{ - event::{EventFinalizers, Finalizable, Metric}, - EstimatedJsonEncodedSizeOf, -}; +use vector_core::event::{EventFinalizers, Finalizable, Metric}; use super::{ config::{DatadogMetricsEndpoint, DatadogMetricsEndpointConfiguration}, @@ -17,19 +13,19 @@ use crate::sinks::util::{metadata::RequestMetadataBuilder, IncrementalRequestBui #[derive(Debug, Snafu)] pub enum RequestBuilderError { - #[snafu(display("Failed to build the request builder: {}", error_type))] - FailedToBuild { error_type: &'static str }, + #[snafu( + context(false), + display("Failed to build the request builder: {source}") + )] + FailedToBuild { source: CreateError }, - #[snafu(display("Encoding of a metric failed ({})", reason))] - FailedToEncode { - reason: &'static str, - dropped_events: u64, - }, + #[snafu(context(false), display("Failed to encode metric: {source}"))] + FailedToEncode { source: EncoderError }, - #[snafu(display("A split payload was still too big to encode/compress within size limits"))] + #[snafu(display("A split payload was still too big to encode/compress within size limits."))] FailedToSplit { dropped_events: u64 }, - #[snafu(display("An unexpected error occurred"))] + #[snafu(display("An unexpected error occurred: {error_type}"))] Unexpected { error_type: &'static str, dropped_events: u64, @@ -37,78 +33,28 @@ pub enum RequestBuilderError { } impl RequestBuilderError { - /// Converts this error into its constituent parts: the error reason, and how many events were - /// dropped as a result. - pub const fn into_parts(self) -> (&'static str, &'static str, u64) { + /// Converts this error into its constituent parts: the error reason, the error type, and how + /// many events were dropped as a result. + pub fn into_parts(self) -> (String, &'static str, u64) { match self { - Self::FailedToBuild { error_type } => { - ("Failed to build the request builder.", error_type, 0) - } - Self::FailedToEncode { - reason, - dropped_events, - } => ("Encoding of a metric failed.", reason, dropped_events), + Self::FailedToBuild { source } => (source.to_string(), source.as_error_type(), 0), + // Encoding errors always happen at the per-metric level, so we could only ever drop a + // single metric/event at a time. + Self::FailedToEncode { source } => (source.to_string(), source.as_error_type(), 1), Self::FailedToSplit { dropped_events } => ( - "A split payload was still too big to encode/compress withing size limits.", + "A split payload was still too big to encode/compress withing size limits." + .to_string(), "split_failed", dropped_events, ), Self::Unexpected { error_type, dropped_events, - } => ("An unexpected error occurred.", error_type, dropped_events), - } - } -} - -impl From for RequestBuilderError { - fn from(e: CreateError) -> Self { - match e { - CreateError::InvalidLimits => Self::FailedToBuild { - error_type: "invalid_payload_limits", - }, - } - } -} - -impl From for RequestBuilderError { - fn from(e: EncoderError) -> Self { - match e { - // Series metrics (JSON) are encoded incrementally, so we can only ever lose a single - // metric for a JSON encoding failure. - EncoderError::JsonEncodingFailed { source } => Self::FailedToEncode { - reason: match source.classify() { - Category::Io => "json_io", - Category::Syntax => "json_syntax", - Category::Data => "json_data", - Category::Eof => "json_eof", - }, - dropped_events: 1, - }, - // Sketch metrics (Protocol Buffers) are encoded in a single shot, so naturally we would - // expect `dropped_events` to be 1-N, instead of always 1. We should never emit this - // metric when calling `try_encode`, which is where we'd see the JSON variant of it. - // This is because sketch encoding happens at the end. - // - // Thus, we default `dropped_events` to 1, and if we actually hit this error when - // finishing up a payload, we'll fix up the true number of dropped events at that point. - EncoderError::ProtoEncodingFailed { .. } => Self::FailedToEncode { - // `prost` states that for an encoding error specifically, it can only ever fail due - // to insufficient capacity in the encoding buffer. - reason: "protobuf_insufficient_buf_capacity", - dropped_events: 1, - }, - // Not all metric types for valid depending on the configured endpoint of the encoder. - EncoderError::InvalidMetric { metric_value, .. } => Self::FailedToEncode { - // TODO: At some point, it would be nice to use `const_format` to build the reason - // as " _via_" to better understand in what context - // metric X is being considered as invalid. Practically it's not a huge issue, - // because the number of metric types are fixed and we should be able to inspect the - // code for issues, or if it became a big problem, we could just go ahead and do the - // `const_format` work... but it'd be nice to be ahead of curve when trivially possible. - reason: metric_value, - dropped_events: 1, - }, + } => ( + "An unexpected error occurred.".to_string(), + error_type, + dropped_events, + ), } } } @@ -118,7 +64,6 @@ pub struct DDMetricsMetadata { api_key: Option>, endpoint: DatadogMetricsEndpoint, finalizers: EventFinalizers, - raw_bytes: usize, } /// Incremental request builder specific to Datadog metrics. @@ -211,24 +156,21 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< // If we encoded one or more metrics this pass, finalize the payload. if n > 0 { match encoder.finish() { - Ok((payload, mut metrics, raw_bytes_written)) => { - let json_size = metrics.estimated_json_encoded_size_of(); + Ok((encode_result, mut metrics)) => { let finalizers = metrics.take_finalizers(); let metadata = DDMetricsMetadata { api_key: api_key.as_ref().map(Arc::clone), endpoint, finalizers, - raw_bytes: raw_bytes_written, }; - let builder = RequestMetadataBuilder::new( - metrics.len(), - raw_bytes_written, - json_size, - ); - let bytes_len = NonZeroUsize::new(payload.len()) - .expect("payload should never be zero length"); - let request_metadata = builder.with_request_size(bytes_len); - results.push(Ok(((metadata, request_metadata), payload))); + + let request_metadata = + RequestMetadataBuilder::from_events(&metrics).build(&encode_result); + + results.push(Ok(( + (metadata, request_metadata), + encode_result.into_payload(), + ))); } Err(err) => match err { // The encoder informed us that the resulting payload was too big, so we're @@ -299,7 +241,6 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< uri, content_type: ddmetrics_metadata.endpoint.content_type(), finalizers: ddmetrics_metadata.finalizers, - raw_bytes: ddmetrics_metadata.raw_bytes, metadata: request_metadata, } } @@ -332,21 +273,21 @@ fn encode_now_or_never( encoder .finish() - .map(|(payload, mut processed, raw_bytes_written)| { - let json_size = processed.estimated_json_encoded_size_of(); + .map(|(encode_result, mut processed)| { let finalizers = processed.take_finalizers(); let ddmetrics_metadata = DDMetricsMetadata { api_key, endpoint, finalizers, - raw_bytes: raw_bytes_written, }; - let builder = RequestMetadataBuilder::new(metrics_len, raw_bytes_written, json_size); - let bytes_len = - NonZeroUsize::new(payload.len()).expect("payload should never be zero length"); - let request_metadata = builder.with_request_size(bytes_len); - ((ddmetrics_metadata, request_metadata), payload) + let request_metadata = + RequestMetadataBuilder::from_events(&processed).build(&encode_result); + + ( + (ddmetrics_metadata, request_metadata), + encode_result.into_payload(), + ) }) .map_err(|_| RequestBuilderError::FailedToSplit { dropped_events: metrics_len as u64, diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index 6abacfcc739b7..7f62e6ddaefd5 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -60,7 +60,6 @@ pub struct DatadogMetricsRequest { pub uri: Uri, pub content_type: &'static str, pub finalizers: EventFinalizers, - pub raw_bytes: usize, pub metadata: RequestMetadata, } @@ -125,8 +124,7 @@ impl MetaDescriptive for DatadogMetricsRequest { pub struct DatadogMetricsResponse { status_code: StatusCode, body: Bytes, - byte_size: GroupedCountByteSize, - raw_byte_size: usize, + request_metadata: RequestMetadata, } impl DriverResponse for DatadogMetricsResponse { @@ -141,11 +139,12 @@ impl DriverResponse for DatadogMetricsResponse { } fn events_sent(&self) -> &GroupedCountByteSize { - &self.byte_size + self.request_metadata + .events_estimated_json_encoded_byte_size() } fn bytes_sent(&self) -> Option { - Some(self.raw_byte_size) + Some(self.request_metadata.request_wire_size()) } } @@ -184,9 +183,7 @@ impl Service for DatadogMetricsService { let api_key = self.api_key.clone(); Box::pin(async move { - let metadata = std::mem::take(request.metadata_mut()); - let byte_size = metadata.into_events_estimated_json_encoded_byte_size(); - let raw_byte_size = request.raw_bytes; + let request_metadata = std::mem::take(request.metadata_mut()); let request = request .into_http_request(api_key) @@ -205,8 +202,7 @@ impl Service for DatadogMetricsService { Ok(DatadogMetricsResponse { status_code: parts.status, body, - byte_size, - raw_byte_size, + request_metadata, }) }) } diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index a85eaaf7a3a11..5ceefc3c487d2 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -123,9 +123,9 @@ where .filter_map(|request| async move { match request { Err(e) => { - let (error_message, error_code, dropped_events) = e.into_parts(); + let (reason, error_code, dropped_events) = e.into_parts(); emit!(DatadogMetricsEncodingError { - error_message, + reason: reason.as_str(), error_code, dropped_events: dropped_events as usize, }); diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index 521f1c080995c..e6f4e7739e4d2 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -1,6 +1,5 @@ use std::num::NonZeroUsize; -use vector_buffers::EventCount; use vector_core::{config, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use vector_common::{ @@ -21,21 +20,19 @@ pub struct RequestMetadataBuilder { impl RequestMetadataBuilder { pub fn from_events(events: &[E]) -> Self where - E: ByteSizeOf + EventCount + GetEventCountTags + EstimatedJsonEncodedSizeOf, + E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf, { let mut size = config::telemetry().create_request_count_byte_size(); - let mut event_count = 0; let mut events_byte_size = 0; for event in events { - event_count += 1; events_byte_size += event.size_of(); size.add_event(event, event.estimated_json_encoded_size_of()); } Self { - event_count, + event_count: events.len(), events_byte_size, events_estimated_json_encoded_byte_size: size, }