From 04c94621c75221f2a579cd74914caaf015892afd Mon Sep 17 00:00:00 2001 From: j-mendez Date: Thu, 7 Nov 2024 16:00:18 -0500 Subject: [PATCH] perf(page): add streaming chunks rewrite --- Cargo.lock | 12 +- spider/Cargo.toml | 2 +- spider/src/page.rs | 846 +++++++++++++++++++++--------- spider/src/utils/mod.rs | 237 ++++++++- spider/src/website.rs | 191 +++---- spider_chrome/Cargo.toml | 2 +- spider_cli/Cargo.toml | 2 +- spider_transformations/Cargo.toml | 2 +- spider_utils/Cargo.toml | 2 +- spider_worker/Cargo.toml | 2 +- spider_worker/src/main.rs | 56 +- 11 files changed, 961 insertions(+), 393 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14d2ffbfa..4f99c7ec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3917,7 +3917,7 @@ dependencies = [ [[package]] name = "spider" -version = "2.12.12" +version = "2.13.0" dependencies = [ "ahash", "aho-corasick", @@ -3979,7 +3979,7 @@ dependencies = [ [[package]] name = "spider_chrome" -version = "2.12.12" +version = "2.13.0" dependencies = [ "adblock", "async-tungstenite", @@ -4014,7 +4014,7 @@ dependencies = [ [[package]] name = "spider_cli" -version = "2.12.12" +version = "2.13.0" dependencies = [ "clap", "env_logger", @@ -4038,7 +4038,7 @@ dependencies = [ [[package]] name = "spider_transformations" -version = "2.12.12" +version = "2.13.0" dependencies = [ "aho-corasick", "fast_html2md", @@ -4060,7 +4060,7 @@ dependencies = [ [[package]] name = "spider_utils" -version = "2.12.12" +version = "2.13.0" dependencies = [ "indexmap 1.9.3", "serde", @@ -4072,7 +4072,7 @@ dependencies = [ [[package]] name = "spider_worker" -version = "2.12.12" +version = "2.13.0" dependencies = [ "env_logger", "lazy_static", diff --git a/spider/Cargo.toml b/spider/Cargo.toml index 890d1d36f..1b782c5bb 100644 --- a/spider/Cargo.toml +++ b/spider/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider" -version = "2.12.12" +version = "2.13.0" authors = [ "j-mendez " ] diff --git a/spider/src/page.rs b/spider/src/page.rs index 8e66313b7..e593e70bb 100644 --- a/spider/src/page.rs +++ b/spider/src/page.rs @@ -2,12 +2,12 @@ use crate::compact_str::CompactString; #[cfg(all(feature = "chrome", not(feature = "decentralized")))] use crate::configuration::{AutomationScripts, ExecutionScripts}; - use crate::utils::log; use crate::utils::PageResponse; use crate::CaseInsensitiveString; use crate::Client; use crate::RelativeSelectors; +use auto_encoder::auto_encode_bytes; use bytes::Bytes; use hashbrown::HashSet; use lol_html::Settings; @@ -204,6 +204,62 @@ pub struct Page { pub waf_check: bool, } +/// Validate link and push into the map +pub fn push_link>( + base: &Option, + href: &str, + map: &mut HashSet, + base_domain: &CompactString, + parent_host: &CompactString, + parent_host_scheme: &CompactString, + base_input_domain: &CompactString, + sub_matcher: &CompactString, + external_domains_caseless: &Box>, +) { + if let Some(b) = base { + let mut abs = convert_abs_path(&b, href); + let scheme = abs.scheme(); + + if scheme == "https" || scheme == "http" { + let host_name = abs.host_str(); + let mut can_process = parent_host_match( + host_name, + base_domain, + parent_host, + base_input_domain, + sub_matcher, + ); + + if !can_process && host_name.is_some() && !external_domains_caseless.is_empty() { + can_process = external_domains_caseless + .contains::(&host_name.unwrap_or_default().into()) + || external_domains_caseless + .contains::(&CASELESS_WILD_CARD); + } + + if can_process { + if abs.scheme() != parent_host_scheme.as_str() { + let _ = abs.set_scheme(parent_host_scheme.as_str()); + } + + let hchars = abs.path(); + + if let Some(position) = hchars.rfind('.') { + let resource_ext = &hchars[position + 1..hchars.len()]; + + if !ONLY_RESOURCES.contains::(&resource_ext.into()) { + can_process = false; + } + } + + if can_process { + map.insert(abs.as_str().to_string().into()); + } + } + } + } +} + /// get the clean domain name pub fn domain_name(domain: &Url) -> &str { match domain.host_str() { @@ -338,7 +394,6 @@ pub fn validate_empty(content: &Option>, is_success: bool) -> bool { pub fn build(url: &str, res: PageResponse) -> Page { let success = res.status_code.is_success(); let resource_found = validate_empty(&res.content, success); - let mut should_retry = resource_found && !success || res.status_code.is_server_error() || res.status_code == StatusCode::TOO_MANY_REQUESTS @@ -351,9 +406,13 @@ pub fn build(url: &str, res: PageResponse) -> Page { headers: res.headers, #[cfg(feature = "cookies")] cookies: res.cookies, - base: match Url::parse(url) { - Ok(u) => Some(u), - _ => None, + base: if !url.is_empty() { + match Url::parse(url) { + Ok(u) => Some(u), + _ => None, + } + } else { + None }, url: url.into(), #[cfg(feature = "time")] @@ -417,13 +476,266 @@ pub fn build(_: &str, res: PageResponse) -> Page { } } +/// Settings for streaming rewriter +#[derive(Debug, Default, Clone, Copy)] +pub struct PageLinkBuildSettings { + /// If the SSG build is in progress. + pub ssg_build: bool, + /// If full resources should be included. + pub full_resources: bool, + /// TLD handling resources. + pub tld: bool, + /// Subdomain handling resources. + pub subdomains: bool, +} + +impl PageLinkBuildSettings { + /// New build link settings. + pub fn new(ssg_build: bool, full_resources: bool) -> Self { + Self { + ssg_build, + full_resources, + ..Default::default() + } + } + + /// New build full link settings. + pub fn new_full(ssg_build: bool, full_resources: bool, subdomains: bool, tld: bool) -> Self { + Self { + ssg_build, + full_resources, + subdomains, + tld, + } + } +} + impl Page { /// Instantiate a new page and gather the html repro of standard fetch_page_html. pub async fn new_page(url: &str, client: &Client) -> Self { - let page_resource = crate::utils::fetch_page_html_raw(url, client).await; + let page_resource: PageResponse = crate::utils::fetch_page_html_raw(url, client).await; build(url, page_resource) } + /// New page with rewriter + pub async fn new_page_streaming< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( + url: &str, + client: &Client, + only_html: bool, + mut selectors: &mut RelativeSelectors, + external_domains_caseless: &Box>, + r_settings: &PageLinkBuildSettings, + mut map: &mut hashbrown::HashSet, + ssg_map: Option<&mut hashbrown::HashSet>, + prior_domain: &Option>, + mut domain_parsed: &mut Option>, + ) -> Self { + use crate::utils::{ + handle_response_bytes_writer, modify_selectors, setup_default_response, + AllowedDomainTypes, + }; + + let page_response = match client.get(url).send().await { + Ok(res) if res.status().is_success() => { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut senders: Option<( + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + )> = None; + + let base = match Url::parse(url) { + Ok(u) => Some(u), + _ => None, + }; + + if url != res.url().as_str() { + let domain = res.url().as_str(); + let mut url = Box::new(CaseInsensitiveString::new(&url)); + + modify_selectors( + &prior_domain, + domain, + &mut domain_parsed, + &mut url, + &mut selectors, + AllowedDomainTypes::new(r_settings.subdomains, r_settings.tld), + ); + }; + + let parent_host = &selectors.1[0]; + // the host schemes + let parent_host_scheme = &selectors.1[1]; + let base_input_domain = &selectors.2; // the domain after redirects + let sub_matcher = &selectors.0; + + // let prior_domain = self.domain_parsed.take(); + + let external_domains_caseless = external_domains_caseless.clone(); + + let base_links_settings = if r_settings.full_resources { + lol_html::element!("a[href],script[src],link[href]", |el| { + let attribute = if el.tag_name() == "script" { + "src" + } else { + "href" + }; + if let Some(href) = el.get_attribute(attribute) { + push_link( + &base, + &href, + &mut map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &external_domains_caseless, + ); + } + Ok(()) + }) + } else { + lol_html::element!("a[href]", |el| { + if let Some(href) = el.get_attribute("href") { + push_link( + &base, + &href, + &mut map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &external_domains_caseless, + ); + } + Ok(()) + }) + }; + + let mut element_content_handlers = vec![base_links_settings]; + + if r_settings.ssg_build { + let c = tokio::sync::mpsc::unbounded_channel(); + let ctx = c.0.clone(); + + element_content_handlers.push(lol_html::element!("script", move |el| { + if let Some(source) = el.get_attribute("src") { + if source.starts_with("/_next/static/") + && source.ends_with("/_ssgManifest.js") + { + let _ = ctx.send(source); + } + } + Ok(()) + })); + + senders.replace(c); + } + + let settings = lol_html::send::Settings { + element_content_handlers, + adjust_charset_on_meta_tag: true, + ..lol_html::send::Settings::new_for_handler_types() + }; + + let mut rewriter = + lol_html::send::HtmlRewriter::new(settings.into(), move |c: &[u8]| { + let _ = tx.send(c.to_vec()); + }); + + let mut response = + handle_response_bytes_writer(res, url, only_html, &mut rewriter).await; + + let rewrite_error = response.1; + + if !rewrite_error { + let _ = rewriter.end(); + } + + let mut collected_bytes: Vec = Vec::new(); + + while let Some(c) = rx.recv().await { + collected_bytes.extend_from_slice(&c); + } + + response.0.content.replace(Box::new(collected_bytes.into())); + + drop(rx); + + if let Some(ctx) = senders { + let mut rtx = ctx.1; + drop(ctx.0); + + if r_settings.ssg_build { + if let Some(mut ssg_map) = ssg_map { + let rc = rtx.recv().await; + + if let Some(source) = rc { + if let Some(ref url_base) = base { + let build_ssg_path = convert_abs_path(&url_base, &source); + let build_page = + Page::new_page(build_ssg_path.as_str(), &client).await; + + for cap in + SSG_CAPTURE.captures_iter(build_page.get_html_bytes_u8()) + { + if let Some(matched) = cap.get(1) { + let href = auto_encode_bytes(&matched.as_bytes()) + .replace(r#"\u002F"#, "/"); + + fn get_last_segment(path: &str) -> &str { + if let Some(pos) = path.rfind('/') { + &path[pos + 1..] + } else { + path + } + } + + let last_segment = get_last_segment(&href); + + // we can pass in a static map of the dynamic SSG routes pre-hand, custom API endpoint to seed, or etc later. + if !(last_segment.starts_with("[") + && last_segment.ends_with("]")) + { + push_link( + &base, + &href, + &mut ssg_map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &external_domains_caseless, + ); + } + } + } + } + } + } + } + } + + response.0 + } + Ok(res) => setup_default_response(&res), + Err(_) => { + log("- error parsing html text {}", url); + let mut page_response = PageResponse::default(); + if let Ok(status_code) = StatusCode::from_u16(599) { + page_response.status_code = status_code; + } + page_response + } + }; + + build(url, page_response) + } + /// Instantiate a new page and gather the html repro of standard fetch_page_html only gathering resources to crawl. pub async fn new_page_only_html(url: &str, client: &Client) -> Self { let page_resource = crate::utils::fetch_page_html_raw_only_html(url, client).await; @@ -839,75 +1151,9 @@ impl Page { self.duration.elapsed() } - /// Validate link and push into the map - pub fn push_link>( - &self, - href: &str, - map: &mut HashSet, - base_domain: &CompactString, - parent_host: &CompactString, - parent_host_scheme: &CompactString, - base_input_domain: &CompactString, - sub_matcher: &CompactString, - ) { - match self.abs_path(href) { - Some(mut abs) => { - let scheme = abs.scheme(); - - if scheme == "https" || scheme == "http" { - let host_name = abs.host_str(); - let mut can_process = parent_host_match( - host_name, - base_domain, - parent_host, - base_input_domain, - sub_matcher, - ); - - if !can_process - && host_name.is_some() - && !self.external_domains_caseless.is_empty() - { - can_process = self - .external_domains_caseless - .contains::( - &host_name.unwrap_or_default().into(), - ) - || self - .external_domains_caseless - .contains::(&CASELESS_WILD_CARD); - } - - if can_process { - if abs.scheme() != parent_host_scheme.as_str() { - let _ = abs.set_scheme(parent_host_scheme.as_str()); - } - - let hchars = abs.path(); - - if let Some(position) = hchars.rfind('.') { - let resource_ext = &hchars[position + 1..hchars.len()]; - - if !ONLY_RESOURCES - .contains::(&resource_ext.into()) - { - can_process = false; - } - } - - if can_process { - map.insert(abs.as_str().to_string().into()); - } - } - } - } - _ => (), - } - } - /// Find the links as a stream using string resource validation for XML files pub async fn links_stream_xml_links_stream_base< - A: PartialEq + Eq + std::hash::Hash + From, + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, >( &self, selectors: &RelativeSelectors, @@ -946,7 +1192,8 @@ impl Page { if is_link_tag { match e.unescape() { Ok(v) => { - self.push_link( + push_link( + &self.base, &v, map, &selectors.0, @@ -954,6 +1201,7 @@ impl Page { parent_host_scheme, base_input_domain, sub_matcher, + &self.external_domains_caseless, ); } _ => (), @@ -983,7 +1231,9 @@ impl Page { /// Find the links as a stream using string resource validation #[inline(always)] #[cfg(all(not(feature = "decentralized")))] - pub async fn links_stream_base>( + pub async fn links_stream_base< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, html: &str, @@ -1001,27 +1251,50 @@ impl Page { let base_input_domain = &selectors.2; // the domain after redirects let sub_matcher = &selectors.0; - let _ = rewrite_str_empty( - &html, - Settings { - element_content_handlers: vec![lol_html::element!("a", |el| { - if let Some(href) = el.get_attribute("href") { - self.push_link( - &href, - &mut map, - &selectors.0, - parent_host, - parent_host_scheme, - base_input_domain, - sub_matcher, - ); - } - Ok(()) - })], - adjust_charset_on_meta_tag: true, - ..Settings::default() - }, - ); + let rewriter_settings = Settings { + element_content_handlers: vec![lol_html::element!("a", |el| { + if let Some(href) = el.get_attribute("href") { + push_link( + &self.base, + &href, + &mut map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &self.external_domains_caseless, + ); + } + Ok(()) + })], + adjust_charset_on_meta_tag: true, + ..lol_html::send::Settings::new_for_handler_types() + }; + + let mut wrote_error = false; + + let mut rewriter = + lol_html::send::HtmlRewriter::new(rewriter_settings.into(), |_c: &[u8]| {}); + + let html_bytes = html.as_bytes(); + let chunk_size = 8192; + let chunks = html_bytes.chunks(chunk_size); + + let mut stream = tokio_stream::iter(chunks).map(Ok::<&[u8], A>); + + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = chunk { + if let Err(_) = rewriter.write(chunk) { + wrote_error = true; + break; + } + } + } + + if !wrote_error { + let _ = rewriter.end(); + } } } @@ -1030,7 +1303,9 @@ impl Page { /// Find the links as a stream using string resource validation #[inline(always)] - pub async fn links_stream_base_ssg>( + pub async fn links_stream_base_ssg< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, html: &str, @@ -1039,12 +1314,15 @@ impl Page { use auto_encoder::auto_encode_bytes; let mut map = HashSet::new(); + let mut map_ssg = HashSet::new(); if !html.is_empty() { if html.starts_with(" &str { - if let Some(pos) = path.rfind('/') { - &path[pos + 1..] - } else { - path + let txx = c.0.clone(); + + let rewriter_settings = Settings { + element_content_handlers: vec![ + lol_html::element!("a", |el| { + if let Some(href) = el.get_attribute("href") { + push_link( + &self.base, + &href, + &mut map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &self.external_domains_caseless, + ); + } + Ok(()) + }), + lol_html::element!("script", move |el| { + if let Some(source) = el.get_attribute("src") { + if source.starts_with("/_next/static/") + && source.ends_with("/_ssgManifest.js") + { + if let Some(build_path) = self.abs_path(&source) { + let _ = txx.send(build_path.as_str().to_string()); } } + } + Ok(()) + }), + ], + adjust_charset_on_meta_tag: true, + ..lol_html::send::Settings::new_for_handler_types() + }; - let last_segment = get_last_segment(&href); + let mut rewriter = + lol_html::send::HtmlRewriter::new(rewriter_settings.into(), |_c: &[u8]| {}); - // we can pass in a static map of the dynamic SSG routes pre-hand, custom API endpoint to seed, or etc later. - if !(last_segment.starts_with("[") && last_segment.ends_with("]")) { - self.push_link( - &href, - &mut map, - &selectors.0, - parent_host, - parent_host_scheme, - base_input_domain, - sub_matcher, - ); + let html_bytes = html.as_bytes(); + let chunk_size = 8192; + let chunks = html_bytes.chunks(chunk_size); + let mut wrote_error = false; + + let mut stream = tokio_stream::iter(chunks).map(Ok::<&[u8], A>); + + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = chunk { + if let Err(_) = rewriter.write(chunk) { + wrote_error = true; + break; + } + } + } + + if !wrote_error { + let _ = rewriter.end(); + } + + drop(c.0); + let mut rx = c.1; + + if let Some(build_ssg_path) = rx.recv().await { + let build_page = Page::new_page(&build_ssg_path, &client).await; + + for cap in SSG_CAPTURE.captures_iter(build_page.get_html_bytes_u8()) { + if let Some(matched) = cap.get(1) { + let href = + auto_encode_bytes(&matched.as_bytes()).replace(r#"\u002F"#, "/"); + + fn get_last_segment(path: &str) -> &str { + if let Some(pos) = path.rfind('/') { + &path[pos + 1..] + } else { + path } } + + let last_segment = get_last_segment(&href); + + // we can pass in a static map of the dynamic SSG routes pre-hand, custom API endpoint to seed, or etc later. + if !(last_segment.starts_with("[") && last_segment.ends_with("]")) { + push_link( + &self.base, + &href, + &mut map_ssg, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &self.external_domains_caseless, + ); + } } } } } } + map.extend(map_ssg); + map } /// Find the links as a stream using string resource validation and parsing the script for nextjs initial SSG paths. - pub async fn links_stream_ssg>( + pub async fn links_stream_ssg< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, client: &Client, @@ -1145,10 +1452,28 @@ impl Page { } } + /// Find all href links and return them using CSS selectors. + #[inline(always)] + pub async fn links_ssg( + &self, + selectors: &RelativeSelectors, + client: &Client, + ) -> HashSet { + match self.html.is_some() { + false => Default::default(), + true => { + self.links_stream_ssg::(selectors, client) + .await + } + } + } + /// Find the links as a stream using string resource validation #[inline(always)] #[cfg(all(not(feature = "decentralized"), not(feature = "full_resources")))] - pub async fn links_stream>( + pub async fn links_stream< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, ) -> HashSet { @@ -1168,7 +1493,7 @@ impl Page { ))] #[inline(always)] pub async fn links_stream_smart< - A: PartialEq + std::fmt::Debug + Eq + std::hash::Hash + From, + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, >( &self, selectors: &RelativeSelectors, @@ -1177,8 +1502,10 @@ impl Page { context_id: &Option, ) -> HashSet { use auto_encoder::auto_encode_bytes; + use lol_html::{doc_comments, element}; let mut map = HashSet::new(); + let mut inner_map: HashSet = map.clone(); if !self.is_empty() { let html_resource = Box::new(self.get_html()); @@ -1187,88 +1514,128 @@ impl Page { self.links_stream_xml_links_stream_base(selectors, &html_resource, &mut map) .await; } else { - use lol_html::{doc_comments, element}; - let (tx, rx) = tokio::sync::oneshot::channel(); + let (txx, mut rxx) = tokio::sync::mpsc::unbounded_channel(); + let (txxx, mut rxxx) = tokio::sync::mpsc::unbounded_channel(); + let base_input_domain = &selectors.2; let parent_frags = &selectors.1; // todo: allow mix match tpt let parent_host = &parent_frags[0]; let parent_host_scheme = &parent_frags[1]; let sub_matcher = &selectors.0; - let mut rerender = false; + let external_domains_caseless = self.external_domains_caseless.clone(); + + let base = self.base.clone(); + let base1 = base.clone(); + + let txxx2 = txxx.clone(); + let mut static_app = false; - let rewrited_bytes = match rewrite_str_as_bytes( - &html_resource, - Settings { - element_content_handlers: vec![ - element!("script", |element| { - if !static_app { - if let Some(src) = element.get_attribute("src") { - if src.starts_with("/") { - if src.starts_with("/_next/static/chunks/pages/") - || src.starts_with("/webpack-runtime-") - || element.get_attribute("id").eq(&*GATSBY) - { - static_app = true; - } + let rewriter_settings = Settings { + element_content_handlers: vec![ + element!("script", move |element| { + if !static_app { + if let Some(src) = element.get_attribute("src") { + if src.starts_with("/") { + if src.starts_with("/_next/static/chunks/pages/") + || src.starts_with("/webpack-runtime-") + || element.get_attribute("id").eq(&*GATSBY) + { + static_app = true; + } - if let Some(abs) = self.abs_path(&src) { - if let Ok(mut paths) = abs - .path_segments() - .ok_or_else(|| "cannot be base") - { - while let Some(p) = paths.next() { - // todo: get the path last before None instead of checking for ends_with - if p.ends_with(".js") - && JS_FRAMEWORK_ASSETS.contains(&p) - { - rerender = true; - } + if let Some(ref base) = base1 { + let abs = convert_abs_path(&base, &src); + + if let Ok(mut paths) = + abs.path_segments().ok_or_else(|| "cannot be base") + { + while let Some(p) = paths.next() { + // todo: get the path last before None instead of checking for ends_with + if p.ends_with(".js") + && JS_FRAMEWORK_ASSETS.contains(&p) + { + let _ = txxx2.send(true); } } } } } } - Ok(()) - }), - element!("a", |el| { - if let Some(href) = el.get_attribute("href") { - self.push_link( - &href, - &mut map, - &selectors.0, - parent_host, - parent_host_scheme, - base_input_domain, - sub_matcher, - ); - } + } + Ok(()) + }), + element!("a", |el| { + if let Some(href) = el.get_attribute("href") { + push_link( + &base, + &href, + &mut inner_map, + &selectors.0, + parent_host, + parent_host_scheme, + base_input_domain, + sub_matcher, + &external_domains_caseless, + ); + } + + el.remove(); - el.remove(); - - Ok(()) - }), - element!("*:not(script):not(a):not(body):not(head):not(html)", |el| { - el.remove(); - Ok(()) - }), - ], - document_content_handlers: vec![doc_comments!(|c| { - c.remove(); Ok(()) - })], - adjust_charset_on_meta_tag: true, - ..Settings::default() - }, - ) { - Ok(s) => s, - _ => html_resource.as_bytes().to_vec(), + }), + element!("*:not(script):not(a):not(body):not(head):not(html)", |el| { + el.remove(); + Ok(()) + }), + ], + document_content_handlers: vec![doc_comments!(|c| { + c.remove(); + Ok(()) + })], + adjust_charset_on_meta_tag: true, + ..lol_html::send::Settings::new_for_handler_types() }; + let mut rewriter = + lol_html::send::HtmlRewriter::new(rewriter_settings.into(), |c: &[u8]| { + let _ = txx.send(c.to_vec()); + }); + + let html_bytes = html_resource.as_bytes(); + let chunk_size = 8192; + let chunks = html_bytes.chunks(chunk_size); + let mut wrote_error = false; + + let mut stream = tokio_stream::iter(chunks).map(Ok::<&[u8], A>); + + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = chunk { + if let Err(_) = rewriter.write(chunk) { + wrote_error = true; + break; + } + } + } + + if !wrote_error { + let _ = rewriter.end(); + } + + drop(txxx); + drop(txx); + + let mut rewrited_bytes: Vec = Vec::new(); + + while let Some(c) = rxx.recv().await { + rewrited_bytes.extend_from_slice(&c); + } + + let mut rerender = rxxx.recv().await.unwrap_or_default(); + if !rerender { if let Some(_) = DOM_WATCH_METHODS.find(&rewrited_bytes) { rerender = true; @@ -1366,6 +1733,7 @@ impl Page { }; } } + map.extend(inner_map); } map @@ -1373,7 +1741,9 @@ impl Page { /// Find the links as a stream using string resource validation #[inline(always)] - pub async fn links_stream_full_resource>( + pub async fn links_stream_full_resource< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, ) -> HashSet { @@ -1461,7 +1831,9 @@ impl Page { /// Find the links as a stream using string resource validation #[inline(always)] #[cfg(all(not(feature = "decentralized"), feature = "full_resources"))] - pub async fn links_stream>( + pub async fn links_stream< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, selectors: &RelativeSelectors, ) -> HashSet { @@ -1475,7 +1847,9 @@ impl Page { #[inline(always)] #[cfg(feature = "decentralized")] /// Find the links as a stream using string resource validation - pub async fn links_stream>( + pub async fn links_stream< + A: PartialEq + Eq + Sync + Send + Clone + Default + std::hash::Hash + From, + >( &self, _: &RelativeSelectors, ) -> HashSet { @@ -1492,22 +1866,6 @@ impl Page { } } - /// Find all href links and return them using CSS selectors. - #[inline(always)] - pub async fn links_ssg( - &self, - selectors: &RelativeSelectors, - client: &Client, - ) -> HashSet { - match self.html.is_some() { - false => Default::default(), - true => { - self.links_stream_ssg::(selectors, client) - .await - } - } - } - /// Find all href links and return them using CSS selectors gathering all resources. #[inline(always)] pub async fn links_full( diff --git a/spider/src/utils/mod.rs b/spider/src/utils/mod.rs index 1c814dc72..1204a085c 100644 --- a/spider/src/utils/mod.rs +++ b/spider/src/utils/mod.rs @@ -7,10 +7,15 @@ pub mod trie; use std::str::FromStr; -use crate::bytes::BufMut; +use crate::{bytes::BufMut, RelativeSelectors}; use auto_encoder::is_binary_file; use bytes::BytesMut; +use case_insensitive_string::CaseInsensitiveString; +use lol_html::send::HtmlRewriter; +use lol_html::OutputSink; + use phf::phf_set; +use url::Url; #[cfg(feature = "chrome")] use crate::features::chrome_common::{AutomationScripts, ExecutionScripts}; @@ -1403,6 +1408,22 @@ pub fn get_cookies(res: &Response) -> Option { None } +/// Block streaming +fn block_streaming(res: &Response, only_html: bool) -> bool { + let mut block_streaming = false; + + if only_html { + if let Some(content_type) = res.headers().get(reqwest::header::CONTENT_TYPE) { + if let Ok(content_type_str) = content_type.to_str() { + if IGNORE_CONTENT_TYPES.contains(content_type_str) { + block_streaming = true; + } + } + } + } + block_streaming +} + /// Handle the response bytes pub async fn handle_response_bytes( res: Response, @@ -1416,26 +1437,15 @@ pub async fn handle_response_bytes( } else { None }; + let status_code: StatusCode = res.status(); #[cfg(feature = "headers")] let headers = res.headers().clone(); let cookies = get_cookies(&res); - let mut block_streaming = false; - - if only_html { - if let Some(content_type) = res.headers().get(reqwest::header::CONTENT_TYPE) { - if let Ok(content_type_str) = content_type.to_str() { - if IGNORE_CONTENT_TYPES.contains(content_type_str) { - block_streaming = true; - } - } - } - } - let mut content: Option> = None; - if !block_streaming { + if !block_streaming(&res, only_html) { let mut stream = res.bytes_stream(); let mut data: BytesMut = BytesMut::new(); let mut first_bytes = true; @@ -1479,6 +1489,96 @@ pub async fn handle_response_bytes( } } +/// Handle the response bytes writing links while crawling +pub async fn handle_response_bytes_writer<'h, O>( + res: Response, + target_url: &str, + only_html: bool, + rewriter: &mut HtmlRewriter<'h, O>, +) -> (PageResponse, bool) +where + O: OutputSink + Send + 'static, +{ + let u = res.url().as_str(); + + let final_url = if target_url != u { + Some(u.into()) + } else { + None + }; + + let status_code: StatusCode = res.status(); + #[cfg(feature = "headers")] + let headers = res.headers().clone(); + let cookies = get_cookies(&res); + + // let mut content: Option> = None; + let mut rewrite_error = false; + + if !block_streaming(&res, only_html) { + let mut stream = res.bytes_stream(); + // let mut data: BytesMut = BytesMut::new(); + let mut first_bytes = true; + let mut data_len = 0; + + while let Some(item) = stream.next().await { + match item { + Ok(text) => { + if only_html && first_bytes { + first_bytes = false; + if is_binary_file(&text) { + break; + } + } + let limit = *MAX_SIZE_BYTES; + + if limit > 0 && data_len + text.len() > limit { + break; + } + + if !rewrite_error { + if let Err(_) = rewriter.write(&text) { + rewrite_error = true; + } + } + + data_len += text.len(); + } + Err(e) => { + log::error!("{e} in {}", target_url); + break; + } + } + } + } + + ( + PageResponse { + #[cfg(feature = "headers")] + headers: Some(headers), + #[cfg(feature = "cookies")] + cookies, + // content, + final_url, + status_code, + ..Default::default() + }, + rewrite_error, + ) +} + +/// Setup default response +pub(crate) fn setup_default_response(res: &Response) -> PageResponse { + PageResponse { + #[cfg(feature = "headers")] + headers: Some(res.headers().clone()), + #[cfg(feature = "cookies")] + cookies: get_cookies(&res), + status_code: res.status(), + ..Default::default() + } +} + /// Perform a network request to a resource extracting all content streaming. async fn fetch_page_html_raw_base( target_url: &str, @@ -1489,14 +1589,37 @@ async fn fetch_page_html_raw_base( Ok(res) if res.status().is_success() => { handle_response_bytes(res, target_url, only_html).await } - Ok(res) => PageResponse { - #[cfg(feature = "headers")] - headers: Some(res.headers().clone()), - #[cfg(feature = "cookies")] - cookies: get_cookies(&res), - status_code: res.status(), - ..Default::default() - }, + Ok(res) => setup_default_response(&res), + Err(_) => { + log("- error parsing html text {}", target_url); + let mut page_response = PageResponse::default(); + if let Ok(status_code) = StatusCode::from_u16(599) { + page_response.status_code = status_code; + } + page_response + } + } +} + +/// Perform a network request to a resource extracting all content streaming. +async fn fetch_page_html_raw_base_handler<'h, O>( + target_url: &str, + client: &Client, + only_html: bool, + rewriter: &mut HtmlRewriter<'h, O>, + rewrite_error: &mut bool, +) -> PageResponse +where + O: OutputSink + Send + 'static, +{ + match client.get(target_url).send().await { + Ok(res) if res.status().is_success() => { + // todo: add the rewriter here. + let response = handle_response_bytes_writer(res, target_url, only_html, rewriter).await; + *rewrite_error = response.1; + response.0 + } + Ok(res) => setup_default_response(&res), Err(_) => { log("- error parsing html text {}", target_url); let mut page_response = PageResponse::default(); @@ -1508,6 +1631,20 @@ async fn fetch_page_html_raw_base( } } +/// Perform a network request to a resource extracting all content streaming. +pub async fn fetch_page_html_raw_rewriter<'h, O>( + target_url: &str, + client: &Client, + only_html: bool, + rewriter: &mut HtmlRewriter<'h, O>, + rewrite_error: &mut bool, +) -> PageResponse +where + O: OutputSink + Send + 'static, +{ + fetch_page_html_raw_base_handler(target_url, client, only_html, rewriter, rewrite_error).await +} + /// Perform a network request to a resource extracting all content streaming. pub async fn fetch_page_html_raw(target_url: &str, client: &Client) -> PageResponse { fetch_page_html_raw_base(target_url, client, false).await @@ -2700,3 +2837,59 @@ pub async fn reset(target: &str) { _ => (), }; } + +/// Setup selectors for handling link targets. +pub(crate) fn setup_website_selectors( + domain_parsed: &Option>, + url: &str, + allowed: AllowedDomainTypes, +) -> Option { + use crate::page::{get_page_selectors, get_page_selectors_base}; + let subdomains = allowed.subdomains; + let tld = allowed.tld; + match domain_parsed { + Some(u) => get_page_selectors_base(&u, subdomains, tld), + _ => get_page_selectors(url, subdomains, tld), + } +} + +/// Allow subdomains or tlds. +#[derive(Debug, Default, Clone, Copy)] +pub struct AllowedDomainTypes { + /// Subdomains + pub subdomains: bool, + /// Tlds + pub tld: bool, +} + +impl AllowedDomainTypes { + /// A new domain type. + pub fn new(subdomains: bool, tld: bool) -> Self { + Self { subdomains, tld } + } +} + +/// Modify the selectors for targetting a website. +pub(crate) fn modify_selectors( + prior_domain: &Option>, + domain: &str, + domain_parsed: &mut Option>, + url: &mut Box, + base: &mut RelativeSelectors, + allowed: AllowedDomainTypes, +) { + *domain_parsed = match url::Url::parse(domain) { + Ok(u) => Some(Box::new(crate::page::convert_abs_path(&u, "/"))), + _ => None, + }; + *url = Box::new(domain.into()); + if let Some(s) = setup_website_selectors(domain_parsed, url.inner(), allowed) { + base.0 = s.0; + base.1 = s.1; + if let Some(prior_domain) = prior_domain { + if let Some(dname) = prior_domain.host_str() { + base.2 = dname.into(); + } + } + } +} diff --git a/spider/src/website.rs b/spider/src/website.rs index bc2a9320b..dd58bd1ac 100644 --- a/spider/src/website.rs +++ b/spider/src/website.rs @@ -5,8 +5,9 @@ use crate::configuration::{ }; use crate::features::chrome_common::RequestInterceptConfiguration; use crate::packages::robotparser::parser::RobotFileParser; -use crate::page::{get_page_selectors, get_page_selectors_base, Page}; +use crate::page::{Page, PageLinkBuildSettings}; use crate::utils::{interner::ListBucket, log}; +use crate::utils::{setup_website_selectors, AllowedDomainTypes}; use crate::CaseInsensitiveString; use crate::Client; use crate::RelativeSelectors; @@ -1181,16 +1182,11 @@ impl Website { /// Setup selectors for handling link targets. fn setup_selectors(&self) -> Option { - match self.get_url_parsed() { - Some(u) => { - get_page_selectors_base(u, self.configuration.subdomains, self.configuration.tld) - } - _ => get_page_selectors( - self.get_url().inner(), - self.configuration.subdomains, - self.configuration.tld, - ), - } + setup_website_selectors( + &self.get_url_parsed(), + self.get_url().inner(), + AllowedDomainTypes::new(self.configuration.subdomains, self.configuration.tld), + ) } /// Setup config for crawl. @@ -1272,9 +1268,32 @@ impl Website { .is_allowed_default(self.get_base_link()) .eq(&ProcessLinkStatus::Allowed) { + let mut links: HashSet = HashSet::new(); + let mut links_ssg = links.clone(); + let url = self.url.inner(); + let mut page_links_settings = + PageLinkBuildSettings::new(true, self.configuration.full_resources); + + page_links_settings.subdomains = self.configuration.subdomains; + page_links_settings.tld = self.configuration.tld; + + let mut domain_parsed = self.domain_parsed.take(); + + let mut page = Page::new_page_streaming( + url, + client, + false, + base, + &self.configuration.external_domains_caseless, + &page_links_settings, + &mut links, + Some(&mut links_ssg), + &mut domain_parsed, + &mut self.domain_parsed, + ) + .await; - let mut page = Page::new_page(url, client).await; let mut retry_count = self.configuration.retry; while page.should_retry && retry_count > 0 { @@ -1292,46 +1311,40 @@ impl Website { if let Some(timeout) = page.get_timeout() { tokio::time::sleep(timeout).await; } - page.clone_from(&Page::new_page(url, client).await); + page.clone_from( + &Page::new_page_streaming( + url, + client, + false, + base, + &self.configuration.external_domains_caseless, + &page_links_settings, + &mut links, + Some(&mut links_ssg), + &mut domain_parsed, + &mut self.domain_parsed, + ) + .await, + ); } retry_count -= 1; } log::info!("fetch {}", &url); - // allow initial page mutation - if let Some(domain) = page.final_redirect_destination.as_deref() { - let prior_domain = self.domain_parsed.take(); - self.domain_parsed = match url::Url::parse(domain) { - Ok(u) => Some(Box::new(crate::page::convert_abs_path(&u, "/"))), - _ => None, - }; - self.url = Box::new(domain.into()); - if let Some(s) = self.setup_selectors() { - base.0 = s.0; - base.1 = s.1; - - if let Some(prior_domain) = prior_domain { - if let Some(dname) = prior_domain.host_str() { - base.2 = dname.into(); - } - } + self.links_visited.insert(match self.on_link_find_callback { + Some(cb) => { + let c = cb(*self.url.clone(), None); + c.0 } - } + _ => *self.url.clone(), + }); - let links = if !page.is_empty() { - self.links_visited.insert(match self.on_link_find_callback { - Some(cb) => { - let c = cb(*self.url.clone(), None); - c.0 - } - _ => *self.url.clone(), - }); - page.links_ssg(base, client).await - } else { + if page.is_empty() { self.status = CrawlStatus::Empty; - Default::default() - }; + } + + links.extend(links_ssg); self.initial_status_code = page.status_code; @@ -1530,7 +1543,7 @@ impl Website { async fn crawl_establish_smart( &mut self, client: &Client, - base: &mut RelativeSelectors, + mut base: &mut RelativeSelectors, _: bool, browser: &Arc, context_id: &Option, @@ -1540,7 +1553,9 @@ impl Website { .eq(&ProcessLinkStatus::Allowed) { let url = self.url.inner(); + let mut page = Page::new_page(&url, &client).await; + let mut retry_count = self.configuration.retry; while page.should_retry && retry_count > 0 { @@ -1593,35 +1608,16 @@ impl Website { .smart_links(&base, &browser, &self.configuration, &context_id) .await; - match page.final_redirect_destination { - Some(ref domain) => { - let domain: Box = - CaseInsensitiveString::new(&domain).into(); - let prior_domain = self.domain_parsed.take(); - - self.domain_parsed = match url::Url::parse(&domain.inner()) { - Ok(u) => Some(Box::new(crate::page::convert_abs_path(&u, "/"))), - _ => None, - }; - self.url = domain; - match self.setup_selectors() { - Some(s) => { - base.0 = s.0; - base.1 = s.1; - match prior_domain { - Some(prior_domain) => match prior_domain.host_str() { - Some(dname) => { - base.2 = dname.into(); - } - _ => (), - }, - _ => (), - } - } - _ => (), - } - } - _ => (), + if let Some(ref domain) = page.final_redirect_destination { + let prior_domain = self.domain_parsed.take(); + crate::utils::modify_selectors( + &prior_domain, + domain, + &mut self.domain_parsed, + &mut self.url, + &mut base, + AllowedDomainTypes::new(self.configuration.subdomains, self.configuration.tld), + ); } let links = if !page_links.is_empty() { @@ -2192,6 +2188,13 @@ impl Website { self.configuration.external_domains_caseless.clone(), self.channel_guard.clone(), self.configuration.retry, + self.configuration.full_resources, + PageLinkBuildSettings::new_full( + false, + self.configuration.full_resources, + self.configuration.subdomains, + self.configuration.tld, + ), )); let mut set: JoinSet> = JoinSet::new(); @@ -2236,11 +2239,19 @@ impl Website { _ => (link, None), }; - let mut page = if !only_html { - Page::new_page(link_result.0.as_ref(), &shared.0).await - } else { - Page::new_page_only_html(link_result.0.as_ref(), &shared.0).await - }; + let mut links: HashSet = HashSet::new(); + let mut relative_selectors = shared.1.clone(); + let mut r_settings = shared.7; + r_settings.ssg_build = true; + let target_url = link_result.0.as_ref(); + let external_domains_caseless = &shared.3; + let client = &shared.0; + + let mut domain_parsed = None; + + let page_stream = Page::new_page_streaming(target_url, client, only_html, &mut relative_selectors, external_domains_caseless, &r_settings, &mut links, None, &None, &mut domain_parsed); + + let mut page = page_stream.await; let mut retry_count = shared.5; @@ -2249,35 +2260,26 @@ impl Website { let next_page = backoff::future::retry( ExponentialBackoff::default(), || async { - let p = if full_resources { - Page::new_page(link_result.0.as_ref(), &shared.0).await - } else { - Page::new_page_only_html(link_result.0.as_ref(), &shared.0).await - }; + let mut links: HashSet = HashSet::new(); + let mut domain_parsed = None; + let p = Page::new_page_streaming(target_url, client, only_html, &mut relative_selectors.clone(), &external_domains_caseless, &r_settings, &mut links, None, &None, &mut domain_parsed).await; - Ok::>(p) + Ok::<(Page, HashSet), backoff::Error>((p, links)) }, ); if let Ok(next_page) = next_page.await { - page.clone_from(&next_page); + page.clone_from(&next_page.0); + links.extend(next_page.1); }; } else { if let Some(timeout) = page.get_timeout() { tokio::time::sleep(timeout).await; } - page.clone_from(&Page::new_page(link_result.0.as_ref(), &shared.0).await); + page.clone_from(&Page::new_page_streaming(target_url, &client, only_html, &mut relative_selectors.clone(), external_domains_caseless, &r_settings, &mut links, None, &None, &mut domain_parsed).await); } retry_count -= 1; } - page.set_external(shared.3.to_owned()); - - let links = if full_resources { - page.links_full(&shared.1).await - } else { - page.links(&shared.1).await - }; - if return_page_links { page.page_links = if links.is_empty() { None @@ -2287,6 +2289,7 @@ impl Website { } channel_send_page(&shared.2, page, &shared.4); + links }), &chandle, diff --git a/spider_chrome/Cargo.toml b/spider_chrome/Cargo.toml index 92c06c860..4b4e78765 100644 --- a/spider_chrome/Cargo.toml +++ b/spider_chrome/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_chrome" -version = "2.12.12" +version = "2.13.0" rust-version = "1.70" authors = [ "j-mendez " diff --git a/spider_cli/Cargo.toml b/spider_cli/Cargo.toml index 06e4cf994..0caed9bf5 100644 --- a/spider_cli/Cargo.toml +++ b/spider_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_cli" -version = "2.12.12" +version = "2.13.0" authors = [ "j-mendez " ] diff --git a/spider_transformations/Cargo.toml b/spider_transformations/Cargo.toml index f47789180..dabac97f4 100644 --- a/spider_transformations/Cargo.toml +++ b/spider_transformations/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_transformations" -version = "2.12.12" +version = "2.13.0" authors = [ "j-mendez " ] diff --git a/spider_utils/Cargo.toml b/spider_utils/Cargo.toml index 5e6107ade..71f473378 100644 --- a/spider_utils/Cargo.toml +++ b/spider_utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_utils" -version = "2.12.12" +version = "2.13.0" authors = [ "j-mendez " ] diff --git a/spider_worker/Cargo.toml b/spider_worker/Cargo.toml index ada18a917..e37209783 100644 --- a/spider_worker/Cargo.toml +++ b/spider_worker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_worker" -version = "2.12.12" +version = "2.13.0" authors = [ "j-mendez " ] diff --git a/spider_worker/src/main.rs b/spider_worker/src/main.rs index 3bf672700..3c063d783 100644 --- a/spider_worker/src/main.rs +++ b/spider_worker/src/main.rs @@ -23,6 +23,7 @@ async fn forward( ) -> Result { use spider::{ flexbuffers, + page::{build, Page}, serde::Serialize, string_concat::{string_concat, string_concat_impl}, }; @@ -42,30 +43,43 @@ async fn forward( ) }; - let page = spider::page::Page::new_page(&url_path, &CLIENT).await; - - let extracted = if !page.get_html().is_empty() { - let (subdomains, tld) = match referer { - Some(r) => (r == "3" || r == "1", r == "3" || r == "2"), - _ => (false, false), - }; - - match spider::page::get_page_selectors(&url_path, subdomains, tld) { - Some(selectors) => { - let links = page.links_stream::(&selectors).await; - - let mut s = flexbuffers::FlexbufferSerializer::new(); + let (subdomains, tld) = match referer { + Some(r) => (r == "3" || r == "1", r == "3" || r == "2"), + _ => (false, false), + }; - match links.serialize(&mut s) { - _ => (), - }; + let mut page = build(&"", Default::default()); + + let extracted = match spider::page::get_page_selectors(&url_path, subdomains, tld) { + Some(mut selectors) => { + let mut links: spider::hashbrown::HashSet = + spider::hashbrown::HashSet::new(); + + page.clone_from( + &spider::page::Page::new_page_streaming( + &url_path, + &CLIENT, + false, + &mut selectors, + &Default::default(), + &Default::default(), + &mut links, + None, + &None, + &mut None, + ) + .await, + ); + + let mut s = flexbuffers::FlexbufferSerializer::new(); + + match links.serialize(&mut s) { + _ => (), + }; - s.take_buffer() - } - _ => Default::default(), + s.take_buffer() } - } else { - Default::default() + _ => Default::default(), }; #[cfg(feature = "headers")]