Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Rayon threadpool threads #5871

Merged
merged 23 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ members = [
"fixed-buf",
"vote-signer",
"cli",
"rayon-threadlimit",
]

exclude = [
Expand Down
2 changes: 1 addition & 1 deletion ci/buildkite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
- wait
- command: "ci/test-stable-perf.sh"
name: "stable-perf"
timeout_in_minutes: 30
timeout_in_minutes: 40
artifact_paths: "log-*.txt"
agents:
- "queue=cuda"
Expand Down
2 changes: 1 addition & 1 deletion ci/test-stable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ test-stable-perf)

# Run root package library tests
_ cargo +"$rust_stable" build --all --tests --bins ${V:+--verbose} --features="$ROOT_FEATURES"
_ cargo +"$rust_stable" test --manifest-path=core/Cargo.toml ${V:+--verbose} --features="$ROOT_FEATURES" -- --nocapture
_ cargo +"$rust_stable" test --manifest-path=core/Cargo.toml ${V:+--verbose} --features="$ROOT_FEATURES" -- --nocapture
sagar-solana marked this conversation as resolved.
Show resolved Hide resolved
;;
*)
echo "Error: Unknown test: $testName"
Expand Down
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ tokio-codec = "0.1"
tokio-fs = "0.1"
tokio-io = "0.1"
untrusted = "0.7.0"
lazy_static = "1.4.0"
sagar-solana marked this conversation as resolved.
Show resolved Hide resolved
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }

# reed-solomon-erasure's simd_c feature fails to build for x86_64-pc-windows-msvc, use pure-rust
[target.'cfg(windows)'.dependencies]
Expand Down
11 changes: 6 additions & 5 deletions core/src/blocktree_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, SlotMeta};
use crate::entry::{Entry, EntrySlice};
use crate::leader_schedule_cache::LeaderScheduleCache;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
Expand All @@ -16,16 +18,15 @@ use std::result;
use std::sync::Arc;
use std::time::{Duration, Instant};

use rand::seq::SliceRandom;
use rand::thread_rng;

pub const NUM_THREADS: u32 = 10;
use solana_rayon_threadlimit::thread_count::get_thread_count;
sagar-solana marked this conversation as resolved.
Show resolved Hide resolved
use std::cell::RefCell;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.num_threads(get_thread_count())
.build()
.unwrap()));
.unwrap())
);

fn first_err(results: &[Result<()>]) -> Result<()> {
for r in results {
Expand Down
5 changes: 3 additions & 2 deletions core/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use solana_merkle_tree::MerkleTree;
use solana_metrics::inc_new_counter_warn;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing;
use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::cell::RefCell;
Expand All @@ -22,7 +23,7 @@ use std::sync::{Arc, RwLock};

#[cfg(feature = "cuda")]
use crate::sigverify::poh_verify_many;
use solana_sdk::timing;
use solana_rayon_threadlimit::thread_count::get_thread_count;
#[cfg(feature = "cuda")]
use std::sync::Mutex;
#[cfg(feature = "cuda")]
Expand All @@ -32,7 +33,7 @@ use std::time::Instant;
pub const NUM_THREADS: u32 = 10;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.num_threads(get_thread_count())
.build()
.unwrap()));

Expand Down
9 changes: 4 additions & 5 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::Transaction;
use std::mem::size_of;

#[cfg(feature = "cuda")]
use std::os::raw::{c_int, c_uint};

#[cfg(feature = "cuda")]
use core::ffi::c_void;

use solana_rayon_threadlimit::thread_count::get_thread_count;
#[cfg(feature = "cuda")]
use std::os::raw::{c_int, c_uint};
pub const NUM_THREADS: u32 = 10;
use std::cell::RefCell;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.num_threads(get_thread_count())
.build()
.unwrap()));

Expand Down
3 changes: 2 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::streamer::{PacketReceiver, PacketSender};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use rayon::ThreadPool;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_rayon_threadlimit::thread_count::get_thread_count;
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
Expand Down Expand Up @@ -205,7 +206,7 @@ impl WindowService {
trace!("{}: RECV_WINDOW started", id);
let mut now = Instant::now();
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.num_threads(get_thread_count())
.build()
.unwrap();
loop {
Expand Down
1 change: 1 addition & 0 deletions local_cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ solana-storage-program = { path = "../programs/storage_program", version = "0.19
solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" }
symlink = "0.1.0"
tempfile = "3.1.0"
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }

[dev-dependencies]
serial_test = "0.2.0"
Expand Down
3 changes: 3 additions & 0 deletions local_cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use solana_core::{
service::Service,
validator::{Validator, ValidatorConfig},
};
use solana_rayon_threadlimit::thread_count::init_test_thread_count;
use solana_sdk::{
client::SyncClient,
clock::DEFAULT_TICKS_PER_SLOT,
Expand Down Expand Up @@ -116,6 +117,8 @@ impl LocalCluster {
}

pub fn new(config: &ClusterConfig) -> Self {
init_test_thread_count();

assert_eq!(config.validator_configs.len(), config.node_stakes.len());
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
Expand Down
18 changes: 4 additions & 14 deletions local_cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use tempfile::TempDir;

#[test]
#[serial]
#[allow(unused_attributes)]
#[ignore]
fn test_ledger_cleanup_service() {
solana_logger::setup();
error!("test_ledger_cleanup_service");
Expand Down Expand Up @@ -111,9 +109,8 @@ fn test_spend_and_verify_all_nodes_3() {
);
}

#[allow(unused_attributes)]
#[test]
#[serial]
#[allow(unused_attributes)]
#[ignore]
fn test_spend_and_verify_all_nodes_env_num_nodes() {
solana_logger::setup();
Expand All @@ -132,7 +129,6 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() {

#[allow(unused_attributes)]
#[test]
#[serial]
#[should_panic]
fn test_fullnode_exit_default_config_should_panic() {
solana_logger::setup();
Expand Down Expand Up @@ -161,10 +157,8 @@ fn test_fullnode_exit_2() {
}

// Cluster needs a supermajority to remain, so the minimum size for this test is 4
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_leader_failure_4() {
solana_logger::setup();
error!("test_leader_failure_4");
Expand Down Expand Up @@ -222,14 +216,14 @@ fn test_two_unbalanced_stakes() {
}

#[test]
#[ignore]
#[serial]
fn test_forwarding() {
// Set up a cluster where one node is never the leader, so all txs sent to this node
// will be have to be forwarded in order to be confirmed
let config = ClusterConfig {
node_stakes: vec![999_990, 3],
cluster_lamports: 2_000_000,
validator_configs: vec![ValidatorConfig::default(); 3],
validator_configs: vec![ValidatorConfig::default(); 2],
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
Expand Down Expand Up @@ -363,7 +357,6 @@ fn test_snapshot_restart_locktower() {
);
}

#[allow(unused_attributes)]
#[test]
#[serial]
fn test_snapshots_blocktree_floor() {
Expand Down Expand Up @@ -524,15 +517,14 @@ fn test_snapshots_restart_validity() {
}
}

#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_fail_entry_verification_leader() {
test_faulty_node(BroadcastStageType::FailEntryVerification);
}

#[test]
#[allow(unused_attributes)]
#[ignore]
fn test_fake_blobs_broadcast_leader() {
test_faulty_node(BroadcastStageType::BroadcastFakeBlobs);
Expand Down Expand Up @@ -591,9 +583,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) {
);
}

#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_repairman_catchup() {
solana_logger::setup();
Expand Down
2 changes: 2 additions & 0 deletions rayon-threadlimit/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target/
/farf/
13 changes: 13 additions & 0 deletions rayon-threadlimit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "solana-rayon-threadlimit"
version = "0.19.0-pre0"
homepage = "https://solana.com/"
readme = "../README.md"
repository = "https://github.com/solana-labs/solana"
authors = ["Solana Maintainers <maintainers@solana.com>"]
license = "Apache-2.0"
edition = "2018"

[dependencies]
lazy_static = "1.4.0"
sys-info = "0.5.7"
4 changes: 4 additions & 0 deletions rayon-threadlimit/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod thread_count;
sagar-solana marked this conversation as resolved.
Show resolved Hide resolved

#[macro_use]
extern crate lazy_static;
15 changes: 15 additions & 0 deletions rayon-threadlimit/src/thread_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::sync::RwLock;

//TODO remove this hack when rayon fixes itself
lazy_static! {
static ref MAX_RAYON_THREADS: RwLock<usize> =
RwLock::new(sys_info::cpu_num().unwrap() as usize);
}

pub fn get_thread_count() -> usize {
*MAX_RAYON_THREADS.read().unwrap()
}

pub fn init_test_thread_count() {
sagar-solana marked this conversation as resolved.
Show resolved Hide resolved
*MAX_RAYON_THREADS.write().unwrap() = 1;
}
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" }
solana-vote-program = { path = "../programs/vote_program", version = "0.19.0-pre0" }
sys-info = "0.5.7"
tempfile = "3.1.0"
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }

[lib]
crate-type = ["lib"]
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde::de::{MapAccess, Visitor};
use serde::ser::{SerializeMap, Serializer};
use serde::{Deserialize, Serialize};
use solana_measure::measure::Measure;
use solana_rayon_threadlimit::thread_count::get_thread_count;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
Expand All @@ -39,7 +40,6 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use sys_info;
use tempfile::TempDir;

pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
Expand Down Expand Up @@ -370,7 +370,7 @@ pub struct AccountsDB {

impl Default for AccountsDB {
fn default() -> Self {
let num_threads = sys_info::cpu_num().unwrap_or(DEFAULT_NUM_THREADS) as usize;
let num_threads = get_thread_count();

AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
Expand Down