diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 63be96a219..4232833517 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -21,6 +21,7 @@ use crate::actors::project_local::{LocalProjectSource, LocalProjectSourceService #[cfg(feature = "processing")] use crate::actors::project_redis::RedisProjectSource; use crate::actors::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; +use crate::actors::upstream::UpstreamRelay; use crate::envelope::Envelope; use crate::service::REGISTRY; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; @@ -294,9 +295,14 @@ struct ProjectSource { impl ProjectSource { /// Starts all project source services in the current runtime. - pub fn start(config: Arc, _redis: Option) -> Self { + pub fn start( + config: Arc, + upstream_relay: Addr, + _redis: Option, + ) -> Self { let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = UpstreamProjectSourceService::new(config.clone()).start(); + let upstream_source = + UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); #[cfg(feature = "processing")] let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool)); @@ -380,6 +386,7 @@ struct UpdateProjectState { #[derive(Debug)] struct ProjectCacheBroker { config: Arc, + envelope_processor: Addr, // Need hashbrown because drain_filter is not stable in std yet. projects: hashbrown::HashMap, garbage_disposal: GarbageDisposal, @@ -616,7 +623,7 @@ impl ProjectCacheBroker { } } - EnvelopeProcessor::from_registry().send(process); + self.envelope_processor.send(process); } } @@ -706,13 +713,25 @@ impl ProjectCacheBroker { #[derive(Debug)] pub struct ProjectCacheService { config: Arc, + envelope_processor: Addr, + upstream_relay: Addr, redis: Option, } impl ProjectCacheService { /// Creates a new `ProjectCacheService`. - pub fn new(config: Arc, redis: Option) -> Self { - Self { config, redis } + pub fn new( + config: Arc, + envelope_processor: Addr, + upstream_relay: Addr, + redis: Option, + ) -> Self { + Self { + config, + envelope_processor, + upstream_relay, + redis, + } } } @@ -720,7 +739,12 @@ impl Service for ProjectCacheService { type Interface = ProjectCache; fn spawn_handler(self, mut rx: relay_system::Receiver) { - let Self { config, redis } = self; + let Self { + config, + redis, + envelope_processor, + upstream_relay, + } = self; tokio::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); @@ -736,9 +760,10 @@ impl Service for ProjectCacheService { // fetches via the project source. let mut broker = ProjectCacheBroker { config: config.clone(), + envelope_processor, projects: hashbrown::HashMap::new(), garbage_disposal: GarbageDisposal::new(), - source: ProjectSource::start(config, redis), + source: ProjectSource::start(config, upstream_relay, redis), state_tx, buffer_tx, index: Default::default(), diff --git a/relay-server/src/actors/project_upstream.rs b/relay-server/src/actors/project_upstream.rs index 2e5d5d9143..d3944d96fe 100644 --- a/relay-server/src/actors/project_upstream.rs +++ b/relay-server/src/actors/project_upstream.rs @@ -12,7 +12,7 @@ use relay_dynamic_config::ErrorBoundary; use relay_log::LogError; use relay_statsd::metric; use relay_system::{ - BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service, + Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service, }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -157,6 +157,7 @@ struct UpstreamResponse { pub struct UpstreamProjectSourceService { backoff: RetryBackoff, config: Arc, + upstream_relay: Addr, state_channels: ProjectStateChannels, inner_tx: mpsc::UnboundedSender>>, inner_rx: mpsc::UnboundedReceiver>>, @@ -165,13 +166,14 @@ pub struct UpstreamProjectSourceService { impl UpstreamProjectSourceService { /// Creates a new [`UpstreamProjectSourceService`] instance. - pub fn new(config: Arc) -> Self { + pub fn new(config: Arc, upstream_relay: Addr) -> Self { let (inner_tx, inner_rx) = mpsc::unbounded_channel(); Self { backoff: RetryBackoff::new(config.http_max_retry_interval()), state_channels: HashMap::new(), fetch_handle: SleepHandle::idle(), + upstream_relay, config, inner_tx, inner_rx, @@ -247,6 +249,7 @@ impl UpstreamProjectSourceService { /// channels are pushed in the meanwhile, this will reschedule automatically. async fn fetch_states( config: Arc, + upstream_relay: Addr, channels: ChannelsBatch, ) -> Vec> { let request_start = Instant::now(); @@ -275,8 +278,9 @@ impl UpstreamProjectSourceService { // count number of http requests for project states metric!(counter(RelayCounters::ProjectStateRequest) += 1); + let upstream_relay = upstream_relay.clone(); requests.push(async move { - match UpstreamRelay::from_registry().send(SendQuery(query)).await { + match upstream_relay.send(SendQuery(query)).await { Ok(response) => Some(UpstreamResponse { channels_batch, response, @@ -407,9 +411,10 @@ impl UpstreamProjectSourceService { let config = self.config.clone(); let inner_tx = self.inner_tx.clone(); let channels = self.prepare_batches(); + let upstream_relay = self.upstream_relay.clone(); tokio::spawn(async move { - let responses = Self::fetch_states(config, channels).await; + let responses = Self::fetch_states(config, upstream_relay, channels).await; // Send back all resolved responses and also unused channels. // These responses will be handled by `handle_responses` function. if let Err(err) = inner_tx.send(responses) { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 9a2bbdf91c..cb1490eba3 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -140,8 +140,13 @@ impl ServiceState { let envelope_manager = envelope_manager.start(); let test_store = TestStoreService::new(config.clone()).start(); - let project_cache = - ProjectCacheService::new(config.clone(), redis_pool).start_in(&project_runtime); + let project_cache = ProjectCacheService::new( + config.clone(), + processor.clone(), + upstream_relay.clone(), + redis_pool, + ) + .start_in(&project_runtime); let health_check = HealthCheckService::new(config.clone()).start(); let relay_cache = RelayCacheService::new(config.clone()).start();