diff --git a/Cargo.lock b/Cargo.lock index 590371a..fb42a2f 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,12 +342,95 @@ dependencies = [ "libc", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -692,6 +775,12 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "powerfmt" version = "0.2.0" @@ -766,6 +855,7 @@ name = "qb-daemon" version = "0.1.0" dependencies = [ "bitcode", + "futures", "interprocess", "qb-control", "qb-core", @@ -991,6 +1081,15 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/qb-control/src/lib.rs b/qb-control/src/lib.rs index 30d3252..41cda26 100644 --- a/qb-control/src/lib.rs +++ b/qb-control/src/lib.rs @@ -1,5 +1,7 @@ use std::fmt; +// no to happy with this one. It kinda sucks + use bitcode::{Decode, Encode}; use qb_core::interface::QBIId; diff --git a/qb-daemon/Cargo.toml b/qb-daemon/Cargo.toml index 2a73598..c46f425 100644 --- a/qb-daemon/Cargo.toml +++ b/qb-daemon/Cargo.toml @@ -14,3 +14,4 @@ thiserror = "1.0.63" qb-core = { path = "../qb-core" } qb-proto = { path = "../qb-proto" } qb-control = { path = "../qb-control" } +futures = "0.3.30" diff --git a/qb-daemon/src/main.rs b/qb-daemon/src/main.rs index 07ebba2..3158a46 100644 --- a/qb-daemon/src/main.rs +++ b/qb-daemon/src/main.rs @@ -1,7 +1,8 @@ use core::panic; -use std::{collections::HashMap, fs::File, sync::Arc, time::Duration}; +use std::{collections::HashMap, fs::File, future::Future, sync::Arc}; use bitcode::DecodeOwned; +use futures::future::BoxFuture; use interprocess::local_socket::{ tokio::Stream, traits::tokio::Listener, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToNsName, @@ -29,13 +30,15 @@ pub enum Error { NotFound, #[error("this type of QBI is not supported")] NotSupported, + #[error("the given content is malformed")] + Malformed, } pub type Result = std::result::Result; // TODO: asyncify these -pub type StartFn = Box; -pub type SetupFn = Box (QBIId, Vec)>; +pub type StartFn<'a> = Box) -> BoxFuture<'a, ()>>; +pub type SetupFn<'a> = Box BoxFuture<'a, Result<(QBIId, Vec)>>>; pub struct QBIDescriptior { name: String, @@ -49,7 +52,10 @@ pub struct Handle { impl Handle { /// Send a message to this handle pub async fn send(&self, msg: impl Into) { - self.tx.send(msg.into()).await.unwrap(); + match self.tx.send(msg.into()).await { + Err(err) => warn!("could not send message to handle: {0}", err), + Ok(_) => {} + }; } } @@ -60,18 +66,18 @@ pub struct HandleInit { rx: mpsc::Receiver, } -pub struct QBDaemon { +pub struct QBDaemon<'a> { qb: QB, qbis: HashMap, - start_fns: HashMap, - setup_fns: HashMap, + start_fns: HashMap>, + setup_fns: HashMap>, req_tx: mpsc::Sender<(QBId, QBControlRequest, Option)>, req_rx: mpsc::Receiver<(QBId, QBControlRequest, Option)>, handles: HashMap, } -impl QBDaemon { +impl<'a> QBDaemon<'a> { /// Build the daemon pub fn init(qb: QB) -> Self { let (req_tx, req_rx) = mpsc::channel(10); @@ -91,7 +97,7 @@ impl QBDaemon { let descriptor = self.qbis.get(&id).ok_or(Error::NotFound)?; let name = &descriptor.name; let start = self.start_fns.get(name).ok_or(Error::NotSupported)?; - start(&mut self.qb, id, &descriptor.data); + start(&mut self.qb, id, descriptor.data.clone()); Ok(()) } @@ -101,37 +107,36 @@ impl QBDaemon { Ok(()) } - pub fn setup(&mut self, name: String, blob: QBPBlob) { - let setup = self.setup_fns.get(&name).unwrap(); - let (id, data) = setup(blob); + pub async fn setup(&mut self, name: String, blob: QBPBlob) -> Result<()> { + let setup = self.setup_fns.get(&name).ok_or(Error::NotSupported)?; + let (id, data) = setup(blob).await?; self.qbis.insert(id, QBIDescriptior { name, data }); + Ok(()) } /// Register a QBI kind. pub fn register(&mut self, name: impl Into) where - for<'a> T: QBIContext + QBISetup<'a> + DecodeOwned, + for<'b> T: QBIContext + QBISetup<'b> + DecodeOwned, { let name = name.into(); self.start_fns.insert( name.clone(), - Box::new(|qb, id, data| { - let runtime = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - runtime.block_on(qb.attach(id, bitcode::decode::(data).unwrap())); + Box::new(move |qb, id, data| { + Box::pin(async move { + qb.attach(id, bitcode::decode::(&data).unwrap()).await; + }) }), ); self.setup_fns.insert( name, Box::new(move |blob| { - let runtime = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - let cx = blob.deserialize::().unwrap(); - let data = bitcode::encode(&cx); - let id = runtime.block_on(cx.setup()); - (id, data) + Box::pin(async move { + let cx = blob.deserialize::()?; + let data = bitcode::encode(&cx); + let id = cx.setup().await; + Ok((id, data)) + }) }), ); } @@ -159,7 +164,7 @@ impl QBDaemon { QBControlRequest::Stop { id } => self.stop(id).await?, QBControlRequest::Setup { name, .. } => { let blob = blob.unwrap(); - self.setup(name, blob); + self.setup(name, blob).await?; } QBControlRequest::Bridge { id, msg } => { self.qb.send(&id, QBIBridgeMessage { caller, msg }).await; @@ -183,12 +188,6 @@ impl QBDaemon { tokio::spawn(handle_run(init)); } - - /// Register the default QBI kinds. - pub fn register_default(&mut self) { - self.register::("local"); - // self.register::("gdrive"); - } } #[tokio::main] @@ -222,7 +221,10 @@ async fn main() { // Initialize the core library let qb = QB::init("./local").await; + + // Setup the daemon let mut daemon = QBDaemon::init(qb); + daemon.register::("local"); // Process loop { @@ -255,8 +257,6 @@ async fn handle_run(mut init: HandleInit) { Err(err) => span.in_scope(|| warn!("handle finished with error: {:?}", err)), Ok(_) => {} } - - tokio::time::sleep(Duration::from_secs(1)).await; } async fn _handle_run(init: &mut HandleInit) -> Result<()> {