-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(actix): Update the Outcome services #1441
Conversation
@@ -2276,6 +2276,8 @@ mod tests { | |||
} | |||
|
|||
#[test] | |||
#[ignore = "The current Register panics if the Addr of an Actor (that is not yet started) is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a comment here to make this visible. These tests must be fixed in a follow up when Addr
s allow to mock services.
relay-server/src/service.rs
Outdated
let outcome_producer = OutcomeProducer::create(config.clone())?; | ||
let outcome_producer = Arbiter::start(|_| outcome_producer); | ||
registry.set(outcome_producer.clone()); | ||
// TODO(tobias): Check if this is good enough or if we want this to have its own tokio runtime? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Kafka producer used to consume a fair amount of CPU and introduced delays for other services sharing the same runtime in the past. We can benchmark this again at a later point.
For now, let's create a dedicated runtime for the outcome producer. Feel free to add the OutcomeAggregator
into that same runtime, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actor doesn't need to handle the shutdown message? I'm a bit surprised by this as I would expect this to try and flush all things on shutdown. But I guess we only have a timeout somewhere and hope things happen in time?
(pressing submit review now on an incomplete review as the refresh button has appeared)
match compat::send(UpstreamRelay::from_registry(), SendQuery(request)).await { | ||
Ok(_) => relay_log::trace!("outcome batch sent."), | ||
Err(error) => { | ||
relay_log::error!("outcome batch sending failed with: {}", LogError(&error)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not this PR, but I'm rather surprised to find we just drop the outcomes onto the floor here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed two more points to solve, therefore retracting my approval:
- We're still implementing
actix::Message
on some messages. - Through your refactor it became apparent that we're swallowing errors when producing outcomes to Kafka. Thank you for uncovering that! We should log this error in the OutcomeProducer, instead.
General
This PR updates the services contained in
outcome.rs
namelyHttpOutcomeProducer
,ClientReportOutcomeProducer
andOutcomeProducer
and theOutcomeAggregator
service in
outcome_aggregator.rs
since it is heavily intertwined with the others.Design choices
run_later
mechanic now usetokio::time:sleep
in combinationwith
future::Pending
. This allows for the use oftokio::select!
inside the main messagehandle loop of the services. Other options of doing this with internal channels were
considered but deemed undesirable since they are less performant.
Future Steps
address if the Service isn't created/started. This will need to be addressed by
either emulating the behaviour of the old Actix Register or by Mocking the
Services necessary in the Tests. The later approach might require a significant change of
the Registry we currently have.
#skip-changelog