Skip to content

Commit

Permalink
Try
Browse files Browse the repository at this point in the history
  • Loading branch information
lbirkert committed Aug 6, 2024
1 parent bdf6127 commit db24131
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 34 deletions.
99 changes: 99 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions qb-control/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt;

// no to happy with this one. It kinda sucks

use bitcode::{Decode, Encode};
use qb_core::interface::QBIId;

Expand Down
1 change: 1 addition & 0 deletions qb-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ thiserror = "1.0.63"
qb-core = { path = "../qb-core" }
qb-proto = { path = "../qb-proto" }
qb-control = { path = "../qb-control" }
futures = "0.3.30"
68 changes: 34 additions & 34 deletions qb-daemon/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use core::panic;
use std::{collections::HashMap, fs::File, sync::Arc, time::Duration};
use std::{collections::HashMap, fs::File, future::Future, sync::Arc};

use bitcode::DecodeOwned;
use futures::future::BoxFuture;
use interprocess::local_socket::{
tokio::Stream, traits::tokio::Listener, GenericNamespaced, ListenerNonblockingMode,
ListenerOptions, ToNsName,
Expand Down Expand Up @@ -29,13 +30,15 @@ pub enum Error {
NotFound,
#[error("this type of QBI is not supported")]
NotSupported,
#[error("the given content is malformed")]
Malformed,
}

pub type Result<T> = std::result::Result<T, Error>;

// TODO: asyncify these
pub type StartFn = Box<dyn Fn(&mut QB, QBIId, &[u8])>;
pub type SetupFn = Box<dyn Fn(QBPBlob) -> (QBIId, Vec<u8>)>;
pub type StartFn<'a> = Box<dyn Fn(&mut QB, QBIId, Vec<u8>) -> BoxFuture<'a, ()>>;
pub type SetupFn<'a> = Box<dyn Fn(QBPBlob) -> BoxFuture<'a, Result<(QBIId, Vec<u8>)>>>;

pub struct QBIDescriptior {
name: String,
Expand All @@ -49,7 +52,10 @@ pub struct Handle {
impl Handle {
/// Send a message to this handle
pub async fn send(&self, msg: impl Into<QBControlResponse>) {
self.tx.send(msg.into()).await.unwrap();
match self.tx.send(msg.into()).await {
Err(err) => warn!("could not send message to handle: {0}", err),
Ok(_) => {}
};
}
}

Expand All @@ -60,18 +66,18 @@ pub struct HandleInit {
rx: mpsc::Receiver<QBControlResponse>,
}

pub struct QBDaemon {
pub struct QBDaemon<'a> {
qb: QB,
qbis: HashMap<QBIId, QBIDescriptior>,
start_fns: HashMap<String, StartFn>,
setup_fns: HashMap<String, SetupFn>,
start_fns: HashMap<String, StartFn<'a>>,
setup_fns: HashMap<String, SetupFn<'a>>,

req_tx: mpsc::Sender<(QBId, QBControlRequest, Option<QBPBlob>)>,
req_rx: mpsc::Receiver<(QBId, QBControlRequest, Option<QBPBlob>)>,
handles: HashMap<QBId, Handle>,
}

impl QBDaemon {
impl<'a> QBDaemon<'a> {
/// Build the daemon
pub fn init(qb: QB) -> Self {
let (req_tx, req_rx) = mpsc::channel(10);
Expand All @@ -91,7 +97,7 @@ impl QBDaemon {
let descriptor = self.qbis.get(&id).ok_or(Error::NotFound)?;
let name = &descriptor.name;
let start = self.start_fns.get(name).ok_or(Error::NotSupported)?;
start(&mut self.qb, id, &descriptor.data);
start(&mut self.qb, id, descriptor.data.clone());
Ok(())
}

Expand All @@ -101,37 +107,36 @@ impl QBDaemon {
Ok(())
}

pub fn setup(&mut self, name: String, blob: QBPBlob) {
let setup = self.setup_fns.get(&name).unwrap();
let (id, data) = setup(blob);
pub async fn setup(&mut self, name: String, blob: QBPBlob) -> Result<()> {
let setup = self.setup_fns.get(&name).ok_or(Error::NotSupported)?;
let (id, data) = setup(blob).await?;
self.qbis.insert(id, QBIDescriptior { name, data });
Ok(())
}

/// Register a QBI kind.
pub fn register<T>(&mut self, name: impl Into<String>)
where
for<'a> T: QBIContext + QBISetup<'a> + DecodeOwned,
for<'b> T: QBIContext + QBISetup<'b> + DecodeOwned,
{
let name = name.into();
self.start_fns.insert(
name.clone(),
Box::new(|qb, id, data| {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
runtime.block_on(qb.attach(id, bitcode::decode::<T>(data).unwrap()));
Box::new(move |qb, id, data| {
Box::pin(async move {
qb.attach(id, bitcode::decode::<T>(&data).unwrap()).await;
})
}),
);
self.setup_fns.insert(
name,
Box::new(move |blob| {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let cx = blob.deserialize::<T>().unwrap();
let data = bitcode::encode(&cx);
let id = runtime.block_on(cx.setup());
(id, data)
Box::pin(async move {
let cx = blob.deserialize::<T>()?;
let data = bitcode::encode(&cx);
let id = cx.setup().await;
Ok((id, data))
})
}),
);
}
Expand Down Expand Up @@ -159,7 +164,7 @@ impl QBDaemon {
QBControlRequest::Stop { id } => self.stop(id).await?,
QBControlRequest::Setup { name, .. } => {
let blob = blob.unwrap();
self.setup(name, blob);
self.setup(name, blob).await?;
}
QBControlRequest::Bridge { id, msg } => {
self.qb.send(&id, QBIBridgeMessage { caller, msg }).await;
Expand All @@ -183,12 +188,6 @@ impl QBDaemon {

tokio::spawn(handle_run(init));
}

/// Register the default QBI kinds.
pub fn register_default(&mut self) {
self.register::<QBILocal>("local");
// self.register::<QBIGDrive>("gdrive");
}
}

#[tokio::main]
Expand Down Expand Up @@ -222,7 +221,10 @@ async fn main() {

// Initialize the core library
let qb = QB::init("./local").await;

// Setup the daemon
let mut daemon = QBDaemon::init(qb);
daemon.register::<QBILocal>("local");

// Process
loop {
Expand Down Expand Up @@ -255,8 +257,6 @@ async fn handle_run(mut init: HandleInit) {
Err(err) => span.in_scope(|| warn!("handle finished with error: {:?}", err)),
Ok(_) => {}
}

tokio::time::sleep(Duration::from_secs(1)).await;
}

async fn _handle_run(init: &mut HandleInit) -> Result<()> {
Expand Down

0 comments on commit db24131

Please sign in to comment.