Skip to content

Commit

Permalink
fix: use futures::sync::mpsc to schedule futures
Browse files Browse the repository at this point in the history
  • Loading branch information
phra committed Jul 2, 2019
1 parent 42876be commit c855f76
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions src/tildebuster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use native_tls;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::thread;
use std::boxed::Box;

use futures::sync::mpsc;

pub mod result_processor;

Expand Down Expand Up @@ -108,6 +111,15 @@ impl TildeBuster {
Ok(())
} else {
let mut spanwed_futures = chars.len();
let (tx_futures, rx_futures) = mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>();
let stream_of_futures = rx_futures
.map(|fut| {
rt::spawn(fut);
Ok(())
})
.buffer_unordered(self.n_threads)
.for_each(Ok)
.map_err(|err| eprintln!("Err {:?}", err));
let stream = futures::stream::iter_ok(chars)
.map(move |c| {
let request = TildeRequest {
Expand All @@ -133,6 +145,7 @@ impl TildeBuster {
.map_err(|err| eprintln!("Err {:?}", err));

rt::spawn(stream);
rt::spawn(stream_of_futures);

while spanwed_futures > 0 {
debug!("spawned_futures: {}", spanwed_futures);
Expand Down Expand Up @@ -228,11 +241,11 @@ impl TildeBuster {
for c in chars_duplicate.iter() {
let mut request = msg.request.clone();
request.duplicate_index = c.clone();
rt::spawn(TildeBuster::_brute_duplicate(
tx_futures.unbounded_send(Box::new(TildeBuster::_brute_duplicate(
tx1.clone(),
client1.clone(),
request,
));
))).unwrap();
spanwed_futures = spanwed_futures + 1;
}

Expand All @@ -251,11 +264,11 @@ impl TildeBuster {
for c in chars_duplicate.iter() {
let mut request = msg.request.clone();
request.duplicate_index = c.clone();
rt::spawn(TildeBuster::_brute_duplicate(
tx_futures.unbounded_send(Box::new(TildeBuster::_brute_duplicate(
tx1.clone(),
client1.clone(),
request,
));
))).unwrap();
spanwed_futures = spanwed_futures + 1;
}

Expand All @@ -266,11 +279,11 @@ impl TildeBuster {
let mut request = msg.request.clone();
request.extension =
format!("{}{}", request.extension, c);
rt::spawn(TildeBuster::_brute_extension(
tx_futures.unbounded_send(Box::new(TildeBuster::_brute_extension(
tx1.clone(),
client1.clone(),
request,
));
))).unwrap();
spanwed_futures = spanwed_futures + 1;
}
}
Expand All @@ -279,20 +292,20 @@ impl TildeBuster {
let mut request = msg.request.clone();
request.filename =
format!("{}{}", request.filename, c);
rt::spawn(TildeBuster::_brute_filename(
tx_futures.unbounded_send(Box::new(TildeBuster::_brute_filename(
tx1.clone(),
client1.clone(),
request,
));
))).unwrap();
spanwed_futures = spanwed_futures + 1;
}
}
FSObject::CHECK_IF_DIRECTORY => {
rt::spawn(TildeBuster::_check_if_directory(
tx_futures.unbounded_send(Box::new(TildeBuster::_check_if_directory(
tx1.clone(),
client1.clone(),
msg.request,
));
))).unwrap();
spanwed_futures = spanwed_futures + 1;
}
},
Expand Down

0 comments on commit c855f76

Please sign in to comment.