Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions trust-quorum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dropshot.workspace = true
omicron-test-utils.workspace = true
proptest.workspace = true
serde_json.workspace = true
sled-hardware-types.workspace = true
test-strategy.workspace = true
trust-quorum-protocol = { workspace = true, features = ["testing"] }
trust-quorum-test-utils.workspace = true
sprockets-tls-test-utils.workspace = true
4 changes: 4 additions & 0 deletions trust-quorum/protocol/src/coordinator_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ impl CoordinatorState {
&self.op
}

pub fn config(&self) -> &Configuration {
&self.configuration
}

/// Send any required messages as a reconfiguration coordinator
///
/// This varies depending upon the current `CoordinatorState`.
Expand Down
2 changes: 1 addition & 1 deletion trust-quorum/protocol/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Clone for ReconstructedRackSecret {
}
}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
impl PartialEq for ReconstructedRackSecret {
fn eq(&self, other: &Self) -> bool {
self.expose_secret().ct_eq(other.expose_secret()).into()
Expand Down
7 changes: 4 additions & 3 deletions trust-quorum/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ pub use coordinator_state::{
};
pub use rack_secret_loader::{LoadRackSecretError, RackSecretLoaderDiff};
pub use validators::{
ValidatedLrtqUpgradeMsgDiff, ValidatedReconfigureMsgDiff,
LrtqUpgradeError, ReconfigurationError, ValidatedLrtqUpgradeMsgDiff,
ValidatedReconfigureMsgDiff,
};

pub use alarm::Alarm;
pub use crypto::RackSecret;
pub use crypto::{RackSecret, ReconstructedRackSecret};
pub use messages::*;
pub use node::{Node, NodeDiff};
pub use node::{CommitError, Node, NodeDiff, PrepareAndCommitError};
// public only for docs.
pub use node_ctx::NodeHandlerCtx;
pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff};
Expand Down
45 changes: 37 additions & 8 deletions trust-quorum/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! A mechanism for maintaining a full mesh of trust quorum node connections

use crate::established_conn::EstablishedConn;
use trust_quorum_protocol::{BaseboardId, PeerMsg};
use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg};

// TODO: Move to this crate
// https://github.com/oxidecomputer/omicron/issues/9311
Expand Down Expand Up @@ -50,7 +50,6 @@ pub enum AcceptError {
/// Messages sent from the main task to the connection managing tasks
#[derive(Debug)]
pub enum MainToConnMsg {
#[expect(unused)]
Msg(WireMsg),
}

Expand Down Expand Up @@ -103,7 +102,6 @@ pub enum ConnToMainMsgInner {
addr: SocketAddrV6,
peer_id: BaseboardId,
},
#[expect(unused)]
Received {
from: BaseboardId,
msg: PeerMsg,
Expand All @@ -120,7 +118,6 @@ pub enum ConnToMainMsgInner {

pub struct TaskHandle {
pub abort_handle: AbortHandle,
#[expect(unused)]
pub tx: mpsc::Sender<MainToConnMsg>,
pub conn_type: ConnectionType,
}
Expand All @@ -137,6 +134,10 @@ impl TaskHandle {
pub fn abort(&self) {
self.abort_handle.abort()
}

pub async fn send(&self, msg: PeerMsg) {
let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await;
}
}

impl BiHashItem for TaskHandle {
Expand Down Expand Up @@ -178,6 +179,10 @@ impl EstablishedTaskHandle {
pub fn abort(&self) {
self.task_handle.abort();
}

pub async fn send(&self, msg: PeerMsg) {
let _ = self.task_handle.send(msg).await;
}
}

impl TriHashItem for EstablishedTaskHandle {
Expand Down Expand Up @@ -375,6 +380,14 @@ impl ConnMgr {
self.listen_addr
}

pub async fn send(&self, envelope: Envelope) {
let Envelope { to, msg, .. } = envelope;
info!(self.log, "Sending {msg:?}"; "peer_id" => %to);
if let Some(handle) = self.established.get1(&to) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was quite confused when I first saw this, since it silently discards the message if a connection with the recipient is not established.

Originally I was going to suggest to rename the function to try_send or similar, but poking at the rest of the code I learned errors are discarded everywhere (EstablishedConn::run only logs the error message and kills the connection, without reporting the failure down the stack).

This makes sense, as in general RFD 238 is designed to be resilient to nodes disappearing at any point in time. I'm not sure if I would do anything in response to this comment. Just leaving this as a note for future me.

handle.send(msg).await;
}
}

/// Perform any polling related operations that the connection
/// manager must perform concurrently.
pub async fn step(
Expand Down Expand Up @@ -576,13 +589,15 @@ impl ConnMgr {
/// easiest way to achieve this is to only connect to peers with addresses
/// that sort less than our own and tear down any connections that no longer
/// exist in `addrs`.
///
/// Return the `BaseboardId` of all peers that have been disconnected.
pub async fn update_bootstrap_connections(
&mut self,
addrs: BTreeSet<SocketAddrV6>,
corpus: Vec<Utf8PathBuf>,
) {
) -> BTreeSet<BaseboardId> {
if self.bootstrap_addrs == addrs {
return;
return BTreeSet::new();
}

// We don't try to compare addresses from accepted nodes. If DDMD
Expand Down Expand Up @@ -610,9 +625,13 @@ impl ConnMgr {
self.connect_client(corpus.clone(), addr).await;
}

let mut disconnected_peers = BTreeSet::new();
for addr in to_disconnect {
self.disconnect_client(addr).await;
if let Some(peer_id) = self.disconnect_client(addr).await {
disconnected_peers.insert(peer_id);
}
}
disconnected_peers
}

/// Spawn a task to estalbish a sprockets connection for the given address
Expand Down Expand Up @@ -691,7 +710,13 @@ impl ConnMgr {
///
/// We don't tear down server connections this way as we don't know their
/// listen port, just the ephemeral port.
async fn disconnect_client(&mut self, addr: SocketAddrV6) {
///
/// Return the `BaseboardId` of the peer if an established connection is
/// torn down.
async fn disconnect_client(
&mut self,
addr: SocketAddrV6,
) -> Option<BaseboardId> {
if let Some(handle) = self.connecting.remove2(&addr) {
// The connection has not yet completed its handshake
info!(
Expand All @@ -700,6 +725,7 @@ impl ConnMgr {
"remote_addr" => %addr
);
handle.abort();
None
} else {
if let Some(handle) = self.established.remove3(&addr) {
info!(
Expand All @@ -709,6 +735,9 @@ impl ConnMgr {
"peer_id" => %handle.baseboard_id
);
handle.abort();
Some(handle.baseboard_id)
} else {
None
}
}
}
Expand Down
Loading
Loading