Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quic update identity #33865

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0471a4e
Initial work
ryleung-solana Jul 25, 2023
f583dc1
Fix problems from bad merge
ryleung-solana Oct 12, 2023
573b53d
Wire some stuff up
ryleung-solana Oct 13, 2023
b2e6b70
Wire more stuff up
ryleung-solana Oct 16, 2023
2e179ab
Wire more stuff up
ryleung-solana Oct 18, 2023
4a9d1f8
Clear connection cache when updating the self-signed key
ryleung-solana Oct 18, 2023
8588396
Testing changes to confirm the key is updated
ryleung-solana Oct 23, 2023
6bcf8a3
Using info for the debug logs
ryleung-solana Oct 24, 2023
5eb1855
Fix clippy and formatting issues
ryleung-solana Oct 25, 2023
a5fe8b9
Fix test compilation
ryleung-solana Oct 26, 2023
ec3dd44
Fix more tests
ryleung-solana Oct 26, 2023
d49902d
Attempt to fix sbf compilation error
ryleung-solana Oct 26, 2023
9da6c41
Try to fix CI
ryleung-solana Nov 6, 2023
fa08f91
Revert "Using info for the debug logs"
ryleung-solana Nov 6, 2023
dc8edb6
Revert "Testing changes to confirm the key is updated"
ryleung-solana Nov 6, 2023
e5e60c7
Re-delete ConnectionMap
ryleung-solana Nov 6, 2023
12ba595
Revert "Fix more tests"
ryleung-solana Nov 7, 2023
e6f2991
Revert "Fix test compilation"
ryleung-solana Nov 7, 2023
0f69b45
Re-fix tests
ryleung-solana Nov 7, 2023
3d716b0
Fix more tests
ryleung-solana Nov 7, 2023
5b56526
Log errors updating the Quic client key
ryleung-solana Nov 8, 2023
ca0d922
Bubble up errors from updating keypair
ryleung-solana Nov 10, 2023
48c9cf3
Fix dumb merge error
ryleung-solana Nov 10, 2023
95104f3
Use struct rather than tuple
ryleung-solana Nov 10, 2023
d9fe30e
Add comment
ryleung-solana Nov 13, 2023
2afb6de
Remove misc unnecessary changes
ryleung-solana Nov 13, 2023
c7b7197
Re-fix formatting
ryleung-solana Nov 14, 2023
d253d3a
Misc cleanup
ryleung-solana Nov 15, 2023
32312bc
Add missing implementation
ryleung-solana Nov 15, 2023
0093a69
Use separate method to update just the keypair
ryleung-solana Nov 20, 2023
bf8a2ca
Remove a lock from ConnectionCache
ryleung-solana Nov 21, 2023
f2a0d74
Move notify pointers for NotifyKeyUpdate to simplify Validator::new i…
ryleung-solana Dec 6, 2023
ddd1efb
Remove unnecessary clones
ryleung-solana Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use {
},
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_sdk::{pubkey::Pubkey, signature::Keypair, transport::Result as TransportResult},
solana_sdk::{
pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair,
transport::Result as TransportResult,
},
solana_streamer::streamer::StakedNodes,
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::{
Expand Down Expand Up @@ -43,6 +46,15 @@ pub enum NonblockingClientConnection {
Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
}

impl NotifyKeyUpdate for ConnectionCache {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
match self {
Self::Udp(_) => Ok(()),
Self::Quic(backend) => backend.update_key(key),
}
}
}

impl ConnectionCache {
pub fn new(name: &'static str) -> Self {
if DEFAULT_CONNECTION_CACHE_USE_QUIC {
Expand Down Expand Up @@ -217,7 +229,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -245,7 +258,11 @@ mod tests {

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: response_recv_endpoint,
thread: response_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
Expand Down
12 changes: 11 additions & 1 deletion connection-cache/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
solana_sdk::{signature::Keypair, timing::AtomicInterval},
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
Expand Down Expand Up @@ -38,6 +38,7 @@ pub trait ConnectionManager: Send + Sync + 'static {

fn new_connection_pool(&self) -> Self::ConnectionPool;
fn new_connection_config(&self) -> Self::NewConnectionConfig;
fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>>;
}

pub struct ConnectionCache<
Expand Down Expand Up @@ -137,6 +138,11 @@ where
.unwrap()
}

pub fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
let mut map = self.map.write().unwrap();
map.clear();
self.connection_manager.update_key(key)
}
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
/// enough used connections in the connection pool for the specified address.
/// Returns CreateConnectionResult.
Expand Down Expand Up @@ -636,6 +642,10 @@ mod tests {
fn new_connection_config(&self) -> Self::NewConnectionConfig {
MockUdpConfig::new().unwrap()
}

fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}

impl BlockingClientConnection for MockUdpConnection {
Expand Down
3 changes: 2 additions & 1 deletion core/src/admin_rpc_post_init.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::pubkey::Pubkey,
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
std::{
collections::HashSet,
sync::{Arc, RwLock},
Expand All @@ -14,4 +14,5 @@ pub struct AdminRpcRequestMetadataPostInit {
pub bank_forks: Arc<RwLock<BankForks>>,
pub vote_account: Pubkey,
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
}
47 changes: 29 additions & 18 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use {
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Tpu {
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
) -> Self {
) -> (Self, Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>) {
let TpuSockets {
transactions: transactions_sockets,
transaction_forwards: tpu_forwards_sockets,
Expand Down Expand Up @@ -148,7 +148,11 @@ impl Tpu {

let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();

let (_, tpu_quic_t) = spawn_server(
let SpawnServerResult {
endpoint: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server(
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
Expand All @@ -168,7 +172,11 @@ impl Tpu {
)
.unwrap();

let (_, tpu_forwards_quic_t) = spawn_server(
let SpawnServerResult {
endpoint: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server(
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
Expand Down Expand Up @@ -259,19 +267,22 @@ impl Tpu {
turbine_quic_endpoint_sender,
);

Self {
fetch_stage,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
}
(
Self {
fetch_stage,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
},
vec![key_updater, forwards_key_updater],
)
}

pub fn join(self) -> thread::Result<()> {
Expand Down
19 changes: 11 additions & 8 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,13 +1080,6 @@ impl Validator {
exit.clone(),
);

*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
bank_forks: bank_forks.clone(),
cluster_info: cluster_info.clone(),
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
});

let waited_for_supermajority = wait_for_supermajority(
config,
Some(&mut process_blockstore),
Expand Down Expand Up @@ -1295,7 +1288,7 @@ impl Validator {
};
}

let tpu = Tpu::new(
let (tpu, mut key_notifies) = Tpu::new(
&cluster_info,
&poh_recorder,
entry_receiver,
Expand Down Expand Up @@ -1346,6 +1339,16 @@ impl Validator {
);

*start_progress.write().unwrap() = ValidatorStartProgress::Running;
key_notifies.push(connection_cache);

*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
bank_forks: bank_forks.clone(),
cluster_info: cluster_info.clone(),
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
notifies: key_notifies,
});

Ok(Self {
stats_reporter_service,
gossip_service,
Expand Down
56 changes: 45 additions & 11 deletions quic-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,52 @@ impl ConnectionPool for QuicPool {
}
}

#[derive(Clone)]
pub struct QuicConfig {
client_certificate: Arc<QuicClientCertificate>,
// Arc to prevent having to copy the struct
client_certificate: RwLock<Arc<QuicClientCertificate>>,
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
maybe_client_pubkey: Option<Pubkey>,

// The optional specified endpoint for the quic based client connections
// If not specified, the connection cache will create as needed.
client_endpoint: Option<Endpoint>,
addr: IpAddr,
}

impl Clone for QuicConfig {
fn clone(&self) -> Self {
let cert_guard = self.client_certificate.read().unwrap();
QuicConfig {
client_certificate: RwLock::new(cert_guard.clone()),
maybe_staked_nodes: self.maybe_staked_nodes.clone(),
maybe_client_pubkey: self.maybe_client_pubkey,
client_endpoint: self.client_endpoint.clone(),
addr: self.addr,
}
}
}

impl NewConnectionConfig for QuicConfig {
fn new() -> Result<Self, ClientError> {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))?;
let addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
let (cert, priv_key) = new_self_signed_tls_certificate(&Keypair::new(), addr)?;
Ok(Self {
client_certificate: Arc::new(QuicClientCertificate {
client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
}),
})),
maybe_staked_nodes: None,
maybe_client_pubkey: None,
client_endpoint: None,
addr,
})
}
}

impl QuicConfig {
fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
self.client_endpoint.as_ref().cloned(),
)
let cert_guard = self.client_certificate.read().unwrap();
QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
}

fn compute_max_parallel_streams(&self) -> usize {
Expand All @@ -143,7 +156,23 @@ impl QuicConfig {
ipaddr: IpAddr,
) -> Result<(), RcgenError> {
let (cert, priv_key) = new_self_signed_tls_certificate(keypair, ipaddr)?;
self.client_certificate = Arc::new(QuicClientCertificate {
self.addr = ipaddr;

let mut cert_guard = self.client_certificate.write().unwrap();

*cert_guard = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});
Ok(())
}

pub fn update_keypair(&self, keypair: &Keypair) -> Result<(), RcgenError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update works only when a new connection pool is created. How do we have handle existing connection cached?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See connection-cache/src/connection_cache.rs line 141

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but I think it won't fix the existing connection's endpoints. We would similar things you did to the server side: like

    let (config, _) = configure_server(key, self.gossip_host)?;
    self.endpoint.set_server_config(Some(config));
    Ok(())

let (cert, priv_key) = new_self_signed_tls_certificate(keypair, self.addr)?;

let mut cert_guard = self.client_certificate.write().unwrap();

*cert_guard = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});
Expand Down Expand Up @@ -212,6 +241,11 @@ impl ConnectionManager for QuicConnectionManager {
fn new_connection_config(&self) -> QuicConfig {
self.connection_config.clone()
}

fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
self.connection_config.update_keypair(key)?;
Ok(())
}
}

impl QuicConnectionManager {
Expand Down
22 changes: 17 additions & 5 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -68,7 +68,11 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip) = server_args();
let (_, t) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: _,
thread: t,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
s.try_clone().unwrap(),
&keypair,
Expand Down Expand Up @@ -204,7 +208,11 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip) = server_args();
let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: request_recv_endpoint,
thread: request_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
request_recv_socket.try_clone().unwrap(),
&keypair,
Expand All @@ -228,7 +236,11 @@ mod tests {
let addr = response_recv_socket.local_addr().unwrap().ip();
let port = response_recv_socket.local_addr().unwrap().port();
let server_addr = SocketAddr::new(addr, port);
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: response_recv_endpoint,
thread: response_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
Expand Down
7 changes: 6 additions & 1 deletion sdk/src/quic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(feature = "full")]
//! Definitions related to Solana over QUIC.
use std::time::Duration;
use {crate::signer::keypair::Keypair, std::time::Duration};

pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
Expand Down Expand Up @@ -35,3 +36,7 @@ pub const QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO: u64 = 128;
/// The receive window for QUIC connection from maximum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO: u64 = 512;

pub trait NotifyKeyUpdate {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>>;
}
Loading