Skip to content

Commit

Permalink
feat: change pinging logic with arc semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Chleba committed Nov 5, 2024
1 parent b0b9add commit dd700da
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 44 deletions.
74 changes: 68 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ csv = "1.3.0"
derive_deref = "1.1.1"
directories = "5.0.1"
dns-lookup = "2.0.4"
fastping-rs = "0.2.4"
futures = "0.3.30"
human-panic = "2.0.1"
ipnetwork = "0.20.0"
Expand Down
4 changes: 2 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl App {
let (action_tx, action_rx) = mpsc::unbounded_channel();

Ok(Self {
tick_rate: 10.0,
frame_rate,
tick_rate: 1.0,
frame_rate: 10.0,
components: vec![
Box::new(title),
Box::new(interfaces),
Expand Down
127 changes: 91 additions & 36 deletions src/components/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use pnet::packet::{
MutablePacket, Packet,
};
use pnet::util::MacAddr;
use tokio::sync::Semaphore;

use core::str;
use ratatui::layout::Position;
use ratatui::{prelude::*, widgets::*};
use std::net::{IpAddr, Ipv4Addr};
use std::string;
use std::sync::Arc;
use std::time::{Duration, Instant};
use surge_ping::{Client, Config, IcmpPacket, PingIdentifier, PingSequence, ICMP};
use tokio::{
Expand Down Expand Up @@ -114,7 +116,7 @@ impl Discovery {
}
Err(e) => {
if let Some(tx) = &self.action_tx {
tx.send(Action::CidrError).unwrap();
tx.clone().send(Action::CidrError).unwrap();
}
}
}
Expand All @@ -137,6 +139,7 @@ impl Discovery {
Ok(_) => {
if let Some(tx_action) = &self.action_tx {
tx_action
.clone()
.send(Action::Error(
"Unknown or unsupported channel type".into(),
))
Expand All @@ -147,6 +150,7 @@ impl Discovery {
Err(e) => {
if let Some(tx_action) = &self.action_tx {
tx_action
.clone()
.send(Action::Error(format!(
"Unable to create datalink channel: {e}"
)))
Expand Down Expand Up @@ -186,49 +190,100 @@ impl Discovery {
}
}

// fn scan(&mut self) {
// self.reset_scan();

// if let Some(cidr) = self.cidr {
// self.is_scanning = true;
// let tx = self.action_tx.as_ref().unwrap().clone();
// self.task = tokio::spawn(async move {
// let ips = get_ips4_from_cidr(cidr);
// let chunks: Vec<_> = ips.chunks(POOL_SIZE).collect();
// for chunk in chunks {
// let tasks: Vec<_> = chunk
// .iter()
// .map(|&ip| {
// let tx = tx.clone();
// let closure = || async move {
// let client =
// Client::new(&Config::default()).expect("Cannot create client");
// let payload = [0; 56];
// let mut pinger = client
// .pinger(IpAddr::V4(ip), PingIdentifier(random()))
// .await;
// pinger.timeout(Duration::from_secs(2));

// match pinger.ping(PingSequence(2), &payload).await {
// Ok((IcmpPacket::V4(packet), dur)) => {
// tx.send(Action::PingIp(packet.get_real_dest().to_string()))
// .unwrap_or_default();
// tx.send(Action::CountIp).unwrap_or_default();
// }
// Ok(_) => {
// tx.send(Action::CountIp).unwrap_or_default();
// }
// Err(_) => {
// tx.send(Action::CountIp).unwrap_or_default();
// }
// }
// };
// task::spawn(closure())
// })
// .collect();

// let _ = join_all(tasks).await;
// }
// });
// };
// }

fn scan(&mut self) {
self.reset_scan();

if let Some(cidr) = self.cidr {
self.is_scanning = true;
let tx = self.action_tx.as_ref().unwrap().clone();

let tx = self.action_tx.clone().unwrap();
let semaphore = Arc::new(Semaphore::new(POOL_SIZE));

self.task = tokio::spawn(async move {
let ips = get_ips4_from_cidr(cidr);
let chunks: Vec<_> = ips.chunks(POOL_SIZE).collect();
for chunk in chunks {
let tasks: Vec<_> = chunk
.iter()
.map(|&ip| {
let tx = tx.clone();
let closure = || async move {
let client =
Client::new(&Config::default()).expect("Cannot create client");
let payload = [0; 56];
let mut pinger = client
.pinger(IpAddr::V4(ip), PingIdentifier(random()))
.await;
pinger.timeout(Duration::from_secs(2));

match pinger.ping(PingSequence(2), &payload).await {
Ok((IcmpPacket::V4(packet), dur)) => {
tx.send(Action::PingIp(packet.get_real_dest().to_string()))
.unwrap_or_default();
tx.send(Action::CountIp).unwrap_or_default();
}
Ok(_) => {
tx.send(Action::CountIp).unwrap_or_default();
}
Err(_) => {
tx.send(Action::CountIp).unwrap_or_default();
}
let tasks: Vec<_> = ips
.iter()
.map(|&ip| {
let s = semaphore.clone();
let tx = tx.clone();
let c = || async move {
let _permit = s.acquire().await.unwrap();
let client =
Client::new(&Config::default()).expect("Cannot create client");
let payload = [0; 56];
let mut pinger = client
.pinger(IpAddr::V4(ip), PingIdentifier(random()))
.await;
pinger.timeout(Duration::from_secs(2));

match pinger.ping(PingSequence(2), &payload).await {
Ok((IcmpPacket::V4(packet), dur)) => {
tx.send(Action::PingIp(packet.get_real_dest().to_string()))
.unwrap_or_default();
tx.send(Action::CountIp).unwrap_or_default();
}
};
task::spawn(closure())
})
.collect();

let _ = join_all(tasks).await;
Ok(_) => {
tx.send(Action::CountIp).unwrap_or_default();
}
Err(_) => {
tx.send(Action::CountIp).unwrap_or_default();
}
}
};
tokio::spawn(c())
})
.collect();
for t in tasks {
let _ = t.await;
}
// let _ = join_all(tasks).await;
});
};
}
Expand Down Expand Up @@ -256,7 +311,7 @@ impl Discovery {
fn process_ip(&mut self, ip: &str) {
let tx = self.action_tx.as_ref().unwrap();
let ipv4: Ipv4Addr = ip.parse().unwrap();
self.send_arp(ipv4);
// self.send_arp(ipv4);

if let Some(n) = self.scanned_ips.iter_mut().find(|item| item.ip == ip) {
let hip: IpAddr = ip.parse().unwrap();
Expand Down

0 comments on commit dd700da

Please sign in to comment.