diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index 5b65089852ea09..3043cc2ba519c2 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -248,7 +248,7 @@ impl ChronicleUnstructuredConfig { .settings(request, GcsRetryLogic) .service(ChronicleService::new(client, base_url, creds)); - let request_settings = RequestSettings::new(self)?; + let request_settings = ChronicleRequestBuilder::new(self)?; let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, "http"); @@ -362,7 +362,7 @@ impl Encoder<(String, Vec)> for ChronicleEncoder { // request. All possible values are pre-computed for direct use in // producing a request. #[derive(Clone, Debug)] -struct RequestSettings { +struct ChronicleRequestBuilder { encoder: ChronicleEncoder, } @@ -382,7 +382,7 @@ impl AsRef<[u8]> for ChronicleRequestPayload { } } -impl RequestBuilder<(String, Vec)> for RequestSettings { +impl RequestBuilder<(String, Vec)> for ChronicleRequestBuilder { type Metadata = EventFinalizers; type Events = (String, Vec); type Encoder = ChronicleEncoder; @@ -423,7 +423,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } } -impl RequestSettings { +impl ChronicleRequestBuilder { fn new(config: &ChronicleUnstructuredConfig) -> crate::Result { let transformer = config.encoding.transformer(); let serializer = config.encoding.config().build()?; diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index 5a7f64b84c076d..537ea336fc05bb 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -7,7 +7,6 @@ pub mod chronicle_unstructured; pub mod cloud_storage; pub mod pubsub; pub mod stackdriver; -pub mod stackdriver_metrics; /// A monitored resource. /// @@ -103,18 +102,18 @@ pub struct GcpResource { #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct GcpSerie<'a> { +pub struct GcpSerie { pub metric: GcpMetric, pub resource: GcpResource, pub metric_kind: GcpMetricKind, pub value_type: GcpValueType, - pub points: &'a [GcpPoint], + pub points: Vec, } #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct GcpSeries<'a> { - time_series: &'a [GcpSerie<'a>], + time_series: &'a [GcpSerie], } fn serialize_int64_value(value: &Option, serializer: S) -> Result @@ -180,7 +179,7 @@ mod tests { }, metric_kind: GcpMetricKind::Gauge, value_type: GcpValueType::Int64, - points: &[GcpPoint { + points: vec![GcpPoint { interval: GcpInterval { start_time: None, end_time, diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs new file mode 100644 index 00000000000000..99d03d114fd293 --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -0,0 +1,168 @@ +use bytes::Bytes; +use goauth::scopes::Scope; +use http::{Request, Uri}; + +use crate::{ + gcp::{GcpAuthConfig, GcpAuthenticator}, + http::HttpClient, + sinks::{ + gcp, + prelude::*, + util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, + }, +}; + +use super::{ + request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder}, + sink::StackdriverMetricsSink, +}; + +/// Configuration for the `gcp_stackdriver_metrics` sink. +#[configurable_component(sink( + "gcp_stackdriver_metrics", + "Deliver metrics to GCP's Cloud Monitoring system." +))] +#[derive(Clone, Debug, Default)] +pub struct StackdriverConfig { + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + /// The project ID to which to publish metrics. + /// + /// See the [Google Cloud Platform project management documentation][project_docs] for more details. + /// + /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects + pub(super) project_id: String, + + /// The monitored resource to associate the metrics with. + pub(super) resource: gcp::GcpTypedResource, + + #[serde(flatten)] + pub(super) auth: GcpAuthConfig, + + /// The default namespace to use for metrics that do not have one. + /// + /// Metrics with the same name can only be differentiated by their namespace, and not all + /// metrics have their own namespace. + #[serde(default = "default_metric_namespace_value")] + pub(super) default_namespace: String, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) acknowledgements: AcknowledgementsConfig, +} + +fn default_metric_namespace_value() -> String { + "namespace".to_string() +} + +fn default_endpoint() -> String { + "https://monitoring.googleapis.com".to_string() +} + +impl_generate_config_from_default!(StackdriverConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "gcp_stackdriver_metrics")] +impl SinkConfig for StackdriverConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let auth = self.auth.build(Scope::MonitoringWrite).await?; + + let healthcheck = healthcheck().boxed(); + let started = chrono::Utc::now(); + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls_settings, cx.proxy())?; + + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = StackdriverMetricsRequestBuilder { + encoder: StackdriverMetricsEncoder { + default_namespace: self.default_namespace.clone(), + started, + resource: self.resource.clone(), + }, + }; + + let request_limits = self.request.unwrap_with( + &TowerRequestConfig::default() + .rate_limit_duration_secs(1) + .rate_limit_num(1000), + ); + + let uri: Uri = format!( + "{}/v3/projects/{}/timeSeries", + self.endpoint, self.project_id + ) + .parse()?; + + auth.spawn_regenerate_token(); + + let stackdriver_metrics_service_request_builder = + StackdriverMetricsServiceRequestBuilder { uri, auth }; + + let service = HttpService::new(client, stackdriver_metrics_service_request_builder); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct StackdriverMetricsDefaultBatchSettings; + +impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1); + const MAX_BYTES: Option = None; + const TIMEOUT_SECS: f64 = 1.0; +} + +#[derive(Debug, Clone)] +pub(super) struct StackdriverMetricsServiceRequestBuilder { + pub(super) uri: Uri, + pub(super) auth: GcpAuthenticator, +} + +impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let mut request = Request::post(self.uri.clone()) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + self.auth.apply(&mut request); + + request + } +} + +async fn healthcheck() -> crate::Result<()> { + Ok(()) +} diff --git a/src/sinks/gcp/stackdriver/metrics/mod.rs b/src/sinks/gcp/stackdriver/metrics/mod.rs new file mode 100644 index 00000000000000..8c070ff20781ab --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/mod.rs @@ -0,0 +1,9 @@ +//! GCP Cloud Monitoring (formerly Stackdriver Metrics) sink. +//! Sends metrics to [GPC Cloud Monitoring][cloud monitoring]. +//! +//! [cloud monitoring]: https://cloud.google.com/monitoring/docs/monitoring-overview +mod config; +mod request_builder; +mod sink; +#[cfg(test)] +mod tests; diff --git a/src/sinks/gcp/stackdriver/metrics/request_builder.rs b/src/sinks/gcp/stackdriver/metrics/request_builder.rs new file mode 100644 index 00000000000000..12e4526f055f58 --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/request_builder.rs @@ -0,0 +1,138 @@ +use std::io; + +use bytes::Bytes; +use chrono::Utc; +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; + +#[derive(Clone, Debug)] +pub(super) struct StackdriverMetricsRequestBuilder { + pub(super) encoder: StackdriverMetricsEncoder, +} + +impl RequestBuilder> for StackdriverMetricsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = StackdriverMetricsEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} + +#[derive(Clone, Debug)] +pub struct StackdriverMetricsEncoder { + pub(super) default_namespace: String, + pub(super) started: chrono::DateTime, + pub(super) resource: gcp::GcpTypedResource, +} + +impl encoding::Encoder> for StackdriverMetricsEncoder { + /// Create the object defined [here][api_docs]. + /// + /// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create + fn encode_input( + &self, + input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let time_series = input + .into_iter() + .map(|metric| { + byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); + + let (series, data, _metadata) = metric.into_parts(); + let namespace = series + .name + .namespace + .unwrap_or_else(|| self.default_namespace.clone()); + let metric_type = format!( + "custom.googleapis.com/{}/metrics/{}", + namespace, series.name.name + ); + + let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); + + let (point_value, interval, metric_kind) = match &data.value { + MetricValue::Counter { value } => { + let interval = gcp::GcpInterval { + start_time: Some(self.started), + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Cumulative) + } + MetricValue::Gauge { value } => { + let interval = gcp::GcpInterval { + start_time: None, + end_time, + }; + + (*value, interval, gcp::GcpMetricKind::Gauge) + } + _ => { + unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point") + }, + }; + let metric_labels = series + .tags + .unwrap_or_default() + .into_iter_single() + .collect::>(); + + gcp::GcpSerie { + metric: gcp::GcpMetric { + r#type: metric_type, + labels: metric_labels, + }, + resource: gcp::GcpResource { + r#type: self.resource.r#type.clone(), + labels: self.resource.labels.clone(), + }, + metric_kind, + value_type: gcp::GcpValueType::Int64, + points: vec![gcp::GcpPoint { + interval, + value: gcp::GcpPointValue { + int64_value: Some(point_value as i64), + }, + }], + } + }) + .collect::>(); + + let series = gcp::GcpSeries { + time_series: &time_series, + }; + + let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); + writer.write_all(&body).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs new file mode 100644 index 00000000000000..538a666bdd91db --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -0,0 +1,103 @@ +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::{ + prelude::*, + util::{ + buffer::metrics::{MetricNormalize, MetricSet}, + http::HttpRequest, + }, +}; + +use super::request_builder::StackdriverMetricsRequestBuilder; + +#[derive(Clone, Debug, Default)] +struct StackdriverMetricsNormalize; + +impl MetricNormalize for StackdriverMetricsNormalize { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + match (metric.kind(), &metric.value()) { + (_, MetricValue::Counter { .. }) => state.make_absolute(metric), + (_, MetricValue::Gauge { .. }) => state.make_absolute(metric), + // All others are left as-is + _ => Some(metric), + } + } +} + +pub(super) struct StackdriverMetricsSink { + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverMetricsRequestBuilder, +} + +impl StackdriverMetricsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `StackdriverMetricsSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverMetricsRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .filter_map(|event| { + // Filter out anything that is not a Counter or a Gauge. + let metric = event.into_metric(); + + future::ready(match metric.value() { + &MetricValue::Counter { .. } => Some(metric), + &MetricValue::Gauge { .. } => Some(metric), + not_supported => { + warn!("Unsupported metric type: {:?}.", not_supported); + None + } + }) + }) + .normalized_with_default::() + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for StackdriverMetricsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs new file mode 100644 index 00000000000000..a27194f1900e4a --- /dev/null +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -0,0 +1,276 @@ +use chrono::Utc; +use futures::{future::ready, stream}; +use serde::Deserialize; +use vector_core::event::{Metric, MetricKind, MetricValue}; + +use super::config::StackdriverConfig; +use crate::{ + config::SinkContext, + gcp::GcpAuthConfig, + sinks::{prelude::*, util::test::build_test_server}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + next_addr, + }, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = StackdriverConfig::generate_config().to_string(); + let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + + // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance + // Metadata API, which we clearly don't have in unit tests. :) + config.auth.credentials_path = None; + config.auth.api_key = Some("fake".to_string().into()); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Metric(Metric::new( + "gauge-test", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + )); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; +} + +#[tokio::test] +async fn sends_metric() { + let in_addr = next_addr(); + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + let timestamp = Utc::now(); + + let event = Event::Metric( + Metric::new( + "gauge-test", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp)), + ); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[{ + "metric":{ + "type":"custom.googleapis.com//metrics/gauge-test", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} + }] + }] + }), + event[0] + ); +} + +#[tokio::test] +async fn sends_multiple_metrics() { + let in_addr = next_addr(); + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + batch, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let timestamp1 = Utc::now(); + let timestamp2 = Utc::now(); + + let event = vec![ + Event::Metric( + Metric::new( + "gauge1", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp1)), + ), + Event::Metric( + Metric::new( + "gauge2", + MetricKind::Absolute, + MetricValue::Gauge { value: 5_f64 }, + ) + .with_timestamp(Some(timestamp2)), + ), + ]; + run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge1", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp1.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} + }] + }, + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge2", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"5"} + }] + }] + }), + event[0] + ); +} + +#[tokio::test] +async fn does_not_aggregate_metrics() { + let in_addr = next_addr(); + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = StackdriverConfig { + endpoint: format!("http://{in_addr}"), + auth: GcpAuthConfig { + api_key: None, + credentials_path: None, + skip_authentication: true, + }, + batch, + ..Default::default() + }; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let timestamp1 = Utc::now(); + let timestamp2 = Utc::now(); + + let event = vec![ + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 1_f64 }, + ) + .with_timestamp(Some(timestamp1)), + ), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 5_f64 }, + ) + .with_timestamp(Some(timestamp2)), + ), + ]; + run_and_assert_sink_compliance(sink, stream::iter(event), &SINK_TAGS).await; + + drop(trigger); + + let event = rx + .take(1) + .map(|(_, bytes)| serde_json::from_slice::(&bytes).unwrap()) + .collect::>() + .await; + + assert_eq!( + serde_json::json! ({ + "timeSeries":[ + { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp1.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"1"} + }] + }, { + "metric":{ + "type":"custom.googleapis.com//metrics/gauge", + "labels":{} + }, + "resource":{"type":"","labels":{}}, + "metricKind":"GAUGE", + "valueType":"INT64", + "points":[{ + "interval":{"endTime":timestamp2.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)}, + "value":{"int64Value":"5"} + }] + }] + }), + event[0] + ); +} diff --git a/src/sinks/gcp/stackdriver/mod.rs b/src/sinks/gcp/stackdriver/mod.rs index dff3ee587ba143..ac7e73a24aeb03 100644 --- a/src/sinks/gcp/stackdriver/mod.rs +++ b/src/sinks/gcp/stackdriver/mod.rs @@ -1 +1,2 @@ mod logs; +mod metrics; diff --git a/src/sinks/gcp/stackdriver_metrics.rs b/src/sinks/gcp/stackdriver_metrics.rs deleted file mode 100644 index faa6989b80bd58..00000000000000 --- a/src/sinks/gcp/stackdriver_metrics.rs +++ /dev/null @@ -1,308 +0,0 @@ -// TODO: In order to correctly assert component specification compliance, we would have to do some more advanced mocking -// off the endpoint, which would include also providing a mock OAuth2 endpoint to allow for generating a token from the -// mocked credentials. Let this TODO serve as a placeholder for doing that in the future. - -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::{sink::SinkExt, FutureExt}; -use goauth::scopes::Scope; -use http::Uri; -use vector_config::configurable_component; - -use crate::{ - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Metric, MetricValue}, - gcp::{GcpAuthConfig, GcpAuthenticator}, - http::HttpClient, - sinks::{ - gcp, - util::{ - buffer::metrics::MetricsBuffer, - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, SinkBatchSettings, TowerRequestConfig, - }, - Healthcheck, VectorSink, - }, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Clone, Copy, Debug, Default)] -pub struct StackdriverMetricsDefaultBatchSettings; - -impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -/// Configuration for the `gcp_stackdriver_metrics` sink. -#[configurable_component(sink( - "gcp_stackdriver_metrics", - "Deliver metrics to GCP's Cloud Monitoring system." -))] -#[derive(Clone, Debug, Default)] -pub struct StackdriverConfig { - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - /// The project ID to which to publish metrics. - /// - /// See the [Google Cloud Platform project management documentation][project_docs] for more details. - /// - /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects - pub project_id: String, - - /// The monitored resource to associate the metrics with. - pub resource: gcp::GcpTypedResource, - - #[serde(flatten)] - pub auth: GcpAuthConfig, - - /// The default namespace to use for metrics that do not have one. - /// - /// Metrics with the same name can only be differentiated by their namespace, and not all - /// metrics have their own namespace. - #[serde(default = "default_metric_namespace_value")] - pub default_namespace: String, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_metric_namespace_value() -> String { - "namespace".to_string() -} - -fn default_endpoint() -> String { - "https://monitoring.googleapis.com".to_string() -} - -impl_generate_config_from_default!(StackdriverConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "gcp_stackdriver_metrics")] -impl SinkConfig for StackdriverConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let auth = self.auth.build(Scope::MonitoringWrite).await?; - - let healthcheck = healthcheck().boxed(); - let started = chrono::Utc::now(); - let request = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - let batch_settings = self.batch.into_batch_settings()?; - - auth.spawn_regenerate_token(); - let sink = HttpEventSink { - config: self.clone(), - started, - auth, - }; - - let sink = BatchedHttpSink::new( - sink, - MetricsBuffer::new(batch_settings.size), - request, - batch_settings.timeout, - client, - ) - .sink_map_err( - |error| error!(message = "Fatal gcp_stackdriver_metrics sink error.", %error), - ); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - Input::metric() - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -struct HttpEventSink { - config: StackdriverConfig, - started: DateTime, - auth: GcpAuthenticator, -} - -struct StackdriverMetricsEncoder; - -impl HttpEventEncoder for StackdriverMetricsEncoder { - fn encode_event(&mut self, event: Event) -> Option { - let metric = event.into_metric(); - - match metric.value() { - &MetricValue::Counter { .. } => Some(metric), - &MetricValue::Gauge { .. } => Some(metric), - not_supported => { - warn!("Unsupported metric type: {:?}.", not_supported); - None - } - } - } -} - -#[async_trait::async_trait] -impl HttpSink for HttpEventSink { - type Input = Metric; - type Output = Vec; - type Encoder = StackdriverMetricsEncoder; - - fn build_encoder(&self) -> Self::Encoder { - StackdriverMetricsEncoder - } - - async fn build_request( - &self, - mut metrics: Self::Output, - ) -> crate::Result> { - let metric = metrics.pop().expect("only one metric"); - let (series, data, _metadata) = metric.into_parts(); - let namespace = series - .name - .namespace - .unwrap_or_else(|| self.config.default_namespace.clone()); - let metric_type = format!( - "custom.googleapis.com/{}/metrics/{}", - namespace, series.name.name - ); - - let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); - - let (point_value, interval, metric_kind) = match &data.value { - MetricValue::Counter { value } => { - let interval = gcp::GcpInterval { - start_time: Some(self.started), - end_time, - }; - - (*value, interval, gcp::GcpMetricKind::Cumulative) - } - MetricValue::Gauge { value } => { - let interval = gcp::GcpInterval { - start_time: None, - end_time, - }; - - (*value, interval, gcp::GcpMetricKind::Gauge) - } - _ => unreachable!(), - }; - - let metric_labels = series - .tags - .unwrap_or_default() - .into_iter_single() - .collect::>(); - - let series = gcp::GcpSeries { - time_series: &[gcp::GcpSerie { - metric: gcp::GcpMetric { - r#type: metric_type, - labels: metric_labels, - }, - resource: gcp::GcpResource { - r#type: self.config.resource.r#type.clone(), - labels: self.config.resource.labels.clone(), - }, - metric_kind, - value_type: gcp::GcpValueType::Int64, - points: &[gcp::GcpPoint { - interval, - value: gcp::GcpPointValue { - int64_value: Some(point_value as i64), - }, - }], - }], - }; - - let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); - - let uri: Uri = format!( - "{}/v3/projects/{}/timeSeries", - self.config.endpoint, self.config.project_id - ) - .parse()?; - - let mut request = hyper::Request::post(uri) - .header("content-type", "application/json") - .body(body)?; - self.auth.apply(&mut request); - - Ok(request) - } -} - -async fn healthcheck() -> crate::Result<()> { - Ok(()) -} - -#[cfg(test)] -mod tests { - use futures::{future::ready, stream}; - use serde::Deserialize; - use vector_core::event::{MetricKind, MetricValue}; - - use super::*; - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = StackdriverConfig::generate_config().to_string(); - let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - - // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance - // Metadata API, which we clearly don't have in unit tests. :) - config.auth.credentials_path = None; - config.auth.api_key = Some("fake".to_string().into()); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Metric(Metric::new( - "gauge-test", - MetricKind::Absolute, - MetricValue::Gauge { value: 1_f64 }, - )); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; - } -}