Skip to content

Commit

Permalink
don't go through envelope manager, default 100kib batch size processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 23, 2023
1 parent 78053ca commit cdbe965
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 80 deletions.
19 changes: 9 additions & 10 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,10 @@ fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

fn default_metrics_max_batch_size_processing() -> ByteSize {
ByteSize::kibibytes(100)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
Expand Down Expand Up @@ -997,12 +1001,9 @@ pub struct Processing {
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch for processing
/// relays.
///
/// Overrides [`Self::metrics_max_batch_size`] when specified on processing relays.
#[serde(default)]
pub metrics_max_batch_size_processing: Option<ByteSize>,
/// The approximate maximum number of bytes submitted in one metrics batch on processing relays.
#[serde(default = "default_metrics_max_batch_size_processing")]
pub metrics_max_batch_size_processing: ByteSize,
}

impl Default for Processing {
Expand All @@ -1023,7 +1024,7 @@ impl Default for Processing {
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: None,
metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(),
}
}
}
Expand Down Expand Up @@ -2081,9 +2082,7 @@ impl Config {
self.values
.processing
.metrics_max_batch_size_processing
.as_ref()
.map(|s| s.as_bytes())
.unwrap_or_else(|| self.metrics_max_batch_size_bytes())
.as_bytes()
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
Expand Down
60 changes: 1 addition & 59 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use chrono::Utc;
use relay_base_schema::project::ProjectKey;
use relay_config::{Config, HttpEncoding};
use relay_event_schema::protocol::ClientReport;
use relay_metrics::Bucket;
use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, NoResponse};
Expand All @@ -27,9 +26,7 @@ use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::statsd::RelayHistograms;
use crate::utils::{ExtractionMode, ManagedEnvelope};

use super::processor::EncodeMetrics;
use crate::utils::ManagedEnvelope;

/// Error created while handling [`SendEnvelope`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -139,26 +136,11 @@ pub struct SendClientReports {
pub scoping: Scoping,
}

/// Sends a batch of pre-aggregated metrics to the upstream or Kafka.
///
/// Responds with `Err` if there was an error sending some or all of the buckets, containing the
/// failed buckets.
#[derive(Debug)]
pub struct SendMetrics {
/// The pre-aggregated metric buckets.
pub buckets: Vec<Bucket>,
/// Scoping information for the metrics.
pub scoping: Scoping,
/// Transaction extraction mode.
pub extraction_mode: ExtractionMode,
}

/// Dispatch service for generating and submitting Envelopes.
#[derive(Debug)]
pub enum EnvelopeManager {
SubmitEnvelope(Box<SubmitEnvelope>),
SendClientReports(SendClientReports),
SendMetrics(SendMetrics),
}

impl relay_system::Interface for EnvelopeManager {}
Expand All @@ -179,14 +161,6 @@ impl FromMessage<SendClientReports> for EnvelopeManager {
}
}

impl FromMessage<SendMetrics> for EnvelopeManager {
type Response = NoResponse;

fn from_message(message: SendMetrics, _: ()) -> Self {
Self::SendMetrics(message)
}
}

/// Service implementing the [`EnvelopeManager`] interface.
///
/// This service will produce envelopes to one the following backends:
Expand Down Expand Up @@ -330,35 +304,6 @@ impl EnvelopeManagerService {
}
}

async fn handle_send_metrics(&self, message: SendMetrics) {
let SendMetrics {
buckets,
scoping,
extraction_mode,
} = message;

#[allow(unused_mut)]
let mut partitions = self.config.metrics_partitions();
#[allow(unused_mut)]
let mut max_batch_size_bytes = self.config.metrics_max_batch_size_bytes();

#[cfg(feature = "processing")]
if self.store_forwarder.is_some() {
// Partitioning on processing relays does not make sense, they end up all
// in the same Kafka topic anyways and the partition key is ignored.
partitions = None;
max_batch_size_bytes = self.config.metrics_max_batch_size_bytes_processing();
}

self.enveloper_processor.send(EncodeMetrics {
buckets,
scoping,
extraction_mode,
max_batch_size_bytes,
partitions,
});
}

async fn handle_send_client_reports(&self, message: SendClientReports) {
let SendClientReports {
client_reports,
Expand Down Expand Up @@ -398,9 +343,6 @@ impl EnvelopeManagerService {
EnvelopeManager::SendClientReports(message) => {
self.handle_send_client_reports(message).await;
}
EnvelopeManager::SendMetrics(message) => {
self.handle_send_metrics(message).await;
}
}
}
}
Expand Down
23 changes: 17 additions & 6 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,6 @@ pub struct EncodeMetrics {
pub scoping: Scoping,
/// Transaction metrics extraction mode.
pub extraction_mode: ExtractionMode,
/// Approximate size in bytes to batch buckets.
pub max_batch_size_bytes: usize,
/// Amount of logical partitions for the buckets.
pub partitions: Option<u64>,
}

/// Applies rate limits to metrics buckets and forwards them to the envelope manager.
Expand Down Expand Up @@ -3108,11 +3104,26 @@ impl EnvelopeProcessorService {
let EncodeMetrics {
buckets,
scoping,
max_batch_size_bytes,
extraction_mode,
partitions,
} = message;

#[cfg(not(feature = "processing"))]
let (partitions, max_batch_size_bytes) = {
(
self.inner.config.metrics_partitions(),
self.inner.config.metrics_max_batch_size_bytes(),
)
};
#[cfg(feature = "processing")]
let (partitions, max_batch_size_bytes) = {
// Partitioning on processing relays does not make sense, they end up all
// in the same Kafka topic anyways and the partition key is ignored.
(
None,
self.inner.config.metrics_max_batch_size_bytes_processing(),
)
};

let upstream = self.inner.config.upstream_descriptor();
let dsn = PartialDsn {
scheme: upstream.scheme(),
Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use smallvec::SmallVec;
use tokio::time::Instant;
use url::Url;

use crate::actors::envelopes::{EnvelopeManager, SendMetrics};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::processor::EnvelopeProcessor;
#[cfg(feature = "processing")]
Expand All @@ -32,6 +31,8 @@ use crate::utils::{
EnvelopeLimiter, ExtractionMode, ManagedEnvelope, MetricsLimiter, RetryBackoff,
};

use super::processor::EncodeMetrics;

/// The expiry status of a project state. Return value of [`ProjectState::check_expiry`].
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
enum Expiry {
Expand Down Expand Up @@ -1020,7 +1021,7 @@ impl Project {
pub fn flush_buckets(
&mut self,
project_cache: Addr<ProjectCache>,
envelope_manager: Addr<EnvelopeManager>,
envelope_processor: Addr<EnvelopeProcessor>,
buckets: Vec<Bucket>,
) {
let Some(project_state) = self.get_cached_state(project_cache, false) else {
Expand All @@ -1043,7 +1044,7 @@ impl Project {
let extraction_mode = ExtractionMode::from_usage(usage);

if !buckets.is_empty() {
envelope_manager.send(SendMetrics {
envelope_processor.send(EncodeMetrics {
buckets,
scoping,
extraction_mode,
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,11 +858,11 @@ impl ProjectCacheBroker {
}

fn handle_flush_buckets(&mut self, message: FlushBuckets) {
let envelope_manager = self.services.envelope_manager.clone();
let envelope_processor = self.services.envelope_processor.clone();
let project_cache = self.services.project_cache.clone();

self.get_or_create_project(message.project_key)
.flush_buckets(project_cache, envelope_manager, message.buckets);
.flush_buckets(project_cache, envelope_processor, message.buckets);
}

fn handle_buffer_index(&mut self, message: UpdateBufferIndex) {
Expand Down

0 comments on commit cdbe965

Please sign in to comment.