Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prometheus): add more compression algorithms to Prometheus Remote Write #17334

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 55 additions & 6 deletions src/sinks/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Read;
use std::sync::Arc;
use std::task;

Expand Down Expand Up @@ -123,10 +124,40 @@ pub struct RemoteWriteConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
#[serde(default)]
pub compression: Compression,
}

impl_generate_config_from_default!(RemoteWriteConfig);

/// Supported compression types for Prometheus Remote Write.
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum Compression {
/// Snappy.
#[derivative(Default)]
Snappy,

/// Gzip.
Gzip,

/// Zstandard.
Zstd,
}

const fn convert_compression_to_content_encoding(compression: Compression) -> &'static str {
match compression {
Compression::Snappy => "snappy",
Compression::Gzip => "gzip",
Compression::Zstd => "zstd",
}
}

#[async_trait::async_trait]
impl SinkConfig for RemoteWriteConfig {
async fn build(
Expand Down Expand Up @@ -181,6 +212,7 @@ impl SinkConfig for RemoteWriteConfig {
aws_region,
credentials_provider,
http_auth,
compression: self.compression,
});

let healthcheck = healthcheck(client.clone(), Arc::clone(&http_request_builder)).boxed();
Expand All @@ -190,6 +222,7 @@ impl SinkConfig for RemoteWriteConfig {
buckets,
quantiles,
http_request_builder,
compression: self.compression,
};

let sink = {
Expand Down Expand Up @@ -274,6 +307,7 @@ struct RemoteWriteService {
buckets: Vec<f64>,
quantiles: Vec<f64>,
http_request_builder: Arc<HttpRequestBuilder>,
compression: Compression,
}

impl RemoteWriteService {
Expand Down Expand Up @@ -309,7 +343,7 @@ impl Service<PartitionInnerBuffer<Vec<Metric>, PartitionKey>> for RemoteWriteSer
fn call(&mut self, buffer: PartitionInnerBuffer<Vec<Metric>, PartitionKey>) -> Self::Future {
let (events, key) = buffer.into_parts();
let body = self.encode_events(events);
let body = snap_block(body);
let body = compress_block(self.compression, body);

let client = self.client.clone();
let request_builder = Arc::clone(&self.http_request_builder);
Expand Down Expand Up @@ -341,6 +375,7 @@ pub struct HttpRequestBuilder {
pub aws_region: Option<Region>,
pub http_auth: Option<Auth>,
pub credentials_provider: Option<SharedCredentialsProvider>,
pub compression: Compression,
}

impl HttpRequestBuilder {
Expand All @@ -350,11 +385,13 @@ impl HttpRequestBuilder {
body: Vec<u8>,
tenant_id: Option<String>,
) -> Result<Request<hyper::Body>, crate::Error> {
let content_encoding = convert_compression_to_content_encoding(self.compression);

let mut builder = http::Request::builder()
.method(method)
.uri(self.endpoint.clone())
.header("X-Prometheus-Remote-Write-Version", "0.1.0")
.header("Content-Encoding", "snappy")
.header("Content-Encoding", content_encoding)
.header("Content-Type", "application/x-protobuf");

if let Some(tenant_id) = &tenant_id {
Expand All @@ -377,10 +414,22 @@ impl HttpRequestBuilder {
}
}

fn snap_block(data: Bytes) -> Vec<u8> {
snap::raw::Encoder::new()
.compress_vec(&data)
.expect("Out of memory")
fn compress_block(compression: Compression, data: Bytes) -> Vec<u8> {
match compression {
Compression::Snappy => snap::raw::Encoder::new()
.compress_vec(&data)
.expect("snap compression failed, please report"),
Compression::Gzip => {
let mut buf = Vec::new();
flate2::read::GzEncoder::new(data.as_ref(), flate2::Compression::default())
.read_to_end(&mut buf)
.expect("gzip compression failed, please report");
buf
}
Compression::Zstd => {
zstd::encode_all(data.as_ref(), 0).expect("zstd compression failed, please report")
}
}
}

async fn sign_request(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ base: components: sinks: prometheus_remote_write: configuration: {
items: type: float: {}
}
}
compression: {
description: "Supported compression types for Prometheus Remote Write."
required: false
type: string: {
default: "snappy"
enum: {
gzip: "Gzip."
snappy: "Snappy."
zstd: "Zstandard."
}
}
}
default_namespace: {
description: """
The default namespace for any metrics sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ components: sinks: prometheus_remote_write: {
values for each name, Vector will only send the last value specified.
"""
}
compression_schemes: {
title: "Compression schemes"
body: """
Officially according to the [Prometheus Remote-Write specification](\(urls.prometheus_remote_write_spec)),
the only supported compression scheme is [Snappy](\(urls.snappy)). However,
there are a number of other implementations that do support other schemes. Thus
Vector also supports using Gzip and Zstd.
"""
}
}

telemetry: metrics: {
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ urls: {
prometheus_remote_integrations: "https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage"
prometheus_remote_write: "https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write"
prometheus_remote_write_protocol: "https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM/edit#heading=h.n0d0vphea3fe"
prometheus_remote_write_spec: "https://prometheus.io/docs/concepts/remote_write_spec/#protocol"
protobuf: "https://developers.google.com/protocol-buffers"
pulsar: "https://pulsar.apache.org/"
pulsar_protocol: "https://pulsar.apache.org/docs/en/develop-binary-protocol/"
Expand Down