Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(metrics): Temporarily restore previous configuration keys for buc…
Browse files Browse the repository at this point in the history
…ket splitting
Dav1dde committed Nov 28, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent caac49f commit 52443d4
Showing 5 changed files with 26 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

## Unreleased

**Features**:

- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682))

## 23.11.2
42 changes: 4 additions & 38 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
@@ -934,14 +934,6 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

fn default_metrics_max_batch_size_processing() -> ByteSize {
ByteSize::kibibytes(100)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
@@ -997,23 +989,6 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
#[serde(default)]
pub metrics_partitions: Option<u64>,
/// The approximate maximum number of bytes submitted in one metrics batch.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch on processing relays.
#[serde(default = "default_metrics_max_batch_size_processing")]
pub metrics_max_batch_size_processing: ByteSize,
}

impl Default for Processing {
@@ -1032,9 +1007,6 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(),
}
}
}
@@ -2089,20 +2061,14 @@ impl Config {

/// Amount of metric partitions.
pub fn metrics_partitions(&self) -> Option<u64> {
self.values.processing.metrics_partitions
// TODO(dav1dde): move config to a better place
self.values.aggregator.flush_partitions
}

/// Maximum metrics batch size in bytes.
pub fn metrics_max_batch_size_bytes(&self) -> usize {
self.values.processing.metrics_max_batch_size.as_bytes()
}

/// Maximum metrics batch size in bytes for processing relays.
pub fn metrics_max_batch_size_bytes_processing(&self) -> usize {
self.values
.processing
.metrics_max_batch_size_processing
.as_bytes()
// TODO(dav1dde): move config to a better place
self.values.aggregator.max_flush_bytes
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
16 changes: 16 additions & 0 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
@@ -90,6 +90,20 @@ pub struct AggregatorServiceConfig {
/// This prevents flushing all buckets from a bucket interval at the same
/// time by computing an offset from the hash of the given key.
pub shift_key: ShiftKey,

// TODO(dav1dde): move these config values to a better spot
/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
pub flush_partitions: Option<u64>,
}

impl Default for AggregatorServiceConfig {
@@ -106,6 +120,8 @@ impl Default for AggregatorServiceConfig {
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
max_flush_bytes: 5_000_000, // 5 MB
flush_partitions: None,
}
}
}
15 changes: 2 additions & 13 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
@@ -2449,19 +2449,8 @@ impl EnvelopeProcessorService {
extraction_mode,
} = message;

let (partitions, max_batch_size_bytes) = if self.inner.config.processing_enabled() {
// Partitioning on processing relays does not make sense, they end up all
// in the same Kafka topic anyways and the partition key is ignored.
(
None,
self.inner.config.metrics_max_batch_size_bytes_processing(),
)
} else {
(
self.inner.config.metrics_partitions(),
self.inner.config.metrics_max_batch_size_bytes(),
)
};
let partitions = self.inner.config.metrics_partitions();
let max_batch_size_bytes = self.inner.config.metrics_max_batch_size_bytes();

let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn {
4 changes: 2 additions & 2 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
@@ -130,14 +130,14 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_partitions": metrics_partitions,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"flush_partitions": metrics_partitions,
},
}
relay = relay(mini_sentry, options=relay_config)
@@ -172,14 +172,14 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_max_batch_size": max_batch_size,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"max_flush_bytes": max_batch_size,
},
}
relay = relay(mini_sentry, options=relay_config)

0 comments on commit 52443d4

Please sign in to comment.