Skip to content

Commit

Permalink
fix(metrics): Temporarily restore previous configuration keys for buc…
Browse files Browse the repository at this point in the history
…ket splitting (#2780)

Temporarily restore old config behaviour until we figure out a better
way to structure our configs to unblock production deployments.
  • Loading branch information
Dav1dde authored Nov 28, 2023
1 parent a8a66ba commit afe3e1b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

**Features**:

- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682))

**Internal**:
Expand Down
42 changes: 4 additions & 38 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,14 +934,6 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

fn default_metrics_max_batch_size_processing() -> ByteSize {
ByteSize::kibibytes(100)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
Expand Down Expand Up @@ -997,23 +989,6 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
#[serde(default)]
pub metrics_partitions: Option<u64>,
/// The approximate maximum number of bytes submitted in one metrics batch.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch on processing relays.
#[serde(default = "default_metrics_max_batch_size_processing")]
pub metrics_max_batch_size_processing: ByteSize,
}

impl Default for Processing {
Expand All @@ -1032,9 +1007,6 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(),
}
}
}
Expand Down Expand Up @@ -2089,20 +2061,14 @@ impl Config {

/// Amount of metric partitions.
pub fn metrics_partitions(&self) -> Option<u64> {
self.values.processing.metrics_partitions
// TODO(dav1dde): move config to a better place
self.values.aggregator.flush_partitions
}

/// Maximum metrics batch size in bytes.
pub fn metrics_max_batch_size_bytes(&self) -> usize {
self.values.processing.metrics_max_batch_size.as_bytes()
}

/// Maximum metrics batch size in bytes for processing relays.
pub fn metrics_max_batch_size_bytes_processing(&self) -> usize {
self.values
.processing
.metrics_max_batch_size_processing
.as_bytes()
// TODO(dav1dde): move config to a better place
self.values.aggregator.max_flush_bytes
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
Expand Down
16 changes: 16 additions & 0 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ pub struct AggregatorServiceConfig {
/// This prevents flushing all buckets from a bucket interval at the same
/// time by computing an offset from the hash of the given key.
pub shift_key: ShiftKey,

// TODO(dav1dde): move these config values to a better spot
/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
pub flush_partitions: Option<u64>,
}

impl Default for AggregatorServiceConfig {
Expand All @@ -106,6 +120,8 @@ impl Default for AggregatorServiceConfig {
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
max_flush_bytes: 5_000_000, // 5 MB
flush_partitions: None,
}
}
}
Expand Down
15 changes: 2 additions & 13 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2449,19 +2449,8 @@ impl EnvelopeProcessorService {
extraction_mode,
} = message;

let (partitions, max_batch_size_bytes) = if self.inner.config.processing_enabled() {
// Partitioning on processing relays does not make sense, they end up all
// in the same Kafka topic anyways and the partition key is ignored.
(
None,
self.inner.config.metrics_max_batch_size_bytes_processing(),
)
} else {
(
self.inner.config.metrics_partitions(),
self.inner.config.metrics_max_batch_size_bytes(),
)
};
let partitions = self.inner.config.metrics_partitions();
let max_batch_size_bytes = self.inner.config.metrics_max_batch_size_bytes();

let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_partitions": metrics_partitions,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"flush_partitions": metrics_partitions,
},
}
relay = relay(mini_sentry, options=relay_config)
Expand Down Expand Up @@ -172,14 +172,14 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_max_batch_size": max_batch_size,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"max_flush_bytes": max_batch_size,
},
}
relay = relay(mini_sentry, options=relay_config)
Expand Down

0 comments on commit afe3e1b

Please sign in to comment.