From f5177b4f0dd803200775d0712611813d4631110e Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Tue, 28 Oct 2025 18:22:31 +0000 Subject: [PATCH 1/5] TQ: Integrate protocol with `NodeTask` `NodeTask` now uses the `trust_quorum_protocol::Node` and `trust_quorum_protocol::NodeCtx` to send and receive trust quorum messages. An API to drive this was added to the `NodeTaskHandle`. The majority of code in this PR is tests using the API. A follow up will deal with saving persistent state to a Ledger. --- Cargo.lock | 1 + trust-quorum/Cargo.toml | 2 + .../protocol/src/coordinator_state.rs | 4 + trust-quorum/protocol/src/crypto.rs | 2 +- trust-quorum/protocol/src/lib.rs | 7 +- trust-quorum/src/connection_manager.rs | 45 +- trust-quorum/src/task.rs | 1045 ++++++++++++++++- 7 files changed, 1046 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3848089b025..38813d7bd65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14838,6 +14838,7 @@ dependencies = [ "serde_with", "sha3", "sled-agent-types", + "sled-hardware-types", "slog", "slog-error-chain", "sprockets-tls", diff --git a/trust-quorum/Cargo.toml b/trust-quorum/Cargo.toml index 5f5ad0e88a8..306764dc6f1 100644 --- a/trust-quorum/Cargo.toml +++ b/trust-quorum/Cargo.toml @@ -48,6 +48,8 @@ dropshot.workspace = true omicron-test-utils.workspace = true proptest.workspace = true serde_json.workspace = true +sled-hardware-types.workspace = true test-strategy.workspace = true +trust-quorum-protocol = { workspace = true, features = ["testing"] } trust-quorum-test-utils.workspace = true sprockets-tls-test-utils.workspace = true diff --git a/trust-quorum/protocol/src/coordinator_state.rs b/trust-quorum/protocol/src/coordinator_state.rs index 5d8d394c32c..725cdfb8397 100644 --- a/trust-quorum/protocol/src/coordinator_state.rs +++ b/trust-quorum/protocol/src/coordinator_state.rs @@ -236,6 +236,10 @@ impl CoordinatorState { &self.op } + pub fn config(&self) -> &Configuration { + &self.configuration + } + /// Send any required messages as a reconfiguration coordinator /// /// This varies depending upon the current `CoordinatorState`. diff --git a/trust-quorum/protocol/src/crypto.rs b/trust-quorum/protocol/src/crypto.rs index 8227bdef5b8..84ba89c4691 100644 --- a/trust-quorum/protocol/src/crypto.rs +++ b/trust-quorum/protocol/src/crypto.rs @@ -130,7 +130,7 @@ impl Clone for ReconstructedRackSecret { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] impl PartialEq for ReconstructedRackSecret { fn eq(&self, other: &Self) -> bool { self.expose_secret().ct_eq(other.expose_secret()).into() diff --git a/trust-quorum/protocol/src/lib.rs b/trust-quorum/protocol/src/lib.rs index 0d5c522b2d4..44f0d75379c 100644 --- a/trust-quorum/protocol/src/lib.rs +++ b/trust-quorum/protocol/src/lib.rs @@ -37,13 +37,14 @@ pub use coordinator_state::{ }; pub use rack_secret_loader::{LoadRackSecretError, RackSecretLoaderDiff}; pub use validators::{ - ValidatedLrtqUpgradeMsgDiff, ValidatedReconfigureMsgDiff, + LrtqUpgradeError, ReconfigurationError, ValidatedLrtqUpgradeMsgDiff, + ValidatedReconfigureMsgDiff, }; pub use alarm::Alarm; -pub use crypto::RackSecret; +pub use crypto::{RackSecret, ReconstructedRackSecret}; pub use messages::*; -pub use node::{Node, NodeDiff}; +pub use node::{CommitError, Node, NodeDiff, PrepareAndCommitError}; // public only for docs. pub use node_ctx::NodeHandlerCtx; pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff}; diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index d2b427fd6eb..bcc6bbe4914 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -5,7 +5,7 @@ //! A mechanism for maintaining a full mesh of trust quorum node connections use crate::established_conn::EstablishedConn; -use trust_quorum_protocol::{BaseboardId, PeerMsg}; +use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; // TODO: Move to this crate // https://github.com/oxidecomputer/omicron/issues/9311 @@ -50,7 +50,6 @@ pub enum AcceptError { /// Messages sent from the main task to the connection managing tasks #[derive(Debug)] pub enum MainToConnMsg { - #[expect(unused)] Msg(WireMsg), } @@ -103,7 +102,6 @@ pub enum ConnToMainMsgInner { addr: SocketAddrV6, peer_id: BaseboardId, }, - #[expect(unused)] Received { from: BaseboardId, msg: PeerMsg, @@ -120,7 +118,6 @@ pub enum ConnToMainMsgInner { pub struct TaskHandle { pub abort_handle: AbortHandle, - #[expect(unused)] pub tx: mpsc::Sender, pub conn_type: ConnectionType, } @@ -137,6 +134,10 @@ impl TaskHandle { pub fn abort(&self) { self.abort_handle.abort() } + + pub async fn send(&self, msg: PeerMsg) { + let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await; + } } impl BiHashItem for TaskHandle { @@ -178,6 +179,10 @@ impl EstablishedTaskHandle { pub fn abort(&self) { self.task_handle.abort(); } + + pub async fn send(&self, msg: PeerMsg) { + let _ = self.task_handle.send(msg).await; + } } impl TriHashItem for EstablishedTaskHandle { @@ -375,6 +380,14 @@ impl ConnMgr { self.listen_addr } + pub async fn send(&self, envelope: Envelope) { + let Envelope { to, msg, .. } = envelope; + info!(self.log, "Sending {msg:?}"; "peer_id" => %to); + if let Some(handle) = self.established.get1(&to) { + handle.send(msg).await; + } + } + /// Perform any polling related operations that the connection /// manager must perform concurrently. pub async fn step( @@ -576,13 +589,15 @@ impl ConnMgr { /// easiest way to achieve this is to only connect to peers with addresses /// that sort less than our own and tear down any connections that no longer /// exist in `addrs`. + /// + /// Return the `BaseboardId` of all peers that have been disconnected. pub async fn update_bootstrap_connections( &mut self, addrs: BTreeSet, corpus: Vec, - ) { + ) -> BTreeSet { if self.bootstrap_addrs == addrs { - return; + return BTreeSet::new(); } // We don't try to compare addresses from accepted nodes. If DDMD @@ -610,9 +625,13 @@ impl ConnMgr { self.connect_client(corpus.clone(), addr).await; } + let mut disconnected_peers = BTreeSet::new(); for addr in to_disconnect { - self.disconnect_client(addr).await; + if let Some(peer_id) = self.disconnect_client(addr).await { + disconnected_peers.insert(peer_id); + } } + disconnected_peers } /// Spawn a task to estalbish a sprockets connection for the given address @@ -691,7 +710,13 @@ impl ConnMgr { /// /// We don't tear down server connections this way as we don't know their /// listen port, just the ephemeral port. - async fn disconnect_client(&mut self, addr: SocketAddrV6) { + /// + /// Return the `BaseboardId` of the peer if an established connection is + // torn down. + async fn disconnect_client( + &mut self, + addr: SocketAddrV6, + ) -> Option { if let Some(handle) = self.connecting.remove2(&addr) { // The connection has not yet completed its handshake info!( @@ -700,6 +725,7 @@ impl ConnMgr { "remote_addr" => %addr ); handle.abort(); + None } else { if let Some(handle) = self.established.remove3(&addr) { info!( @@ -709,6 +735,9 @@ impl ConnMgr { "peer_id" => %handle.baseboard_id ); handle.abort(); + Some(handle.baseboard_id) + } else { + None } } } diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 6e821f197db..0bbae4ac142 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -8,15 +8,29 @@ use crate::connection_manager::{ ConnMgr, ConnMgrStatus, ConnToMainMsg, ConnToMainMsgInner, }; +use omicron_uuid_kinds::RackUuid; +use serde::{Deserialize, Serialize}; use slog::{Logger, debug, error, info, o}; use sprockets_tls::keys::SprocketsConfig; use std::collections::BTreeSet; use std::net::SocketAddrV6; +use std::time::Duration; use thiserror::Error; use tokio::sync::mpsc::error::SendError; use tokio::sync::oneshot::error::RecvError; use tokio::sync::{mpsc, oneshot}; -use trust_quorum_protocol::{BaseboardId, Node, NodeCtx}; +use tokio::time::sleep; +use trust_quorum_protocol::{ + Alarm, BaseboardId, CommitError, Configuration, Epoch, ExpungedMetadata, + LoadRackSecretError, LrtqUpgradeError, LrtqUpgradeMsg, Node, NodeCallerCtx, + NodeCommonCtx, NodeCtx, PersistentState, PrepareAndCommitError, + ReconfigurationError, ReconfigureMsg, ReconstructedRackSecret, +}; + +#[cfg(not(test))] +const LOAD_RACK_SECRET_RETRY_TIMEOUT: Duration = Duration::from_millis(500); +#[cfg(test)] +const LOAD_RACK_SECRET_RETRY_TIMEOUT: Duration = Duration::from_millis(5); /// We only expect a handful of messages at a time. const API_CHANNEL_BOUND: usize = 32; @@ -39,6 +53,45 @@ pub struct Config { pub sprockets: SprocketsConfig, } +/// Status of the node coordinating the `Prepare` phase of a reconfiguration or +/// LRTQ upgrade. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CoordinatorStatus { + config: Configuration, + acked_prepares: BTreeSet, +} + +// Details about a given node's status +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeStatus { + connected_peers: BTreeSet, + alarms: BTreeSet, + persistent_state: NodePersistentStateSummary, +} + +/// A summary of a node's persistent state, leaving out things like key shares +/// and hashes. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodePersistentStateSummary { + has_lrtq_share: bool, + configs: BTreeSet, + shares: BTreeSet, + commits: BTreeSet, + expunged: Option, +} + +impl From<&PersistentState> for NodePersistentStateSummary { + fn from(value: &PersistentState) -> Self { + Self { + has_lrtq_share: value.lrtq.is_some(), + configs: value.configs.iter().map(|c| c.epoch).collect(), + shares: value.shares.keys().cloned().collect(), + commits: value.commits.clone(), + expunged: value.expunged.clone(), + } + } +} + /// A request sent to the `NodeTask` from the `NodeTaskHandle` pub enum NodeApiRequest { /// Inform the `Node` of currently known IP addresses on the bootstrap network @@ -46,9 +99,51 @@ pub enum NodeApiRequest { /// These are generated from DDM prefixes learned by the bootstrap agent. BootstrapAddresses(BTreeSet), + /// Remove any secrets cached in memory at this node + ClearSecrets, + /// Retrieve connectivity status via the `ConnMgr` ConnMgrStatus { responder: oneshot::Sender }, + /// Return the status of this node if it is a coordinator + CoordinatorStatus { responder: oneshot::Sender> }, + + /// Load a rack secret for the given epoch + LoadRackSecret { + epoch: Epoch, + responder: oneshot::Sender< + Result, LoadRackSecretError>, + >, + }, + + /// Coordinate an upgrade from LRTQ at this node + LrtqUpgrade { + msg: LrtqUpgradeMsg, + responder: oneshot::Sender>, + }, + + /// Get the overall status of the node + NodeStatus { responder: oneshot::Sender }, + + /// `PrepareAndCommit` a configuration at this node + PrepareAndCommit { + config: Configuration, + responder: oneshot::Sender>, + }, + + /// `Commit` a configuration at this node + Commit { + rack_id: RackUuid, + epoch: Epoch, + responder: oneshot::Sender>, + }, + + /// Coordinate a reconfiguration at this node + Reconfigure { + msg: ReconfigureMsg, + responder: oneshot::Sender>, + }, + /// Shutdown the node's tokio tasks Shutdown, } @@ -56,10 +151,20 @@ pub enum NodeApiRequest { /// An error response from a `NodeApiRequest` #[derive(Error, Debug, PartialEq)] pub enum NodeApiError { - #[error("Failed to send request to node task")] + #[error("failed to send request to node task")] Send, - #[error("Failed to receive response from node task")] + #[error("failed to receive response from node task")] Recv, + #[error("failed to reconfigure trust quorum")] + Reconfigure(#[from] ReconfigurationError), + #[error("failed to load rack secret")] + LoadRackSecret(#[from] LoadRackSecretError), + #[error("failed to upgrade from LRTQ")] + LrtqUpgrade(#[from] LrtqUpgradeError), + #[error("failed to prepare and commit")] + PrepareAndCommit(#[from] PrepareAndCommitError), + #[error("failed to commit")] + Commit(#[from] CommitError), } impl From> for NodeApiError { @@ -82,7 +187,7 @@ pub struct NodeTaskHandle { } impl NodeTaskHandle { - /// Return the actual port being listened on + /// Return the actual ip and port being listened on /// /// This is useful when the port passed in was `0`. pub fn listen_addr(&self) -> SocketAddrV6 { @@ -93,6 +198,114 @@ impl NodeTaskHandle { &self.baseboard_id } + /// Initiate a trust quorum reconfiguration at this node + pub async fn reconfigure( + &self, + msg: ReconfigureMsg, + ) -> Result<(), NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::Reconfigure { msg, responder: tx }) + .await?; + rx.await??; + Ok(()) + } + + /// Initiate an LRTQ upgrade at this node + pub async fn upgrade_from_lrtq( + &self, + msg: LrtqUpgradeMsg, + ) -> Result<(), NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::LrtqUpgrade { msg, responder: tx }) + .await?; + rx.await??; + Ok(()) + } + + /// Return the status of this node if it is coordinating the `Prepare` phase + /// of a reconfiguration or LRTQ upgrade. Return `Ok(None)` or an error + /// otherwise. + pub async fn coordinator_status( + &self, + ) -> Result, NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::CoordinatorStatus { responder: tx }) + .await?; + let res = rx.await?; + Ok(res) + } + + /// Load the rack secret for the given epoch + /// + /// This can block for an indefinite period of time before returning + /// and depends on availability of the trust quorum. + pub async fn load_rack_secret( + &self, + epoch: Epoch, + ) -> Result { + loop { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::LoadRackSecret { epoch, responder: tx }) + .await?; + if let Some(rack_secret) = rx.await?? { + return Ok(rack_secret); + }; + + // The task returns immediately with `None` if the secret is still + // being loaded. We must therefore retry. + sleep(LOAD_RACK_SECRET_RETRY_TIMEOUT).await; + } + } + + /// Return `Ok(true)` if the configuration has committed, `Ok(false)` if + /// it hasn't committed yet, or an error otherwise. + /// + /// Nexus will retry this operation and so we should only try once here. + /// This is in contrast to operations like `load_rack_secret` that are + /// called directly from sled agent. + pub async fn prepare_and_commit( + &self, + config: Configuration, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::PrepareAndCommit { config, responder: tx }) + .await?; + let res = rx.await??; + Ok(res) + } + + /// Return `Ok(true)` if the configuration has committed, `Ok(false)` if + /// it hasn't committed yet, or an error otherwise. + /// + /// Nexus will retry this operation and so we should only try once here. + /// This is in contrast to operations like `load_rack_secret` that are + /// called directly from sled agent. + pub async fn commit( + &self, + rack_id: RackUuid, + epoch: Epoch, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::Commit { rack_id, epoch, responder: tx }) + .await?; + let res = rx.await??; + Ok(res) + } + + /// Clear all secrets loaded in memory at this node + /// + /// Rack secrets are cached after loading and must be manually cleared. + pub async fn clear_secrets(&self) -> Result<(), NodeApiError> { + self.tx.send(NodeApiRequest::ClearSecrets).await?; + Ok(()) + } + /// Inform the node of currently known IP addresses on the bootstrap network /// /// These are generated from DDM prefixes learned by the bootstrap agent. @@ -111,6 +324,13 @@ impl NodeTaskHandle { Ok(res) } + pub async fn status(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx.send(NodeApiRequest::NodeStatus { responder: tx }).await?; + let res = rx.await?; + Ok(res) + } + pub async fn shutdown(&self) -> Result<(), NodeApiError> { self.tx.send(NodeApiRequest::Shutdown).await?; Ok(()) @@ -122,9 +342,7 @@ pub struct NodeTask { log: Logger, #[expect(unused)] config: Config, - #[expect(unused)] node: Node, - #[expect(unused)] ctx: NodeCtx, conn_mgr: ConnMgr, conn_mgr_rx: mpsc::Receiver, @@ -196,7 +414,10 @@ impl NodeTask { Some(msg) = self.conn_mgr_rx.recv() => { self.on_conn_msg(msg).await } + } + for envelope in self.ctx.drain_envelopes() { + self.conn_mgr.send(envelope).await; } } } @@ -207,19 +428,22 @@ impl NodeTask { match msg.msg { ConnToMainMsgInner::Accepted { addr, peer_id } => { self.conn_mgr - .server_handshake_completed(task_id, addr, peer_id) + .server_handshake_completed(task_id, addr, peer_id.clone()) .await; + self.node.on_connect(&mut self.ctx, peer_id); } ConnToMainMsgInner::Connected { addr, peer_id } => { self.conn_mgr - .client_handshake_completed(task_id, addr, peer_id) + .client_handshake_completed(task_id, addr, peer_id.clone()) .await; + self.node.on_connect(&mut self.ctx, peer_id); } ConnToMainMsgInner::Disconnected { peer_id } => { - self.conn_mgr.on_disconnected(task_id, peer_id).await; + self.conn_mgr.on_disconnected(task_id, peer_id.clone()).await; + self.node.on_disconnect(&mut self.ctx, peer_id); } - ConnToMainMsgInner::Received { from: _, msg: _ } => { - todo!(); + ConnToMainMsgInner::Received { from, msg } => { + self.node.handle(&mut self.ctx, from, msg); } ConnToMainMsgInner::ReceivedNetworkConfig { from: _, @@ -230,18 +454,77 @@ impl NodeTask { } } + // TODO: Process `ctx`: save persistent state async fn on_api_request(&mut self, request: NodeApiRequest) { match request { NodeApiRequest::BootstrapAddresses(addrs) => { info!(self.log, "Updated Peer Addresses: {addrs:?}"); // TODO: real corpus let corpus = vec![]; - self.conn_mgr.update_bootstrap_connections(addrs, corpus).await; + let disconnected = self + .conn_mgr + .update_bootstrap_connections(addrs, corpus) + .await; + for peer_id in disconnected { + self.node.on_disconnect(&mut self.ctx, peer_id); + } + } + NodeApiRequest::ClearSecrets => { + self.node.clear_secrets(); + } + NodeApiRequest::Commit { rack_id, epoch, responder } => { + let res = self + .node + .commit_configuration(&mut self.ctx, rack_id, epoch) + .map(|_| { + self.ctx.persistent_state().commits.contains(&epoch) + }); + let _ = responder.send(res); } NodeApiRequest::ConnMgrStatus { responder } => { debug!(self.log, "Received Request for ConnMgrStatus"); let _ = responder.send(self.conn_mgr.status()); } + NodeApiRequest::CoordinatorStatus { responder } => { + let status = self.node.get_coordinator_state().map(|cs| { + CoordinatorStatus { + config: cs.config().clone(), + acked_prepares: cs.op().acked_prepares(), + } + }); + let _ = responder.send(status); + } + NodeApiRequest::LoadRackSecret { epoch, responder } => { + let res = self.node.load_rack_secret(&mut self.ctx, epoch); + let _ = responder.send(res); + } + NodeApiRequest::LrtqUpgrade { msg, responder } => { + let res = + self.node.coordinate_upgrade_from_lrtq(&mut self.ctx, msg); + let _ = responder.send(res); + } + NodeApiRequest::NodeStatus { responder } => { + let _ = responder.send(NodeStatus { + connected_peers: self.ctx.connected().clone(), + alarms: self.ctx.alarms().clone(), + persistent_state: self.ctx.persistent_state().into(), + }); + } + NodeApiRequest::PrepareAndCommit { config, responder } => { + let epoch = config.epoch; + let res = self + .node + .prepare_and_commit(&mut self.ctx, config) + .map(|_| { + self.ctx.persistent_state().commits.contains(&epoch) + }); + let _ = responder.send(res); + } + NodeApiRequest::Reconfigure { msg, responder } => { + let res = + self.node.coordinate_reconfiguration(&mut self.ctx, msg); + let _ = responder.send(res); + } NodeApiRequest::Shutdown => { info!(self.log, "Shutting down Node tokio tasks"); self.shutdown = true; @@ -257,15 +540,20 @@ mod tests { ConnState, RECONNECT_TIME, platform_id_to_baseboard_id, }; use camino::Utf8PathBuf; - use dropshot::test_util::log_prefix_for_test; + use dropshot::test_util::{LogContext, log_prefix_for_test}; use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; use omicron_test_utils::dev::test_setup_log; + use omicron_uuid_kinds::GenericUuid; + use secrecy::ExposeSecretMut; + use sled_hardware_types::Baseboard; use sprockets_tls::keys::ResolveSetting; use sprockets_tls_test_utils::{ alias_prefix, cert_path, certlist_path, private_key_path, root_prefix, sprockets_auth_prefix, }; use std::time::Duration; + use tokio::task::JoinHandle; + use trust_quorum_protocol::NodeHandlerCtx; fn pki_doc_to_node_configs(dir: Utf8PathBuf, n: usize) -> Vec { (1..=n) @@ -304,17 +592,7 @@ mod tests { .collect() } - /// Test that all nodes can connect to each other when given each the full - /// set of "bootstrap addresses". - #[tokio::test] - async fn full_mesh_connectivity() { - let logctx = test_setup_log("full_mesh_connectivity"); - let (mut dir, s) = log_prefix_for_test("full_mesh_connectivity"); - dir.push(&s); - std::fs::create_dir(&dir).unwrap(); - println!("Writing keys and certs to {dir}"); - let num_nodes = 4; - + fn write_keys_and_measurements(dir: Utf8PathBuf, num_nodes: usize) { let file_behavior = sprockets_tls_test_utils::OutputFileExistsBehavior::Overwrite; @@ -339,22 +617,144 @@ mod tests { // Write out the log document to the filesystem let out = attest_mock::log::mock(attest_log_doc).unwrap(); std::fs::write(dir.join("log.bin"), &out).unwrap(); + } - let configs = pki_doc_to_node_configs(dir.clone(), num_nodes); + struct TestSetup { + pub logctx: LogContext, + pub dir: Utf8PathBuf, + pub configs: Vec, + pub node_handles: Vec, + pub join_handles: Vec>, + pub listen_addrs: Vec, + } - let mut node_handles = vec![]; - let mut join_handles = vec![]; - for config in configs.clone() { - let (mut task, handle) = NodeTask::new(config, &logctx.log).await; - node_handles.push(handle); - join_handles.push(tokio::spawn(async move { task.run().await })); + impl TestSetup { + pub async fn spawn_nodes( + name: &'static str, + num_nodes: usize, + ) -> TestSetup { + let logctx = test_setup_log(name); + let (mut dir, s) = log_prefix_for_test(name); + dir.push(&s); + std::fs::create_dir(&dir).unwrap(); + println!("Writing keys and certs to {dir}"); + write_keys_and_measurements(dir.clone(), num_nodes); + let configs = pki_doc_to_node_configs(dir.clone(), num_nodes); + + let mut node_handles = vec![]; + let mut join_handles = vec![]; + for config in configs.clone() { + let (mut task, handle) = + NodeTask::new(config, &logctx.log).await; + node_handles.push(handle); + join_handles + .push(tokio::spawn(async move { task.run().await })); + } + + let listen_addrs: Vec<_> = + node_handles.iter().map(|h| h.listen_addr()).collect(); + TestSetup { + logctx, + dir, + configs, + node_handles, + join_handles, + listen_addrs, + } } - let listen_addrs: BTreeSet<_> = - node_handles.iter().map(|h| h.listen_addr()).collect(); + pub async fn spawn_nodes_with_lrtq_shares( + name: &'static str, + num_nodes: usize, + ) -> (TestSetup, RackUuid) { + let logctx = test_setup_log(name); + let (mut dir, s) = log_prefix_for_test(name); + dir.push(&s); + std::fs::create_dir(&dir).unwrap(); + println!("Writing keys and certs to {dir}"); + write_keys_and_measurements(dir.clone(), num_nodes); + let configs = pki_doc_to_node_configs(dir.clone(), num_nodes); - for h in &node_handles { - h.load_peer_addresses(listen_addrs.clone()).await.unwrap(); + let rack_id = RackUuid::new_v4(); + + // Translate `BaseboardId`s to `Baseboard`s for LRTQ membership + let baseboards: BTreeSet<_> = configs + .iter() + .map(|c| { + Baseboard::new_pc( + c.baseboard_id.serial_number.clone(), + c.baseboard_id.part_number.clone(), + ) + }) + .collect(); + + // Create the LRTQ key share packages and take only the common data, + // which is what we use for trust quorum upgrade. + let share_pkgs: Vec<_> = bootstore::schemes::v0::create_pkgs( + rack_id.into_untyped_uuid(), + baseboards.clone(), + ) + .unwrap() + .expose_secret_mut() + .iter() + .map(|pkg| pkg.common.clone()) + .collect(); + + let mut node_handles = vec![]; + let mut join_handles = vec![]; + for (config, share_pkg) in + configs.clone().into_iter().zip(share_pkgs) + { + let (mut task, handle) = + NodeTask::new(config, &logctx.log).await; + task.ctx.update_persistent_state(|ps| { + ps.lrtq = Some(share_pkg); + // We are modifying the persistent state, but not in a way + // we want the test to recognize. + false + }); + node_handles.push(handle); + join_handles + .push(tokio::spawn(async move { task.run().await })); + } + + let listen_addrs: Vec<_> = + node_handles.iter().map(|h| h.listen_addr()).collect(); + ( + TestSetup { + logctx, + dir, + configs, + node_handles, + join_handles, + listen_addrs, + }, + rack_id, + ) + } + + pub fn members(&self) -> impl Iterator { + self.configs.iter().map(|c| &c.baseboard_id) + } + + pub fn cleanup_successful(self) { + self.logctx.cleanup_successful(); + std::fs::remove_dir_all(self.dir).unwrap(); + } + } + + /// Test that all nodes can connect to each other when given each the full + /// set of "bootstrap addresses". + #[tokio::test] + async fn full_mesh_connectivity() { + let num_nodes = 4; + let mut setup = + TestSetup::spawn_nodes("full_mesh_connectivity", num_nodes).await; + + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); } let poll_interval = Duration::from_millis(1); @@ -364,7 +764,7 @@ mod tests { wait_for_condition( async || { let mut count = 0; - for h in &node_handles { + for h in &setup.node_handles { let status = h.conn_mgr_status().await.unwrap(); if status .connections @@ -395,9 +795,9 @@ mod tests { // reconnecting. This should cause the task id counter to start // incrementing at all nodes and for their to be one fewer established // connection. - let h = node_handles.pop().unwrap(); + let h = setup.node_handles.pop().unwrap(); h.shutdown().await.unwrap(); - join_handles.pop().unwrap(); + setup.join_handles.pop().unwrap(); let stopped_addr = h.listen_addr; // Speed up reconnection in the test @@ -407,7 +807,7 @@ mod tests { wait_for_condition( async || { let mut valid = 0; - for h in &node_handles { + for h in &setup.node_handles { let status = h.conn_mgr_status().await.unwrap(); let established_count = status .connections @@ -443,16 +843,19 @@ mod tests { .unwrap(); // Now let's bring back up the old node and ensure full connectivity again - let (mut task, handle) = - NodeTask::new(configs.last().unwrap().clone(), &logctx.log).await; - node_handles.push(handle.clone()); - join_handles.push(tokio::spawn(async move { task.run().await })); + let (mut task, handle) = NodeTask::new( + setup.configs.last().unwrap().clone(), + &setup.logctx.log, + ) + .await; + setup.node_handles.push(handle.clone()); + setup.join_handles.push(tokio::spawn(async move { task.run().await })); // The port likely changed, so we must refresh everyone's set of addresses let listen_addrs: BTreeSet<_> = - node_handles.iter().map(|h| h.listen_addr()).collect(); + setup.node_handles.iter().map(|h| h.listen_addr()).collect(); - for h in &node_handles { + for h in &setup.node_handles { h.load_peer_addresses(listen_addrs.clone()).await.unwrap(); } @@ -460,7 +863,7 @@ mod tests { wait_for_condition( async || { let mut count = 0; - for h in &node_handles { + for h in &setup.node_handles { let status = h.conn_mgr_status().await.unwrap(); if status .connections @@ -483,7 +886,553 @@ mod tests { .await .unwrap(); - logctx.cleanup_successful(); - std::fs::remove_dir_all(dir).unwrap(); + setup.cleanup_successful(); + } + + /// Commit an initial configuration at all nodes + #[tokio::test] + pub async fn tq_initial_config() { + let num_nodes = 4; + let setup = + TestSetup::spawn_nodes("tq_initial_config", num_nodes).await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles { + if h.commit(rack_id, Epoch(1)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + setup.cleanup_successful(); + } + + /// Eventually Commit an initial configuration at all nodes + /// + /// We leave one node out of the bootstrap network info, trigger a commit + /// at the first 3 nodes. Then we go and issue a `PrepareAndCommit` to the last + /// node and ensure it commits. + #[tokio::test] + pub async fn tq_initial_config_prepare_and_commit() { + let num_nodes = 4; + let setup = TestSetup::spawn_nodes( + "tq_initial_config_prepare_and_commit", + num_nodes, + ) + .await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell all but the last node how to reach each other + for h in &setup.node_handles[0..num_nodes - 1] { + h.load_peer_addresses( + setup + .listen_addrs + .iter() + .take(num_nodes - 1) + .cloned() + .collect(), + ) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all but the last + // node + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes - 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Save the configuration as if we were nexus + let config = + coordinator.coordinator_status().await.unwrap().unwrap().config; + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if h.commit(rack_id, Epoch(1)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes - 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now ensure that the last node still hasn't prepared or committed for + // epoch 1, and isn't connected to any other node. + let status = setup.node_handles.last().unwrap().status().await.unwrap(); + assert!(status.connected_peers.is_empty()); + assert!(status.persistent_state.configs.is_empty()); + assert!(status.persistent_state.shares.is_empty()); + assert!(status.persistent_state.commits.is_empty()); + + // Update connectivity at all nodes + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Now issue a `PrepareAndCommit` to the last node and wait for it to + // commit + wait_for_condition( + async || { + let h = &setup.node_handles.last().unwrap(); + if h.prepare_and_commit(config.clone()).await.unwrap() { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // The last node should now have all the info we expect + let status = setup.node_handles.last().unwrap().status().await.unwrap(); + assert_eq!(status.connected_peers.len(), num_nodes - 1); + assert!(status.persistent_state.configs.contains(&Epoch(1))); + assert!(status.persistent_state.shares.contains(&Epoch(1))); + assert!(status.persistent_state.commits.contains(&Epoch(1))); + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + setup.cleanup_successful(); + } + + /// Perform an initial config, followed by a reconfiguration. Leave one + /// node out of the reconfiguration, then connect it and attempt to load + /// the configuration for the prior epoch. This should result in commit + /// advancing to the latest epoch. + #[tokio::test] + pub async fn tq_reconfig_with_commit_advance() { + let num_nodes = 4; + let setup = TestSetup::spawn_nodes( + "tq_recofnig_with_commit_advance", + num_nodes, + ) + .await; + let rack_id = RackUuid::new_v4(); + + // Trigger an initial configuration by using the first node as a + // coordinator. We're pretending to be the sled-agent with instruction from + // Nexus here. + let initial_config = ReconfigureMsg { + rack_id, + epoch: Epoch(1), + last_committed_epoch: None, + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell all but the last node how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.reconfigure(initial_config.clone()).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles { + if h.commit(rack_id, Epoch(1)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + // Tell all but the last node how to reach each other + // This should disconnect the last node from everybody + for h in &setup.node_handles[0..num_nodes - 1] { + h.load_peer_addresses( + setup.listen_addrs.iter().take(3).cloned().collect(), + ) + .await + .unwrap(); + } + setup + .node_handles + .last() + .unwrap() + .load_peer_addresses(BTreeSet::new()) + .await + .unwrap(); + + // Wait for peers to disconnect + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + let status = h.status().await.unwrap(); + if status.connected_peers.len() == num_nodes - 2 { + acked += 1; + } + } + let status = + setup.node_handles.last().unwrap().status().await.unwrap(); + if status.connected_peers.is_empty() { + acked += 1; + } + + if acked == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Just stick to the same set of nodes for simplicity + let mut new_config = initial_config; + new_config.epoch = Epoch(2); + new_config.last_committed_epoch = Some(Epoch(1)); + + // Pick a different coordinator for the hell of it + let coordinator = setup.node_handles.get(1).unwrap(); + coordinator.reconfigure(new_config).await.unwrap(); + + // Wait for the coordinator to see `PrepareAck`s from all but the last + // node + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes - 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles[0..num_nodes - 1] { + if h.commit(rack_id, Epoch(2)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes - 1 { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now ensure that the last node still hasn't prepared or committed for epoch 2, + // and isn't connected to any other node. + let status = setup.node_handles.last().unwrap().status().await.unwrap(); + assert!(status.connected_peers.is_empty()); + assert!(status.persistent_state.configs.contains(&Epoch(1))); + assert!(status.persistent_state.shares.contains(&Epoch(1))); + assert!(status.persistent_state.commits.contains(&Epoch(1))); + assert!(!status.persistent_state.configs.contains(&Epoch(2))); + assert!(!status.persistent_state.shares.contains(&Epoch(2))); + assert!(!status.persistent_state.commits.contains(&Epoch(2))); + + // Now reconnect the last node. + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + // Clear the rack secrets at the last node to force a request for shares. + let last_node = setup.node_handles.last().unwrap(); + last_node.clear_secrets().await.unwrap(); + + // Load the secret at epoch 1. This should trigger a `CommitAdvance` + // response from nodes that committed at epoch 2. + let res = + setup.node_handles.last().unwrap().load_rack_secret(Epoch(1)).await; + + println!("res = {res:#?}"); + + let rs = last_node.load_rack_secret(Epoch(2)).await.unwrap(); + + // Ensure the rack secret is the same as at another node + let expected = setup + .node_handles + .first() + .unwrap() + .load_rack_secret(Epoch(2)) + .await + .unwrap(); + assert_eq!(rs, expected); + + setup.cleanup_successful(); + } + + #[tokio::test] + pub async fn tq_upgrade_from_lrtq() { + let num_nodes = 4; + let (setup, rack_id) = TestSetup::spawn_nodes_with_lrtq_shares( + "tq_upgrade_from_lrtq", + num_nodes, + ) + .await; + + let msg = LrtqUpgradeMsg { + rack_id, + epoch: Epoch(2), + members: setup.members().cloned().collect(), + threshold: trust_quorum_protocol::Threshold(3), + }; + + // Tell nodes how to reach each other + for h in &setup.node_handles { + h.load_peer_addresses(setup.listen_addrs.iter().cloned().collect()) + .await + .unwrap(); + } + + let coordinator = setup.node_handles.first().unwrap(); + coordinator.upgrade_from_lrtq(msg).await.unwrap(); + + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + + // Wait for the coordinator to see `PrepareAck`s from all nodes + wait_for_condition( + async || { + let Ok(Some(s)) = coordinator.coordinator_status().await else { + return Err(CondCheckError::<()>::NotYet); + }; + if s.acked_prepares.len() == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Commit at each node + // + // Nexus retries this idempotent command until each node acks. So we + // simulate that here. + wait_for_condition( + async || { + let mut acked = 0; + for h in &setup.node_handles { + if h.commit(rack_id, Epoch(2)).await.unwrap() { + acked += 1; + } + } + if acked == num_nodes { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &poll_interval, + &poll_max, + ) + .await + .unwrap(); + + // Now load the rack secret at all nodes + let mut secret = None; + for h in &setup.node_handles { + let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + + setup.cleanup_successful(); } } From 24c3691ef68b540646e807fb0d2e49f1ed94c783 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 31 Oct 2025 19:34:19 +0000 Subject: [PATCH 2/5] Do not internally retry for `load_rack_secret` --- trust-quorum/src/task.rs | 146 +++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 67 deletions(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 0bbae4ac142..ee547c39ff3 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -14,12 +14,10 @@ use slog::{Logger, debug, error, info, o}; use sprockets_tls::keys::SprocketsConfig; use std::collections::BTreeSet; use std::net::SocketAddrV6; -use std::time::Duration; use thiserror::Error; use tokio::sync::mpsc::error::SendError; use tokio::sync::oneshot::error::RecvError; use tokio::sync::{mpsc, oneshot}; -use tokio::time::sleep; use trust_quorum_protocol::{ Alarm, BaseboardId, CommitError, Configuration, Epoch, ExpungedMetadata, LoadRackSecretError, LrtqUpgradeError, LrtqUpgradeMsg, Node, NodeCallerCtx, @@ -27,11 +25,6 @@ use trust_quorum_protocol::{ ReconfigurationError, ReconfigureMsg, ReconstructedRackSecret, }; -#[cfg(not(test))] -const LOAD_RACK_SECRET_RETRY_TIMEOUT: Duration = Duration::from_millis(500); -#[cfg(test)] -const LOAD_RACK_SECRET_RETRY_TIMEOUT: Duration = Duration::from_millis(5); - /// We only expect a handful of messages at a time. const API_CHANNEL_BOUND: usize = 32; @@ -245,20 +238,13 @@ impl NodeTaskHandle { pub async fn load_rack_secret( &self, epoch: Epoch, - ) -> Result { - loop { - let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::LoadRackSecret { epoch, responder: tx }) - .await?; - if let Some(rack_secret) = rx.await?? { - return Ok(rack_secret); - }; - - // The task returns immediately with `None` if the secret is still - // being loaded. We must therefore retry. - sleep(LOAD_RACK_SECRET_RETRY_TIMEOUT).await; - } + ) -> Result, NodeApiError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::LoadRackSecret { epoch, responder: tx }) + .await?; + let rs = rx.await??; + Ok(rs) } /// Return `Ok(true)` if the configuration has committed, `Ok(false)` if @@ -542,7 +528,7 @@ mod tests { use camino::Utf8PathBuf; use dropshot::test_util::{LogContext, log_prefix_for_test}; use omicron_test_utils::dev::poll::{CondCheckError, wait_for_condition}; - use omicron_test_utils::dev::test_setup_log; + use omicron_test_utils::dev::{self, test_setup_log}; use omicron_uuid_kinds::GenericUuid; use secrecy::ExposeSecretMut; use sled_hardware_types::Baseboard; @@ -741,6 +727,36 @@ mod tests { self.logctx.cleanup_successful(); std::fs::remove_dir_all(self.dir).unwrap(); } + + pub async fn wait_for_rack_secrets_and_assert_equality( + &self, + node_indexes: BTreeSet, + epoch: Epoch, + ) -> Result<(), dev::poll::Error> { + let poll_interval = Duration::from_millis(10); + let poll_max = Duration::from_secs(10); + wait_for_condition( + async || { + let mut secret = None; + for (i, h) in self.node_handles.iter().enumerate() { + if node_indexes.contains(&i) { + let Some(rs) = h.load_rack_secret(epoch).await? + else { + return Err(CondCheckError::NotYet); + }; + if secret.is_none() { + secret = Some(rs.clone()); + } + assert_eq!(&rs, secret.as_ref().unwrap()); + } + } + Ok(()) + }, + &poll_interval, + &poll_max, + ) + .await + } } /// Test that all nodes can connect to each other when given each the full @@ -964,14 +980,13 @@ mod tests { .unwrap(); // Now load the rack secret at all nodes - let mut secret = None; - for h in &setup.node_handles { - let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); - if secret.is_none() { - secret = Some(rs.clone()); - } - assert_eq!(&rs, secret.as_ref().unwrap()); - } + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); setup.cleanup_successful(); } @@ -1109,14 +1124,13 @@ mod tests { assert!(status.persistent_state.commits.contains(&Epoch(1))); // Now load the rack secret at all nodes - let mut secret = None; - for h in &setup.node_handles { - let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); - if secret.is_none() { - secret = Some(rs.clone()); - } - assert_eq!(&rs, secret.as_ref().unwrap()); - } + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); setup.cleanup_successful(); } @@ -1202,14 +1216,13 @@ mod tests { .unwrap(); // Now load the rack secret at all nodes - let mut secret = None; - for h in &setup.node_handles { - let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); - if secret.is_none() { - secret = Some(rs.clone()); - } - assert_eq!(&rs, secret.as_ref().unwrap()); - } + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); // Tell all but the last node how to reach each other // This should disconnect the last node from everybody @@ -1332,22 +1345,22 @@ mod tests { // Load the secret at epoch 1. This should trigger a `CommitAdvance` // response from nodes that committed at epoch 2. - let res = - setup.node_handles.last().unwrap().load_rack_secret(Epoch(1)).await; - - println!("res = {res:#?}"); - - let rs = last_node.load_rack_secret(Epoch(2)).await.unwrap(); + setup + .wait_for_rack_secrets_and_assert_equality( + BTreeSet::from([num_nodes - 1]), + Epoch(1), + ) + .await + .unwrap(); - // Ensure the rack secret is the same as at another node - let expected = setup - .node_handles - .first() - .unwrap() - .load_rack_secret(Epoch(2)) + // Ensure the rack secret at epoch 2 is the same as at another node + setup + .wait_for_rack_secrets_and_assert_equality( + BTreeSet::from([0, num_nodes - 1]), + Epoch(2), + ) .await .unwrap(); - assert_eq!(rs, expected); setup.cleanup_successful(); } @@ -1424,14 +1437,13 @@ mod tests { .unwrap(); // Now load the rack secret at all nodes - let mut secret = None; - for h in &setup.node_handles { - let rs = h.load_rack_secret(Epoch(1)).await.unwrap(); - if secret.is_none() { - secret = Some(rs.clone()); - } - assert_eq!(&rs, secret.as_ref().unwrap()); - } + setup + .wait_for_rack_secrets_and_assert_equality( + (0..num_nodes).collect(), + Epoch(1), + ) + .await + .unwrap(); setup.cleanup_successful(); } From b68a144e6b9e0a8673b6c478cb482cf9f2e46f3a Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 31 Oct 2025 19:56:12 +0000 Subject: [PATCH 3/5] more review fixes for @sunshowers --- trust-quorum/src/task.rs | 131 +++++++++++++++++++++++---------------- 1 file changed, 79 insertions(+), 52 deletions(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index ee547c39ff3..24962eb6251 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -25,6 +25,12 @@ use trust_quorum_protocol::{ ReconfigurationError, ReconfigureMsg, ReconstructedRackSecret, }; +/// Whether or not a configuration has committed or is still underway. +pub enum CommitStatus { + Committed, + Pending, +} + /// We only expect a handful of messages at a time. const API_CHANNEL_BOUND: usize = 32; @@ -96,15 +102,15 @@ pub enum NodeApiRequest { ClearSecrets, /// Retrieve connectivity status via the `ConnMgr` - ConnMgrStatus { responder: oneshot::Sender }, + ConnMgrStatus { tx: oneshot::Sender }, /// Return the status of this node if it is a coordinator - CoordinatorStatus { responder: oneshot::Sender> }, + CoordinatorStatus { tx: oneshot::Sender> }, /// Load a rack secret for the given epoch LoadRackSecret { epoch: Epoch, - responder: oneshot::Sender< + tx: oneshot::Sender< Result, LoadRackSecretError>, >, }, @@ -112,29 +118,29 @@ pub enum NodeApiRequest { /// Coordinate an upgrade from LRTQ at this node LrtqUpgrade { msg: LrtqUpgradeMsg, - responder: oneshot::Sender>, + tx: oneshot::Sender>, }, /// Get the overall status of the node - NodeStatus { responder: oneshot::Sender }, + NodeStatus { tx: oneshot::Sender }, /// `PrepareAndCommit` a configuration at this node PrepareAndCommit { config: Configuration, - responder: oneshot::Sender>, + tx: oneshot::Sender>, }, /// `Commit` a configuration at this node Commit { rack_id: RackUuid, epoch: Epoch, - responder: oneshot::Sender>, + tx: oneshot::Sender>, }, /// Coordinate a reconfiguration at this node Reconfigure { msg: ReconfigureMsg, - responder: oneshot::Sender>, + tx: oneshot::Sender>, }, /// Shutdown the node's tokio tasks @@ -197,9 +203,7 @@ impl NodeTaskHandle { msg: ReconfigureMsg, ) -> Result<(), NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::Reconfigure { msg, responder: tx }) - .await?; + self.tx.send(NodeApiRequest::Reconfigure { msg, tx: tx }).await?; rx.await??; Ok(()) } @@ -210,9 +214,7 @@ impl NodeTaskHandle { msg: LrtqUpgradeMsg, ) -> Result<(), NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::LrtqUpgrade { msg, responder: tx }) - .await?; + self.tx.send(NodeApiRequest::LrtqUpgrade { msg, tx: tx }).await?; rx.await??; Ok(()) } @@ -224,9 +226,7 @@ impl NodeTaskHandle { &self, ) -> Result, NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::CoordinatorStatus { responder: tx }) - .await?; + self.tx.send(NodeApiRequest::CoordinatorStatus { tx: tx }).await?; let res = rx.await?; Ok(res) } @@ -240,9 +240,7 @@ impl NodeTaskHandle { epoch: Epoch, ) -> Result, NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::LoadRackSecret { epoch, responder: tx }) - .await?; + self.tx.send(NodeApiRequest::LoadRackSecret { epoch, tx: tx }).await?; let rs = rx.await??; Ok(rs) } @@ -256,10 +254,10 @@ impl NodeTaskHandle { pub async fn prepare_and_commit( &self, config: Configuration, - ) -> Result { + ) -> Result { let (tx, rx) = oneshot::channel(); self.tx - .send(NodeApiRequest::PrepareAndCommit { config, responder: tx }) + .send(NodeApiRequest::PrepareAndCommit { config, tx: tx }) .await?; let res = rx.await??; Ok(res) @@ -275,11 +273,9 @@ impl NodeTaskHandle { &self, rack_id: RackUuid, epoch: Epoch, - ) -> Result { + ) -> Result { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::Commit { rack_id, epoch, responder: tx }) - .await?; + self.tx.send(NodeApiRequest::Commit { rack_id, epoch, tx: tx }).await?; let res = rx.await??; Ok(res) } @@ -303,20 +299,23 @@ impl NodeTaskHandle { Ok(()) } + /// Return information about connectivity to other peers pub async fn conn_mgr_status(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::ConnMgrStatus { responder: tx }).await?; + self.tx.send(NodeApiRequest::ConnMgrStatus { tx: tx }).await?; let res = rx.await?; Ok(res) } + /// Return internal information for the [`Node`] pub async fn status(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::NodeStatus { responder: tx }).await?; + self.tx.send(NodeApiRequest::NodeStatus { tx: tx }).await?; let res = rx.await?; Ok(res) } + /// Shutdown this [`NodeTask`] and all its child tasks pub async fn shutdown(&self) -> Result<(), NodeApiError> { self.tx.send(NodeApiRequest::Shutdown).await?; Ok(()) @@ -458,58 +457,68 @@ impl NodeTask { NodeApiRequest::ClearSecrets => { self.node.clear_secrets(); } - NodeApiRequest::Commit { rack_id, epoch, responder } => { + NodeApiRequest::Commit { rack_id, epoch, tx } => { let res = self .node .commit_configuration(&mut self.ctx, rack_id, epoch) .map(|_| { - self.ctx.persistent_state().commits.contains(&epoch) + if self.ctx.persistent_state().commits.contains(&epoch) + { + CommitStatus::Committed + } else { + CommitStatus::Pending + } }); - let _ = responder.send(res); + let _ = tx.send(res); } - NodeApiRequest::ConnMgrStatus { responder } => { + NodeApiRequest::ConnMgrStatus { tx } => { debug!(self.log, "Received Request for ConnMgrStatus"); - let _ = responder.send(self.conn_mgr.status()); + let _ = tx.send(self.conn_mgr.status()); } - NodeApiRequest::CoordinatorStatus { responder } => { + NodeApiRequest::CoordinatorStatus { tx } => { let status = self.node.get_coordinator_state().map(|cs| { CoordinatorStatus { config: cs.config().clone(), acked_prepares: cs.op().acked_prepares(), } }); - let _ = responder.send(status); + let _ = tx.send(status); } - NodeApiRequest::LoadRackSecret { epoch, responder } => { + NodeApiRequest::LoadRackSecret { epoch, tx } => { let res = self.node.load_rack_secret(&mut self.ctx, epoch); - let _ = responder.send(res); + let _ = tx.send(res); } - NodeApiRequest::LrtqUpgrade { msg, responder } => { + NodeApiRequest::LrtqUpgrade { msg, tx } => { let res = self.node.coordinate_upgrade_from_lrtq(&mut self.ctx, msg); - let _ = responder.send(res); + let _ = tx.send(res); } - NodeApiRequest::NodeStatus { responder } => { - let _ = responder.send(NodeStatus { + NodeApiRequest::NodeStatus { tx } => { + let _ = tx.send(NodeStatus { connected_peers: self.ctx.connected().clone(), alarms: self.ctx.alarms().clone(), persistent_state: self.ctx.persistent_state().into(), }); } - NodeApiRequest::PrepareAndCommit { config, responder } => { + NodeApiRequest::PrepareAndCommit { config, tx } => { let epoch = config.epoch; let res = self .node .prepare_and_commit(&mut self.ctx, config) .map(|_| { - self.ctx.persistent_state().commits.contains(&epoch) + if self.ctx.persistent_state().commits.contains(&epoch) + { + CommitStatus::Committed + } else { + CommitStatus::Pending + } }); - let _ = responder.send(res); + let _ = tx.send(res); } - NodeApiRequest::Reconfigure { msg, responder } => { + NodeApiRequest::Reconfigure { msg, tx } => { let res = self.node.coordinate_reconfiguration(&mut self.ctx, msg); - let _ = responder.send(res); + let _ = tx.send(res); } NodeApiRequest::Shutdown => { info!(self.log, "Shutting down Node tokio tasks"); @@ -963,7 +972,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles { - if h.commit(rack_id, Epoch(1)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { acked += 1; } } @@ -1068,7 +1080,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles[0..num_nodes - 1] { - if h.commit(rack_id, Epoch(1)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed, + ) { acked += 1; } } @@ -1104,7 +1119,10 @@ mod tests { wait_for_condition( async || { let h = &setup.node_handles.last().unwrap(); - if h.prepare_and_commit(config.clone()).await.unwrap() { + if matches!( + h.prepare_and_commit(config.clone()).await.unwrap(), + CommitStatus::Committed + ) { Ok(()) } else { Err(CondCheckError::<()>::NotYet) @@ -1199,7 +1217,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles { - if h.commit(rack_id, Epoch(1)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(1)).await.unwrap(), + CommitStatus::Committed + ) { acked += 1; } } @@ -1305,7 +1326,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles[0..num_nodes - 1] { - if h.commit(rack_id, Epoch(2)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(2)).await.unwrap(), + CommitStatus::Committed + ) { acked += 1; } } @@ -1420,7 +1444,10 @@ mod tests { async || { let mut acked = 0; for h in &setup.node_handles { - if h.commit(rack_id, Epoch(2)).await.unwrap() { + if matches!( + h.commit(rack_id, Epoch(2)).await.unwrap(), + CommitStatus::Committed + ) { acked += 1; } } From d2f7d245c8fe7d3ed0267efafdddf68e5961bb44 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 31 Oct 2025 20:20:56 +0000 Subject: [PATCH 4/5] fix typo --- trust-quorum/src/connection_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trust-quorum/src/connection_manager.rs b/trust-quorum/src/connection_manager.rs index bcc6bbe4914..5adcc44d747 100644 --- a/trust-quorum/src/connection_manager.rs +++ b/trust-quorum/src/connection_manager.rs @@ -712,7 +712,7 @@ impl ConnMgr { /// listen port, just the ephemeral port. /// /// Return the `BaseboardId` of the peer if an established connection is - // torn down. + /// torn down. async fn disconnect_client( &mut self, addr: SocketAddrV6, From 0ef9d1e8e9ec6f07d343718e7bdf07006bd4492c Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 31 Oct 2025 20:30:18 +0000 Subject: [PATCH 5/5] This is what I get for doing a quick search/replace --- trust-quorum/src/task.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/trust-quorum/src/task.rs b/trust-quorum/src/task.rs index 24962eb6251..179caf26a22 100644 --- a/trust-quorum/src/task.rs +++ b/trust-quorum/src/task.rs @@ -203,7 +203,7 @@ impl NodeTaskHandle { msg: ReconfigureMsg, ) -> Result<(), NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::Reconfigure { msg, tx: tx }).await?; + self.tx.send(NodeApiRequest::Reconfigure { msg, tx }).await?; rx.await??; Ok(()) } @@ -214,7 +214,7 @@ impl NodeTaskHandle { msg: LrtqUpgradeMsg, ) -> Result<(), NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::LrtqUpgrade { msg, tx: tx }).await?; + self.tx.send(NodeApiRequest::LrtqUpgrade { msg, tx }).await?; rx.await??; Ok(()) } @@ -226,7 +226,7 @@ impl NodeTaskHandle { &self, ) -> Result, NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::CoordinatorStatus { tx: tx }).await?; + self.tx.send(NodeApiRequest::CoordinatorStatus { tx }).await?; let res = rx.await?; Ok(res) } @@ -240,7 +240,7 @@ impl NodeTaskHandle { epoch: Epoch, ) -> Result, NodeApiError> { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::LoadRackSecret { epoch, tx: tx }).await?; + self.tx.send(NodeApiRequest::LoadRackSecret { epoch, tx }).await?; let rs = rx.await??; Ok(rs) } @@ -256,9 +256,7 @@ impl NodeTaskHandle { config: Configuration, ) -> Result { let (tx, rx) = oneshot::channel(); - self.tx - .send(NodeApiRequest::PrepareAndCommit { config, tx: tx }) - .await?; + self.tx.send(NodeApiRequest::PrepareAndCommit { config, tx }).await?; let res = rx.await??; Ok(res) } @@ -275,7 +273,7 @@ impl NodeTaskHandle { epoch: Epoch, ) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::Commit { rack_id, epoch, tx: tx }).await?; + self.tx.send(NodeApiRequest::Commit { rack_id, epoch, tx }).await?; let res = rx.await??; Ok(res) } @@ -302,7 +300,7 @@ impl NodeTaskHandle { /// Return information about connectivity to other peers pub async fn conn_mgr_status(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::ConnMgrStatus { tx: tx }).await?; + self.tx.send(NodeApiRequest::ConnMgrStatus { tx }).await?; let res = rx.await?; Ok(res) } @@ -310,7 +308,7 @@ impl NodeTaskHandle { /// Return internal information for the [`Node`] pub async fn status(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(NodeApiRequest::NodeStatus { tx: tx }).await?; + self.tx.send(NodeApiRequest::NodeStatus { tx }).await?; let res = rx.await?; Ok(res) }