Skip to content

Commit

Permalink
tests mostly passing
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Aug 15, 2020
1 parent 01f8bd6 commit 502df3e
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 82 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ wasm-bindgen-test = "0.3"
[build-dependencies]
rustversion = "1.0"

[patch.crates-io]
tokio = {git = "https://github.com/tokio-rs/tokio", branch = "v0.2.x"}

[profile.bench]
codegen-units = 1
debug = 2
Expand Down
2 changes: 0 additions & 2 deletions amadeus-serde/src/csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::unsafe_derive_deserialize)] // https://github.com/rust-lang/rust-clippy/issues/5789

use csv::Error as InternalCsvError;
use educe::Educe;
use futures::{pin_mut, stream, AsyncReadExt, FutureExt, Stream, StreamExt};
Expand Down
2 changes: 0 additions & 2 deletions amadeus-serde/src/json.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::unsafe_derive_deserialize)] // https://github.com/rust-lang/rust-clippy/issues/5789

use educe::Educe;
use futures::{pin_mut, stream, AsyncReadExt, FutureExt, Stream, StreamExt};
use serde::{Deserialize, Serialize};
Expand Down
1 change: 1 addition & 0 deletions benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fn run<F>(b: &mut Bencher, bytes: u64, mut task: impl FnMut() -> F)
where
F: Future<Output = ()>,
{
let _ = rayon::ThreadPoolBuilder::new().build_global();
RT.enter(|| {
let _ = Lazy::force(&POOL);
b.bytes = bytes;
Expand Down
116 changes: 50 additions & 66 deletions src/pool/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ impl ThreadPool {
T: Send + 'a,
{
#[cfg(not(target_arch = "wasm32"))]
return Guard::new(
self.0
.pool
.spawn_pinned_unchecked(task)
.map_err(JoinError::into_panic)
.map_err(Panicked::from),
);
return self
.0
.pool
.spawn_pinned_unchecked(task)
.map_err(JoinError::into_panic)
.map_err(Panicked::from);
#[cfg(target_arch = "wasm32")]
{
let _self = self;
Expand All @@ -104,10 +103,10 @@ impl ThreadPool {
.map_err(Into::into)
.remote_handle();
wasm_bindgen_futures::spawn_local(remote);
Guard::new(remote_handle.map_ok(|t| {
remote_handle.map_ok(|t| {
let t: *mut dyn Send = Box::into_raw(t);
*Box::from_raw(t as *mut T)
}))
})
}
}
}
Expand All @@ -125,39 +124,6 @@ impl Clone for ThreadPool {
impl UnwindSafe for ThreadPool {}
impl RefUnwindSafe for ThreadPool {}

#[pin_project(PinnedDrop)]
struct Guard<F>(#[pin] Option<F>);
impl<F> Guard<F> {
fn new(f: F) -> Self {
Self(Some(f))
}
}
impl<F> Future for Guard<F>
where
F: Future,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.as_mut().project().0.as_pin_mut() {
Some(fut) => {
let output = ready!(fut.poll(cx));
self.project().0.set(None);
Poll::Ready(output)
}
None => Poll::Pending,
}
}
}
#[pinned_drop]
impl<F> PinnedDrop for Guard<F> {
fn drop(self: Pin<&mut Self>) {
if self.project().0.is_some() {
panic!("dropped before finished polling!");
}
}
}

fn _assert() {
let _ = assert_sync_and_send::<ThreadPool>;
}
Expand All @@ -166,41 +132,47 @@ fn _assert() {
#[cfg(not(target_arch = "wasm32"))]
mod pool {
use async_channel::{bounded, Sender};
use futures::{future::RemoteHandle, FutureExt};
use futures::{
future::{join_all, RemoteHandle}, FutureExt
};
use std::{any::Any, future::Future, mem, panic::AssertUnwindSafe, pin::Pin};
use tokio::{
runtime::Handle, task::{JoinError, LocalSet}
runtime::Handle, task, task::{JoinError, JoinHandle, LocalSet}
};

type Request = Box<dyn FnOnce() -> Box<dyn Future<Output = Response>> + Send>;
type Response = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>;

#[derive(Debug)]
pub(super) struct Pool {
sender: Sender<(Request, Sender<RemoteHandle<Response>>)>,
sender: Option<Sender<(Request, Sender<RemoteHandle<Response>>)>>,
threads: Vec<JoinHandle<()>>,
}
impl Pool {
pub(super) fn new(threads: usize) -> Self {
let handle = Handle::current();
let handle1 = handle.clone();
let (sender, receiver) = bounded::<(Request, Sender<RemoteHandle<Response>>)>(1);
for _ in 0..threads {
let receiver = receiver.clone();
let handle = handle.clone();
let _ = handle1.spawn_blocking(move || {
let local = LocalSet::new();
handle.block_on(local.run_until(async {
while let Ok((task, sender)) = receiver.recv().await {
let _ = local.spawn_local(async move {
let (remote, remote_handle) = Pin::from(task()).remote_handle();
let _ = sender.send(remote_handle).await;
remote.await;
});
}
}))
});
}
Self { sender }
let threads = (0..threads)
.map(|_| {
let receiver = receiver.clone();
let handle = handle.clone();
handle1.spawn_blocking(move || {
let local = LocalSet::new();
handle.block_on(local.run_until(async {
while let Ok((task, sender)) = receiver.recv().await {
let _ = local.spawn_local(async move {
let (remote, remote_handle) = Pin::from(task()).remote_handle();
let _ = sender.send(remote_handle).await;
remote.await;
});
}
}))
})
})
.collect();
let sender = Some(sender);
Self { sender, threads }
}
pub(super) fn spawn_pinned<F, Fut, T>(
&self, task: F,
Expand All @@ -210,7 +182,7 @@ mod pool {
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let sender = self.sender.clone();
let sender = self.sender.as_ref().unwrap().clone();
async move {
let task: Request = Box::new(|| {
Box::new(
Expand All @@ -236,7 +208,7 @@ mod pool {
Fut: Future<Output = T> + 'a,
T: Send + 'a,
{
let sender = self.sender.clone();
let sender = self.sender.as_ref().unwrap().clone();
async move {
let task: Box<dyn FnOnce() -> Box<dyn Future<Output = Response>> + Send> =
Box::new(|| {
Expand Down Expand Up @@ -264,6 +236,18 @@ mod pool {
}
}
}
impl Drop for Pool {
fn drop(&mut self) {
let _ = self.sender.take().unwrap();
task::block_in_place(|| {
let handle = Handle::current();
handle.block_on(join_all(mem::take(&mut self.threads)))
})
.into_iter()
.collect::<Result<(), _>>()
.unwrap();
}
}

#[cfg(test)]
mod tests {
Expand All @@ -274,8 +258,8 @@ mod pool {
atomic::{AtomicUsize, Ordering}, Arc
};

#[tokio::test]
async fn spawn_pinned_() {
#[tokio::test(threaded_scheduler)]
async fn spawn_pinned() {
const TASKS: usize = 1000;
const ITERS: usize = 1000;
const THREADS: usize = 4;
Expand Down
3 changes: 1 addition & 2 deletions src/source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::unsafe_derive_deserialize)]

use ::serde::{Deserialize, Serialize};
use derive_new::new;
use futures::Stream;
Expand Down Expand Up @@ -206,6 +204,7 @@ where
}
}

#[allow(clippy::unsafe_derive_deserialize)]
#[pin_project]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion tests/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use amadeus::prelude::*;
use std::time::SystemTime;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn cloudfront() {
let pool = &ThreadPool::new(None).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion tests/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, SystemTime};

use amadeus::{data::Webpage, prelude::*};

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn commoncrawl() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, time::SystemTime};

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn csv() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/into_par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use either::Either;

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn into_par_stream() {
let pool = &ThreadPool::new(None).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion tests/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, time::SystemTime};

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn json() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{panic, panic::AssertUnwindSafe, time::SystemTime};

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn panic() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashMap, path::PathBuf, time::SystemTime};

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn parquet() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/parquet_dist.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![type_length_limit = "1572864"]
#![type_length_limit = "2073124"]
#![allow(clippy::cognitive_complexity, clippy::type_complexity)]

#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::SystemTime;

use amadeus::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn postgres() {
let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::time::delay_for as sleep;

use amadeus::dist::prelude::*;

#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn threads() {
let start = SystemTime::now();

Expand Down

0 comments on commit 502df3e

Please sign in to comment.