Skip to content

Commit

Permalink
Fiddle around a lot more (no success)
Browse files Browse the repository at this point in the history
  • Loading branch information
lbirkert committed Aug 22, 2024
1 parent f40901e commit 48bcef5
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 44 deletions.
9 changes: 5 additions & 4 deletions qb-daemon/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bitcode::{Decode, Encode};
use qb_ext::{
control::{QBCId, QBCRequest, QBCResponse},
hook::QBHContext,
interface::QBIContext,
interface::QBIContextBoxed,
QBExtId, QBExtSetup,
};
use qb_proto::{QBPBlob, QBPDeserialize, QBP};
Expand Down Expand Up @@ -290,14 +290,15 @@ impl QBDaemon {
pub fn register_qbi<S, I>(&mut self, name: impl Into<String>)
where
S: QBExtSetup<I> + QBPDeserialize,
I: QBIContext + Encode + for<'a> Decode<'a> + 'static,
I: QBIContextBoxed + Encode + for<'a> Decode<'a> + 'static,
{
let name = name.into();
self.start_fns.insert(
name.clone(),
Box::new(move |qb, id, data| {
Box::pin(async move {
qb.attach(id, bitcode::decode::<I>(data).unwrap()).await?;
qb.attach(id, Box::new(bitcode::decode::<I>(data).unwrap()))
.await?;
Ok(())
})
}),
Expand Down Expand Up @@ -325,7 +326,7 @@ impl QBDaemon {
where
S: QBExtSetup<H> + QBPDeserialize,
H: QBHContext<I> + Encode + for<'a> Decode<'a> + 'static,
I: QBIContext + Any + Send,
I: QBIContextBoxed + Any + Send,
{
let name = name.into();
self.start_fns.insert(
Expand Down
43 changes: 12 additions & 31 deletions qb-daemon/src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! which handles interfaces and their communication.
//! It owns a device table and a changelog to allow syncing.
use std::{collections::HashMap, future::Future, pin::Pin, rc::Rc};
use std::collections::HashMap;

use qb_core::{
change::QBChangeMap,
Expand All @@ -14,7 +14,7 @@ use qb_core::{
};
use qb_ext::{
hook::{QBHChannel, QBHContext, QBHHostMessage, QBHSlaveMessage},
interface::{QBIChannel, QBIContext, QBIHostMessage, QBIMessage, QBISlaveMessage},
interface::{QBIChannel, QBIContextBoxed, QBIHostMessage, QBIMessage, QBISlaveMessage},
QBExtId,
};
use thiserror::Error;
Expand Down Expand Up @@ -69,19 +69,9 @@ pub struct QBIHandle {
/// A handle to a hook.
pub struct QBHHandle {
join_handle: JoinHandle<()>,
handler_fn: QBHHandlerFn,
tx: mpsc::Sender<QBHHostMessage>,
}

/// A hook handler function. This is needed, because the type of the
/// interface context we send over the mpsc is Any and therefore the
/// context must be downcast individually.
pub type QBHHandlerFn = Rc<
Box<
dyn for<'a> Fn(&'a mut QBMaster, QBHSlaveMessage) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
>,
>;

/// The master, that is, the struct that houses connection
/// to the individual interfaces and manages communication.
pub struct QBMaster {
Expand Down Expand Up @@ -142,10 +132,13 @@ impl QBMaster {
///
/// # Cancelation Safety
/// This method is not cancelation safe.
pub async fn hprocess(&mut self, (id, msg): (QBExtId, QBHSlaveMessage)) {
let handle = self.qbh_handles.get(&id).unwrap();
let handler_fn = handle.handler_fn.clone();
handler_fn(self, msg).await;
pub async fn hprocess(&mut self, (_, msg): (QBExtId, QBHSlaveMessage)) {
match msg {
QBHSlaveMessage::Attach { context } => {
self.attach(QBExtId::generate(), context).await.unwrap()
}
_ => unimplemented!(),
}
}

/// Remove unused handles [from interfaces that have finished]
Expand Down Expand Up @@ -277,7 +270,7 @@ impl QBMaster {
}

/// Try to hook a hook to the master. Returns error if already hooked.
pub async fn hook<T: QBIContext + 'static>(
pub async fn hook<T: QBIContextBoxed + 'static>(
&mut self,
id: QBExtId,
cx: impl QBHContext<T>,
Expand All @@ -293,18 +286,6 @@ impl QBMaster {

// create the handle
let handle = QBHHandle {
handler_fn: Rc::new(Box::new(move |master, msg| {
Box::pin(async move {
match msg {
QBHSlaveMessage::Attach { context } => {
// downcast the context
let context = *context.downcast::<T>().unwrap();
master.attach(QBExtId::generate(), context).await.unwrap();
}
_ => unimplemented!(),
}
})
})),
join_handle: tokio::spawn(
cx.run(QBHChannel::new(id.clone(), self.qbh_tx.clone(), master_rx).into())
.instrument(span),
Expand All @@ -318,7 +299,7 @@ impl QBMaster {
}

/// Try to attach an interface to the master. Returns error if already attached.
pub async fn attach(&mut self, id: QBExtId, cx: impl QBIContext) -> Result<()> {
pub async fn attach(&mut self, id: QBExtId, cx: Box<dyn QBIContextBoxed>) -> Result<()> {
let span = info_span!("qb-interface", id = id.to_hex());

// make sure we do not attach an interface twice
Expand All @@ -331,7 +312,7 @@ impl QBMaster {
// create the handle
let handle = QBIHandle {
join_handle: tokio::spawn(
cx.run(
cx.run_boxed(
self.devices.host_id.clone(),
QBIChannel::new(id.clone(), self.qbi_tx.clone(), master_rx),
)
Expand Down
2 changes: 1 addition & 1 deletion qb-ext-tcp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct QBITCPClient {
}

impl QBIContext for QBITCPClient {
async fn run(self, host_id: QBDeviceId, com: QBIChannel) {
async fn run(&mut self, host_id: QBDeviceId, com: QBIChannel) {
debug!("initializing socket: {}", self.addr);

let socket = TcpSocket::new_v4().unwrap();
Expand Down
17 changes: 10 additions & 7 deletions qb-ext/src/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
//!
//! TODO: switch to mutex instead of using messaging
use std::{any::Any, future::Future, marker::PhantomData};
use std::{future::Future, marker::PhantomData};

use crate::interface::QBIContextBoxed;
use crate::QBExtId;

use crate::{interface::QBIContext, QBExtChannel};
use crate::QBExtChannel;

/// Communicate from the interface to the master
pub type QBHChannel = QBExtChannel<QBExtId, QBHSlaveMessage, QBHHostMessage>;

/// TODO: figure out what to call this
pub struct QBHInit<T: QBIContext + Any + Send> {
pub struct QBHInit<T: QBIContextBoxed + Send + 'static> {
pub channel: QBHChannel,
_t: PhantomData<T>,
}

impl<T: QBIContext + Any + Send> QBHInit<T> {
impl<T: QBIContextBoxed + Send + 'static> QBHInit<T> {
pub async fn attach(&self, context: T) {
self.channel
.send(QBHSlaveMessage::Attach {
Expand All @@ -31,7 +32,7 @@ impl<T: QBIContext + Any + Send> QBHInit<T> {
}
}

impl<T: QBIContext + Any + Send> From<QBHChannel> for QBHInit<T> {
impl<T: QBIContextBoxed + Send> From<QBHChannel> for QBHInit<T> {
fn from(value: QBHChannel) -> Self {
Self {
channel: value,
Expand All @@ -47,10 +48,12 @@ pub enum QBHHostMessage {

#[non_exhaustive]
pub enum QBHSlaveMessage {
Attach { context: Box<dyn Any + Send> },
Attach {
context: Box<dyn QBIContextBoxed + Send>,
},
}

/// A context which yields interfaces.
pub trait QBHContext<I: QBIContext + Any + Send> {
pub trait QBHContext<I: QBIContextBoxed + Send + 'static> {
fn run(self, init: QBHInit<I>) -> impl Future<Output = ()> + Send + 'static;
}
20 changes: 19 additions & 1 deletion qb-ext/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
use bitcode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::future::Future;
use std::{fmt, pin::Pin};

use crate::QBExtId;
use qb_core::{change::QBChangeMap, device::QBDeviceId, time::QBTimeStampUnique};
Expand Down Expand Up @@ -127,3 +127,21 @@ pub trait QBIContext: Send + Sync {
fn run(self, host_id: QBDeviceId, com: QBIChannel)
-> impl Future<Output = ()> + Send + 'static;
}

pub trait QBIContextBoxed: Send + Sync {
fn run_boxed(
self,
host_id: QBDeviceId,
com: QBIChannel,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
}

impl<T: QBIContext> QBIContextBoxed for T {
fn run_boxed(
self,
host_id: QBDeviceId,
com: QBIChannel,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(self.run(host_id, com))
}
}

0 comments on commit 48bcef5

Please sign in to comment.