-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(gcp_stackdriver_metrics sink): rewrite to stream based sink (#1…
…8749) * Move to stream based sink Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> * Added tests Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> * Aggregate metrics Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> * Remove aggregation Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> * Feedback from Kyle Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> --------- Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
- Loading branch information
1 parent
a6d9bac
commit 4cdad00
Showing
9 changed files
with
703 additions
and
317 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
use bytes::Bytes; | ||
use goauth::scopes::Scope; | ||
use http::{Request, Uri}; | ||
|
||
use crate::{ | ||
gcp::{GcpAuthConfig, GcpAuthenticator}, | ||
http::HttpClient, | ||
sinks::{ | ||
gcp, | ||
prelude::*, | ||
util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, | ||
}, | ||
}; | ||
|
||
use super::{ | ||
request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder}, | ||
sink::StackdriverMetricsSink, | ||
}; | ||
|
||
/// Configuration for the `gcp_stackdriver_metrics` sink. | ||
#[configurable_component(sink( | ||
"gcp_stackdriver_metrics", | ||
"Deliver metrics to GCP's Cloud Monitoring system." | ||
))] | ||
#[derive(Clone, Debug, Default)] | ||
pub struct StackdriverConfig { | ||
#[serde(skip, default = "default_endpoint")] | ||
pub(super) endpoint: String, | ||
|
||
/// The project ID to which to publish metrics. | ||
/// | ||
/// See the [Google Cloud Platform project management documentation][project_docs] for more details. | ||
/// | ||
/// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects | ||
pub(super) project_id: String, | ||
|
||
/// The monitored resource to associate the metrics with. | ||
pub(super) resource: gcp::GcpTypedResource, | ||
|
||
#[serde(flatten)] | ||
pub(super) auth: GcpAuthConfig, | ||
|
||
/// The default namespace to use for metrics that do not have one. | ||
/// | ||
/// Metrics with the same name can only be differentiated by their namespace, and not all | ||
/// metrics have their own namespace. | ||
#[serde(default = "default_metric_namespace_value")] | ||
pub(super) default_namespace: String, | ||
|
||
#[configurable(derived)] | ||
#[serde(default)] | ||
pub(super) request: TowerRequestConfig, | ||
|
||
#[configurable(derived)] | ||
#[serde(default)] | ||
pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>, | ||
|
||
#[configurable(derived)] | ||
pub(super) tls: Option<TlsConfig>, | ||
|
||
#[configurable(derived)] | ||
#[serde( | ||
default, | ||
deserialize_with = "crate::serde::bool_or_struct", | ||
skip_serializing_if = "crate::serde::skip_serializing_if_default" | ||
)] | ||
pub(super) acknowledgements: AcknowledgementsConfig, | ||
} | ||
|
||
fn default_metric_namespace_value() -> String { | ||
"namespace".to_string() | ||
} | ||
|
||
fn default_endpoint() -> String { | ||
"https://monitoring.googleapis.com".to_string() | ||
} | ||
|
||
impl_generate_config_from_default!(StackdriverConfig); | ||
|
||
#[async_trait::async_trait] | ||
#[typetag::serde(name = "gcp_stackdriver_metrics")] | ||
impl SinkConfig for StackdriverConfig { | ||
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { | ||
let auth = self.auth.build(Scope::MonitoringWrite).await?; | ||
|
||
let healthcheck = healthcheck().boxed(); | ||
let started = chrono::Utc::now(); | ||
let tls_settings = TlsSettings::from_options(&self.tls)?; | ||
let client = HttpClient::new(tls_settings, cx.proxy())?; | ||
|
||
let batch_settings = self.batch.validate()?.into_batcher_settings()?; | ||
|
||
let request_builder = StackdriverMetricsRequestBuilder { | ||
encoder: StackdriverMetricsEncoder { | ||
default_namespace: self.default_namespace.clone(), | ||
started, | ||
resource: self.resource.clone(), | ||
}, | ||
}; | ||
|
||
let request_limits = self.request.unwrap_with( | ||
&TowerRequestConfig::default() | ||
.rate_limit_duration_secs(1) | ||
.rate_limit_num(1000), | ||
); | ||
|
||
let uri: Uri = format!( | ||
"{}/v3/projects/{}/timeSeries", | ||
self.endpoint, self.project_id | ||
) | ||
.parse()?; | ||
|
||
auth.spawn_regenerate_token(); | ||
|
||
let stackdriver_metrics_service_request_builder = | ||
StackdriverMetricsServiceRequestBuilder { uri, auth }; | ||
|
||
let service = HttpService::new(client, stackdriver_metrics_service_request_builder); | ||
|
||
let service = ServiceBuilder::new() | ||
.settings(request_limits, http_response_retry_logic()) | ||
.service(service); | ||
|
||
let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); | ||
|
||
Ok((VectorSink::from_event_streamsink(sink), healthcheck)) | ||
} | ||
|
||
fn input(&self) -> Input { | ||
Input::metric() | ||
} | ||
|
||
fn acknowledgements(&self) -> &AcknowledgementsConfig { | ||
&self.acknowledgements | ||
} | ||
} | ||
|
||
#[derive(Clone, Copy, Debug, Default)] | ||
pub struct StackdriverMetricsDefaultBatchSettings; | ||
|
||
impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { | ||
const MAX_EVENTS: Option<usize> = Some(1); | ||
const MAX_BYTES: Option<usize> = None; | ||
const TIMEOUT_SECS: f64 = 1.0; | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(super) struct StackdriverMetricsServiceRequestBuilder { | ||
pub(super) uri: Uri, | ||
pub(super) auth: GcpAuthenticator, | ||
} | ||
|
||
impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder { | ||
fn build(&self, body: Bytes) -> Request<Bytes> { | ||
let mut request = Request::post(self.uri.clone()) | ||
.header("Content-Type", "application/json") | ||
.body(body) | ||
.unwrap(); | ||
|
||
self.auth.apply(&mut request); | ||
|
||
request | ||
} | ||
} | ||
|
||
async fn healthcheck() -> crate::Result<()> { | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
//! GCP Cloud Monitoring (formerly Stackdriver Metrics) sink. | ||
//! Sends metrics to [GPC Cloud Monitoring][cloud monitoring]. | ||
//! | ||
//! [cloud monitoring]: https://cloud.google.com/monitoring/docs/monitoring-overview | ||
mod config; | ||
mod request_builder; | ||
mod sink; | ||
#[cfg(test)] | ||
mod tests; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
use std::io; | ||
|
||
use bytes::Bytes; | ||
use chrono::Utc; | ||
use vector_core::event::{Metric, MetricValue}; | ||
|
||
use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; | ||
|
||
#[derive(Clone, Debug)] | ||
pub(super) struct StackdriverMetricsRequestBuilder { | ||
pub(super) encoder: StackdriverMetricsEncoder, | ||
} | ||
|
||
impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder { | ||
type Metadata = EventFinalizers; | ||
type Events = Vec<Metric>; | ||
type Encoder = StackdriverMetricsEncoder; | ||
type Payload = Bytes; | ||
type Request = HttpRequest; | ||
type Error = io::Error; | ||
|
||
fn compression(&self) -> Compression { | ||
Compression::None | ||
} | ||
|
||
fn encoder(&self) -> &Self::Encoder { | ||
&self.encoder | ||
} | ||
|
||
fn split_input( | ||
&self, | ||
mut events: Vec<Metric>, | ||
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { | ||
let finalizers = events.take_finalizers(); | ||
let builder = RequestMetadataBuilder::from_events(&events); | ||
(finalizers, builder, events) | ||
} | ||
|
||
fn build_request( | ||
&self, | ||
metadata: Self::Metadata, | ||
request_metadata: RequestMetadata, | ||
payload: EncodeResult<Self::Payload>, | ||
) -> Self::Request { | ||
HttpRequest::new(payload.into_payload(), metadata, request_metadata) | ||
} | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct StackdriverMetricsEncoder { | ||
pub(super) default_namespace: String, | ||
pub(super) started: chrono::DateTime<Utc>, | ||
pub(super) resource: gcp::GcpTypedResource, | ||
} | ||
|
||
impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder { | ||
/// Create the object defined [here][api_docs]. | ||
/// | ||
/// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create | ||
fn encode_input( | ||
&self, | ||
input: Vec<Metric>, | ||
writer: &mut dyn io::Write, | ||
) -> io::Result<(usize, GroupedCountByteSize)> { | ||
let mut byte_size = telemetry().create_request_count_byte_size(); | ||
let time_series = input | ||
.into_iter() | ||
.map(|metric| { | ||
byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); | ||
|
||
let (series, data, _metadata) = metric.into_parts(); | ||
let namespace = series | ||
.name | ||
.namespace | ||
.unwrap_or_else(|| self.default_namespace.clone()); | ||
let metric_type = format!( | ||
"custom.googleapis.com/{}/metrics/{}", | ||
namespace, series.name.name | ||
); | ||
|
||
let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); | ||
|
||
let (point_value, interval, metric_kind) = match &data.value { | ||
MetricValue::Counter { value } => { | ||
let interval = gcp::GcpInterval { | ||
start_time: Some(self.started), | ||
end_time, | ||
}; | ||
|
||
(*value, interval, gcp::GcpMetricKind::Cumulative) | ||
} | ||
MetricValue::Gauge { value } => { | ||
let interval = gcp::GcpInterval { | ||
start_time: None, | ||
end_time, | ||
}; | ||
|
||
(*value, interval, gcp::GcpMetricKind::Gauge) | ||
} | ||
_ => { | ||
unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point") | ||
}, | ||
}; | ||
let metric_labels = series | ||
.tags | ||
.unwrap_or_default() | ||
.into_iter_single() | ||
.collect::<std::collections::HashMap<_, _>>(); | ||
|
||
gcp::GcpSerie { | ||
metric: gcp::GcpMetric { | ||
r#type: metric_type, | ||
labels: metric_labels, | ||
}, | ||
resource: gcp::GcpResource { | ||
r#type: self.resource.r#type.clone(), | ||
labels: self.resource.labels.clone(), | ||
}, | ||
metric_kind, | ||
value_type: gcp::GcpValueType::Int64, | ||
points: vec![gcp::GcpPoint { | ||
interval, | ||
value: gcp::GcpPointValue { | ||
int64_value: Some(point_value as i64), | ||
}, | ||
}], | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
let series = gcp::GcpSeries { | ||
time_series: &time_series, | ||
}; | ||
|
||
let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); | ||
writer.write_all(&body).map(|()| (body.len(), byte_size)) | ||
} | ||
} |
Oops, something went wrong.