Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
refactor(fetch) : light use only one DNS thread
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Sep 25, 2018
1 parent 375ecd4 commit 2bb4b46
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
4 changes: 2 additions & 2 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<Runnin
let cpu_pool = CpuPool::new(4);

// fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let fetch = fetch::Client::new(cmd.light).map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;

// prepare account provider
Expand Down Expand Up @@ -477,7 +477,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
let event_loop = EventLoop::spawn();

// fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let fetch = fetch::Client::new(cmd.light).map_err(|e| format!("Error starting fetch client: {:?}", e))?;

let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner
Expand Down
36 changes: 24 additions & 12 deletions util/fetch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ impl Drop for Client {

impl Client {
/// Create a new fetch client.
pub fn new() -> Result<Self, Error> {
pub fn new(is_light: bool) -> Result<Self, Error> {
let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
let (tx_proto, rx_proto) = mpsc::channel(64);

Client::background_thread(tx_start, rx_proto)?;
Client::background_thread(tx_start, rx_proto, is_light)?;

match rx_start.recv_timeout(Duration::from_secs(10)) {
Err(RecvTimeoutError::Timeout) => {
Expand All @@ -199,16 +199,17 @@ impl Client {
})
}

fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>) -> io::Result<thread::JoinHandle<()>> {
fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>, is_light: bool) -> io::Result<thread::JoinHandle<()>> {
thread::Builder::new().name("fetch".into()).spawn(move || {
let mut core = match reactor::Core::new() {
Ok(c) => c,
Err(e) => return tx_start.send(Err(e)).unwrap_or(())
};

let handle = core.handle();
let number_dns_threads = if is_light { 1 } else { 4 };
let hyper = hyper::Client::configure()
.connector(hyper_rustls::HttpsConnector::new(4, &core.handle()))
.connector(hyper_rustls::HttpsConnector::new(number_dns_threads, &core.handle()))
.build(&core.handle());

let future = rx_proto.take_while(|item| Ok(item.is_some()))
Expand Down Expand Up @@ -640,7 +641,18 @@ mod test {
#[test]
fn it_should_fetch() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
let body = resp.concat2().wait().unwrap();
assert_eq!(&body[..], b"123")
}

#[test]
fn it_should_fetch_in_light_mode() {
let server = TestServer::run();
let client = Client::new(true).unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
Expand All @@ -651,7 +663,7 @@ mod test {
#[test]
fn it_should_timeout() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
match client.get(&format!("http://{}/delay?3", server.addr()), abort).wait() {
Err(Error::Timeout) => {}
Expand All @@ -662,7 +674,7 @@ mod test {
#[test]
fn it_should_follow_redirects() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default();
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
assert!(future.wait().unwrap().is_success())
Expand All @@ -671,7 +683,7 @@ mod test {
#[test]
fn it_should_follow_relative_redirects() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default().with_max_redirects(4);
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort);
assert!(future.wait().unwrap().is_success())
Expand All @@ -680,7 +692,7 @@ mod test {
#[test]
fn it_should_not_follow_too_many_redirects() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default().with_max_redirects(3);
match client.get(&format!("http://{}/loop", server.addr()), abort).wait() {
Err(Error::TooManyRedirects) => {}
Expand All @@ -691,7 +703,7 @@ mod test {
#[test]
fn it_should_read_data() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default();
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
let resp = future.wait().unwrap();
Expand All @@ -702,7 +714,7 @@ mod test {
#[test]
fn it_should_not_read_too_much_data() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
Expand All @@ -715,7 +727,7 @@ mod test {
#[test]
fn it_should_not_read_too_much_data_sync() {
let server = TestServer::run();
let client = Client::new().unwrap();
let client = Client::new(false).unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
Expand Down

0 comments on commit 2bb4b46

Please sign in to comment.