Skip to content

Commit

Permalink
add "null" queue
Browse files Browse the repository at this point in the history
At reception time, we've supported using the special queue name
"null" as a way to indicate that a message should be accepted
but then silently dropped.

Since that was built, we've added a couple of ways through which
a message can be assigned to a different queue after reception.

In a few situations we've had users that want to assign those
messages to the null queue to indicate that they should be dropped.

That has "worked" but not for the right reasons: setting the queue
name to "null" is equivalent to setting the routing domain to the
domain named "null", which doesn't resolve to a valid MX and that
will eventually cause messages that were queued there to expire
out of the system.

This commit introduces a special case for a queue named "null"
that configures it with a special "Null" delivery handler.

Similar to the maildir handler, this one will immediately
handle ready messages without instantiating a corresponding
ready queue for them.

In this case, handling them means simply deleting them with
no other logging.

We don't log here in order to be consistent with the reception-time
behavior of not logging.  It might be desirable to add some
configuration around this in the future, but for the moment, the primary
way that messages might end up assigned to a "null" queue is via a
rebind operation, and that will log an `AdminRebind` record that can be
used to capture that change in responsibility.

If logging a bounce is desired, it is recommended to use kumo.reject to
bounce the message during the rebind event instead of assigning it to
the null queue.
  • Loading branch information
wez committed Nov 21, 2024
1 parent 3b6ef4e commit c509d45
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
24 changes: 23 additions & 1 deletion crates/kumod/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ pub enum DeliveryProto {
Maildir { maildir_path: std::path::PathBuf },
Lua { custom_lua: LuaDeliveryProtocol },
HttpInjectionGenerator,
Null,
}

impl DeliveryProto {
Expand All @@ -326,13 +327,14 @@ impl DeliveryProto {
Self::Maildir { .. } => "maildir",
Self::Lua { .. } => "lua",
Self::HttpInjectionGenerator { .. } => "httpinject",
Self::Null { .. } => "null",
}
}

pub fn ready_queue_name(&self) -> String {
let proto_name = self.metrics_protocol_name();
match self {
Self::Smtp { .. } => proto_name.to_string(),
Self::Smtp { .. } | Self::Null => proto_name.to_string(),
Self::Maildir { maildir_path } => format!("{proto_name}:{}", maildir_path.display()),
Self::Lua { custom_lua } => format!("{proto_name}:{}", custom_lua.constructor),
Self::HttpInjectionGenerator => format!("{proto_name}:generator"),
Expand Down Expand Up @@ -930,6 +932,15 @@ impl Queue {
return make_generate_queue_config();
}

if name == "null" {
return Ok(QueueConfig {
protocol: DeliveryProto::Null,
retry_interval: Duration::from_secs(10),
max_retry_interval: Some(Duration::from_secs(10)),
..QueueConfig::default()
});
}

let components = QueueNameComponents::parse(&name);

let queue_config: QueueConfig = config
Expand Down Expand Up @@ -1793,6 +1804,17 @@ impl Queue {
}
}
}
DeliveryProto::Null => {
// We don't log anything here; this is in alignment with
// our reception time behavior of not logging either.
// We shouldn't get here unless someone re-bound a message
// into the "null" queue, and there will be an AdminRebind
// log entry recording that
spawn("remove from spool", async move {
SpoolManager::remove_from_spool(*msg.id()).await
})?;
Ok(())
}
DeliveryProto::Maildir { maildir_path } => {
tracing::trace!(
"Deliver msg {} to maildir at {}",
Expand Down
6 changes: 6 additions & 0 deletions crates/kumod/src/ready_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,9 @@ impl Dispatcher {
DeliveryProto::Lua { .. } => "Lua".to_string(),
DeliveryProto::Maildir { .. } => "Maildir".to_string(),
DeliveryProto::HttpInjectionGenerator => "HttpInjectionGenerator".to_string(),
DeliveryProto::Null => {
anyhow::bail!("Should not have a ready_queue for the null queue")
}
};

let mut dispatcher = Self {
Expand Down Expand Up @@ -1060,6 +1063,9 @@ impl Dispatcher {
DeliveryProto::HttpInjectionGenerator => {
Box::new(HttpInjectionGeneratorDispatcher::new())
}
DeliveryProto::Null => {
anyhow::bail!("Should not have a ready_queue for the null queue")
}
};

// We get better throughput by being more aggressive with establishing
Expand Down

0 comments on commit c509d45

Please sign in to comment.