Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(gcp_stackdriver_metrics sink): rewrite to stream based sink #18749

Merged
merged 5 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/sinks/gcp/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl ChronicleUnstructuredConfig {
.settings(request, GcsRetryLogic)
.service(ChronicleService::new(client, base_url, creds));

let request_settings = RequestSettings::new(self)?;
let request_settings = ChronicleRequestBuilder::new(self)?;

let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, "http");

Expand Down Expand Up @@ -362,7 +362,7 @@ impl Encoder<(String, Vec<Event>)> for ChronicleEncoder {
// request. All possible values are pre-computed for direct use in
// producing a request.
#[derive(Clone, Debug)]
struct RequestSettings {
struct ChronicleRequestBuilder {
encoder: ChronicleEncoder,
}

Expand All @@ -382,7 +382,7 @@ impl AsRef<[u8]> for ChronicleRequestPayload {
}
}

impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
impl RequestBuilder<(String, Vec<Event>)> for ChronicleRequestBuilder {
type Metadata = EventFinalizers;
type Events = (String, Vec<Event>);
type Encoder = ChronicleEncoder;
Expand Down Expand Up @@ -423,7 +423,7 @@ impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
}
}

impl RequestSettings {
impl ChronicleRequestBuilder {
fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.config().build()?;
Expand Down
9 changes: 4 additions & 5 deletions src/sinks/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod chronicle_unstructured;
pub mod cloud_storage;
pub mod pubsub;
pub mod stackdriver;
pub mod stackdriver_metrics;

/// A monitored resource.
///
Expand Down Expand Up @@ -103,18 +102,18 @@ pub struct GcpResource {

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GcpSerie<'a> {
pub struct GcpSerie {
pub metric: GcpMetric,
pub resource: GcpResource,
pub metric_kind: GcpMetricKind,
pub value_type: GcpValueType,
pub points: &'a [GcpPoint],
pub points: Vec<GcpPoint>,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GcpSeries<'a> {
time_series: &'a [GcpSerie<'a>],
time_series: &'a [GcpSerie],
}

fn serialize_int64_value<S>(value: &Option<i64>, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -180,7 +179,7 @@ mod tests {
},
metric_kind: GcpMetricKind::Gauge,
value_type: GcpValueType::Int64,
points: &[GcpPoint {
points: vec![GcpPoint {
interval: GcpInterval {
start_time: None,
end_time,
Expand Down
168 changes: 168 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use bytes::Bytes;
use goauth::scopes::Scope;
use http::{Request, Uri};

use crate::{
gcp::{GcpAuthConfig, GcpAuthenticator},
http::HttpClient,
sinks::{
gcp,
prelude::*,
util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder},
},
};

use super::{
request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
sink::StackdriverMetricsSink,
};

/// Configuration for the `gcp_stackdriver_metrics` sink.
#[configurable_component(sink(
"gcp_stackdriver_metrics",
"Deliver metrics to GCP's Cloud Monitoring system."
))]
#[derive(Clone, Debug, Default)]
pub struct StackdriverConfig {
#[serde(skip, default = "default_endpoint")]
pub(super) endpoint: String,

/// The project ID to which to publish metrics.
///
/// See the [Google Cloud Platform project management documentation][project_docs] for more details.
///
/// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
pub(super) project_id: String,

/// The monitored resource to associate the metrics with.
pub(super) resource: gcp::GcpTypedResource,

#[serde(flatten)]
pub(super) auth: GcpAuthConfig,

/// The default namespace to use for metrics that do not have one.
///
/// Metrics with the same name can only be differentiated by their namespace, and not all
/// metrics have their own namespace.
#[serde(default = "default_metric_namespace_value")]
pub(super) default_namespace: String,

#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig,

#[configurable(derived)]
#[serde(default)]
pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,

#[configurable(derived)]
pub(super) tls: Option<TlsConfig>,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub(super) acknowledgements: AcknowledgementsConfig,
}

fn default_metric_namespace_value() -> String {
"namespace".to_string()
}

fn default_endpoint() -> String {
"https://monitoring.googleapis.com".to_string()
}

impl_generate_config_from_default!(StackdriverConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "gcp_stackdriver_metrics")]
impl SinkConfig for StackdriverConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let auth = self.auth.build(Scope::MonitoringWrite).await?;

let healthcheck = healthcheck().boxed();
let started = chrono::Utc::now();
let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, cx.proxy())?;

let batch_settings = self.batch.validate()?.into_batcher_settings()?;

let request_builder = StackdriverMetricsRequestBuilder {
encoder: StackdriverMetricsEncoder {
default_namespace: self.default_namespace.clone(),
started,
resource: self.resource.clone(),
},
};

let request_limits = self.request.unwrap_with(
&TowerRequestConfig::default()
.rate_limit_duration_secs(1)
.rate_limit_num(1000),
);

let uri: Uri = format!(
"{}/v3/projects/{}/timeSeries",
self.endpoint, self.project_id
)
.parse()?;

auth.spawn_regenerate_token();

let stackdriver_metrics_service_request_builder =
StackdriverMetricsServiceRequestBuilder { uri, auth };

let service = HttpService::new(client, stackdriver_metrics_service_request_builder);

let service = ServiceBuilder::new()
.settings(request_limits, http_response_retry_logic())
.service(service);

let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::metric()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[derive(Clone, Copy, Debug, Default)]
pub struct StackdriverMetricsDefaultBatchSettings;

impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(1);
const MAX_BYTES: Option<usize> = None;
const TIMEOUT_SECS: f64 = 1.0;
}

#[derive(Debug, Clone)]
pub(super) struct StackdriverMetricsServiceRequestBuilder {
pub(super) uri: Uri,
pub(super) auth: GcpAuthenticator,
}

impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder {
fn build(&self, body: Bytes) -> Request<Bytes> {
let mut request = Request::post(self.uri.clone())
.header("Content-Type", "application/json")
.body(body)
.unwrap();

self.auth.apply(&mut request);

request
}
}

async fn healthcheck() -> crate::Result<()> {
Ok(())
}
9 changes: 9 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! GCP Cloud Monitoring (formerly Stackdriver Metrics) sink.
//! Sends metrics to [GPC Cloud Monitoring][cloud monitoring].
//!
//! [cloud monitoring]: https://cloud.google.com/monitoring/docs/monitoring-overview
mod config;
mod request_builder;
mod sink;
#[cfg(test)]
mod tests;
138 changes: 138 additions & 0 deletions src/sinks/gcp/stackdriver/metrics/request_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::io;

use bytes::Bytes;
use chrono::Utc;
use vector_core::event::{Metric, MetricValue};

use crate::sinks::{gcp, prelude::*, util::http::HttpRequest};

#[derive(Clone, Debug)]
pub(super) struct StackdriverMetricsRequestBuilder {
pub(super) encoder: StackdriverMetricsEncoder,
}

impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder {
type Metadata = EventFinalizers;
type Events = Vec<Metric>;
type Encoder = StackdriverMetricsEncoder;
type Payload = Bytes;
type Request = HttpRequest;
type Error = io::Error;

fn compression(&self) -> Compression {
Compression::None
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
mut events: Vec<Metric>,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = events.take_finalizers();
let builder = RequestMetadataBuilder::from_events(&events);
(finalizers, builder, events)
}

fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
HttpRequest::new(payload.into_payload(), metadata, request_metadata)
}
}

#[derive(Clone, Debug)]
pub struct StackdriverMetricsEncoder {
pub(super) default_namespace: String,
pub(super) started: chrono::DateTime<Utc>,
pub(super) resource: gcp::GcpTypedResource,
}

impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder {
/// Create the object defined [here][api_docs].
///
/// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create
fn encode_input(
&self,
input: Vec<Metric>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
let time_series = input
.into_iter()
.map(|metric| {
byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());

let (series, data, _metadata) = metric.into_parts();
let namespace = series
.name
.namespace
.unwrap_or_else(|| self.default_namespace.clone());
let metric_type = format!(
"custom.googleapis.com/{}/metrics/{}",
namespace, series.name.name
);

let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now);

let (point_value, interval, metric_kind) = match &data.value {
MetricValue::Counter { value } => {
let interval = gcp::GcpInterval {
start_time: Some(self.started),
end_time,
};

(*value, interval, gcp::GcpMetricKind::Cumulative)
}
MetricValue::Gauge { value } => {
let interval = gcp::GcpInterval {
start_time: None,
end_time,
};

(*value, interval, gcp::GcpMetricKind::Gauge)
}
_ => {
unreachable!("sink has filtered out all metrics that aren't counter or gauge by this point")
},
};
let metric_labels = series
.tags
.unwrap_or_default()
.into_iter_single()
.collect::<std::collections::HashMap<_, _>>();

gcp::GcpSerie {
metric: gcp::GcpMetric {
r#type: metric_type,
labels: metric_labels,
},
resource: gcp::GcpResource {
r#type: self.resource.r#type.clone(),
labels: self.resource.labels.clone(),
},
metric_kind,
value_type: gcp::GcpValueType::Int64,
points: vec![gcp::GcpPoint {
interval,
value: gcp::GcpPointValue {
int64_value: Some(point_value as i64),
},
}],
}
})
.collect::<Vec<_>>();

let series = gcp::GcpSeries {
time_series: &time_series,
};

let body = crate::serde::json::to_bytes(&series).unwrap().freeze();
writer.write_all(&body).map(|()| (body.len(), byte_size))
}
}
Loading
Loading