Skip to content

Commit

Permalink
It works
Browse files Browse the repository at this point in the history
  • Loading branch information
frc4533-lincoln committed Jun 13, 2024
1 parent 04e377b commit d1fdd1d
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 202 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ edition = "2021"
axum = "0.7.5"
env_logger = "0.11.3"
log = "0.4.21"
lru = "0.12.3"
once_cell = "1.19.0"
reqwest = "0.12.4"
scraper = "0.19.0"
serde = { version = "1.0.203", features = ["derive"] }
sled = "0.34.7"
tantivy = { version = "0.22.0", default-features = false, features = ["zstd-compression", "mmap", "stopwords"] }
tera = "1.20.0"
texting_robots = "0.2.2"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros", "signal"] }
url = "2.5.0"

Expand Down
209 changes: 121 additions & 88 deletions src/crawler.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,121 @@
use std::{
collections::HashSet,
error::Error,
future::IntoFuture,
sync::Arc,
time::{Duration, Instant},
};

use reqwest::{header::CONTENT_TYPE, Client};
use sled::Db;
use tokio::sync::mpsc;
use url::Url;

use crate::page::Page;

const CRAWL_WORKERS: usize = 100;
const CRAWL_DELAY: Duration = Duration::from_millis(20);

/// A web crawler
#[derive(Clone)]
pub struct Crawler {
db: Db,
chan: mpsc::UnboundedSender<Page>,
}
impl Crawler {
pub async fn new(chan: mpsc::UnboundedSender<Page>) -> Self {
let db = sled::open("searched-db").unwrap();

let queue = db.open_tree("queue").unwrap();
if queue.is_empty() {
queue
.insert("https://www.apple.com".as_bytes().to_vec(), vec![])
.unwrap();
}

Self { db, chan }
}
pub async fn run(&self) -> Result<(), Box<dyn Error>> {
for i in 0..CRAWL_WORKERS {
let cr = self.clone();
tokio::spawn(async move {
info!("starting worker {i}");
cr.crawl().await.unwrap();
});
}

Ok(())
}
/// Run a web crawling worker
pub async fn crawl(&self) -> Result<(), Box<dyn Error>> {
let client = Client::new();
let links = self.db.open_tree("links").unwrap();
let queue = self.db.open_tree("queue").unwrap();

loop {
let url = queue.pop_min()?.clone();
if let Some((url, _)) = url {
// If we've already indexed this url, skip it
if links.contains_key(url.clone())? {
continue;
}

let st = Instant::now();

let url = Url::parse(core::str::from_utf8(&url)?)?;
let res = client.get(url.as_str()).send().await?;
if res.headers().get(CONTENT_TYPE).unwrap().to_str().unwrap().contains("text/html") {
let html = res.text().await?;

let page = Page::new(url.clone(), html);

for l in page.links() {
let link = l.as_bytes().to_vec();
queue.insert(link, vec![]).unwrap();
}

self.chan.send(page).unwrap();
}

links.insert(url.as_str().as_bytes(), vec![]).unwrap();

debug!("page took {:?} to crawl: {url}", st.elapsed());
}

tokio::time::sleep(CRAWL_DELAY).await;
}
}
}
use std::{
collections::HashSet,
error::Error,
future::IntoFuture,
sync::Arc,
time::{Duration, Instant},
};

use axum::http::{HeaderMap, HeaderValue};
use reqwest::{
header::{self, CONTENT_TYPE},
Client,
};
use sled::Db;
use texting_robots::{get_robots_url, Robot};
use tokio::sync::mpsc;
use url::Url;

use crate::page::Page;

const CRAWL_WORKERS: usize = 5;
const CRAWL_DELAY: Duration = Duration::from_millis(100);

/// A web crawler
#[derive(Clone)]
pub struct Crawler {
db: Db,
chan: mpsc::UnboundedSender<Page>,
}
impl Crawler {
pub async fn new(chan: mpsc::UnboundedSender<Page>) -> Self {
let db = sled::open("searched-db").unwrap();

let queue = db.open_tree("queue").unwrap();
if queue.is_empty() {
queue
.insert("https://www.apple.com".as_bytes().to_vec(), vec![])
.unwrap();
}

Self { db, chan }
}
pub async fn run(&self) -> Result<(), Box<dyn Error>> {
for i in 0..CRAWL_WORKERS {
let cr = self.clone();
tokio::spawn(async move {
info!("starting worker {i}");
cr.crawl().await.unwrap();
});
}

Ok(())
}
/// Run a web crawling worker
pub async fn crawl(&self) -> Result<(), Box<dyn Error>> {
let mut headers = HeaderMap::new();
headers.insert(header::USER_AGENT, HeaderValue::from_str("Crawled")?);

let client = Client::builder().default_headers(headers).build()?;

let links = self.db.open_tree("links").unwrap();
let queue = self.db.open_tree("queue").unwrap();

loop {
let url = queue.pop_min()?.clone();
if let Some((url, _)) = url {
// If we've already indexed this url, skip it
if links.contains_key(url.clone())? {
continue;
}

let url = Url::parse(core::str::from_utf8(&url)?)?;

let robots_url = get_robots_url(url.as_str()).unwrap();

let robots_res = client.get(robots_url).send().await.unwrap();
let robot = Robot::new("Crawled", &robots_res.bytes().await.unwrap()).unwrap();

if !robot.allowed(url.as_str()) {
debug!("skipped url due to robots.txt: {url}");
continue;
}

let st = Instant::now();

match client.get(url.as_str()).send().await {
Ok(res) => {
if res
.headers()
.get(CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap()
.contains("text/html")
{
let html = res.text().await?;

let page = Page::new(url.clone(), html);

for l in page.links() {
let link = l.as_bytes().to_vec();
queue.insert(link, vec![]).unwrap();
}

self.chan.send(page).unwrap();
}

links.insert(url.as_str().as_bytes(), vec![]).unwrap();

debug!("page took {:?} to crawl: {url}", st.elapsed());
}
Err(_) => {
continue;
}
}

tokio::time::sleep(CRAWL_DELAY).await;
}
}
}
}
Loading

0 comments on commit d1fdd1d

Please sign in to comment.