Skip to content

Commit

Permalink
BLAZINGLY FAST!!!!!!!!!
Browse files Browse the repository at this point in the history
  • Loading branch information
frc4533-lincoln committed Oct 30, 2024
1 parent e1078c1 commit 932f4ed
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 84 deletions.
6 changes: 6 additions & 0 deletions plugins/ranking/engine.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Ranking based on engine

for result in results do
local weight = 1.0

end
114 changes: 39 additions & 75 deletions src/lua_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,23 @@ impl LuaUserData for ClientWrapper {
}

/// Lua wrapper for [scraper::Html]
struct Scraper(Html);
struct Scraper(Arc<Mutex<Html>>);
impl LuaUserData for Scraper {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_function("new", |_, raw_html: String| {
let html = Html::parse_document(&raw_html);
Ok(Self(html))
Ok(Self(Arc::new(Mutex::new(html))))
});
methods.add_method("select", |lua, this, selector: String| {
async fn select(lua: Lua, this: LuaUserDataRef<Scraper>, selector: String) -> LuaResult<LuaTable> {
let sel = Selector::parse(&selector).unwrap();
Ok(lua
lua
.create_sequence_from(
this.0
this.0.lock().await
.select(&sel)
.map(|x| ElementWrapper(x.inner_html(), x.value().clone())),
.map(|x| ElementWrapper(x.inner_html().clone(), x.value().clone())),
)
.unwrap())
});
}
methods.add_async_method("select", select);
}
}

Expand Down Expand Up @@ -203,33 +203,14 @@ fn fend_eval(_: &Lua, input: String) -> LuaResult<String> {
/// A single-threaded plugin engine
#[derive(Clone)]
pub struct PluginEngine {
query_tx: watch::Sender<crate::Query>,
results_rx: watch::Receiver<Vec<crate::Result>>,
lua: Lua,
client: Client,
providers: ProvidersConfig,
}
impl PluginEngine {
/// Initialize a new engine for running plugins
pub async fn new(client: Client) -> Result<Self, Box<dyn Error>> {
let providers = ProvidersConfig::load("plugins/providers.toml");
let (query_tx, rx) = watch::channel(Default::default());
let (tx, results_rx) = watch::channel(Default::default());

spawn_local(async move {
Self::inner(client, providers, rx, tx).await.unwrap();
});

Ok(Self {
query_tx,
results_rx,
})
}

/// Actual Lua init and event loop
async fn inner(
client: Client,
providers: ProvidersConfig,
mut rx: watch::Receiver<crate::Query>,
tx: watch::Sender<Vec<crate::Result>>,
) -> Result<(), Box<dyn Error>> {
let lua = Lua::new();

// Add Lua global variables we need
Expand Down Expand Up @@ -267,15 +248,20 @@ impl PluginEngine {
}
}

// Event loop
while let Ok(()) = rx.changed().await {
let query = rx.borrow_and_update().clone();
Ok(Self {
lua,
client,
providers,
})
}

if let Some(provider) = providers.0.get(&query.provider) {
/// Use the given provider to process the given query
pub async fn search(&self, query: Query) -> Vec<crate::Result> {
if let Some(provider) = self.providers.0.get(&query.provider) {
let engine = provider.engine.clone().unwrap_or(query.provider.clone());

// Get engine implementation
let eng_impl = lua
let eng_impl = self.lua
.globals()
.get::<LuaTable>("__searched_engines__")
.unwrap()
Expand All @@ -284,9 +270,9 @@ impl PluginEngine {

let results = eng_impl
.call_async::<Vec<HashMap<String, LuaValue>>>((
ClientWrapper(client.clone()),
ClientWrapper(self.client.clone()),
query,
lua.to_value(&provider.clone().extra.unwrap_or_default()),
self.lua.to_value(&provider.clone().extra.unwrap_or_default()),
))
.await;

Expand All @@ -300,7 +286,6 @@ impl PluginEngine {

match results {
Ok(results) => {
tx.send(
results
.into_iter()
.map(|r| crate::Result {
Expand All @@ -314,38 +299,16 @@ impl PluginEngine {
}),
..Default::default()
})
.collect(),
)
.unwrap();
.collect()
}
Err(err) => {
error!("failed to get results from engine: {:?}", err);
tx.send(Vec::new()).unwrap();
Vec::new()
}
}
} else {
Vec::new()
}
}

Ok(())
}

/// Use the given provider to process the given query
pub async fn search(&mut self, query: Query) -> Vec<crate::Result> {
// Clean the last set of results
self.results_rx.mark_unchanged();

if self.query_tx.send(query).is_ok() {
if tokio::time::timeout(Duration::from_secs(3), self.results_rx.changed())
.await
.map(|ret| ret.is_ok())
.unwrap_or(false)
{
let results = self.results_rx.borrow_and_update().clone();
return results;
}
}

Vec::new()
}
}

Expand Down Expand Up @@ -390,14 +353,15 @@ impl PluginEnginePool {
let queue = queue.clone();
let client = client.clone();

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
LocalSet::new()
.run_until(async move {
//std::thread::spawn(move || {
// let rt = tokio::runtime::Builder::new_current_thread()
// .enable_all()
// .build()
// .unwrap();
// rt.block_on(async move {
// LocalSet::new()
// .run_until(async move {
tokio::spawn(async move {
let target = format!("searched::worker{i}");
let mut eng = PluginEngine::new(client).await.unwrap();

Expand All @@ -422,9 +386,9 @@ impl PluginEnginePool {
debug!(target: &target, "done in {:?}! awaiting a query...", st.elapsed());
}
}
})
.await;
});
//})
//.await;
//});
});
info!("started worker thread #{i}");
}
Expand Down
16 changes: 9 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use axum::{
};
use log::LevelFilter;
//use reqwest::Client;
use searched::{config::Config, lua_api::PluginEnginePool};
use searched::{config::Config, lua_api::{PluginEngine, PluginEnginePool}};
//use sled::Db;
use tokio::net::TcpListener;

Expand All @@ -28,7 +28,8 @@ pub struct AppState {
//client: Client,
//db: Db,
pool: PluginEnginePool,
config: Config,
//config: Config,
eng: PluginEngine,
}

// Need more worker threads if we do our own search index again:
Expand Down Expand Up @@ -59,10 +60,10 @@ async fn main() {
] {
headers.append(key, HeaderValue::from_str(val).unwrap());
}
//let client = reqwest::Client::builder()
// .default_headers(headers)
// .build()
// .unwrap();
let client = reqwest::Client::builder()
.default_headers(headers)
.build()
.unwrap();

env_logger::builder()
.filter_level(LevelFilter::Info)
Expand All @@ -74,6 +75,7 @@ async fn main() {
let pool = PluginEnginePool::new(4).await;

let config = Config::load("plugins/providers.toml");
let eng = PluginEngine::new(client).await.unwrap();

info!("initializing web");
let r = Router::new()
Expand All @@ -82,7 +84,7 @@ async fn main() {
.route("/settings", get(web::settings))
.route("/assets/logo.png", get(web::logo))
.route("/favicon.ico", get(web::icon))
.with_state(AppState { config, pool });
.with_state(AppState { pool, eng });

tokio::spawn(async {
axum::serve(
Expand Down
10 changes: 8 additions & 2 deletions src/web.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{process, sync::Arc};
use std::{process, sync::Arc, time::Instant};

use axum::{
body::Body,
Expand Down Expand Up @@ -106,8 +106,11 @@ pub async fn results(

let kind = params.k.unwrap_or_default();

#[cfg(debug_assertions)]
let sttm = Instant::now();

let results = st
.pool
.eng
.search(searched::Query {
provider: params.s.clone().unwrap_or("duckduckgo".to_string()),
query: q.clone(),
Expand All @@ -117,6 +120,9 @@ pub async fn results(
})
.await;

#[cfg(debug_assertions)]
debug!("done in {:?}! awaiting a query...", sttm.elapsed());

Html(
(*TEMPLATES.read().await)
.render(
Expand Down

0 comments on commit 932f4ed

Please sign in to comment.