Skip to content

Commit

Permalink
fix: egress pool change wouldn't take effect until age out
Browse files Browse the repository at this point in the history
I noticed this while reading through the code; we were not updating the
round robin state which we maintain based on the configured egress pool
if the config updates and changes the pool.

We resolve this here by using an arcswap to keep a read-only reference
to the state; since has some async portions it is important to allow it
to continue its management of interior mutability so that it doesn't
acquire a mutex while awaiting on async state.
  • Loading branch information
wez committed Jan 9, 2025
1 parent 8824ca7 commit ebd9a00
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
25 changes: 20 additions & 5 deletions crates/kumod/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::smtp_dispatcher::SmtpProtocol;
use crate::smtp_server::{make_deferred_queue_config, RejectError, DEFERRED_QUEUE_NAME};
use crate::spool::SpoolManager;
use anyhow::Context;
use arc_swap::ArcSwap;
use chrono::{DateTime, Utc};
use config::epoch::{get_current_epoch, ConfigEpoch};
use config::{load_config, CallbackSignature, LuaConfig};
Expand Down Expand Up @@ -942,7 +943,7 @@ pub struct Queue {
queue_config: ConfigHandle<QueueConfig>,
metrics: OnceLock<ScheduledMetrics>,
activity: Activity,
rr: EgressPoolRoundRobin,
rr: ArcSwap<EgressPoolRoundRobin>,
next_config_refresh: StdMutex<Instant>,
warned_strategy_change: AtomicBool,
config_epoch: StdMutex<ConfigEpoch>,
Expand Down Expand Up @@ -994,7 +995,7 @@ impl Queue {
let queue_config = Self::call_get_queue_config(&name, &mut config).await?;

let pool = EgressPool::resolve(queue_config.egress_pool.as_deref(), &mut config).await?;
let rr = EgressPoolRoundRobin::new(&pool);
let rr = ArcSwap::new(EgressPoolRoundRobin::new(&pool).into());

let activity = Activity::get(format!("Queue {name}"))?;
let strategy = queue_config.strategy;
Expand Down Expand Up @@ -1191,6 +1192,20 @@ impl Queue {
async fn perform_config_refresh(&self, epoch: &ConfigEpoch) {
if let Ok(mut config) = load_config().await {
if let Ok(queue_config) = Queue::call_get_queue_config(&self.name, &mut config).await {
match EgressPool::resolve(queue_config.egress_pool.as_deref(), &mut config).await {
Ok(pool) => {
if self.rr.load().name != pool.name {
self.rr.store(EgressPoolRoundRobin::new(&pool).into());
}
}
Err(err) => {
tracing::error!(
"error while processing queue config update for {}: {err:#}",
self.name
);
}
}

let strategy = queue_config.strategy;

self.queue_config.update(queue_config);
Expand Down Expand Up @@ -1774,8 +1789,8 @@ impl Queue {
| DeliveryProto::Lua { .. }
| DeliveryProto::DeferredSmtpInjection
| DeliveryProto::HttpInjectionGenerator => {
let (egress_source, ready_name) = match self
.rr
let rr = self.rr.load();
let (egress_source, ready_name) = match rr
.next(&self.name, &self.queue_config)
.await
{
Expand Down Expand Up @@ -1860,7 +1875,7 @@ impl Queue {
&self.name,
&self.queue_config,
&egress_source,
&self.rr.name,
&rr.name,
self.get_config_epoch(),
)
.await
Expand Down
3 changes: 3 additions & 0 deletions docs/changelog/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@
* When using the HTTP injection API to construct a subject header with a non-space
UTF-8 sequence containing the byte 0x20, the quoted printable encoder would
confuse that sequence with a space and produce an invalid output sequence.

* Changing the egress pool associated with a scheduled queue would not actually
take effect until the scheduled queue aged out (eg: was idle for 10 minutes).

0 comments on commit ebd9a00

Please sign in to comment.