Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(actix): Update Store Actor #1397

Merged
merged 13 commits into from
Aug 10, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- Improve performance of Redis accesses by not running `PING` everytime a connection is reused. ([#1394](https://github.com/getsentry/relay/pull/1394))
- Distinguish between various discard reasons for profiles. ([#1395](https://github.com/getsentry/relay/pull/1395))
- Add missing fields to GPUContext ([#1391](https://github.com/getsentry/relay/pull/1391))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this accidentally moved to the wrong section?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, the line below this :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be in the Internal section? (At least that is where the entry for the change of the Healthcheck actor is as well)

- Store actor now uses Tokio for message handling instead of Actix. ([#1397](https://github.com/getsentry/relay/pull/1397))

## 22.7.0

Expand Down
41 changes: 33 additions & 8 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ use crate::extractors::{PartialDsn, RequestMeta};
use crate::http::{HttpError, Request, RequestBuilder, Response};
use crate::service::ServerError;
use crate::statsd::{RelayCounters, RelayHistograms, RelaySets, RelayTimers};
use crate::utils::{self, EnvelopeContext, FutureExt, SendWithOutcome};
use crate::utils::{self, EnvelopeContext, FutureExt as _, SendWithOutcome};

#[cfg(feature = "processing")]
use crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder};
use {
crate::actors::store::{StoreAddr, StoreEnvelope, StoreError, StoreForwarder},
futures::{FutureExt, TryFutureExt},
tokio::runtime::Runtime,
};

#[derive(Debug, Fail)]
pub enum QueueEnvelopeError {
Expand Down Expand Up @@ -153,18 +157,31 @@ pub struct EnvelopeManager {
captures: BTreeMap<EventId, CapturedEnvelope>,
processor: Addr<EnvelopeProcessor>,
#[cfg(feature = "processing")]
store_forwarder: Option<Addr<StoreForwarder>>,
store_forwarder: Option<StoreAddr<StoreEnvelope>>,
#[cfg(feature = "processing")]
_runtime: Runtime,
}

impl EnvelopeManager {
pub fn create(
config: Arc<Config>,
processor: Addr<EnvelopeProcessor>,
) -> Result<Self, ServerError> {
// Enter the tokio runtime so we can start spawning tasks from the outside.
#[cfg(feature = "processing")]
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

#[cfg(feature = "processing")]
let _guard = runtime.enter();

#[cfg(feature = "processing")]
let store_forwarder = if config.processing_enabled() {
let actor = StoreForwarder::create(config.clone())?;
Some(Arbiter::start(move |_| actor))
Some(actor.start())
} else {
None
};
Expand All @@ -176,6 +193,8 @@ impl EnvelopeManager {
processor,
#[cfg(feature = "processing")]
store_forwarder,
#[cfg(feature = "processing")]
_runtime: runtime,
})
}

Expand All @@ -189,17 +208,23 @@ impl EnvelopeManager {
) -> ResponseFuture<(), SendEnvelopeError> {
#[cfg(feature = "processing")]
{
if let Some(ref store_forwarder) = self.store_forwarder {
if let Some(store_forwarder) = self.store_forwarder.clone() {
relay_log::trace!("sending envelope to kafka");
let future = store_forwarder
.send(StoreEnvelope {
let fut = async move {
let addr = store_forwarder.clone();
addr.send(StoreEnvelope {
envelope,
start_time,
scoping,
})
.await
tobias-wilfert marked this conversation as resolved.
Show resolved Hide resolved
};

let future = fut
.boxed_local()
.compat()
.map_err(|_| SendEnvelopeError::ScheduleFailed)
.and_then(|result| result.map_err(SendEnvelopeError::StoreFailed));

return Box::new(future);
}
}
Expand Down
Loading