From 2c24dac2e3721034b5ef9dd1c599046060ea9a6d Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 28 Sep 2023 11:06:32 +0100 Subject: [PATCH 1/5] Move to stream based sink Signed-off-by: Stephen Wakely --- src/sinks/gcp/chronicle_unstructured.rs | 8 +- src/sinks/gcp/mod.rs | 1 - src/sinks/gcp/stackdriver/metrics/config.rs | 160 +++++++++ src/sinks/gcp/stackdriver/metrics/mod.rs | 20 ++ .../stackdriver/metrics/request_builder.rs | 129 ++++++++ src/sinks/gcp/stackdriver/metrics/sink.rs | 83 +++++ src/sinks/gcp/stackdriver/metrics/tests.rs | 42 +++ src/sinks/gcp/stackdriver/mod.rs | 1 + src/sinks/gcp/stackdriver_metrics.rs | 308 ------------------ 9 files changed, 439 insertions(+), 313 deletions(-) create mode 100644 src/sinks/gcp/stackdriver/metrics/config.rs create mode 100644 src/sinks/gcp/stackdriver/metrics/mod.rs create mode 100644 src/sinks/gcp/stackdriver/metrics/request_builder.rs create mode 100644 src/sinks/gcp/stackdriver/metrics/sink.rs create mode 100644 src/sinks/gcp/stackdriver/metrics/tests.rs delete mode 100644 src/sinks/gcp/stackdriver_metrics.rs diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index 5b65089852ea0..3043cc2ba519c 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -248,7 +248,7 @@ impl ChronicleUnstructuredConfig { .settings(request, GcsRetryLogic) .service(ChronicleService::new(client, base_url, creds)); - let request_settings = RequestSettings::new(self)?; + let request_settings = ChronicleRequestBuilder::new(self)?; let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, "http"); @@ -362,7 +362,7 @@ impl Encoder<(String, Vec)> for ChronicleEncoder { // request. All possible values are pre-computed for direct use in // producing a request. #[derive(Clone, Debug)] -struct RequestSettings { +struct ChronicleRequestBuilder { encoder: ChronicleEncoder, } @@ -382,7 +382,7 @@ impl AsRef<[u8]> for ChronicleRequestPayload { } } -impl RequestBuilder<(String, Vec)> for RequestSettings { +impl RequestBuilder<(String, Vec)> for ChronicleRequestBuilder { type Metadata = EventFinalizers; type Events = (String, Vec); type Encoder = ChronicleEncoder; @@ -423,7 +423,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } } -impl RequestSettings { +impl ChronicleRequestBuilder { fn new(config: &ChronicleUnstructuredConfig) -> crate::Result { let transformer = config.encoding.transformer(); let serializer = config.encoding.config().build()?; diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index 5a7f64b84c076..93c312711784f 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -7,7 +7,6 @@ pub mod chronicle_unstructured; pub mod cloud_storage; pub mod pubsub; pub mod stackdriver; -pub mod stackdriver_metrics; /// A monitored resource. /// diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs new file mode 100644 index 0000000000000..f5fdfef26014d --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -0,0 +1,160 @@ +use bytes::Bytes; +use goauth::scopes::Scope; +use http::{Request, Uri}; + +use crate::{ + gcp::{GcpAuthConfig, GcpAuthenticator}, + http::HttpClient, + sinks::{ + gcp, + prelude::*, + util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, + }, +}; + +use super::{ + request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder}, + sink::StackdriverMetricsSink, + StackdriverMetricsDefaultBatchSettings, +}; + +/// Configuration for the `gcp_stackdriver_metrics` sink. +#[configurable_component(sink( + "gcp_stackdriver_metrics", + "Deliver metrics to GCP's Cloud Monitoring system." +))] +#[derive(Clone, Debug, Default)] +pub struct StackdriverConfig { + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + /// The project ID to which to publish metrics. + /// + /// See the [Google Cloud Platform project management documentation][project_docs] for more details. + /// + /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects + pub(super) project_id: String, + + /// The monitored resource to associate the metrics with. + pub(super) resource: gcp::GcpTypedResource, + + #[serde(flatten)] + pub(super) auth: GcpAuthConfig, + + /// The default namespace to use for metrics that do not have one. + /// + /// Metrics with the same name can only be differentiated by their namespace, and not all + /// metrics have their own namespace. + #[serde(default = "default_metric_namespace_value")] + pub(super) default_namespace: String, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +fn default_metric_namespace_value() -> String { + "namespace".to_string() +} + +fn default_endpoint() -> String { + "https://monitoring.googleapis.com".to_string() +} + +impl_generate_config_from_default!(StackdriverConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "gcp_stackdriver_metrics")] +impl SinkConfig for StackdriverConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let auth = self.auth.build(Scope::MonitoringWrite).await?; + + let healthcheck = healthcheck().boxed(); + let started = chrono::Utc::now(); + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls_settings, cx.proxy())?; + + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = StackdriverMetricsRequestBuilder { + encoder: StackdriverMetricsEncoder { + default_namespace: self.default_namespace.clone(), + started, + resource: self.resource.clone(), + }, + }; + + let request_limits = self.request.unwrap_with( + &TowerRequestConfig::default() + .rate_limit_duration_secs(1) + .rate_limit_num(1000), + ); + + let uri: Uri = format!( + "{}/v3/projects/{}/timeSeries", + self.endpoint, self.project_id + ) + .parse()?; + + auth.spawn_regenerate_token(); + + let stackdriver_logs_service_request_builder = + StackdriverMetricsServiceRequestBuilder { uri, auth }; + + let service = HttpService::new(client, stackdriver_logs_service_request_builder); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +#[derive(Debug, Clone)] +pub(super) struct StackdriverMetricsServiceRequestBuilder { + pub(super) uri: Uri, + pub(super) auth: GcpAuthenticator, +} + +impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let mut request = Request::post(self.uri.clone()) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + self.auth.apply(&mut request); + + request + } +} + +async fn healthcheck() -> crate::Result<()> { + Ok(()) +} diff --git a/src/sinks/gcp/stackdriver/metrics/mod.rs b/src/sinks/gcp/stackdriver/metrics/mod.rs new file mode 100644 index 0000000000000..a73070b11e02f --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/mod.rs @@ -0,0 +1,20 @@ +// TODO: In order to correctly assert component specification compliance, we would have to do some more advanced mocking +// off the endpoint, which would include also providing a mock OAuth2 endpoint to allow for generating a token from the +// mocked credentials. Let this TODO serve as a placeholder for doing that in the future. + +use crate::sinks::prelude::*; + +mod config; +mod request_builder; +mod sink; +#[cfg(test)] +mod tests; + +#[derive(Clone, Copy, Debug, Default)] +pub struct StackdriverMetricsDefaultBatchSettings; + +impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1); + const MAX_BYTES: Option = None; + const TIMEOUT_SECS: f64 = 1.0; +} diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs new file mode 100644 index 0000000000000..cffc1b5eecdd6 --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -0,0 +1,129 @@ +use std::io; + +use bytes::Bytes; +use chrono::Utc; +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; + +#[derive(Clone, Debug)] +pub(super) struct StackdriverMetricsRequestBuilder { + pub(super) encoder: StackdriverMetricsEncoder, +} + +impl RequestBuilder> for StackdriverMetricsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = StackdriverMetricsEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} + +#[derive(Clone, Debug)] +pub struct StackdriverMetricsEncoder { + pub(super) default_namespace: String, + pub(super) started: chrono::DateTime, + pub(super) resource: gcp::GcpTypedResource, +} + +impl encoding::Encoder> for StackdriverMetricsEncoder { + fn encode_input( + &self, + mut input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + // TODO: Are we really only encoding the last event here? + let metric = input.pop().expect("only one metric"); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); + + let (series, data, _metadata) = metric.into_parts(); + let namespace = series + .name + .namespace + .unwrap_or_else(|| self.default_namespace.clone()); + let metric_type = format!( + "custom.googleapis.com/{}/metrics/{}", + namespace, series.name.name + ); + + let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); + + let (point_value, interval, metric_kind) = match &data.value { + MetricValue::Counter { value } => { + let interval = gcp::GcpInterval { + start_time: Some(self.started), + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Cumulative) + } + MetricValue::Gauge { value } => { + let interval = gcp::GcpInterval { + start_time: None, + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Gauge) + } + _ => unreachable!(), + }; + let metric_labels = series + .tags + .unwrap_or_default() + .into_iter_single() + .collect::>(); + + let series = gcp::GcpSeries { + time_series: &[gcp::GcpSerie { + metric: gcp::GcpMetric { + r#type: metric_type, + labels: metric_labels, + }, + resource: gcp::GcpResource { + r#type: self.resource.r#type.clone(), + labels: self.resource.labels.clone(), + }, + metric_kind, + value_type: gcp::GcpValueType::Int64, + points: &[gcp::GcpPoint { + interval, + value: gcp::GcpPointValue { + int64_value: Some(point_value as i64), + }, + }], + }], + }; + + let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); + writer.write_all(&body).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs new file mode 100644 index 0000000000000..79e81b23eac10 --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -0,0 +1,83 @@ +use vector_core::event::MetricValue; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::request_builder::StackdriverMetricsRequestBuilder; + +pub(super) struct StackdriverMetricsSink { + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverMetricsRequestBuilder, +} + +impl StackdriverMetricsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `StackdriverLogsSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverMetricsRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .filter_map(|event| { + // Filter out anything that is not a Counter or a Gauge. + let metric = event.into_metric(); + + future::ready(match metric.value() { + &MetricValue::Counter { .. } => Some(metric), + &MetricValue::Gauge { .. } => Some(metric), + not_supported => { + warn!("Unsupported metric type: {:?}.", not_supported); + None + } + }) + }) + // TODO Add some kind of normalizer + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for StackdriverMetricsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs new file mode 100644 index 0000000000000..97ed13ae9f351 --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -0,0 +1,42 @@ +use futures::{future::ready, stream}; +use serde::Deserialize; +use vector_core::event::{Metric, MetricKind, MetricValue}; + +use super::{config::StackdriverConfig, *}; +use crate::{ + config::SinkContext, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = StackdriverConfig::generate_config().to_string(); + let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + + // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance + // Metadata API, which we clearly don't have in unit tests. :) + config.auth.credentials_path = None; + config.auth.api_key = Some("fake".to_string().into()); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Metric(Metric::new( + "gauge-test", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + )); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; +} diff --git a/src/sinks/gcp/stackdriver/mod.rs b/src/sinks/gcp/stackdriver/mod.rs index dff3ee587ba14..ac7e73a24aeb0 100644 --- a/src/sinks/gcp/stackdriver/mod.rs +++ b/src/sinks/gcp/stackdriver/mod.rs @@ -1 +1,2 @@ mod logs; +mod metrics; diff --git a/src/sinks/gcp/stackdriver_metrics.rs b/src/sinks/gcp/stackdriver_metrics.rs deleted file mode 100644 index faa6989b80bd5..0000000000000 --- a/src/sinks/gcp/stackdriver_metrics.rs +++ /dev/null @@ -1,308 +0,0 @@ -// TODO: In order to correctly assert component specification compliance, we would have to do some more advanced mocking -// off the endpoint, which would include also providing a mock OAuth2 endpoint to allow for generating a token from the -// mocked credentials. Let this TODO serve as a placeholder for doing that in the future. - -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::{sink::SinkExt, FutureExt}; -use goauth::scopes::Scope; -use http::Uri; -use vector_config::configurable_component; - -use crate::{ - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Metric, MetricValue}, - gcp::{GcpAuthConfig, GcpAuthenticator}, - http::HttpClient, - sinks::{ - gcp, - util::{ - buffer::metrics::MetricsBuffer, - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, SinkBatchSettings, TowerRequestConfig, - }, - Healthcheck, VectorSink, - }, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Clone, Copy, Debug, Default)] -pub struct StackdriverMetricsDefaultBatchSettings; - -impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -/// Configuration for the `gcp_stackdriver_metrics` sink. -#[configurable_component(sink( - "gcp_stackdriver_metrics", - "Deliver metrics to GCP's Cloud Monitoring system." -))] -#[derive(Clone, Debug, Default)] -pub struct StackdriverConfig { - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - /// The project ID to which to publish metrics. - /// - /// See the [Google Cloud Platform project management documentation][project_docs] for more details. - /// - /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects - pub project_id: String, - - /// The monitored resource to associate the metrics with. - pub resource: gcp::GcpTypedResource, - - #[serde(flatten)] - pub auth: GcpAuthConfig, - - /// The default namespace to use for metrics that do not have one. - /// - /// Metrics with the same name can only be differentiated by their namespace, and not all - /// metrics have their own namespace. - #[serde(default = "default_metric_namespace_value")] - pub default_namespace: String, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_metric_namespace_value() -> String { - "namespace".to_string() -} - -fn default_endpoint() -> String { - "https://monitoring.googleapis.com".to_string() -} - -impl_generate_config_from_default!(StackdriverConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "gcp_stackdriver_metrics")] -impl SinkConfig for StackdriverConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let auth = self.auth.build(Scope::MonitoringWrite).await?; - - let healthcheck = healthcheck().boxed(); - let started = chrono::Utc::now(); - let request = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - let batch_settings = self.batch.into_batch_settings()?; - - auth.spawn_regenerate_token(); - let sink = HttpEventSink { - config: self.clone(), - started, - auth, - }; - - let sink = BatchedHttpSink::new( - sink, - MetricsBuffer::new(batch_settings.size), - request, - batch_settings.timeout, - client, - ) - .sink_map_err( - |error| error!(message = "Fatal gcp_stackdriver_metrics sink error.", %error), - ); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - Input::metric() - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -struct HttpEventSink { - config: StackdriverConfig, - started: DateTime, - auth: GcpAuthenticator, -} - -struct StackdriverMetricsEncoder; - -impl HttpEventEncoder for StackdriverMetricsEncoder { - fn encode_event(&mut self, event: Event) -> Option { - let metric = event.into_metric(); - - match metric.value() { - &MetricValue::Counter { .. } => Some(metric), - &MetricValue::Gauge { .. } => Some(metric), - not_supported => { - warn!("Unsupported metric type: {:?}.", not_supported); - None - } - } - } -} - -#[async_trait::async_trait] -impl HttpSink for HttpEventSink { - type Input = Metric; - type Output = Vec; - type Encoder = StackdriverMetricsEncoder; - - fn build_encoder(&self) -> Self::Encoder { - StackdriverMetricsEncoder - } - - async fn build_request( - &self, - mut metrics: Self::Output, - ) -> crate::Result> { - let metric = metrics.pop().expect("only one metric"); - let (series, data, _metadata) = metric.into_parts(); - let namespace = series - .name - .namespace - .unwrap_or_else(|| self.config.default_namespace.clone()); - let metric_type = format!( - "custom.googleapis.com/{}/metrics/{}", - namespace, series.name.name - ); - - let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); - - let (point_value, interval, metric_kind) = match &data.value { - MetricValue::Counter { value } => { - let interval = gcp::GcpInterval { - start_time: Some(self.started), - end_time, - }; - - (*value, interval, gcp::GcpMetricKind::Cumulative) - } - MetricValue::Gauge { value } => { - let interval = gcp::GcpInterval { - start_time: None, - end_time, - }; - - (*value, interval, gcp::GcpMetricKind::Gauge) - } - _ => unreachable!(), - }; - - let metric_labels = series - .tags - .unwrap_or_default() - .into_iter_single() - .collect::>(); - - let series = gcp::GcpSeries { - time_series: &[gcp::GcpSerie { - metric: gcp::GcpMetric { - r#type: metric_type, - labels: metric_labels, - }, - resource: gcp::GcpResource { - r#type: self.config.resource.r#type.clone(), - labels: self.config.resource.labels.clone(), - }, - metric_kind, - value_type: gcp::GcpValueType::Int64, - points: &[gcp::GcpPoint { - interval, - value: gcp::GcpPointValue { - int64_value: Some(point_value as i64), - }, - }], - }], - }; - - let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); - - let uri: Uri = format!( - "{}/v3/projects/{}/timeSeries", - self.config.endpoint, self.config.project_id - ) - .parse()?; - - let mut request = hyper::Request::post(uri) - .header("content-type", "application/json") - .body(body)?; - self.auth.apply(&mut request); - - Ok(request) - } -} - -async fn healthcheck() -> crate::Result<()> { - Ok(()) -} - -#[cfg(test)] -mod tests { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{MetricKind, MetricValue}; - - use super::*; - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = StackdriverConfig::generate_config().to_string(); - let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - - // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance - // Metadata API, which we clearly don't have in unit tests. :) - config.auth.credentials_path = None; - config.auth.api_key = Some("fake".to_string().into()); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Metric(Metric::new( - "gauge-test", - MetricKind::Absolute, - MetricValue::Gauge { value: 1_f64 }, - )); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; - } -} From e471d076427a4783fabbc00f3eb0f90f9d2fe2a6 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 29 Sep 2023 14:16:28 +0100 Subject: [PATCH 2/5] Added tests Signed-off-by: Stephen Wakely --- src/sinks/gcp/mod.rs | 8 +- src/sinks/gcp/stackdriver/metrics/config.rs | 2 +- .../stackdriver/metrics/request_builder.rs | 125 +++++----- src/sinks/gcp/stackdriver/metrics/tests.rs | 222 ++++++++++++++++++ 4 files changed, 293 insertions(+), 64 deletions(-) diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index 93c312711784f..537ea336fc05b 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -102,18 +102,18 @@ pub struct GcpResource { #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct GcpSerie<'a> { +pub struct GcpSerie { pub metric: GcpMetric, pub resource: GcpResource, pub metric_kind: GcpMetricKind, pub value_type: GcpValueType, - pub points: &'a [GcpPoint], + pub points: Vec, } #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct GcpSeries<'a> { - time_series: &'a [GcpSerie<'a>], + time_series: &'a [GcpSerie], } fn serialize_int64_value(value: &Option, serializer: S) -> Result @@ -179,7 +179,7 @@ mod tests { }, metric_kind: GcpMetricKind::Gauge, value_type: GcpValueType::Int64, - points: &[GcpPoint { + points: vec![GcpPoint { interval: GcpInterval { start_time: None, end_time, diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs index f5fdfef26014d..057cff32c76c5 100644 --- a/src/sinks/gcp/stackdriver/metrics/config.rs +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -65,7 +65,7 @@ pub struct StackdriverConfig { deserialize_with = "crate::serde::bool_or_struct", skip_serializing_if = "crate::serde::skip_serializing_if_default" )] - acknowledgements: AcknowledgementsConfig, + pub(super) acknowledgements: AcknowledgementsConfig, } fn default_metric_namespace_value() -> String { diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs index cffc1b5eecdd6..a52693c4c4e1e 100644 --- a/src/sinks/gcp/stackdriver/metrics/request_builder.rs +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -54,73 +54,80 @@ pub struct StackdriverMetricsEncoder { } impl encoding::Encoder> for StackdriverMetricsEncoder { + /// Create the object defined [here][api_docs]. + /// + /// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create fn encode_input( &self, - mut input: Vec, + input: Vec, writer: &mut dyn io::Write, ) -> io::Result<(usize, GroupedCountByteSize)> { - // TODO: Are we really only encoding the last event here? - let metric = input.pop().expect("only one metric"); - let mut byte_size = telemetry().create_request_count_byte_size(); - byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); - - let (series, data, _metadata) = metric.into_parts(); - let namespace = series - .name - .namespace - .unwrap_or_else(|| self.default_namespace.clone()); - let metric_type = format!( - "custom.googleapis.com/{}/metrics/{}", - namespace, series.name.name - ); - - let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); - - let (point_value, interval, metric_kind) = match &data.value { - MetricValue::Counter { value } => { - let interval = gcp::GcpInterval { - start_time: Some(self.started), - end_time, - }; - - (*value, interval, gcp::GcpMetricKind::Cumulative) - } - MetricValue::Gauge { value } => { - let interval = gcp::GcpInterval { - start_time: None, - end_time, + let time_series = input + .into_iter() + .map(|metric| { + byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); + + let (series, data, _metadata) = metric.into_parts(); + let namespace = series + .name + .namespace + .unwrap_or_else(|| self.default_namespace.clone()); + let metric_type = format!( + "custom.googleapis.com/{}/metrics/{}", + namespace, series.name.name + ); + + let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); + + let (point_value, interval, metric_kind) = match &data.value { + MetricValue::Counter { value } => { + let interval = gcp::GcpInterval { + start_time: Some(self.started), + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Cumulative) + } + MetricValue::Gauge { value } => { + let interval = gcp::GcpInterval { + start_time: None, + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Gauge) + } + _ => unreachable!(), }; - - (*value, interval, gcp::GcpMetricKind::Gauge) - } - _ => unreachable!(), - }; - let metric_labels = series - .tags - .unwrap_or_default() - .into_iter_single() - .collect::>(); + let metric_labels = series + .tags + .unwrap_or_default() + .into_iter_single() + .collect::>(); + + gcp::GcpSerie { + metric: gcp::GcpMetric { + r#type: metric_type, + labels: metric_labels, + }, + resource: gcp::GcpResource { + r#type: self.resource.r#type.clone(), + labels: self.resource.labels.clone(), + }, + metric_kind, + value_type: gcp::GcpValueType::Int64, + points: vec![gcp::GcpPoint { + interval, + value: gcp::GcpPointValue { + int64_value: Some(point_value as i64), + }, + }], + } + }) + .collect::>(); let series = gcp::GcpSeries { - time_series: &[gcp::GcpSerie { - metric: gcp::GcpMetric { - r#type: metric_type, - labels: metric_labels, - }, - resource: gcp::GcpResource { - r#type: self.resource.r#type.clone(), - labels: self.resource.labels.clone(), - }, - metric_kind, - value_type: gcp::GcpValueType::Int64, - points: &[gcp::GcpPoint { - interval, - value: gcp::GcpPointValue { - int64_value: Some(point_value as i64), - }, - }], - }], + time_series: &time_series, }; let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index 97ed13ae9f351..0ca5a4cb796f6 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use futures::{future::ready, stream}; use serde::Deserialize; use vector_core::event::{Metric, MetricKind, MetricValue}; @@ -5,9 +6,12 @@ use vector_core::event::{Metric, MetricKind, MetricValue}; use super::{config::StackdriverConfig, *}; use crate::{ config::SinkContext, + gcp::GcpAuthConfig, + sinks::util::test::build_test_server, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, http::{always_200_response, spawn_blackhole_http_server}, + next_addr, }, }; @@ -40,3 +44,221 @@ async fn component_spec_compliance() { )); run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; } + +#[tokio::test] +async fn sends_metric() { + let in_addr = next_addr(); + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + let timestamp = Utc::now(); + + let event = Event::Metric( + Metric::new( + "gauge-test", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp)), + ); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[{ + "metric":{ + "type":"custom.googleapis.com//metrics/gauge-test", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} + }] + }] + }), + event[0] + ); +} + +#[tokio::test] +async fn sends_multiple_metrics() { + let in_addr = next_addr(); + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + batch, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let timestamp1 = Utc::now(); + let timestamp2 = Utc::now(); + + let event = vec![ + Event::Metric( + Metric::new( + "gauge1", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp1)), + ), + Event::Metric( + Metric::new( + "gauge2", + MetricKind::Absolute, + MetricValue::Gauge { value: 5_f64 }, + ) + .with_timestamp(Some(timestamp2)), + ), + ]; + run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge1", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp1.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} + }] + }, + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge2", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"5"} + }] + }] + }), + event[0] + ); +} + +#[tokio::test] +async fn aggregates_metrics() { + let in_addr = next_addr(); + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + batch, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let timestamp1 = Utc::now(); + let timestamp2 = Utc::now(); + + let event = vec![ + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp1)), + ), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 5_f64 }, + ) + .with_timestamp(Some(timestamp2)), + ), + ]; + run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"5"} + }] + }] + }), + event[0] + ); +} From 22537a6ae8aa53513b4ba48e83c3e6f4405e2fcf Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Tue, 3 Oct 2023 14:43:50 +0100 Subject: [PATCH 3/5] Aggregate metrics Signed-off-by: Stephen Wakely --- .../stackdriver/metrics/request_builder.rs | 9 +- src/sinks/gcp/stackdriver/metrics/sink.rs | 59 ++++++++- src/sinks/gcp/stackdriver/metrics/tests.rs | 117 ++++++++++++++++++ 3 files changed, 178 insertions(+), 7 deletions(-) diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs index a52693c4c4e1e..d4800103979bf 100644 --- a/src/sinks/gcp/stackdriver/metrics/request_builder.rs +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -6,12 +6,14 @@ use vector_core::event::{Metric, MetricValue}; use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; +use super::sink::EventCollection; + #[derive(Clone, Debug)] pub(super) struct StackdriverMetricsRequestBuilder { pub(super) encoder: StackdriverMetricsEncoder, } -impl RequestBuilder> for StackdriverMetricsRequestBuilder { +impl RequestBuilder for StackdriverMetricsRequestBuilder { type Metadata = EventFinalizers; type Events = Vec; type Encoder = StackdriverMetricsEncoder; @@ -29,9 +31,10 @@ impl RequestBuilder> for StackdriverMetricsRequestBuilder { fn split_input( &self, - mut events: Vec, + events: EventCollection, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let finalizers = events.take_finalizers(); + let finalizers = events.finalizers; + let events = events.events.into_metrics(); let builder = RequestMetadataBuilder::from_events(&events); (finalizers, builder, events) } diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs index 79e81b23eac10..b1d2e785d31fa 100644 --- a/src/sinks/gcp/stackdriver/metrics/sink.rs +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -1,9 +1,48 @@ -use vector_core::event::MetricValue; +use vector_core::event::{Metric, MetricValue}; -use crate::sinks::{prelude::*, util::http::HttpRequest}; +use crate::sinks::{ + prelude::*, + util::{ + buffer::metrics::{MetricNormalize, MetricSet}, + http::HttpRequest, + }, +}; use super::request_builder::StackdriverMetricsRequestBuilder; +#[derive(Clone, Debug, Default)] +struct StackdriverMetricsNormalize; + +impl MetricNormalize for StackdriverMetricsNormalize { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + match (metric.kind(), &metric.value()) { + (_, MetricValue::Counter { .. }) => state.make_absolute(metric), + (_, MetricValue::Gauge { .. }) => state.make_absolute(metric), + // All others are left as-is + _ => Some(metric), + } + } +} + +#[derive(Clone)] +pub(super) struct EventCollection { + pub(super) finalizers: EventFinalizers, + pub(super) events: MetricSet, + pub(super) events_byte_size: usize, + pub(super) events_json_byte_size: GroupedCountByteSize, +} + +impl Default for EventCollection { + fn default() -> Self { + Self { + finalizers: Default::default(), + events: Default::default(), + events_byte_size: Default::default(), + events_json_byte_size: telemetry().create_request_count_byte_size(), + } + } +} + pub(super) struct StackdriverMetricsSink { service: S, batch_settings: BatcherSettings, @@ -45,8 +84,20 @@ where } }) }) - // TODO Add some kind of normalizer - .batched(self.batch_settings.into_byte_size_config()) + .normalized_with_default::() + .batched(self.batch_settings.into_reducer_config( + |data: &Metric| data.size_of(), + |event_collection: &mut EventCollection, mut item: Metric| { + event_collection + .finalizers + .merge(item.metadata_mut().take_finalizers()); + event_collection.events_byte_size += item.size_of(); + event_collection + .events_json_byte_size + .add_event(&item, item.estimated_json_encoded_size_of()); + event_collection.events.insert_update(item); + }, + )) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index 0ca5a4cb796f6..efb57d894916b 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -262,3 +262,120 @@ async fn aggregates_metrics() { event[0] ); } + +#[tokio::test] +async fn aggregates_metrics_batch_size() { + let in_addr = next_addr(); + let mut batch = BatchConfig::default(); + + // We specify a batch size of 2. + batch.max_events = Some(2); + + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + batch, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let timestamp1 = Utc::now(); + let timestamp2 = Utc::now(); + let timestamp3 = Utc::now(); + let timestamp4 = Utc::now(); + + let event = vec![ + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp1)), + ), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 5_f64 }, + ) + .with_timestamp(Some(timestamp2)), + ), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 7_f64 }, + ) + .with_timestamp(Some(timestamp3)), + ), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 10_f64 }, + ) + .with_timestamp(Some(timestamp4)), + ), + ]; + run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(2) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + // Whilst the batch size was set to 2, we actually get two batches with a single metric in them. This is because + // batches are counted by the number of incoming events. In this case 2 metrics were aggregated down to one, hence + // each batch being output with a single event. + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"5"} + }] + }] + }), + event[0] + ); + + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp4.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"10"} + }] + }] + }), + event[1] + ); +} From 3cba7bcb4dfd7d12d4bb84955d41f4a09c84ae3b Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 9 Oct 2023 12:55:49 +0100 Subject: [PATCH 4/5] Remove aggregation Signed-off-by: Stephen Wakely --- .../stackdriver/metrics/request_builder.rs | 9 +- src/sinks/gcp/stackdriver/metrics/sink.rs | 33 +---- src/sinks/gcp/stackdriver/metrics/tests.rs | 113 +----------------- 3 files changed, 8 insertions(+), 147 deletions(-) diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs index d4800103979bf..a52693c4c4e1e 100644 --- a/src/sinks/gcp/stackdriver/metrics/request_builder.rs +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -6,14 +6,12 @@ use vector_core::event::{Metric, MetricValue}; use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; -use super::sink::EventCollection; - #[derive(Clone, Debug)] pub(super) struct StackdriverMetricsRequestBuilder { pub(super) encoder: StackdriverMetricsEncoder, } -impl RequestBuilder for StackdriverMetricsRequestBuilder { +impl RequestBuilder> for StackdriverMetricsRequestBuilder { type Metadata = EventFinalizers; type Events = Vec; type Encoder = StackdriverMetricsEncoder; @@ -31,10 +29,9 @@ impl RequestBuilder for StackdriverMetricsRequestBuilder { fn split_input( &self, - events: EventCollection, + mut events: Vec, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let finalizers = events.finalizers; - let events = events.events.into_metrics(); + let finalizers = events.take_finalizers(); let builder = RequestMetadataBuilder::from_events(&events); (finalizers, builder, events) } diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs index b1d2e785d31fa..82f0dc1cb641d 100644 --- a/src/sinks/gcp/stackdriver/metrics/sink.rs +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -24,25 +24,6 @@ impl MetricNormalize for StackdriverMetricsNormalize { } } -#[derive(Clone)] -pub(super) struct EventCollection { - pub(super) finalizers: EventFinalizers, - pub(super) events: MetricSet, - pub(super) events_byte_size: usize, - pub(super) events_json_byte_size: GroupedCountByteSize, -} - -impl Default for EventCollection { - fn default() -> Self { - Self { - finalizers: Default::default(), - events: Default::default(), - events_byte_size: Default::default(), - events_json_byte_size: telemetry().create_request_count_byte_size(), - } - } -} - pub(super) struct StackdriverMetricsSink { service: S, batch_settings: BatcherSettings, @@ -85,19 +66,7 @@ where }) }) .normalized_with_default::() - .batched(self.batch_settings.into_reducer_config( - |data: &Metric| data.size_of(), - |event_collection: &mut EventCollection, mut item: Metric| { - event_collection - .finalizers - .merge(item.metadata_mut().take_finalizers()); - event_collection.events_byte_size += item.size_of(); - event_collection - .events_json_byte_size - .add_event(&item, item.estimated_json_encoded_size_of()); - event_collection.events.insert_update(item); - }, - )) + .batched(self.batch_settings.into_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index efb57d894916b..5b25168e0ecf9 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -190,7 +190,7 @@ async fn sends_multiple_metrics() { } #[tokio::test] -async fn aggregates_metrics() { +async fn does_not_aggregate_metrics() { let in_addr = next_addr(); let mut batch = BatchConfig::default(); batch.max_events = Some(5); @@ -254,95 +254,10 @@ async fn aggregates_metrics() { "metricKind":"GAUGE", "valueType":"INT64", "points":[{ - "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, - "value":{"int64Value":"5"} + "interval":{"endTime":timestamp1.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} }] - }] - }), - event[0] - ); -} - -#[tokio::test] -async fn aggregates_metrics_batch_size() { - let in_addr = next_addr(); - let mut batch = BatchConfig::default(); - - // We specify a batch size of 2. - batch.max_events = Some(2); - - let config = StackdriverConfig { - endpoint: format!("http://{in_addr}"), - auth: GcpAuthConfig { - api_key: None, - credentials_path: None, - skip_authentication: true, - }, - batch, - ..Default::default() - }; - let (rx, trigger, server) = build_test_server(in_addr); - tokio::spawn(server); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let timestamp1 = Utc::now(); - let timestamp2 = Utc::now(); - let timestamp3 = Utc::now(); - let timestamp4 = Utc::now(); - - let event = vec![ - Event::Metric( - Metric::new( - "gauge", - MetricKind::Absolute, - MetricValue::Gauge { value: 1_f64 }, - ) - .with_timestamp(Some(timestamp1)), - ), - Event::Metric( - Metric::new( - "gauge", - MetricKind::Absolute, - MetricValue::Gauge { value: 5_f64 }, - ) - .with_timestamp(Some(timestamp2)), - ), - Event::Metric( - Metric::new( - "gauge", - MetricKind::Absolute, - MetricValue::Gauge { value: 7_f64 }, - ) - .with_timestamp(Some(timestamp3)), - ), - Event::Metric( - Metric::new( - "gauge", - MetricKind::Absolute, - MetricValue::Gauge { value: 10_f64 }, - ) - .with_timestamp(Some(timestamp4)), - ), - ]; - run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; - - drop(trigger); - - let event = rx - .take(2) - .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) - .collect::>() - .await; - - // Whilst the batch size was set to 2, we actually get two batches with a single metric in them. This is because - // batches are counted by the number of incoming events. In this case 2 metrics were aggregated down to one, hence - // each batch being output with a single event. - assert_eq!( - serde_json::json! ({ - "timeSeries":[ - { + }, { "metric":{ "type":"custom.googleapis.com//metrics/gauge", "labels":{} @@ -358,24 +273,4 @@ async fn aggregates_metrics_batch_size() { }), event[0] ); - - assert_eq!( - serde_json::json! ({ - "timeSeries":[ - { - "metric":{ - "type":"custom.googleapis.com//metrics/gauge", - "labels":{} - }, - "resource":{"type":"","labels":{}}, - "metricKind":"GAUGE", - "valueType":"INT64", - "points":[{ - "interval":{"endTime":timestamp4.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, - "value":{"int64Value":"10"} - }] - }] - }), - event[1] - ); } From da9ef94f32bb49562cbb303f6f64a49f85ad2433 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 11 Oct 2023 12:08:15 +0100 Subject: [PATCH 5/5] Feedback from Kyle Signed-off-by: Stephen Wakely --- src/sinks/gcp/stackdriver/metrics/config.rs | 14 +++++++++++--- src/sinks/gcp/stackdriver/metrics/mod.rs | 19 ++++--------------- .../stackdriver/metrics/request_builder.rs | 4 +++- src/sinks/gcp/stackdriver/metrics/sink.rs | 2 +- src/sinks/gcp/stackdriver/metrics/tests.rs | 4 ++-- 5 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs index 057cff32c76c5..99d03d114fd29 100644 --- a/src/sinks/gcp/stackdriver/metrics/config.rs +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -15,7 +15,6 @@ use crate::{ use super::{ request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder}, sink::StackdriverMetricsSink, - StackdriverMetricsDefaultBatchSettings, }; /// Configuration for the `gcp_stackdriver_metrics` sink. @@ -113,10 +112,10 @@ impl SinkConfig for StackdriverConfig { auth.spawn_regenerate_token(); - let stackdriver_logs_service_request_builder = + let stackdriver_metrics_service_request_builder = StackdriverMetricsServiceRequestBuilder { uri, auth }; - let service = HttpService::new(client, stackdriver_logs_service_request_builder); + let service = HttpService::new(client, stackdriver_metrics_service_request_builder); let service = ServiceBuilder::new() .settings(request_limits, http_response_retry_logic()) @@ -136,6 +135,15 @@ impl SinkConfig for StackdriverConfig { } } +#[derive(Clone, Copy, Debug, Default)] +pub struct StackdriverMetricsDefaultBatchSettings; + +impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1); + const MAX_BYTES: Option = None; + const TIMEOUT_SECS: f64 = 1.0; +} + #[derive(Debug, Clone)] pub(super) struct StackdriverMetricsServiceRequestBuilder { pub(super) uri: Uri, diff --git a/src/sinks/gcp/stackdriver/metrics/mod.rs b/src/sinks/gcp/stackdriver/metrics/mod.rs index a73070b11e02f..8c070ff20781a 100644 --- a/src/sinks/gcp/stackdriver/metrics/mod.rs +++ b/src/sinks/gcp/stackdriver/metrics/mod.rs @@ -1,20 +1,9 @@ -// TODO: In order to correctly assert component specification compliance, we would have to do some more advanced mocking -// off the endpoint, which would include also providing a mock OAuth2 endpoint to allow for generating a token from the -// mocked credentials. Let this TODO serve as a placeholder for doing that in the future. - -use crate::sinks::prelude::*; - +//! GCP Cloud Monitoring (formerly Stackdriver Metrics) sink. +//! Sends metrics to [GPC Cloud Monitoring][cloud monitoring]. +//! +//! [cloud monitoring]: https://cloud.google.com/monitoring/docs/monitoring-overview mod config; mod request_builder; mod sink; #[cfg(test)] mod tests; - -#[derive(Clone, Copy, Debug, Default)] -pub struct StackdriverMetricsDefaultBatchSettings; - -impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs index a52693c4c4e1e..12e4526f055f5 100644 --- a/src/sinks/gcp/stackdriver/metrics/request_builder.rs +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -97,7 +97,9 @@ impl encoding::Encoder> for StackdriverMetricsEncoder { (*value, interval, gcp::GcpMetricKind::Gauge) } - _ => unreachable!(), + _ => { + unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point") + }, }; let metric_labels = series .tags diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs index 82f0dc1cb641d..538a666bdd91d 100644 --- a/src/sinks/gcp/stackdriver/metrics/sink.rs +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -37,7 +37,7 @@ where S::Response: DriverResponse + Send + 'static, S::Error: std::fmt::Debug + Into + Send, { - /// Creates a new `StackdriverLogsSink`. + /// Creates a new `StackdriverMetricsSink`. pub(super) const fn new( service: S, batch_settings: BatcherSettings, diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index 5b25168e0ecf9..a27194f1900e4 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -3,11 +3,11 @@ use futures::{future::ready, stream}; use serde::Deserialize; use vector_core::event::{Metric, MetricKind, MetricValue}; -use super::{config::StackdriverConfig, *}; +use super::config::StackdriverConfig; use crate::{ config::SinkContext, gcp::GcpAuthConfig, - sinks::util::test::build_test_server, + sinks::{prelude::*, util::test::build_test_server}, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, http::{always_200_response, spawn_blackhole_http_server},