Skip to content

Commit

Permalink
Move default value for --rpc-pubsub-notification-threads to CLI (#158)
Browse files Browse the repository at this point in the history
The default value was previously being determined down where the thread
pool is being created. Providing a default value at the CLI level is
consistent with other args, and gives an operator better visibility into
what the default will actually be
  • Loading branch information
steviez authored Mar 12, 2024
1 parent 8c446f2 commit f8bb98b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions rpc/src/rpc_pubsub_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use {
jsonrpc_core::IoHandler,
soketto::handshake::{server, Server},
solana_metrics::TokenCounter,
solana_rayon_threadlimit::get_thread_count,
solana_sdk::timing::AtomicInterval,
std::{
io,
net::SocketAddr,
num::NonZeroUsize,
str,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Expand Down Expand Up @@ -43,7 +45,7 @@ pub struct PubSubConfig {
pub queue_capacity_items: usize,
pub queue_capacity_bytes: usize,
pub worker_threads: usize,
pub notification_threads: Option<usize>,
pub notification_threads: Option<NonZeroUsize>,
}

impl Default for PubSubConfig {
Expand All @@ -55,7 +57,7 @@ impl Default for PubSubConfig {
queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: None,
notification_threads: NonZeroUsize::new(get_thread_count()),
}
}
}
Expand All @@ -69,7 +71,7 @@ impl PubSubConfig {
queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: Some(2),
notification_threads: NonZeroUsize::new(2),
}
}
}
Expand Down
71 changes: 31 additions & 40 deletions rpc/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use {
solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count,
solana_rpc_client_api::response::{
ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
Expand Down Expand Up @@ -631,41 +630,37 @@ impl RpcSubscriptions {
config.queue_capacity_bytes,
)),
};
let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count);
let t_cleanup = if notification_threads == 0 {
None
} else {

let t_cleanup = config.notification_threads.map(|notification_threads| {
let exit = exit.clone();
Some(
Builder::new()
.name("solRpcNotifier".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads)
.thread_name(|i| format!("solRpcNotify{i:02}"))
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {
rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
}
Self::process_notifications(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap(),
)
};
Builder::new()
.name("solRpcNotifier".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads.get())
.thread_name(|i| format!("solRpcNotify{i:02}"))
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {
rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
}
Self::process_notifications(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap()
});

let control = SubscriptionControl::new(
config.max_active_subscriptions,
Expand All @@ -674,11 +669,7 @@ impl RpcSubscriptions {
);

Self {
notification_sender: if notification_threads == 0 {
None
} else {
Some(notification_sender)
},
notification_sender: config.notification_threads.map(|_| notification_sender),
t_cleanup,
exit,
control,
Expand Down
1 change: 1 addition & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-poh = { workspace = true }
solana-program-runtime = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
solana_faucet::faucet::{self, FAUCET_PORT},
solana_ledger::use_snapshot_archives_at_startup,
solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE},
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{rpc::MAX_REQUEST_BODY_SIZE, rpc_pubsub_service::PubSubConfig},
solana_rpc_client_api::request::MAX_MULTIPLE_ACCOUNTS,
solana_runtime::{
Expand Down Expand Up @@ -1079,6 +1080,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(true)
.value_name("NUM_THREADS")
.validator(is_parsable::<usize>)
.default_value_if(
"full_rpc_api",
None,
&default_args.rpc_pubsub_notification_threads,
)
.help(
"The maximum number of threads that RPC PubSub will use for generating \
notifications. 0 will disable RPC PubSub notifications",
Expand Down Expand Up @@ -2138,6 +2144,7 @@ pub struct DefaultArgs {
pub rpc_bigtable_max_message_size: String,
pub rpc_max_request_body_size: String,
pub rpc_pubsub_worker_threads: String,
pub rpc_pubsub_notification_threads: String,

pub maximum_local_snapshot_age: String,
pub maximum_full_snapshot_archives_to_retain: String,
Expand Down Expand Up @@ -2225,6 +2232,7 @@ impl DefaultArgs {
rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE
.to_string(),
rpc_pubsub_worker_threads: "4".to_string(),
rpc_pubsub_notification_threads: get_thread_count().to_string(),
maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN
.to_string(),
maximum_incremental_snapshot_archives_to_retain:
Expand Down
8 changes: 3 additions & 5 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,11 +1382,9 @@ pub fn main() {
usize
),
worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
notification_threads: if full_api {
value_of(&matches, "rpc_pubsub_notification_threads")
} else {
Some(0)
},
notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize)
.ok()
.and_then(NonZeroUsize::new),
},
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
Expand Down

0 comments on commit f8bb98b

Please sign in to comment.