Skip to content

Commit

Permalink
Expose ready queue states via api, and in kcli queue-summary
Browse files Browse the repository at this point in the history
This adds connection limit/throttle states to the readyq rows
in `kcli queue-summary`, alongside where we would show the
suspension state.

This makes it easier to understand when a given egress path
might be hitting connection limits.
  • Loading branch information
wez committed Dec 21, 2024
1 parent 02fc845 commit ff8c0cc
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 10 deletions.
29 changes: 23 additions & 6 deletions crates/kcli/src/queue_summary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use clap::Parser;
use dns_resolver::MailExchanger;
use futures::StreamExt;
use kumo_api_types::{BounceV1ListEntry, SuspendReadyQueueV1ListEntry, SuspendV1ListEntry};
use kumo_api_types::{
BounceV1ListEntry, ReadyQueueStateResponse, SuspendReadyQueueV1ListEntry, SuspendV1ListEntry,
};
use kumo_prometheus::parser::Metric;
use lexicmp::natural_lexical_cmp;
use message::message::QueueNameComponents;
Expand Down Expand Up @@ -280,6 +282,13 @@ impl QueueSummaryCommand {
)
.await?;

let ready_queue_states: ReadyQueueStateResponse = crate::request_with_json_response(
reqwest::Method::GET,
endpoint.join("/api/admin/ready-q-states/v1")?,
&(),
)
.await?;

let mut metrics = QueueMetrics::obtain(
endpoint,
QueueMetricsParams {
Expand Down Expand Up @@ -361,11 +370,19 @@ impl QueueSummaryCommand {

let mut ready_rows = vec![];
for m in &metrics.ready {
let status = if let Some(s) = suspended_sites.iter().find(|s| s.name == m.name) {
format!("🛑suspend: {}", s.reason)
} else {
String::new()
};
let mut status = vec![];

if let Some(s) = suspended_sites.iter().find(|s| s.name == m.name) {
status.push(format!("🛑suspend: {}", s.reason));
}

if let Some(states) = ready_queue_states.states_by_ready_queue.get(&m.name) {
for (key, state) in states {
status.push(format!("{key}: {} @ {}", state.context, state.since));
}
}

let status = status.join(", ");

ready_rows.push(vec![
m.site_name().to_string(),
Expand Down
18 changes: 18 additions & 0 deletions crates/kumo-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,21 @@ pub struct TraceSmtpClientV1Request {
#[serde(default)]
pub mx_addr: Option<CidrSet>,
}

#[derive(Serialize, Deserialize, Debug, ToSchema, IntoParams)]
pub struct ReadyQueueStateRequest {
/// Which queues to request. If empty, request all queue states.
#[serde(default)]
pub queues: Vec<String>,
}

#[derive(Serialize, Deserialize, Debug, ToResponse, ToSchema)]
pub struct QueueState {
pub context: String,
pub since: DateTime<Utc>,
}

#[derive(Serialize, Deserialize, Debug, ToResponse, ToSchema)]
pub struct ReadyQueueStateResponse {
pub states_by_ready_queue: HashMap<String, HashMap<String, QueueState>>,
}
78 changes: 78 additions & 0 deletions crates/kumod/src/http_server/admin_ready_queue_states.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::ready_queue::ReadyQueueManager;
use axum::extract::{Json, Query};
use kumo_api_types::{QueueState, ReadyQueueStateRequest, ReadyQueueStateResponse};
use kumo_server_common::http_server::auth::TrustedIpRequired;
use kumo_server_common::http_server::AppError;
use std::collections::HashMap;

/// Retrieve information about the states that apply to a set of
/// ready queues, or all queues if no specific queues were named
/// in the request.
#[utoipa::path(
get,
tag="inspect",
path="/api/admin/ready-q-states/v1",
params(ReadyQueueStateRequest),
responses(
(status = 200, description = "Obtained state information", body=ReadyQueueStateResponse),
),
)]
pub async fn readyq_states(
_: TrustedIpRequired,
Query(request): Query<ReadyQueueStateRequest>,
) -> Result<Json<ReadyQueueStateResponse>, AppError> {
let mut states_by_ready_queue = HashMap::new();
let queues = ReadyQueueManager::all_queues();

for queue in queues {
if !request.queues.is_empty()
&& request
.queues
.iter()
.find(|name| name.as_str() == queue.name())
.is_none()
{
continue;
}

fn add_state(
states_by_ready_queue: &mut HashMap<String, HashMap<String, QueueState>>,
queue_name: &str,
state_name: &str,
state: QueueState,
) {
let entry = states_by_ready_queue
.entry(queue_name.to_string())
.or_default();
entry.insert(state_name.to_string(), state);
}

let states = queue.states.lock();
if let Some(s) = &states.connection_limited {
add_state(
&mut states_by_ready_queue,
queue.name(),
"connection_limited",
QueueState {
context: s.context.clone(),
since: s.since,
},
);
}
if let Some(s) = &states.connection_rate_throttled {
add_state(
&mut states_by_ready_queue,
queue.name(),
"connection_rate_throttled",
QueueState {
context: s.context.clone(),
since: s.since,
},
);
}
}

Ok(Json(ReadyQueueStateResponse {
states_by_ready_queue,
}))
}
16 changes: 15 additions & 1 deletion crates/kumod/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use utoipa::OpenApi;

pub mod admin_bounce_v1;
pub mod admin_inspect_message;
pub mod admin_ready_queue_states;
pub mod admin_rebind_v1;
pub mod admin_suspend_ready_q_v1;
pub mod admin_suspend_v1;
Expand All @@ -27,6 +28,7 @@ pub mod inject_v1;
admin_bounce_v1::bounce_v1_list,
admin_bounce_v1::bounce_v1_delete,
admin_inspect_message::inspect_v1,
admin_ready_queue_states::readyq_states,
admin_rebind_v1::rebind_v1,
admin_suspend_ready_q_v1::suspend,
admin_suspend_ready_q_v1::list,
Expand All @@ -52,6 +54,9 @@ pub mod inject_v1;
BounceV1CancelRequest,
InspectMessageV1Response,
MessageInformation,
ReadyQueueStateRequest,
ReadyQueueStateResponse,
QueueState,
RebindV1Request,
RebindV1Response,
SuspendReadyQueueV1Request,
Expand All @@ -62,7 +67,12 @@ pub mod inject_v1;
SuspendV1Request,
TraceHeaders,
),
responses(InjectV1Response, BounceV1Response, InspectMessageV1Response),
responses(
InjectV1Response,
BounceV1Response,
InspectMessageV1Response,
ReadyQueueStateResponse
),
)
)]
struct ApiDoc;
Expand All @@ -81,6 +91,10 @@ pub fn make_router() -> RouterAndDocs {
"/api/admin/bounce/v1",
delete(admin_bounce_v1::bounce_v1_delete),
)
.route(
"/api/admin/ready-q-states/v1",
get(admin_ready_queue_states::readyq_states),
)
.route("/api/admin/rebind/v1", post(admin_rebind_v1::rebind_v1))
.route("/api/admin/suspend/v1", post(admin_suspend_v1::suspend))
.route("/api/admin/suspend/v1", get(admin_suspend_v1::list))
Expand Down
3 changes: 1 addition & 2 deletions crates/kumod/src/ready_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,10 @@ pub struct ReadyQueue {
egress_source: EgressSource,
next_config_refresh: StdMutex<Instant>,
config_epoch: StdMutex<ConfigEpoch>,
states: Arc<StdMutex<ReadyQueueStates>>,
pub states: Arc<StdMutex<ReadyQueueStates>>,
}

impl ReadyQueue {
#[allow(unused)]
pub fn name(&self) -> &str {
&self.name
}
Expand Down
8 changes: 8 additions & 0 deletions docs/changelog/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
* Provider match rules now also support exactly matching MX hostnames via the
new `HostName` entry.

* kcli queue-summary will now show connection limit and connection rate throttled
status effects as part of the ready queue information, making it easier to
determine when a (potentially shared with multiple nodes) limit might be
responsible for messages in the ready queue. There is a corresponding
[ready-q-states](../reference/rapidoc.md/#get-/api/admin/ready-q-states/v1) API
endpoint that can be used to retrieve this same information.


## Fixes

* When `enable_tls` is set to `Required` or `RequiredInsecure`, ignore the
Expand Down
106 changes: 105 additions & 1 deletion docs/reference/kumod.openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"license": {
"name": "Apache-2.0"
},
"version": "2024.12.09-a316f7a9"
"version": "2024.12.21-02fc8458"
},
"paths": {
"/api/admin/bounce/v1": {
Expand Down Expand Up @@ -147,6 +147,42 @@
}
}
},
"/api/admin/ready-q-states/v1": {
"get": {
"tags": [
"inspect"
],
"summary": "Retrieve information about the states that apply to a set of",
"description": "ready queues, or all queues if no specific queues were named\nin the request.",
"operationId": "readyq_states",
"parameters": [
{
"name": "queues",
"in": "query",
"description": "Which queues to request. If empty, request all queue states.",
"required": false,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
}
],
"responses": {
"200": {
"description": "Obtained state information",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ReadyQueueStateResponse"
}
}
}
}
}
}
},
"/api/admin/rebind/v1": {
"post": {
"tags": [
Expand Down Expand Up @@ -834,6 +870,50 @@
}
}
},
"QueueState": {
"type": "object",
"required": [
"context",
"since"
],
"properties": {
"context": {
"type": "string"
},
"since": {
"$ref": "#/components/schemas/DateTime"
}
}
},
"ReadyQueueStateRequest": {
"type": "object",
"properties": {
"queues": {
"type": "array",
"items": {
"type": "string"
},
"description": "Which queues to request. If empty, request all queue states."
}
}
},
"ReadyQueueStateResponse": {
"type": "object",
"required": [
"states_by_ready_queue"
],
"properties": {
"states_by_ready_queue": {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {
"$ref": "#/components/schemas/QueueState"
}
}
}
}
},
"RebindV1Request": {
"type": "object",
"description": "Describes which messages should be rebound.\nThe criteria apply to the scheduled queue associated\nwith a given message.",
Expand Down Expand Up @@ -1242,6 +1322,30 @@
}
}
}
},
"ReadyQueueStateResponse": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"states_by_ready_queue"
],
"properties": {
"states_by_ready_queue": {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {
"$ref": "#/components/schemas/QueueState"
}
}
}
}
}
}
}
}
},
"securitySchemes": {
Expand Down

0 comments on commit ff8c0cc

Please sign in to comment.