Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): Temporarily restore previous configuration keys for bucket splitting #2780

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

**Features**:

- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682))

## 23.11.2
Expand Down
42 changes: 4 additions & 38 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,14 +934,6 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

fn default_metrics_max_batch_size_processing() -> ByteSize {
ByteSize::kibibytes(100)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
Expand Down Expand Up @@ -997,23 +989,6 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
#[serde(default)]
pub metrics_partitions: Option<u64>,
/// The approximate maximum number of bytes submitted in one metrics batch.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch on processing relays.
#[serde(default = "default_metrics_max_batch_size_processing")]
pub metrics_max_batch_size_processing: ByteSize,
}

impl Default for Processing {
Expand All @@ -1032,9 +1007,6 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(),
}
}
}
Expand Down Expand Up @@ -2089,20 +2061,14 @@ impl Config {

/// Amount of metric partitions.
pub fn metrics_partitions(&self) -> Option<u64> {
self.values.processing.metrics_partitions
// TODO(dav1dde): move config to a better place
self.values.aggregator.flush_partitions
}

/// Maximum metrics batch size in bytes.
pub fn metrics_max_batch_size_bytes(&self) -> usize {
self.values.processing.metrics_max_batch_size.as_bytes()
}

/// Maximum metrics batch size in bytes for processing relays.
pub fn metrics_max_batch_size_bytes_processing(&self) -> usize {
self.values
.processing
.metrics_max_batch_size_processing
.as_bytes()
// TODO(dav1dde): move config to a better place
self.values.aggregator.max_flush_bytes
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
Expand Down
16 changes: 16 additions & 0 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ pub struct AggregatorServiceConfig {
/// This prevents flushing all buckets from a bucket interval at the same
/// time by computing an offset from the hash of the given key.
pub shift_key: ShiftKey,

// TODO(dav1dde): move these config values to a better spot
/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
pub flush_partitions: Option<u64>,
}

impl Default for AggregatorServiceConfig {
Expand All @@ -106,6 +120,8 @@ impl Default for AggregatorServiceConfig {
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
shift_key: ShiftKey::default(),
max_flush_bytes: 5_000_000, // 5 MB
flush_partitions: None,
}
}
}
Expand Down
15 changes: 2 additions & 13 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2449,19 +2449,8 @@ impl EnvelopeProcessorService {
extraction_mode,
} = message;

let (partitions, max_batch_size_bytes) = if self.inner.config.processing_enabled() {
// Partitioning on processing relays does not make sense, they end up all
// in the same Kafka topic anyways and the partition key is ignored.
(
None,
self.inner.config.metrics_max_batch_size_bytes_processing(),
)
} else {
(
self.inner.config.metrics_partitions(),
self.inner.config.metrics_max_batch_size_bytes(),
)
};
let partitions = self.inner.config.metrics_partitions();
let max_batch_size_bytes = self.inner.config.metrics_max_batch_size_bytes();

let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_partitions": metrics_partitions,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"flush_partitions": metrics_partitions,
},
}
relay = relay(mini_sentry, options=relay_config)
Expand Down Expand Up @@ -172,14 +172,14 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve
relay_config = {
"processing": {
"max_session_secs_in_past": forever,
"metrics_max_batch_size": max_batch_size,
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"debounce_delay": 0,
"max_secs_in_past": forever,
"max_secs_in_future": forever,
"max_flush_bytes": max_batch_size,
},
}
relay = relay(mini_sentry, options=relay_config)
Expand Down
Loading