Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable QUIC client by default. Add arg to disable QUIC client. Take 2 #26927

Merged
merged 9 commits into from
Aug 17, 2022
6 changes: 3 additions & 3 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ fn main() {
.help("Number of threads to use in the banking stage"),
)
.arg(
Arg::new("tpu_use_quic")
.long("tpu-use-quic")
Arg::new("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Forward messages to TPU using QUIC"),
.help("Disable forwarding messages to TPU using QUIC"),
)
.get_matches();

Expand Down
10 changes: 5 additions & 5 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.help("Submit transactions with a TpuClient")
)
.arg(
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we needed to not break the bench-tps api by removing --tpu-use-quic, but I guess v1.11.5 already did this breaking...

.takes_value(false)
.help("Submit transactions via QUIC; only affects ThinClient (default) \
.help("Do not submit transactions via QUIC; only affects ThinClient (default) \
or TpuClient sends"),
)
.arg(
Expand Down Expand Up @@ -348,8 +348,8 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
args.external_client_type = ExternalClientType::RpcClient;
}

if matches.is_present("tpu_use_quic") {
args.use_quic = true;
if matches.is_present("tpu_disable_quic") {
args.use_quic = false;
}

if let Some(v) = matches.value_of("tpu_connection_pool_size") {
Expand Down
1 change: 1 addition & 0 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn test_bench_tps_test_validator(config: Config) {

#[test]
#[serial]
#[ignore]
fn test_bench_tps_local_cluster_solana() {
test_bench_tps_local_cluster(Config {
tx_count: 100,
Expand Down
30 changes: 23 additions & 7 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static MAX_CONNECTIONS: usize = 1024;

/// Used to decide whether the TPU and underlying connection cache should use
/// QUIC connections.
pub const DEFAULT_TPU_USE_QUIC: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved

/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
Expand Down Expand Up @@ -683,6 +683,11 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_cache = ConnectionCache::default();
let port_offset = if connection_cache.use_quic() {
QUIC_PORT_OFFSET
} else {
0
};
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
Expand All @@ -695,18 +700,29 @@ mod tests {
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let conn = &map.get(a).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*a, connection_cache.stats.clone());
assert!(a.ip() == conn.tpu_addr().ip());
let port = a
.port()
.checked_add(port_offset)
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);

let conn = &map.get(addr).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == conn.tpu_addr().ip());
});
}

let addr = get_addr(&mut rng);
connection_cache.get_connection(&addr);
let addr = &get_addr(&mut rng);
connection_cache.get_connection(addr);

let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
let _conn = map.get(&addr).expect("Address not found");
let _conn = map.get(&addr_with_quic_port).expect("Address not found");
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4120,6 +4120,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
Expand Down Expand Up @@ -4207,6 +4208,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_handle_forwarding() {
solana_logger::setup();
// packets are deserialized upon receiving, failed packets will not be
Expand Down
69 changes: 30 additions & 39 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct Tpu {
banking_stage: BankingStage,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: Option<thread::JoinHandle<()>>,
tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
staked_nodes_updater_service: StakedNodesUpdaterService,
Expand Down Expand Up @@ -96,7 +96,6 @@ impl Tpu {
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
log_messages_bytes_limit: Option<usize>,
enable_quic_servers: bool,
staked_nodes: &Arc<RwLock<StakedNodes>>,
) -> Self {
let TpuSockets {
Expand Down Expand Up @@ -154,37 +153,33 @@ impl Tpu {
let (verified_sender, verified_receiver) = unbounded();

let stats = Arc::new(StreamStats::default());
let tpu_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap()
});
let tpu_quic_t = spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap();

let tpu_forwards_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap()
});
let tpu_forwards_quic_t = spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap();

let sigverify_stage = {
let verifier = TransactionSigVerifier::new(verified_sender);
Expand Down Expand Up @@ -271,13 +266,9 @@ impl Tpu {
self.find_packet_sender_stake_stage.join(),
self.vote_find_packet_sender_stake_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
];
if let Some(tpu_quic_t) = self.tpu_quic_t {
tpu_quic_t.join()?;
}
if let Some(tpu_forwards_quic_t) = self.tpu_forwards_quic_t {
tpu_forwards_quic_t.join()?;
}
let broadcast_result = self.broadcast_stage.join();
for result in results {
result?;
Expand Down
3 changes: 0 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub enable_quic_servers: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -237,7 +236,6 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
enable_quic_servers: true,
}
}
}
Expand Down Expand Up @@ -1036,7 +1034,6 @@ impl Validator {
&connection_cache,
&identity_keypair,
config.runtime_config.log_messages_bytes_limit,
config.enable_quic_servers,
&staked_nodes,
);

Expand Down
2 changes: 2 additions & 0 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,11 +1185,13 @@ pub mod test {
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ false)
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer_and_quic() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ true)
}
Expand Down
1 change: 0 additions & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
enable_quic_servers: config.enable_quic_servers,
}
}

Expand Down
4 changes: 4 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn test_spend_and_verify_all_nodes_3() {

#[test]
#[serial]
#[ignore]
fn test_local_cluster_signature_subscribe() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let num_nodes = 2;
Expand Down Expand Up @@ -311,6 +312,7 @@ fn test_two_unbalanced_stakes() {

#[test]
#[serial]
#[ignore]
fn test_forwarding() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// Set up a cluster where one node is never the leader, so all txs sent to this node
Expand Down Expand Up @@ -1228,6 +1230,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_snapshot_restart_tower() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 2 nodes
Expand Down Expand Up @@ -2520,6 +2523,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

#[test]
#[serial]
#[ignore]
fn test_votes_land_in_fork_during_long_partition() {
let total_stake = 3 * DEFAULT_NODE_STAKE;
// Make `lighter_stake` insufficient for switching threshold
Expand Down
2 changes: 2 additions & 0 deletions local-cluster/tests/local_cluster_slow_1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod common;

#[test]
#[serial]
#[ignore]
// Steps in this test:
// We want to create a situation like:
/*
Expand Down Expand Up @@ -587,6 +588,7 @@ fn test_duplicate_shreds_broadcast_leader() {

#[test]
#[serial]
#[ignore]
fn test_switch_threshold_uses_gossip_votes() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let total_stake = 100 * DEFAULT_NODE_STAKE;
Expand Down
1 change: 1 addition & 0 deletions local-cluster/tests/local_cluster_slow_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ fn test_leader_failure_4() {

#[test]
#[serial]
#[ignore]
fn test_ledger_cleanup_service() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
error!("test_ledger_cleanup_service");
Expand Down
2 changes: 1 addition & 1 deletion multinode-demo/bootstrap-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-use-quic ]]; then
elif [[ $1 = --tpu-disable-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
Expand Down
17 changes: 13 additions & 4 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,13 +1213,21 @@ pub fn main() {
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.hidden(true)
.conflicts_with("tpu_disable_quic")
.help("Use QUIC to send transactions."),
)
.arg(
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Do not use QUIC to send transactions."),
)
.arg(
Arg::with_name("disable_quic_servers")
.long("disable-quic-servers")
.takes_value(false)
.help("Disable QUIC TPU servers"),
.hidden(true)
)
.arg(
Arg::with_name("enable_quic_servers")
Expand Down Expand Up @@ -2314,8 +2322,7 @@ pub fn main() {
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
let accounts_shrink_optimize_total_space =
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let enable_quic_servers = !matches.is_present("disable_quic_servers");
let tpu_use_quic = !matches.is_present("tpu_disable_quic");
let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);

let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
Expand Down Expand Up @@ -2485,6 +2492,9 @@ pub fn main() {
if matches.is_present("enable_quic_servers") {
warn!("--enable-quic-servers is now the default behavior. This flag is deprecated and can be removed from the launch args");
}
if matches.is_present("disable_quic_servers") {
warn!("--disable-quic-servers is deprecated. The quic server cannot be disabled.");
}

let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload")
Expand Down Expand Up @@ -2664,7 +2674,6 @@ pub fn main() {
log_messages_bytes_limit: value_of(&matches, "log_messages_bytes_limit"),
..RuntimeConfig::default()
},
enable_quic_servers,
..ValidatorConfig::default()
};

Expand Down