Skip to content

Commit

Permalink
Merge pull request #129 from robinbernon/threadpool_to_take_num_cores…
Browse files Browse the repository at this point in the history
…_input

Adding input param of threads to ThreadPool::new()
  • Loading branch information
alecmocatta committed Mar 13, 2021
2 parents fb9ee3b + cbdaec0 commit 7f071aa
Show file tree
Hide file tree
Showing 34 changed files with 96 additions and 63 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct LogLine {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down Expand Up @@ -133,7 +133,7 @@ use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down Expand Up @@ -169,7 +169,7 @@ use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down Expand Up @@ -215,7 +215,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.build()
.unwrap()
.block_on(async {
let pool = ProcessPool::new(None, None, Resources::default())?;
let pool = ProcessPool::new(None, None, None, Resources::default())?;

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down
4 changes: 2 additions & 2 deletions amadeus-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ where
}

#[cfg(not(nightly))]
type Output<Row> = Pin<Box<dyn Stream<Item = Result<Row, PostgresError>> + Send>>;
type Output<Row: PostgresData> = Pin<Box<dyn Stream<Item = Result<Row, PostgresError>> + Send>>;
#[cfg(nightly)]
type Output<Row> = impl Stream<Item = Result<Row, PostgresError>> + Send;
type Output<Row: PostgresData> = impl Stream<Item = Result<Row, PostgresError>> + Send;

FnMutNamed! {
pub type Closure<Row> = |self|(config, tables)=> (ConnectParams, Vec<PostgresSelect>)| -> Output<Row>
Expand Down
4 changes: 2 additions & 2 deletions amadeus-serde/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ where

type Error<P, E> = CsvError<E, <P as Partition>::Error, <<P as Partition>::Page as Page>::Error>;
#[cfg(not(nightly))]
type Output<P, Row, E> = std::pin::Pin<Box<dyn Stream<Item = Result<Row, Error<P, E>>>>>;
type Output<P, Row: SerdeData, E> = std::pin::Pin<Box<dyn Stream<Item = Result<Row, Error<P, E>>>>>;
#[cfg(nightly)]
type Output<P: Partition, Row, E> = impl Stream<Item = Result<Row, Error<P, E>>>;
type Output<P: Partition, Row: SerdeData, E> = impl Stream<Item = Result<Row, Error<P, E>>>;

FnMutNamed! {
pub type Closure<P, Row, E> = |self|partition=> P| -> Output<P, Row, E>
Expand Down
4 changes: 2 additions & 2 deletions amadeus-serde/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ where

type Error<P, E> = JsonError<E, <P as Partition>::Error, <<P as Partition>::Page as Page>::Error>;
#[cfg(not(nightly))]
type Output<P, Row, E> = std::pin::Pin<Box<dyn Stream<Item = Result<Row, Error<P, E>>>>>;
type Output<P, Row: SerdeData, E> = std::pin::Pin<Box<dyn Stream<Item = Result<Row, Error<P, E>>>>>;
#[cfg(nightly)]
type Output<P: Partition, Row, E> = impl Stream<Item = Result<Row, Error<P, E>>>;
type Output<P: Partition, Row: SerdeData, E> = impl Stream<Item = Result<Row, Error<P, E>>>;

FnMutNamed! {
pub type Closure<P, Row, E> = |self|partition=> P| -> Output<P, Row, E>
Expand Down
2 changes: 1 addition & 1 deletion amadeus-types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl Timezone {
_ => None,
},
}
.map(|tz| tz.name())
.map(Tz::name)
}
/// Makes a new Timezone for the Eastern Hemisphere with given timezone difference. The negative seconds means the Western Hemisphere.
pub fn from_offset(seconds: i32) -> Option<Self> {
Expand Down
2 changes: 1 addition & 1 deletion benches/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[derive(Data, Clone, Deserialize, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
2 changes: 1 addition & 1 deletion benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[bench]
fn vec(b: &mut Bencher) {
Expand Down
2 changes: 1 addition & 1 deletion benches/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[derive(Data, Clone, PartialEq, Debug)]
struct TenKayVeeTwo {
Expand Down
2 changes: 1 addition & 1 deletion examples/cloudfront_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use amadeus::prelude::*;
#[allow(unreachable_code)]
#[tokio::main]
async fn main() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Cloudfront::new_with(
AwsRegion::UsEast1,
Expand Down
2 changes: 1 addition & 1 deletion examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use amadeus::prelude::*;

#[tokio::main]
async fn main() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Cloudfront::new_with(
AwsRegion::UsEast1,
Expand Down
2 changes: 1 addition & 1 deletion examples/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use amadeus::{data::Webpage, dist::prelude::*};
async fn main() {
return; // TODO: runs for a long time

let pool = ThreadPool::new(None).unwrap();
let pool = ThreadPool::new(None, None).unwrap();

let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/commoncrawl_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn main() {
// Accept the number of processes at the command line, defaulting to the maximum available
let processes = env::args().nth(1).and_then(|arg| arg.parse::<usize>().ok());

let pool = ProcessPool::new(processes, None, Resources::default()).unwrap();
let pool = ProcessPool::new(processes, None, None, Resources::default()).unwrap();

let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();

Expand Down
12 changes: 6 additions & 6 deletions src/pool/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ struct ProcessPoolInner {
impl ProcessPoolInner {
#[allow(clippy::double_parens)] // TODO: work out what's triggering this
fn new(
processes: Option<usize>, tasks_per_core: Option<usize>, resources: Resources,
processes: Option<usize>, threads: Option<usize>, tasks: Option<usize>,
resources: Resources,
) -> Result<Self, SpawnError> {
let processes = processes.unwrap_or(3); // TODO!
let mut processes_vec = Vec::with_capacity(processes);
Expand All @@ -130,7 +131,7 @@ impl ProcessPoolInner {
let receiver = Receiver::<Option<Request>>::new(parent);
let sender = Sender::<Result<Response, Panicked>>::new(parent);

let thread_pool = ThreadPool::new(tasks_per_core).unwrap();
let thread_pool = ThreadPool::new(threads, tasks).unwrap();

while let Some(work) = receiver.recv().await.unwrap() {
let ret = panic::catch_unwind(panic::AssertUnwindSafe(|| {
Expand Down Expand Up @@ -334,12 +335,11 @@ pub struct ProcessPool(Arc<ProcessPoolInner>);
#[cfg_attr(not(nightly), serde_closure::desugar)]
impl ProcessPool {
pub fn new(
processes: Option<usize>, tasks_per_core: Option<usize>, resources: Resources,
processes: Option<usize>, threads: Option<usize>, tasks: Option<usize>,
resources: Resources,
) -> Result<Self, SpawnError> {
Ok(Self(Arc::new(ProcessPoolInner::new(
processes,
tasks_per_core,
resources,
processes, threads, tasks, resources,
)?)))
}
pub fn processes(&self) -> usize {
Expand Down
21 changes: 12 additions & 9 deletions src/pool/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,36 @@ const DEFAULT_TASKS_PER_CORE: usize = 100;

#[derive(Debug)]
struct ThreadPoolInner {
logical_cores: usize,
tasks_per_core: usize,
threads: usize,
tasks: usize,
#[cfg(not(target_arch = "wasm32"))]
pool: Pool,
}

#[derive(Debug)]
pub struct ThreadPool(Arc<ThreadPoolInner>);
impl ThreadPool {
pub fn new(tasks_per_core: Option<usize>) -> io::Result<Self> {
let logical_cores = if !cfg!(target_arch = "wasm32") {
pub fn new(threads: Option<usize>, tasks: Option<usize>) -> io::Result<Self> {
let threads = if let Some(threads) = threads {
threads
} else if !cfg!(target_arch = "wasm32") {
num_cpus::get()
} else {
1
};
let tasks_per_core = tasks_per_core.unwrap_or(DEFAULT_TASKS_PER_CORE);

let tasks = tasks.unwrap_or(DEFAULT_TASKS_PER_CORE);
#[cfg(not(target_arch = "wasm32"))]
let pool = Pool::new(logical_cores);
let pool = Pool::new(threads);
Ok(ThreadPool(Arc::new(ThreadPoolInner {
logical_cores,
tasks_per_core,
threads,
tasks,
#[cfg(not(target_arch = "wasm32"))]
pool,
})))
}
pub fn threads(&self) -> usize {
self.0.logical_cores * self.0.tasks_per_core
self.0.threads * self.0.tasks
}
pub fn spawn<F, Fut, T>(&self, task: F) -> impl Future<Output = Result<T, Panicked>> + Send
where
Expand Down
2 changes: 1 addition & 1 deletion tests/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::SystemTime;
#[tokio::test(threaded_scheduler)]
#[cfg_attr(miri, ignore)]
async fn cloudfront() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let start = SystemTime::now();

Expand Down
5 changes: 3 additions & 2 deletions tests/cloudfront_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use amadeus::{data::Webpage, prelude::*};
async fn commoncrawl() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();
let _ = webpages
Expand Down
5 changes: 3 additions & 2 deletions tests/commoncrawl_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use amadeus::prelude::*;
async fn csv() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
5 changes: 3 additions & 2 deletions tests/csv_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/csv_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn csv() {
let timer = web_sys::window().unwrap().performance().unwrap();
let start = timer.now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
2 changes: 1 addition & 1 deletion tests/into_par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use amadeus::prelude::*;
#[tokio::test(threaded_scheduler)]
#[cfg_attr(miri, ignore)]
async fn into_par_stream() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

<&[usize] as IntoParallelStream>::into_par_stream(&[1, 2, 3])
.map(|a: usize| a)
Expand Down
5 changes: 3 additions & 2 deletions tests/into_par_stream_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use amadeus::prelude::*;
async fn json() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();
let tasks = 100;

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions tests/json_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool, 100).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool, 100).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use amadeus::prelude::*;
async fn panic() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let res = AssertUnwindSafe((0i32..1_000).into_par_stream().for_each(pool, |i| {
if i == 500 {
Expand Down
5 changes: 3 additions & 2 deletions tests/panic_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
let process_pool_time = {
let process_pool = ProcessPool::new(None, None, Resources::default()).unwrap();
let process_pool =
ProcessPool::new(None, None, None, Resources::default()).unwrap();
run(&process_pool).await
};
#[cfg(not(feature = "constellation"))]
Expand Down
2 changes: 1 addition & 1 deletion tests/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use amadeus::prelude::*;
async fn parquet() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Parquet::<_, Value>::new(vec![
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
Expand Down
Loading

0 comments on commit 7f071aa

Please sign in to comment.