diff --git a/Cargo.toml b/Cargo.toml index 786d0a7..6076814 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["actor", "compute", "graph", "pipeline"] license = "Apache-2.0" readme = "README.md" repository = "https://github.com/farm-ng/hollywood/" -version = "0.5.1" +version = "0.6.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -23,7 +23,7 @@ drawille = "0.3" eframe = {version = ">= 0.27, <1.0", features = ["wgpu"], optional = true} env_logger = {version = "0.11", optional = true} grid = "0.13" -hollywood_macros = {version = "0.5", path = "hollywood_macros"} +hollywood_macros = {version = "0.6.0", path = "hollywood_macros"} # hollywood intends to use only very basic features of nalgebra, hence # future versions of nalgebra before the major < 1.0 release are likely to work nalgebra = ">= 0.32, <1.0" @@ -37,3 +37,6 @@ tokio-stream = "0.1" [features] default = ["egui"] egui = ["dep:eframe", "dep:env_logger"] + +[profile.release] +panic = 'abort' diff --git a/README.md b/README.md index d53dcb2..01e8c48 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # hollywood -Hollywood is an actor framework, with focus on representing actors with heterogeneous +Hollywood is an actor framework, with focus on representing actors with heterogeneous inputs and outputs which are arranged in a non-cyclic compute graph/pipeline. The design intend is simplicity and minimal boilerplate code. diff --git a/examples/egui.rs b/examples/egui.rs index c6604cc..dc8ddec 100644 --- a/examples/egui.rs +++ b/examples/egui.rs @@ -9,9 +9,50 @@ use hollywood::prelude::*; #[derive(Clone, Debug)] #[actor_inputs( ContentGeneratorInbound, - {NullProp, ContentGeneratorState, ContentGeneratorOutbound, NullRequest})] + {NullProp, ContentGeneratorState, ContentGeneratorOutbound, ContentGeneratorOutRequest, + ContentGeneratorInRequestMessage + })] pub enum ContentGeneratorInboundMessage { Tick(f64), + Reply(ReplyMessage), +} + +/// Request to reset the content generator. +#[derive(Debug)] +pub enum ContentGeneratorInRequestMessage { + /// Request + Reset(RequestWithReplyChannel<(), f64>), +} + +impl IsInRequestMessage for ContentGeneratorInRequestMessage { + type Prop = NullProp; + + type State = ContentGeneratorState; + + type OutboundHub = ContentGeneratorOutbound; + + type OutRequestHub = ContentGeneratorOutRequest; + + fn in_request_channel(&self) -> String { + "reset".to_owned() + } +} + +impl HasOnRequestMessage for ContentGeneratorInRequestMessage { + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + _outbound: &Self::OutboundHub, + _request: &Self::OutRequestHub, + ) { + match self { + ContentGeneratorInRequestMessage::Reset(msg) => { + state.offset = -state.last_x; + msg.reply(|_| state.last_x); + } + } + } } impl HasOnMessage for ContentGeneratorInboundMessage { @@ -21,13 +62,11 @@ impl HasOnMessage for ContentGeneratorInboundMessage { _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - _request: &Self::RequestHub, + request: &Self::OutRequestHub, ) { match &self { ContentGeneratorInboundMessage::Tick(new_value) => { - if state.reset_request.try_recv().is_ok() { - state.offset = -*new_value; - } + state.last_x = *new_value; let x = *new_value + state.offset; @@ -39,6 +78,13 @@ impl HasOnMessage for ContentGeneratorInboundMessage { msg: PlotMessage::SinPlot((x, x.cos())), }; outbound.plot_message.send(c); + + if x > 2.0 && x < 2.1 { + request.example_request.send_request("foo:".to_owned()); + } + } + ContentGeneratorInboundMessage::Reply(r) => { + println!("Reply received {}", r.reply); } } } @@ -50,27 +96,86 @@ impl IsInboundMessageNew for ContentGeneratorInboundMessage { } } +impl IsInboundMessageNew> for ContentGeneratorInboundMessage { + fn new(_inbound_name: String, msg: ReplyMessage) -> Self { + ContentGeneratorInboundMessage::Reply(msg) + } +} + +impl IsInRequestMessageNew> for ContentGeneratorInRequestMessage { + fn new(_inbound_name: String, msg: RequestWithReplyChannel<(), f64>) -> Self { + ContentGeneratorInRequestMessage::Reset(msg) + } +} + #[derive(Debug)] pub struct ContentGeneratorState { - pub reset_request: tokio::sync::broadcast::Receiver<()>, + pub last_x: f64, pub offset: f64, } -#[actor(ContentGeneratorInboundMessage)] +/// Out request channels for the content generator actor. +#[actor_out_requests] +pub struct ContentGeneratorOutRequest { + pub example_request: OutRequestChannel, +} + +/// The content generator actor. +#[actor(ContentGeneratorInboundMessage, ContentGeneratorInRequestMessage)] type ContentGenerator = Actor< NullProp, ContentGeneratorInbound, + ContentGeneratorInRequest, ContentGeneratorState, ContentGeneratorOutbound, - NullRequest, + ContentGeneratorOutRequest, >; -/// OutboundChannel channels for the filter actor. +/// Outbound channels for the content generator actor. #[actor_outputs] pub struct ContentGeneratorOutbound { pub plot_message: OutboundChannel>, } +pub struct ContentGeneratorInRequest { + pub reset: InRequestChannel, ContentGeneratorInRequestMessage>, +} + +impl + IsInRequestHub< + NullProp, + ContentGeneratorState, + ContentGeneratorOutbound, + ContentGeneratorOutRequest, + ContentGeneratorInboundMessage, + ContentGeneratorInRequestMessage, + > for ContentGeneratorInRequest +{ + fn from_builder( + builder: &mut ActorBuilder< + NullProp, + ContentGeneratorState, + ContentGeneratorOutbound, + ContentGeneratorOutRequest, + ContentGeneratorInboundMessage, + ContentGeneratorInRequestMessage, + >, + actor_name: &str, + ) -> Self { + let reset = InRequestChannel::new( + builder.context, + actor_name, + &builder.request_sender.clone(), + "reset".to_owned(), + ); + builder + .forward_request + .insert(reset.name.clone(), Box::new(reset.clone())); + + Self { reset } + } +} + #[derive(Clone, Debug)] pub enum PlotMessage { SinPlot((f64, f64)), @@ -83,17 +188,23 @@ impl Default for PlotMessage { } } -struct EguiAppExampleAppConfig { - reset_side_channel_tx: tokio::sync::broadcast::Sender<()>, -} +struct EguiAppExampleAppConfig {} -type EguiAppExampleBuilder = - GenericEguiBuilder, EguiAppExampleAppConfig>; +type EguiAppExampleBuilder = GenericEguiBuilder< + PlotMessage, + RequestWithReplyChannel, + (), + f64, + EguiAppExampleAppConfig, +>; pub struct EguiAppExample { - pub message_recv: std::sync::mpsc::Receiver>, - pub request_recv: std::sync::mpsc::Receiver>, - pub reset_side_channel_tx: tokio::sync::broadcast::Sender<()>, + pub message_recv: tokio::sync::mpsc::UnboundedReceiver>, + pub in_request_recv: + tokio::sync::mpsc::UnboundedReceiver>, + pub out_reply_recv: tokio::sync::mpsc::UnboundedReceiver>, + pub out_request_sender: tokio::sync::mpsc::UnboundedSender<()>, + pub cancel_request_sender: tokio::sync::mpsc::UnboundedSender, pub x: f64, pub sin_value: f64, @@ -103,9 +214,11 @@ pub struct EguiAppExample { impl EguiAppFromBuilder for EguiAppExample { fn new(builder: EguiAppExampleBuilder, _dummy_example_state: String) -> Box { Box::new(EguiAppExample { - message_recv: builder.message_recv, - request_recv: builder.request_recv, - reset_side_channel_tx: builder.config.reset_side_channel_tx, + message_recv: builder.message_from_actor_recv, + out_reply_recv: builder.out_reply_from_actor_recv, + in_request_recv: builder.in_request_from_actor_recv, + out_request_sender: builder.out_request_to_actor_sender, + cancel_request_sender: builder.cancel_request_sender.unwrap(), x: 0.0, sin_value: 0.0, cos_value: 0.0, @@ -117,7 +230,9 @@ impl EguiAppFromBuilder for EguiAppExample { type State = String; } use eframe::egui; -use hollywood::RequestMessage; +use hollywood::OutRequestChannel; +use hollywood::ReplyMessage; +use hollywood::RequestWithReplyChannel; impl eframe::App for EguiAppExample { fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { while let Ok(value) = self.message_recv.try_recv() { @@ -132,6 +247,14 @@ impl eframe::App for EguiAppExample { } } } + while let Ok(value) = self.out_reply_recv.try_recv() { + println!("Reply: {:?}", value); + } + while let Ok(value) = self.in_request_recv.try_recv() { + //println!("Request: {:?}", value); + + value.reply(|_| "reply".to_owned()); + } egui::CentralPanel::default().show(ctx, |ui| { ui.heading("Hello, egui!"); @@ -140,25 +263,22 @@ impl eframe::App for EguiAppExample { ui.label(format!("cos(y): {}", self.cos_value)); if ui.button("Reset").clicked() { - self.reset_side_channel_tx.send(()).unwrap(); - // let (reply_channel_sender, reply_receiver) = tokio::sync::oneshot::channel(); - - // tokio::spawn(async move { - // let reply = reply_receiver.await.unwrap(); - // println!("Reply: {}", reply.reply); - // }); + self.out_request_sender.send(()).unwrap(); } }); ctx.request_repaint_after(Duration::from_secs_f64(0.1)); } + + fn on_exit(&mut self, _gl: Option<&eframe::glow::Context>) { + self.cancel_request_sender + .send(hollywood::CancelRequest::Cancel(())) + .unwrap(); + } } pub async fn run_viewer_example() { - let (reset_side_channel_tx, _) = tokio::sync::broadcast::channel(1); - let mut builder = EguiAppExampleBuilder::from_config(EguiAppExampleAppConfig { - reset_side_channel_tx: reset_side_channel_tx.clone(), - }); + let mut builder = EguiAppExampleBuilder::from_config(EguiAppExampleAppConfig {}); // Pipeline configuration let pipeline = Hollywood::configure(&mut |context| { @@ -170,15 +290,15 @@ pub async fn run_viewer_example() { context, NullProp::default(), ContentGeneratorState { - reset_request: reset_side_channel_tx.subscribe(), + last_x: 0.0, offset: 0.0, }, ); // 3. The egui actor let mut egui_actor = - EguiActor::::from_builder(context, &builder); + EguiActor::::from_builder(context, &builder); - // // Pipeline connections: + // Pipeline connections: timer .outbound .time_stamp @@ -187,6 +307,16 @@ pub async fn run_viewer_example() { .outbound .plot_message .connect(context, &mut egui_actor.inbound.stream); + + egui_actor + .out_requests + .request + .connect(context, &mut content_generator.in_requests.reset); + + content_generator + .out_requests + .example_request + .connect(context, &mut egui_actor.in_requests.request); }); // The cancel_requester is used to cancel the pipeline. diff --git a/examples/moving_average.rs b/examples/moving_average.rs index 3f6be88..6997773 100644 --- a/examples/moving_average.rs +++ b/examples/moving_average.rs @@ -14,7 +14,7 @@ pub async fn run_moving_average_example() { context, MovingAverageProp { alpha: 0.3, - ..Default::default() + timeout: 5.0, }, MovingAverageState { moving_average: 0.0, diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs index e1eb66d..3b6c336 100644 --- a/examples/one_dim_robot.rs +++ b/examples/one_dim_robot.rs @@ -82,9 +82,9 @@ async fn run_robot_example() { .true_robot .connect(context, &mut truth_printer.inbound.printable); - sim.request + sim.out_requests .ping_pong - .connect(context, &mut filter.inbound.ping_pong_request); + .connect(context, &mut filter.in_requests.ping_pong_request); context.register_cancel_requester(&mut sim.outbound.cancel_request); filter diff --git a/hollywood_macros/Cargo.toml b/hollywood_macros/Cargo.toml index 865b5b4..0683174 100644 --- a/hollywood_macros/Cargo.toml +++ b/hollywood_macros/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["actor", "compute", "graph", "pipeline"] license = "Apache-2.0" readme = "../README.md" repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros" -version = "0.5.1" +version = "0.6.0" [lib] proc-macro = true diff --git a/hollywood_macros/src/actors/zip.rs b/hollywood_macros/src/actors/zip.rs index 322df76..376b888 100644 --- a/hollywood_macros/src/actors/zip.rs +++ b/hollywood_macros/src/actors/zip.rs @@ -267,7 +267,7 @@ pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream { type Prop = NullProp; type State = #state_struct; type OutboundHub = #outbound_struct; - type RequestHub = NullRequest; + type OutRequestHub = NullOutRequests; fn inbound_channel(&self) -> String { match self { @@ -319,9 +319,10 @@ pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream { pub type #zip_struct = Actor< NullProp, #inbound_struct, + NullInRequests, #state_struct, #outbound_struct, - NullRequest, + NullOutRequests, >; impl< @@ -332,16 +333,19 @@ pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream { HasFromPropState< NullProp, #inbound_struct, + NullInRequests, #state_struct, #outbound_struct, #inbound_message_enum, - NullRequest, + NullInRequestMessage, + NullOutRequests, DefaultRunner< NullProp, #inbound_struct, + NullInRequests, #state_struct, #outbound_struct, - NullRequest, + NullOutRequests, >, > for #zip_struct { @@ -426,8 +430,9 @@ pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { NullProp, #state_struct, #outbound_struct, - NullRequest, + NullOutRequests, #inbound_message_enum, + NullInRequestMessage, > for #inbound_struct { fn from_builder( @@ -435,8 +440,9 @@ pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { NullProp, #state_struct, #outbound_struct, - NullRequest, + NullOutRequests, #inbound_message_enum, + NullInRequestMessage, >, actor_name: &str, ) -> Self { @@ -528,7 +534,7 @@ pub(crate) fn zip_onmessage_n_impl(input: TokenStream) -> TokenStream { _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - _request: &Self::RequestHub) + _request: &Self::OutRequestHub) { let check_and_send = |s: &mut Self::State| { #( diff --git a/hollywood_macros/src/core.rs b/hollywood_macros/src/core.rs index e7cf4f5..d7d37ac 100644 --- a/hollywood_macros/src/core.rs +++ b/hollywood_macros/src/core.rs @@ -131,7 +131,7 @@ pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> Toke let request_assignments = fields.iter().map(|field| { let field_name = &field.ident; quote! { - #field_name: RequestChannel::new( + #field_name: OutRequestChannel::new( stringify!(#field_name).to_owned(), actor_name, sender, @@ -161,9 +161,9 @@ pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> Toke let m_type = is_request_type(&field0.ty).unwrap()[2]; let gen = quote! { - impl #impl_generics IsRequestHub<#m_type> for #struct_name #ty_generics #where_clause { + impl #impl_generics IsOutRequestHub<#m_type> for #struct_name #ty_generics #where_clause { fn from_parent_and_sender( - actor_name: &str, sender: &tokio::sync::mpsc::Sender<#m_type> + actor_name: &str, sender: &tokio::sync::mpsc::UnboundedSender<#m_type> ) -> Self { Self { #(#request_assignments),* @@ -189,14 +189,14 @@ pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> Toke gen.into() } -// This function checks if the field's type is RequestChannel +// This function checks if the field's type is OutRequestChannel fn is_request_type(ty: &Type) -> Option<[&Type; 3]> { if let Type::Path(TypePath { path: Path { segments, .. }, .. }) = ty { - if segments.len() == 1 && segments[0].ident == "RequestChannel" { + if segments.len() == 1 && segments[0].ident == "OutRequestChannel" { if let PathArguments::AngleBracketed(args) = &segments[0].arguments { if args.args.len() == 3 { let mut pop_iter = args.args.iter(); @@ -222,6 +222,7 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream state_type, output_type, request_type, + request_message_type, } = match parse2::(args) { Ok(args) => args, Err(err) => return err.to_compile_error(), @@ -328,7 +329,7 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream type Prop = #prop_type; type State = #state_type; type OutboundHub = #output_type; - type RequestHub = #request_type; + type OutRequestHub = #request_type; fn inbound_channel(&self) -> String { match self { @@ -342,7 +343,8 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream #state_type, #output_type, #request_type, - #name #ty_generics> for #struct_name #ty_generics #where_clause + #name #ty_generics, + #request_message_type> for #struct_name #ty_generics #where_clause { fn from_builder( builder: &mut ActorBuilder< @@ -350,8 +352,8 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream #state_type, #output_type, #request_type, - #name - #ty_generics + #name #ty_generics, + #request_message_type >, actor_name: &str) -> Self { @@ -370,10 +372,11 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream struct ActorInbound { struct_name: Ident, - prop_type: Ident, - state_type: Ident, - output_type: Ident, - request_type: Ident, + prop_type: Type, + state_type: Type, + output_type: Type, + request_type: Type, + request_message_type: Type, } impl Parse for ActorInbound { @@ -383,38 +386,51 @@ impl Parse for ActorInbound { let _: Token![,] = inbound.parse()?; let content; syn::braced!(content in inbound); - let prop_type: Ident = content.parse()?; + let prop_type: Type = content.parse()?; let _: Token![,] = content.parse()?; - let state_type: Ident = content.parse()?; + let state_type: Type = content.parse()?; let _: Token![,] = content.parse()?; - let output_type: Ident = content.parse()?; + let output_type: Type = content.parse()?; let _: Token![,] = content.parse()?; - let request_type: Ident = content.parse()?; + let request_type: Type = content.parse()?; + let _: Token![,] = content.parse()?; + let request_message_type: Type = content.parse()?; Ok(ActorInbound { struct_name, prop_type, state_type, output_type, request_type, + request_message_type, }) } } struct ActorArgs { - message_type: Ident, + request_message_type: Type, + message_type: Type, } impl Parse for ActorArgs { fn parse(inbound_hub: ParseStream) -> Result { - let message_type: Ident = inbound_hub.parse()?; - Ok(ActorArgs { message_type }) + let message_type: Type = inbound_hub.parse()?; + let _: Token![,] = inbound_hub.parse()?; + let request_message_type: Type = inbound_hub.parse()?; + + Ok(ActorArgs { + message_type, + request_message_type, + }) } } /// Documentation is in the hollywood crate. pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { // parse inbound - let ActorArgs { message_type } = match parse2::(attr) { + let ActorArgs { + message_type, + request_message_type, + } = match parse2::(attr) { Ok(args) => args, Err(err) => return err.to_compile_error(), }; @@ -439,6 +455,7 @@ pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { let mut maybe_prop = None; let mut maybe_inbounds = None; + let mut maybe_in_request = None; let mut maybe_state = None; let mut maybe_outputs = None; let mut maybe_requests = None; @@ -452,12 +469,12 @@ pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { } for segment in type_path.path.segments { if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments { - if angle_bracketed_args.args.len() != 5 { + if angle_bracketed_args.args.len() != 6 { return Error::new_spanned( &angle_bracketed_args, concat!( - "Expected 5 type arguments:", - "Actor" + "Expected 6 type arguments:", + "Actor" ), ) .to_compile_error() @@ -465,9 +482,10 @@ pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { } maybe_prop = Some(angle_bracketed_args.args[0].clone()); maybe_inbounds = Some(angle_bracketed_args.args[1].clone()); - maybe_state = Some(angle_bracketed_args.args[2].clone()); - maybe_outputs = Some(angle_bracketed_args.args[3].clone()); - maybe_requests = Some(angle_bracketed_args.args[4].clone()); + maybe_in_request = Some(angle_bracketed_args.args[2].clone()); + maybe_state = Some(angle_bracketed_args.args[3].clone()); + maybe_outputs = Some(angle_bracketed_args.args[4].clone()); + maybe_requests = Some(angle_bracketed_args.args[5].clone()); } } } else { @@ -481,20 +499,33 @@ pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { let prop = maybe_prop.unwrap(); let inbound = maybe_inbounds.unwrap(); + let in_request = maybe_in_request.unwrap(); let state_type = maybe_state.unwrap(); let out = maybe_outputs.unwrap(); - let requests = maybe_requests.unwrap(); + let out_requests = maybe_requests.unwrap(); - let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> }; + let runner_type = quote! { DefaultRunner<#prop, #inbound, + #in_request, + #state_type, #out, #out_requests> }; let gen = quote! { /// #( #attrs )* - pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>; + pub type #actor_name = Actor<#prop, #inbound, + #in_request, + #state_type, #out, #out_requests>; impl HasFromPropState< - #prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type + #prop, + #inbound, + #in_request, + #state_type, + #out, + #message_type, + #request_message_type, + #out_requests, + #runner_type > for #actor_name { fn name_hint(prop: &#prop) -> String { diff --git a/hollywood_macros/src/lib.rs b/hollywood_macros/src/lib.rs index d0f669e..06e707c 100644 --- a/hollywood_macros/src/lib.rs +++ b/hollywood_macros/src/lib.rs @@ -10,7 +10,30 @@ extern crate proc_macro; use proc_macro::TokenStream; use quote::quote; -/// Documented in the root-level hollywood crate. +/// This macro generates the boilerplate for the outbound hub struct it is applied to. +/// +/// Macro template: +/// +/// ``` text +/// #[actor_outputs] +/// pub struct OUTBOUND { +/// pub CHANNEL0: OutboundChannel, +/// pub CHANNEL1: OutboundChannel, +/// ... +/// } +/// ``` +/// +/// Here, OUTBOUND is the user-specified name of the struct. The struct shall be defined right +/// after the macro invocation. (Indeed, these types of macros are called "attribute macros". +/// They are applied to the item directly following them, in this case a struct.) The outbound +/// struct consists of a zero, one or more outbound channels. Each outbound channel has a +/// user-specified name CHANNEL* and a user specified type TYPE*. +/// +/// Effect: The macro generates the [IsOutboundHub](crate::IsOutboundHub) and +/// [HasActivate](crate::HasActivate) implementations for the provided struct OUTBOUND. +/// +/// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] +/// and [macro@actor]. #[proc_macro_attribute] pub fn actor_outputs(attr: TokenStream, item: TokenStream) -> TokenStream { core::actor_outputs_impl( @@ -20,9 +43,28 @@ pub fn actor_outputs(attr: TokenStream, item: TokenStream) -> TokenStream { .into() } -/// Documented in the root-level hollywood crate. +/// This macro generates the boilerplate for the request hub struct it is applied to. +/// +/// Macro template: +/// +/// ``` text +/// #[actor_out_requests] +/// pub struct REQUEST { +/// pub CHANNEL0: OutRequestChannel, +/// pub CHANNEL1: OutRequestChannel, +/// ... +/// } +/// ``` +/// +/// Here, REQUEST is the user-specified name of the struct. The struct shall be defined right +/// after the macro invocation. The request struct consists of one or more request channels. +/// Each request channel has name CHANNEL*, a request type REQ_TYPE*, a reply type REPL_TYPE*, +/// and a message type M*. +/// +/// Effect: The macro generates the [IsRequestHub](crate::IsRequestHub) and +/// [HasActivate](crate::HasActivate) implementations for the provided struct REQUEST. #[proc_macro_attribute] -pub fn actor_requests(attr: TokenStream, item: TokenStream) -> TokenStream { +pub fn actor_out_requests(attr: TokenStream, item: TokenStream) -> TokenStream { core::actor_requests_impl( proc_macro2::TokenStream::from(attr), proc_macro2::TokenStream::from(item), @@ -30,7 +72,46 @@ pub fn actor_requests(attr: TokenStream, item: TokenStream) -> TokenStream { .into() } -/// Documented in the root-level hollywood crate. +/// This macro generates the boilerplate for the inbound hub of an actor. +/// +/// Macro template: +/// +/// ``` text +/// #[derive(Clone, Debug)] +/// #[actor_inputs( +/// INBOUND, +/// { +/// PROP, +/// STATE, +/// OUTBOUND, +/// OUT_REQUESTS, +/// IN_REQUEST_MESSAGE, +/// })] +/// pub enum INBOUND_MESSAGE { +/// VARIANT0(TYPE0), +/// VARIANT1(TYPE1), +/// ... +/// } +/// ``` +/// +/// INBOUND_MESSAGE is the user-specified name of an enum which shall be defined right below the +/// macro invocation. The enum shall consist of a zero, one or more message variants. Each +/// variant has a user-specified name VARIANT* and type TYPE*. +/// +/// Prerequisite: +/// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) +/// and [HasActivate](crate::HasActivate), typically using the [macro@actor_outputs] macro. +/// - The OUT_REQUESTS struct is defined and implements [IsRequestHub](crate::IsRequestHub) and +/// [HasActivate](crate::HasActivate), e.g. using the [actor_out_requests] macro. +/// - The IN_REQUEST_MESSAGE struct is defined and implements +/// [IsInRequestMessage](crate::IsInRequestMessage). +/// - The PROP and STATE structs are defined. +/// +/// Effects: +/// - The macro defines the struct INBOUND that contains an inbound channel field for each +/// variant of the INBOUND_MESSAGE enum, and implements the +/// [IsInboundHub](crate::IsInboundHub) trait for it. +/// - Implements the [IsInboundMessage](crate::IsInboundMessage) trait for INBOUND_MESSAGE. #[proc_macro_attribute] pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { core::actor_inputs_impl( @@ -40,7 +121,33 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { .into() } -/// Documented in the root-level hollywood crate. +/// This macro generates the boilerplate to define an new actor type. +/// +/// Macro template: +/// +/// ``` text +/// #[actor(INBOUND_MESSAGE, IN_REQUEST_MESSAGE)] +/// type ACTOR = Actor; +/// ``` +/// +/// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined +/// right after the macro invocation as an alias of [Actor](crate::Actor). +/// +/// Prerequisites: +/// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) and +/// [HasActivate](crate::HasActivate), e.g. using the [actor_outputs] macro. +/// - The OUT_REQUEST struct is defined and implements [IsOutRequestHub](crate::IsOutRequestHub) +/// and [HasActivate](crate::HasActivate), e.g. using the [actor_out_requests] macro. +/// - The INBOUND_MESSAGE enum is defined and implements +/// [IsInboundMessage](crate::IsInboundMessage), as well as the INBOUND struct is defined +/// and implements the [IsInboundHub](crate::IsInboundHub) trait, e.g through the +/// [actor_inputs] macro. +/// - The PROP and STATE structs are defined. +/// +/// Effect: +/// - This macro implements the [HasFromPropState](crate::HasFromPropState) trait for the ACTOR +/// type. +/// #[proc_macro_attribute] pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { core::actor_impl( diff --git a/src/actors/egui.rs b/src/actors/egui.rs index ad802b3..9dec159 100644 --- a/src/actors/egui.rs +++ b/src/actors/egui.rs @@ -1,17 +1,49 @@ -use crate::core::request::IsRequestMessage; use crate::prelude::*; use crate::CancelRequest; -use crate::RequestMessage; +use crate::OutRequestChannel; +use crate::ReplyMessage; +use crate::RequestWithReplyChannel; use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::Mutex; /// The inbound message for the egui actor. -#[derive(Clone, Debug, Default)] -pub struct EguiState -{ +#[derive(Debug)] +pub struct EguiState< + T: Default + Debug + Clone + Send + Sync + 'static, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, +> { /// Forwards messages to the egui app. - pub forward_message: Option>>, + pub forward_message_to_egui_app: Option>>, /// Forwards an incoming request to the egui app. - pub forward_request: Option>, + pub forward_in_request_to_egui_app: Option>, + /// Forwards an outbound reply to the egui app. + pub forward_out_reply_to_egui_app: + Option>>, + + /// Forwards an outbound request from the egui app. + pub forward_out_request_from_egui_app: + Option>>>, +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, + > Clone for EguiState +{ + fn clone(&self) -> Self { + Self { + forward_message_to_egui_app: self.forward_message_to_egui_app.clone(), + forward_in_request_to_egui_app: self.forward_in_request_to_egui_app.clone(), + forward_out_reply_to_egui_app: self.forward_out_reply_to_egui_app.clone(), + forward_out_request_from_egui_app: self.forward_out_request_from_egui_app.clone(), + } + } } /// The inbound message stream. @@ -22,93 +54,175 @@ pub struct Stream { } /// The inbound message for the egui actor. -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum EguiInboundMessage< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, > { /// A egui message of generic type T. Stream(Stream), /// A generic request message. - Request(RequestMessage), + Dummy(PhantomData<(InRequest, InReply, OutRequest)>), + /// A generic request message. + OutReply(ReplyMessage), +} + +/// The inbound message for the egui actor. +#[derive(Debug)] +pub enum EguiInRequestMessage< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, +> { + /// A generic request message. + InRequest(RequestWithReplyChannel), + /// A dummy message. + Dummy(PhantomData<(T, OutRequest, OutReply)>), } impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, - > IsInboundMessageNew> for EguiInboundMessage + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > Clone for EguiInboundMessage { - fn new(_inbound_name: String, p: Stream) -> Self { - EguiInboundMessage::::Stream(p) + fn clone(&self) -> Self { + match self { + EguiInboundMessage::Stream(msg) => EguiInboundMessage::Stream(msg.clone()), + EguiInboundMessage::Dummy(_) => EguiInboundMessage::Dummy(PhantomData), + EguiInboundMessage::OutReply(reply) => EguiInboundMessage::OutReply(reply.clone()), + } } } impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, - > IsInboundMessageNew> - for EguiInboundMessage + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsInboundMessage for EguiInboundMessage { - fn new(_inbound_name: String, p: RequestMessage) -> Self { - EguiInboundMessage::::Request(p) + type Prop = NullProp; + + type State = EguiState, OutRequest, OutReply>; + + type OutboundHub = NullOutbound; + + type OutRequestHub = EguiOutRequest; + + fn inbound_channel(&self) -> String { + "stream".to_owned() } } impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, - > IsInboundMessage for EguiInboundMessage + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsInRequestMessage for EguiInRequestMessage { type Prop = NullProp; - type State = EguiState>; + type State = EguiState, OutRequest, OutReply>; type OutboundHub = NullOutbound; - type RequestHub = NullRequest; + type OutRequestHub = EguiOutRequest; - fn inbound_channel(&self) -> String { - "stream".to_owned() + fn in_request_channel(&self) -> String { + "in_request".to_owned() + } +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsInboundMessageNew> + for EguiInboundMessage +{ + fn new(_inbound_name: String, p: Stream) -> Self { + EguiInboundMessage::::Stream(p) + } +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsInboundMessageNew> + for EguiInboundMessage +{ + fn new(_inbound_name: String, p: ReplyMessage) -> Self { + EguiInboundMessage::::OutReply(p) + } +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsInRequestMessageNew> + for EguiInRequestMessage +{ + fn new(_inbound_name: String, p: RequestWithReplyChannel) -> Self { + EguiInRequestMessage::::InRequest(p) } } /// The inbound hub for the egui actor. pub struct ViewerInbound< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, > { /// The message stream inbound channel - pub stream: InboundChannel, EguiInboundMessage>, - /// The request inbound channel - pub request: - InboundChannel, EguiInboundMessage>, + pub stream: + InboundChannel, EguiInboundMessage>, } impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, > IsInboundHub< NullProp, - EguiState>, + EguiState, OutRequest, OutReply>, NullOutbound, - NullRequest, - EguiInboundMessage, - > for ViewerInbound + EguiOutRequest, + EguiInboundMessage, + EguiInRequestMessage, + > for ViewerInbound { fn from_builder( builder: &mut ActorBuilder< NullProp, - EguiState>, + EguiState, OutRequest, OutReply>, NullOutbound, - NullRequest, - EguiInboundMessage, + EguiOutRequest, + EguiInboundMessage, + EguiInRequestMessage, >, actor_name: &str, ) -> Self { @@ -122,25 +236,74 @@ impl< .forward .insert(stream.name.clone(), Box::new(stream.clone())); - let request = InboundChannel::new( + Self { stream } + } +} + +/// The inbound hub for the egui actor. +pub struct ViewerInRequest< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, +> { + /// The request inbound channel + #[allow(clippy::type_complexity)] + pub request: InRequestChannel< + RequestWithReplyChannel, + EguiInRequestMessage, + >, +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > + IsInRequestHub< + NullProp, + EguiState, OutRequest, OutReply>, + NullOutbound, + EguiOutRequest, + EguiInboundMessage, + EguiInRequestMessage, + > for ViewerInRequest +{ + fn from_builder( + builder: &mut ActorBuilder< + NullProp, + EguiState, OutRequest, OutReply>, + NullOutbound, + EguiOutRequest, + EguiInboundMessage, + EguiInRequestMessage, + >, + actor_name: &str, + ) -> Self { + let request = InRequestChannel::new( builder.context, actor_name, - &builder.sender, - "in_rquest".to_owned(), + &builder.request_sender, + "in_request".to_owned(), ); builder - .forward + .forward_request .insert(request.name.clone(), Box::new(request.clone())); - Self { stream, request } + Self { request } } } impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, - > HasOnMessage for EguiInboundMessage + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > HasOnMessage for EguiInboundMessage { /// Forward the message to the egui app. fn on_message( @@ -148,19 +311,129 @@ impl< _prop: &Self::Prop, state: &mut Self::State, _outbound: &Self::OutboundHub, - _request: &Self::RequestHub, + request: &Self::OutRequestHub, ) { + { + let mut recv = state + .forward_out_request_from_egui_app + .as_ref() + .unwrap() + .lock() + .unwrap(); + let x = recv.try_recv(); + if let Ok(r) = x { + request.request.send_request(r); + } + } + match &self { EguiInboundMessage::Stream(new_value) => { - if let Some(sender) = &state.forward_message { + if let Some(sender) = &state.forward_message_to_egui_app { sender.send(new_value.clone()).unwrap(); } } - EguiInboundMessage::Request(request) => { - if let Some(sender) = &state.forward_request { - sender.send(request.clone()).unwrap(); + EguiInboundMessage::Dummy(_) => {} + EguiInboundMessage::OutReply(reply) => { + if let Some(sender) = &state.forward_out_reply_to_egui_app { + sender.send(reply.clone()).unwrap(); + } + } + } + } +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > HasOnRequestMessage for EguiInRequestMessage +{ + /// Forward the message to the egui app. + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + _outbound: &Self::OutboundHub, + request: &Self::OutRequestHub, + ) { + { + let mut recv = state + .forward_out_request_from_egui_app + .as_ref() + .unwrap() + .lock() + .unwrap(); + let x = recv.try_recv(); + if let Ok(r) = x { + request.request.send_request(r); + } + } + match self { + EguiInRequestMessage::InRequest(request) => { + println!("EguiInRequestMessage::InRequest"); + if let Some(sender) = &state.forward_in_request_to_egui_app { + sender.send(request).unwrap(); } } + EguiInRequestMessage::Dummy(_) => {} + } + } +} + +/// OutboundChannel channels for the filter actor. +pub struct EguiOutRequest< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, +> { + /// The outbound request channel + pub request: OutRequestChannel< + OutRequest, + OutReply, + EguiInboundMessage, + >, +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > HasActivate for EguiOutRequest +{ + fn extract(&mut self) -> Self { + Self { + request: self.request.extract(), + } + } + + fn activate(&mut self) { + self.request.activate() + } +} + +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > IsOutRequestHub> + for EguiOutRequest +{ + fn from_parent_and_sender( + actor_name: &str, + sender: &tokio::sync::mpsc::UnboundedSender< + EguiInboundMessage, + >, + ) -> Self { + Self { + request: OutRequestChannel::new("request".to_owned(), actor_name, sender), } } } @@ -168,34 +441,40 @@ impl< /// The egui actor. /// /// This is a generic proxy which receives messages and forwards them to the egui app. -pub type EguiActor = Actor< +pub type EguiActor = Actor< NullProp, - ViewerInbound, - EguiState>, + ViewerInbound, + ViewerInRequest, + EguiState, OutRequest, OutReply>, NullOutbound, - NullRequest, + EguiOutRequest, >; impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, + InRequest: Debug + Send + Clone + Sync + 'static, + InReply: Debug + Send + Clone + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, > HasFromPropState< NullProp, - ViewerInbound, - EguiState>, + ViewerInbound, + ViewerInRequest, + EguiState, OutRequest, OutReply>, NullOutbound, - EguiInboundMessage, - NullRequest, + EguiInboundMessage, + EguiInRequestMessage, + EguiOutRequest, DefaultRunner< NullProp, - ViewerInbound, - EguiState>, + ViewerInbound, + ViewerInRequest, + EguiState, OutRequest, OutReply>, NullOutbound, - NullRequest, + EguiOutRequest, >, - > for EguiActor + > for EguiActor { fn name_hint(_prop: &NullProp) -> String { "Egui".to_owned() @@ -204,21 +483,27 @@ impl< impl< T: Default + Debug + Clone + Send + Sync + 'static, - Request: Default + Debug + Clone + Send + Sync + 'static, - Reply: Default + Debug + Clone + Send + Sync + 'static, - > EguiActor + InRequest: Default + Debug + Clone + Send + Sync + 'static, + InReply: Default + Debug + Clone + Send + Sync + 'static, + OutRequest: Debug + Send + Clone + Sync + 'static, + OutReply: Debug + Send + Clone + Sync + 'static, + > EguiActor { /// Create a new egui actor from the builder. - pub fn from_builder>>( + pub fn from_builder< + Builder: EguiActorBuilder, OutRequest, OutReply>, + >( context: &mut Hollywood, builder: &Builder, ) -> Self { Self::from_prop_and_state( context, NullProp {}, - EguiState::> { - forward_message: Some(builder.message_sender()), - forward_request: Some(builder.request_sender()), + EguiState::, OutRequest, OutReply> { + forward_message_to_egui_app: Some(builder.message_to_egui_app_sender()), + forward_in_request_to_egui_app: Some(builder.in_request_to_egui_app_sender()), + forward_out_reply_to_egui_app: Some(builder.out_reply_to_egui_app_sender()), + forward_out_request_from_egui_app: Some(builder.out_request_from_egui_app_recv()), }, ) } @@ -227,64 +512,122 @@ impl< /// The egui actor builder. pub trait EguiActorBuilder< T: Default + Debug + Clone + Send + Sync + 'static, - InReqMsg: IsRequestMessage, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, > { /// Returns message sender. - fn message_sender(&self) -> std::sync::mpsc::Sender>; + fn message_to_egui_app_sender(&self) -> tokio::sync::mpsc::UnboundedSender>; /// Returns in request sender. - fn request_sender(&self) -> std::sync::mpsc::Sender; + fn in_request_to_egui_app_sender(&self) -> tokio::sync::mpsc::UnboundedSender; + /// Returns out reply sender. + fn out_reply_to_egui_app_sender( + &self, + ) -> tokio::sync::mpsc::UnboundedSender>; + /// Returns out request receiver. + fn out_request_from_egui_app_recv( + &self, + ) -> Arc>>; } /// A generic builder for the egui actor and app. pub struct GenericEguiBuilder< T: Default + Debug + Clone + Send + Sync + 'static, - InReqMsg: IsRequestMessage, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, Config, > { - /// The message sender for the egui actor to forward messages to the egui app. - pub message_sender: std::sync::mpsc::Sender>, - /// The message recv for the egui app. - pub message_recv: std::sync::mpsc::Receiver>, - /// To forward incoming requests to the egui app. - pub request_sender: std::sync::mpsc::Sender, - /// The receiver for incoming requests. - pub request_recv: std::sync::mpsc::Receiver, + /// To forward messages from actor to the egui app. + pub message_to_egui_app_sender: tokio::sync::mpsc::UnboundedSender>, + /// To receive messages from the actor. + pub message_from_actor_recv: tokio::sync::mpsc::UnboundedReceiver>, + + /// To forward incoming requests to the egui app. + pub in_request_to_egui_app_sender: tokio::sync::mpsc::UnboundedSender, + /// To receive incoming requests from actor. + pub in_request_from_actor_recv: tokio::sync::mpsc::UnboundedReceiver, + + /// To forward outgoing requests from actor to the egui app. + pub out_reply_to_egui_app_sender: tokio::sync::mpsc::UnboundedSender>, + /// To receive outgoing requests from the actor. + pub out_reply_from_actor_recv: tokio::sync::mpsc::UnboundedReceiver>, + + /// To forward outgoing requests from the egui app to the actor. + pub out_request_to_actor_sender: tokio::sync::mpsc::UnboundedSender, + /// To receive outgoing requests from the egui app. + pub out_request_from_egui_app_recv: + Arc>>, + /// Pipeline cancel request sender - pub cancel_request_sender: Option>, + pub cancel_request_sender: Option>, /// The config for the egui app. pub config: Config, } -impl - GenericEguiBuilder +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, + Config, + > GenericEguiBuilder { /// Create a new viewer builder. pub fn from_config(config: Config) -> Self { - let (sender, recv) = std::sync::mpsc::channel(); - let (request_sender, request_recv) = std::sync::mpsc::channel(); + let (message_to_egui_app_sender, message_from_actor_recv) = + tokio::sync::mpsc::unbounded_channel(); + let (in_request_to_egui_app_sender, in_request_from_actor_recv) = + tokio::sync::mpsc::unbounded_channel(); + let (out_request_to_actor_sender, out_request_from_egui_app_recv) = + tokio::sync::mpsc::unbounded_channel(); + let (out_reply_to_egui_app_sender, out_reply_from_actor_recv) = + tokio::sync::mpsc::unbounded_channel(); Self { - message_sender: sender, - message_recv: recv, - request_sender, - request_recv, + message_to_egui_app_sender, + message_from_actor_recv, + in_request_to_egui_app_sender, + in_request_from_actor_recv, + out_request_to_actor_sender, + out_request_from_egui_app_recv: Arc::new(Mutex::new(out_request_from_egui_app_recv)), + out_reply_to_egui_app_sender, + out_reply_from_actor_recv, cancel_request_sender: None, config, } } } -impl - EguiActorBuilder for GenericEguiBuilder +impl< + T: Default + Debug + Clone + Send + Sync + 'static, + InReqMsg: IsRequestWithReplyChannel, + OutRequest, + OutReply, + Config, + > EguiActorBuilder + for GenericEguiBuilder { - fn message_sender(&self) -> std::sync::mpsc::Sender> { - self.message_sender.clone() + fn message_to_egui_app_sender(&self) -> tokio::sync::mpsc::UnboundedSender> { + self.message_to_egui_app_sender.clone() + } + + fn in_request_to_egui_app_sender(&self) -> tokio::sync::mpsc::UnboundedSender { + self.in_request_to_egui_app_sender.clone() + } + + fn out_reply_to_egui_app_sender( + &self, + ) -> tokio::sync::mpsc::UnboundedSender> { + self.out_reply_to_egui_app_sender.clone() } - fn request_sender(&self) -> std::sync::mpsc::Sender { - self.request_sender.clone() + fn out_request_from_egui_app_recv( + &self, + ) -> Arc>> { + self.out_request_from_egui_app_recv.clone() } } diff --git a/src/actors/nudge.rs b/src/actors/nudge.rs index 5a4087a..48f9d49 100644 --- a/src/actors/nudge.rs +++ b/src/actors/nudge.rs @@ -18,9 +18,10 @@ pub struct NudgeProp { pub type Nudge = GenericActor< NudgeProp, NullInbound, + NullInRequests, NullState, NudgeOutbound, - NullRequest, + NullOutRequests, NudgeRunner, >; @@ -35,10 +36,12 @@ impl HasFromPropState< NudgeProp, NullInbound, + NullInRequests, NullState, NudgeOutbound, - NullMessage, NullState, NudgeOutbound, NullRequest>, - NullRequest, + NullMessage, + NullInRequestMessage, + NullOutRequests, NudgeRunner, > for Nudge { @@ -61,10 +64,12 @@ impl Runner< NudgeProp, NullInbound, + NullInRequests, NullState, NudgeOutbound, - NullRequest, - NullMessage, NullState, NudgeOutbound, NullRequest>, + NullOutRequests, + NullMessage, + NullInRequestMessage, > for NudgeRunner { /// Create a new actor node. @@ -72,31 +77,47 @@ impl name: String, prop: NudgeProp, state: NullState, - _receiver: tokio::sync::mpsc::Receiver< - NullMessage, NullState, NudgeOutbound, NullRequest>, - >, - _forward: std::collections::HashMap< - String, - Box< - dyn HasForwardMessage< - NudgeProp, - NullState, - NudgeOutbound, - NullRequest, - NullMessage, NullState, NudgeOutbound, NullRequest>, - > + Send - + Sync, + forward_receiver_outbound: ( + std::collections::HashMap< + String, + Box< + dyn HasForwardMessage< + NudgeProp, + NullState, + NudgeOutbound, + NullOutRequests, + NullMessage, + > + Send + + Sync, + >, >, - >, - outbound: NudgeOutbound, - _request: NullRequest, + tokio::sync::mpsc::UnboundedReceiver, + NudgeOutbound, + ), + _forward_receiver_request: ( + std::collections::HashMap< + String, + Box< + dyn HasForwardRequestMessage< + NudgeProp, + NullState, + NudgeOutbound, + NullOutRequests, + NullInRequestMessage, + > + Send + + Sync, + >, + >, + tokio::sync::mpsc::UnboundedReceiver, + NullOutRequests, + ), ) -> Box { Box::new(NudgeActor:: { name: name.clone(), prop, init_state: state.clone(), state: None, - outbound: Some(outbound), + outbound: Some(forward_receiver_outbound.2), }) } } diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs index e896299..7706b85 100644 --- a/src/actors/periodic.rs +++ b/src/actors/periodic.rs @@ -11,9 +11,10 @@ use std::sync::Arc; pub type Periodic = GenericActor< PeriodicProp, NullInbound, + NullInRequests, PeriodicState, PeriodicOutbound, - NullRequest, + NullOutRequests, PeriodicRunner, >; @@ -31,6 +32,8 @@ impl Periodic { time_elapsed: 0.0, }, ) + + // todo!() } } @@ -38,10 +41,12 @@ impl HasFromPropState< PeriodicProp, NullInbound, + NullInRequests, PeriodicState, PeriodicOutbound, - NullMessage, - NullRequest, + NullMessage, + NullInRequestMessage, + NullOutRequests, PeriodicRunner, > for Periodic { @@ -116,10 +121,12 @@ impl Runner< PeriodicProp, NullInbound, + NullInRequests, PeriodicState, PeriodicOutbound, - NullRequest, - NullMessage, + NullOutRequests, + NullMessage, + NullInRequestMessage, > for PeriodicRunner { /// Create a new actor node. @@ -127,31 +134,47 @@ impl name: String, prop: PeriodicProp, state: PeriodicState, - _receiver: tokio::sync::mpsc::Receiver< - NullMessage, - >, - _forward: std::collections::HashMap< - String, - Box< - dyn HasForwardMessage< - PeriodicProp, - PeriodicState, - PeriodicOutbound, - NullRequest, - NullMessage, - > + Send - + Sync, + forward_receiver_outbound: ( + std::collections::HashMap< + String, + Box< + dyn HasForwardMessage< + PeriodicProp, + PeriodicState, + PeriodicOutbound, + NullOutRequests, + NullMessage, + > + Send + + Sync, + >, + >, + tokio::sync::mpsc::UnboundedReceiver, + PeriodicOutbound, + ), + _forward_receiver_request: ( + std::collections::HashMap< + String, + Box< + dyn HasForwardRequestMessage< + PeriodicProp, + PeriodicState, + PeriodicOutbound, + NullOutRequests, + NullInRequestMessage, + > + Send + + Sync, + >, >, - >, - outbound: PeriodicOutbound, - _request: NullRequest, + tokio::sync::mpsc::UnboundedReceiver, + NullOutRequests, + ), ) -> Box { Box::new(PeriodicActor { name: name.clone(), prop, init_state: state.clone(), state: None, - outbound: Some(outbound), + outbound: Some(forward_receiver_outbound.2), }) } } diff --git a/src/actors/printer.rs b/src/actors/printer.rs index d6856c7..dac95d2 100644 --- a/src/actors/printer.rs +++ b/src/actors/printer.rs @@ -19,7 +19,15 @@ impl Default for PrinterProp { /// Inbound message for the printer actor. #[derive(Clone, Debug)] -#[actor_inputs(PrinterInbound, {PrinterProp, NullState, NullOutbound, NullRequest})] +#[actor_inputs( + PrinterInbound, + { + PrinterProp, + NullState, + NullOutbound, + NullOutRequests, + NullInRequestMessage + })] pub enum PrinterInboundMessage { /// Printable message. Printable(T), @@ -33,7 +41,7 @@ impl HasOnMessage prop: &PrinterProp, _state: &mut Self::State, _outputs: &Self::OutboundHub, - _request: &Self::RequestHub, + _request: &Self::OutRequestHub, ) { match self { PrinterInboundMessage::Printable(printable) => { @@ -52,17 +60,27 @@ impl IsInboundMess } /// Printer actor. -pub type Printer = Actor, NullState, NullOutbound, NullRequest>; +pub type Printer = + Actor, NullInRequests, NullState, NullOutbound, NullOutRequests>; impl HasFromPropState< PrinterProp, PrinterInbound, + NullInRequests, NullState, NullOutbound, PrinterInboundMessage, - NullRequest, - DefaultRunner, NullState, NullOutbound, NullRequest>, + NullInRequestMessage, + NullOutRequests, + DefaultRunner< + PrinterProp, + PrinterInbound, + NullInRequests, + NullState, + NullOutbound, + NullOutRequests, + >, > for Printer { fn name_hint(prop: &PrinterProp) -> String { diff --git a/src/compute/context.rs b/src/compute/context.rs index 1a52553..8e2c7cc 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -12,8 +12,8 @@ use std::sync::Arc; pub struct Hollywood { pub(crate) actors: Vec>, pub(crate) topology: Topology, - pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender, - pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver, + pub(crate) cancel_request_sender_template: tokio::sync::mpsc::UnboundedSender, + pub(crate) cancel_request_receiver: tokio::sync::mpsc::UnboundedReceiver, } impl Hollywood { @@ -31,7 +31,9 @@ impl Hollywood { /// /// Upon receiving a cancel request the registered outbound channel, the execution of the /// pipeline will be stopped. - pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender { + pub fn get_cancel_request_sender( + &mut self, + ) -> tokio::sync::mpsc::UnboundedSender { self.cancel_request_sender_template.clone() } @@ -51,7 +53,7 @@ impl Hollywood { fn new() -> Self { let (cancel_request_sender_template, cancel_request_receiver) = - tokio::sync::mpsc::channel(1); + tokio::sync::mpsc::unbounded_channel(); Self { actors: vec![], topology: Topology::new(), diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index 924ed91..390c50f 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -19,7 +19,7 @@ impl IsInboundMessage for CancelRequest { type Prop = NullProp; type State = NullState; type OutboundHub = NullOutbound; - type RequestHub = NullOutbound; + type OutRequestHub = NullOutbound; /// This messages is only meant to use for the cancel request inbound channel of the pipeline. /// Hence, the inbound name is the constant [CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL]. @@ -40,8 +40,8 @@ pub struct Pipeline { actors: Vec>, topology: Topology, /// We have this here to keep receiver alive - pub cancel_request_sender_template: Option>, - cancel_request_receiver: Option>, + pub cancel_request_sender_template: Option>, + cancel_request_receiver: Option>, } impl Pipeline { diff --git a/src/core.rs b/src/core.rs index 6b8a2fa..be78cad 100644 --- a/src/core.rs +++ b/src/core.rs @@ -9,11 +9,14 @@ pub mod actor_builder; /// Inbound pub mod inbound; +/// Inbound requests +pub mod in_request; + /// Outbound pub mod outbound; -/// Request -pub mod request; +/// Outbound requests +pub mod out_request; /// Connection pub mod connection; diff --git a/src/core/actor.rs b/src/core/actor.rs index 7537fc1..d33a5c3 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -11,38 +11,43 @@ use tokio::select; /// /// The generic actor struct is merely a user-facing facade to configure network connections. Actual /// properties, state and inbound routing is stored in the [IsActorNode] structs. -pub struct GenericActor { +pub struct GenericActor { /// unique identifier of the actor pub actor_name: String, /// a collection of inbound channels pub inbound: Inbound, + /// a collection of inbound channels + pub in_requests: InRequest, /// a collection of outbound channels pub outbound: Outbound, /// a collection of request channels - pub request: Request, + pub out_requests: OutRequest, pub(crate) phantom: std::marker::PhantomData<(Prop, State, Run)>, } /// An actor of the default runner type, but otherwise generic over its, prop, state, inbound /// and outbound channel types. -pub type Actor = GenericActor< +pub type Actor = GenericActor< Prop, Inbound, + InRequest, State, - IsOutboundHub, - Request, - DefaultRunner, + Outbound, + OutRequest, + DefaultRunner, >; /// New actor from properties and state. pub trait HasFromPropState< Prop, - Inbound: IsInboundHub, + Inbound: IsInboundHub, + InRequest: IsInRequestHub, State, Outbound: IsOutboundHub, M: IsInboundMessage, - Request: IsRequestHub, - Run: Runner, + R: IsInRequestMessage, + OutRequest: IsOutRequestHub, + Run: Runner, > { /// Produces a hint for the actor. The name_hint is used as a base to @@ -56,16 +61,22 @@ pub trait HasFromPropState< context: &mut Hollywood, prop: Prop, initial_state: State, - ) -> GenericActor { + ) -> GenericActor { let actor_name = context.add_new_unique_name(Self::name_hint(&prop).to_string()); let out = Outbound::from_context_and_parent(context, &actor_name); - let mut builder = ActorBuilder::new(context, &actor_name, prop, initial_state); + let mut builder = ActorBuilder::::new( + context, + &actor_name, + prop, + initial_state, + ); - let request = Request::from_parent_and_sender(&actor_name, &builder.sender); + let out_request = OutRequest::from_parent_and_sender(&actor_name, &builder.sender); let inbound = Inbound::from_builder(&mut builder, &actor_name); - builder.build::(inbound, out, request) + let in_request = InRequest::from_builder(&mut builder, &actor_name); + builder.build::(inbound, in_request, out, out_request) } } @@ -92,23 +103,29 @@ pub trait IsActorNode { } /// A table to forward outbound messages to message handlers of downstream actors. -pub type ForwardTable = HashMap< +pub type ForwardTable = + HashMap + Send + Sync>>; + +/// A table to forward outbound messages to message handlers of downstream actors. +pub type ForwardRequestTable = HashMap< String, - Box + Send + Sync>, + Box + Send + Sync>, >; -pub(crate) struct IsActorNodeImpl { +pub(crate) struct ActorNodeImpl { pub(crate) name: String, pub(crate) prop: Prop, pub(crate) state: Option, - pub(crate) receiver: Option>, - pub(crate) outbound: IsOutboundHub, - pub(crate) request: Request, - pub(crate) forward: ForwardTable, + pub(crate) forward: ForwardTable, + pub(crate) receiver: Option>, + pub(crate) outbound: OutboundHub, + pub(crate) forward_request: ForwardRequestTable, + pub(crate) request_receiver: Option>, + pub(crate) out_request: OutRequestHub, } -impl - IsActorNodeImpl +impl + ActorNodeImpl { } @@ -117,9 +134,10 @@ impl< Prop: std::marker::Send + std::marker::Sync + 'static, State: std::marker::Send + std::marker::Sync + 'static, Outbound: IsOutboundHub, - Request: IsRequestHub, + Request: IsOutRequestHub, + R: IsInRequestMessage, M: IsInboundMessage, - > IsActorNode for IsActorNodeImpl + > IsActorNode for ActorNodeImpl { fn name(&self) -> &String { &self.name @@ -127,7 +145,7 @@ impl< async fn run(&mut self, kill: tokio::sync::broadcast::Receiver<()>) { self.outbound.activate(); - self.request.activate(); + self.out_request.activate(); let (state, recv) = on_message( self.name.clone(), @@ -135,11 +153,13 @@ impl< OnMessageMutValues { state: self.state.take().unwrap(), receiver: self.receiver.take().unwrap(), + request_receiver: self.request_receiver.take().unwrap(), kill, }, &self.forward, + &self.forward_request, &self.outbound, - &self.request, + &self.out_request, ) .await; self.state = Some(state); @@ -147,9 +167,10 @@ impl< } } -pub(crate) struct OnMessageMutValues { +pub(crate) struct OnMessageMutValues { state: State, - receiver: tokio::sync::mpsc::Receiver, + receiver: tokio::sync::mpsc::UnboundedReceiver, + request_receiver: tokio::sync::mpsc::UnboundedReceiver, kill: tokio::sync::broadcast::Receiver<()>, } @@ -159,14 +180,16 @@ pub(crate) async fn on_message< Outbound: Sync + Send, Request: Sync + Send, M: IsInboundMessage, + R: IsInRequestMessage, >( _actor_name: String, prop: &Prop, - mut values: OnMessageMutValues, + mut values: OnMessageMutValues, forward: &ForwardTable, + forward_request: &ForwardRequestTable, outbound: &Outbound, request: &Request, -) -> (State, tokio::sync::mpsc::Receiver) { +) -> (State, tokio::sync::mpsc::UnboundedReceiver) { loop { select! { _ = values.kill.recv() => { @@ -186,6 +209,18 @@ pub(crate) async fn on_message< continue; } t.unwrap().forward_message(prop, &mut values.state, outbound, request, m); + }, + m = values.request_receiver.recv() => { + if m.is_some() { + let m = m.unwrap(); + let t = forward_request.get(&m.in_request_channel()); + if t.is_none() { + continue; + } + t.unwrap().forward_message(prop, &mut values.state, outbound, request, m); + } else{ + tokio::task::yield_now().await; + } } } } diff --git a/src/core/actor_builder.rs b/src/core/actor_builder.rs index 1d9faa5..ffb53b8 100644 --- a/src/core/actor_builder.rs +++ b/src/core/actor_builder.rs @@ -1,3 +1,4 @@ +use crate::core::actor::ForwardRequestTable; use crate::core::runner::Runner; use crate::prelude::*; use crate::ForwardTable; @@ -11,8 +12,9 @@ pub struct ActorBuilder< Prop, State, IsOutboundHub, - Request: IsRequestHub, + OutRequest: IsOutRequestHub, M: IsInboundMessage, + R: IsInRequestMessage, > { /// unique identifier of the actor pub actor_name: String, @@ -21,14 +23,26 @@ pub struct ActorBuilder< /// execution context pub context: &'a mut Hollywood, /// a channel for sending messages to the actor - pub sender: tokio::sync::mpsc::Sender, - pub(crate) receiver: tokio::sync::mpsc::Receiver, + pub sender: tokio::sync::mpsc::UnboundedSender, + pub(crate) receiver: tokio::sync::mpsc::UnboundedReceiver, + /// a channel for sending requests to the actor + pub request_sender: tokio::sync::mpsc::UnboundedSender, + pub(crate) request_receiver: tokio::sync::mpsc::UnboundedReceiver, /// a collection of inbound channels - pub forward: ForwardTable, + pub forward: ForwardTable, + /// a collection of inbound channels + pub forward_request: ForwardRequestTable, } -impl<'a, Prop, State, Outbound: IsOutboundHub, Request: IsRequestHub, M: IsInboundMessage> - ActorBuilder<'a, Prop, State, Outbound, Request, M> +impl< + 'a, + Prop, + State, + Outbound: IsOutboundHub, + OutRequest: IsOutRequestHub, + R: IsInRequestMessage, + M: IsInboundMessage, + > ActorBuilder<'a, Prop, State, Outbound, OutRequest, M, R> { pub(crate) fn new( context: &'a mut Hollywood, @@ -36,7 +50,8 @@ impl<'a, Prop, State, Outbound: IsOutboundHub, Request: IsRequestHub, M: IsIn prop: Prop, initial_state: State, ) -> Self { - let (sender, receiver) = tokio::sync::mpsc::channel(8); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel(); Self { actor_name: actor_name.to_owned(), @@ -45,34 +60,42 @@ impl<'a, Prop, State, Outbound: IsOutboundHub, Request: IsRequestHub, M: IsIn context, sender: sender.clone(), receiver, + request_sender, + request_receiver, forward: ForwardTable::new(), + forward_request: ForwardRequestTable::new(), } } pub(crate) fn build< - Inbound: IsInboundHub, - Run: Runner, + Inbound: IsInboundHub, + InRequest: IsInRequestHub, + Run: Runner, >( self, inbound: Inbound, + in_requests: InRequest, outbound: Outbound, - request: Request, - ) -> GenericActor { + out_requests: OutRequest, + ) -> GenericActor { let mut actor = GenericActor { actor_name: self.actor_name.clone(), inbound, + in_requests, outbound, - request, + out_requests, phantom: std::marker::PhantomData {}, }; self.context.actors.push(Run::new_actor_node( self.actor_name, self.prop, self.state, - self.receiver, - self.forward, - actor.outbound.extract(), - actor.request.extract(), + (self.forward, self.receiver, actor.outbound.extract()), + ( + self.forward_request, + self.request_receiver, + actor.out_requests.extract(), + ), )); actor } diff --git a/src/core/connection/request_connection.rs b/src/core/connection/request_connection.rs index 0275ec2..22362b6 100644 --- a/src/core/connection/request_connection.rs +++ b/src/core/connection/request_connection.rs @@ -10,20 +10,20 @@ pub(crate) trait GenericRequestConnection: Send + Sync { } #[derive(Debug, Clone)] -pub(crate) struct RequestConnection { - pub(crate) sender: tokio::sync::mpsc::Sender, +pub(crate) struct RequestConnection { + pub(crate) sender: tokio::sync::mpsc::UnboundedSender, pub(crate) inbound_channel: String, pub(crate) phantom: PhantomData, } -impl> GenericRequestConnection +impl> GenericRequestConnection for RequestConnection { fn send_impl(&self, msg: T) { let msg = M::new(self.inbound_channel.clone(), msg); let c = self.sender.clone(); let handler = tokio::spawn(async move { - match c.send(msg).await { + match c.send(msg) { Ok(_) => {} Err(SendError(_)) => { println!("SendError"); diff --git a/src/core/in_request.rs b/src/core/in_request.rs new file mode 100644 index 0000000..6270ca0 --- /dev/null +++ b/src/core/in_request.rs @@ -0,0 +1,200 @@ +use crate::prelude::*; +use std::sync::Arc; + +/// The inbound hub is a collection of inbound channels. +pub trait IsInRequestHub< + Prop, + State, + IsOutboundHub, + Request: IsOutRequestHub, + M: IsInboundMessage, + R: IsInRequestMessage, +>: Send + Sync +{ + /// Create a new inbound hub for an actor. + fn from_builder( + builder: &mut ActorBuilder, + actor_name: &str, + ) -> Self; +} + +/// An empty inbound hub - for actors with no inbound channels. +#[derive(Debug, Clone)] +pub struct NullInRequests {} + +impl< + Prop, + State, + IsOutboundHub, + NullInRequestMessage: IsInRequestMessage, + NullMessage: IsInboundMessage, + Request: IsOutRequestHub, + > IsInRequestHub + for NullInRequests +{ + fn from_builder( + _builder: &mut ActorBuilder< + Prop, + State, + IsOutboundHub, + Request, + NullMessage, + NullInRequestMessage, + >, + _actor_name: &str, + ) -> Self { + Self {} + } +} + +/// InRequest channel to receive messages of a specific type `T`. +/// +/// InRequest channels can be connected to one or more outbound channels of upstream actors. +#[derive(Debug)] +pub struct InRequestChannel { + /// Unique identifier of the inbound channel. + pub name: String, + /// Name of the actor that the inbound messages are for. + pub actor_name: String, + pub(crate) sender: Arc>, + pub(crate) phantom: std::marker::PhantomData, +} + +impl Clone + for InRequestChannel +{ + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + actor_name: self.actor_name.clone(), + sender: self.sender.clone(), + phantom: std::marker::PhantomData {}, + } + } +} + +impl InRequestChannel { + /// Creates a new inbound channel. + pub fn new( + context: &mut Hollywood, + actor_name: &str, + sender: &tokio::sync::mpsc::UnboundedSender, + name: String, + ) -> Self { + context.assert_unique_inbound_name(name.clone(), actor_name); + Self { + name, + actor_name: actor_name.to_owned(), + sender: Arc::new(sender.clone()), + phantom: std::marker::PhantomData {}, + } + } +} + +/// InRequest messages to be received by the actor. +pub trait IsInRequestMessage: Send + Sync + 'static { + /// Prop type of the receiving actor. + type Prop; + + /// State type of the receiving actor. + type State; + + /// IsOutboundHub type of the receiving actor, to produce outbound messages downstream. + type OutboundHub: Send + Sync + 'static; + + /// IsRequestHub type of the receiving actor, to send requests upstream. + type OutRequestHub: Send + Sync + 'static; + + /// Name of the inbound channel that this message is for. + fn in_request_channel(&self) -> String; +} + +/// Customization point for processing inbound messages. +pub trait HasOnRequestMessage: IsInRequestMessage { + /// Process the inbound message - user code with main business logic goes here. + fn on_message( + self, + prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + request: &Self::OutRequestHub, + ); +} + +/// Trait for creating in-request messages of compatible types `T`. +pub trait IsInRequestMessageNew: + std::fmt::Debug + Send + Sync + 'static + IsInRequestMessage +{ + /// Create a new inbound message from the inbound channel name and the message value of type `T`. + fn new(inbound_channel: String, value: T) -> Self; +} + +/// Message forwarder. +pub trait HasForwardRequestMessage +{ + /// Forward the message to the HasOnMessage customization point. + fn forward_message( + &self, + prop: &Prop, + state: &mut State, + outbound: &IsOutboundHub, + request: &IsRequestHub, + msg: M, + ); +} + +impl< + T: Send + Sync + std::fmt::Debug + 'static, + Prop, + State, + OutboundHub, + OutRequestHub, + M: HasOnRequestMessage< + Prop = Prop, + State = State, + OutboundHub = OutboundHub, + OutRequestHub = OutRequestHub, + >, + > HasForwardRequestMessage + for InRequestChannel +{ + fn forward_message( + &self, + prop: &Prop, + state: &mut State, + outbound: &OutboundHub, + request: &OutRequestHub, + msg: M, + ) { + msg.on_message(prop, state, outbound, request); + } +} + +/// Null message is a marker type for actors with no inbound channels. +#[derive(Debug)] +pub enum NullInRequestMessage { + /// Null message. + Null, +} + +impl IsInRequestMessage for NullInRequestMessage { + type Prop = NullProp; + type State = NullState; + type OutboundHub = NullOutbound; + type OutRequestHub = NullOutRequests; + + fn in_request_channel(&self) -> String { + "".to_owned() + } +} + +impl HasOnRequestMessage for NullInRequestMessage { + fn on_message( + self, + _prop: &Self::Prop, + _state: &mut Self::State, + _outputs: &Self::OutboundHub, + _request: &Self::OutRequestHub, + ) { + } +} diff --git a/src/core/inbound.rs b/src/core/inbound.rs index b285984..4b26d64 100644 --- a/src/core/inbound.rs +++ b/src/core/inbound.rs @@ -1,12 +1,18 @@ use crate::prelude::*; /// The inbound hub is a collection of inbound channels. -pub trait IsInboundHub, M: IsInboundMessage>: - Send + Sync +pub trait IsInboundHub< + Prop, + State, + IsOutboundHub, + Request: IsOutRequestHub, + M: IsInboundMessage, + R: IsInRequestMessage, +>: Send + Sync { /// Create a new inbound hub for an actor. fn from_builder( - builder: &mut ActorBuilder, + builder: &mut ActorBuilder, actor_name: &str, ) -> Self; } @@ -19,12 +25,13 @@ impl< Prop, State, IsOutboundHub, - NullMessage: IsInboundMessage, - Request: IsRequestHub, - > IsInboundHub for NullInbound + InRequestMessage: IsInRequestMessage, + Message: IsInboundMessage, + Request: IsOutRequestHub, + > IsInboundHub for NullInbound { fn from_builder( - _builder: &mut ActorBuilder, + _builder: &mut ActorBuilder, _actor_name: &str, ) -> Self { Self {} @@ -40,7 +47,7 @@ pub struct InboundChannel { pub name: String, /// Name of the actor that the inbound messages are for. pub actor_name: String, - pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) sender: tokio::sync::mpsc::UnboundedSender, pub(crate) phantom: std::marker::PhantomData, } @@ -49,7 +56,7 @@ impl In pub fn new( context: &mut Hollywood, actor_name: &str, - sender: &tokio::sync::mpsc::Sender, + sender: &tokio::sync::mpsc::UnboundedSender, name: String, ) -> Self { context.assert_unique_inbound_name(name.clone(), actor_name); @@ -74,7 +81,7 @@ pub trait IsInboundMessage: Send + Sync + Clone + 'static { type OutboundHub: Send + Sync + 'static; /// IsRequestHub type of the receiving actor, to send requests upstream. - type RequestHub: Send + Sync + 'static; + type OutRequestHub: Send + Sync + 'static; /// Name of the inbound channel that this message is for. fn inbound_channel(&self) -> String; @@ -88,7 +95,7 @@ pub trait HasOnMessage: IsInboundMessage { prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - request: &Self::RequestHub, + request: &Self::OutRequestHub, ); } @@ -118,21 +125,21 @@ impl< Prop, State, OutboundHub, - RequestHub, + OutRequestHub, M: HasOnMessage< Prop = Prop, State = State, OutboundHub = OutboundHub, - RequestHub = RequestHub, + OutRequestHub = OutRequestHub, >, - > HasForwardMessage for InboundChannel + > HasForwardMessage for InboundChannel { fn forward_message( &self, prop: &Prop, state: &mut State, outbound: &OutboundHub, - request: &RequestHub, + request: &OutRequestHub, msg: M, ) { msg.on_message(prop, state, outbound, request); @@ -141,58 +148,35 @@ impl< /// Null message is a marker type for actors with no inbound channels. #[derive(Debug)] -pub enum NullMessage { +pub enum NullMessage { /// Null message. - NullMessage(std::marker::PhantomData<(P, S, O, NullRequest)>), + Null, } -impl Default for NullMessage { - fn default() -> Self { - Self::new() - } -} - -impl NullMessage { - /// Creates a new null message. - pub fn new() -> Self { - NullMessage::NullMessage(std::marker::PhantomData {}) - } -} - -impl Clone for NullMessage { +impl Clone for NullMessage { fn clone(&self) -> Self { - Self::new() + Self::Null } } -impl< - P: std::marker::Send + std::marker::Sync + 'static, - S: std::marker::Send + std::marker::Sync + 'static, - O: IsOutboundHub, - > IsInboundMessage for NullMessage -{ - type Prop = P; - type State = S; - type OutboundHub = O; - type RequestHub = NullRequest; +impl IsInboundMessage for NullMessage { + type Prop = NullProp; + type State = NullState; + type OutboundHub = NullOutbound; + type OutRequestHub = NullOutRequests; fn inbound_channel(&self) -> String { "".to_owned() } } -impl< - P: std::marker::Send + std::marker::Sync + 'static, - S: std::marker::Send + std::marker::Sync + 'static, - O: IsOutboundHub, - > HasOnMessage for NullMessage -{ +impl HasOnMessage for NullMessage { fn on_message( self, - _prop: &P, + _prop: &Self::Prop, _state: &mut Self::State, _outputs: &Self::OutboundHub, - _request: &Self::RequestHub, + _request: &Self::OutRequestHub, ) { } } diff --git a/src/core/request.rs b/src/core/out_request.rs similarity index 62% rename from src/core/request.rs rename to src/core/out_request.rs index 2b79487..bd74ff2 100644 --- a/src/core/request.rs +++ b/src/core/out_request.rs @@ -6,22 +6,25 @@ use std::marker::PhantomData; use std::sync::Arc; /// A request hub is used to send requests to other actors which will reply later. -pub trait IsRequestHub: Send + Sync + 'static + HasActivate { +pub trait IsOutRequestHub: Send + Sync + 'static + HasActivate { /// Create a new request hub for an actor. - fn from_parent_and_sender(actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self; + fn from_parent_and_sender( + actor_name: &str, + sender: &tokio::sync::mpsc::UnboundedSender, + ) -> Self; } /// A request message with a reply channel. -#[derive(Debug, Clone, Default)] -pub struct RequestMessage { +#[derive(Debug)] +pub struct RequestWithReplyChannel { /// The request. pub request: Request, /// The reply channel. - pub reply_channel: Option>>>, + pub reply_channel: tokio::sync::oneshot::Sender>, } /// A trait for request messages. -pub trait IsRequestMessage: Send + Sync + 'static + Clone + Debug + Default { +pub trait IsRequestWithReplyChannel: Send + Sync + 'static + Debug { /// The request type. type Request; /// The reply type. @@ -29,15 +32,15 @@ pub trait IsRequestMessage: Send + Sync + 'static + Clone + Debug + Default { } impl< - Request: Send + Sync + 'static + Clone + Debug + Default, - Reply: Send + Sync + 'static + Clone + Debug + Default, - > IsRequestMessage for RequestMessage + Request: Send + Sync + 'static + Clone + Debug, + Reply: Send + Sync + 'static + Clone + Debug, + > IsRequestWithReplyChannel for RequestWithReplyChannel { type Request = Reply; type Reply = Reply; } -impl RequestMessage { +impl RequestWithReplyChannel { /// Reply to the request immediately. pub fn reply(self, func: F) where @@ -53,14 +56,9 @@ impl RequestMessage { /// Reply to the request later using the provided reply channel. pub fn reply_later(self) -> ReplyLater { - let reply_channel = Arc::into_inner( - self.reply_channel - .expect("self.reply must not be None. This is a bug in the hollywood crate."), - ) - .expect("self.reply must have a ref count of 1. This is a bug in the hollywood crate."); ReplyLater:: { request: self.request, - reply_channel, + reply_channel: self.reply_channel, } } } @@ -87,18 +85,18 @@ pub struct ReplyMessage { pub reply: Reply, } -/// RequestChannel is a connections for messages which are sent to a downstream actor. -pub struct RequestChannel { +/// OutRequestChannel is a connections for messages which are sent to a downstream actor. +pub struct OutRequestChannel { /// Unique name of the request channel. pub name: String, /// Name of the actor that sends the request messages. pub actor_name: String, - pub(crate) connection_register: RequestConnectionEnum>, - pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) connection_register: RequestConnectionEnum>, + pub(crate) sender: tokio::sync::mpsc::UnboundedSender, } -impl HasActivate for RequestChannel { +impl HasActivate for OutRequestChannel { fn extract(&mut self) -> Self { Self { name: self.name.clone(), @@ -117,10 +115,14 @@ impl< Request: Clone + Send + Sync + std::fmt::Debug + 'static, Reply: Clone + Send + Sync + std::fmt::Debug + 'static, M: IsInboundMessageNew>, - > RequestChannel + > OutRequestChannel { /// Create a new request channel for actor in provided context. - pub fn new(name: String, actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self { + pub fn new( + name: String, + actor_name: &str, + sender: &tokio::sync::mpsc::UnboundedSender, + ) -> Self { Self { name: name.clone(), actor_name: actor_name.to_owned(), @@ -130,13 +132,13 @@ impl< } /// Connect the request channel from this actor to the inbound channel of another actor. - pub fn connect>>( + pub fn connect>>( &mut self, _ctx: &mut Hollywood, - inbound: &mut InboundChannel, Me>, + inbound: &mut InRequestChannel, Me>, ) { self.connection_register.push(Arc::new(RequestConnection { - sender: inbound.sender.clone(), + sender: inbound.sender.as_ref().clone(), inbound_channel: inbound.name.clone(), phantom: PhantomData {}, })); @@ -144,10 +146,10 @@ impl< /// Send a message to the connected inbound channels of other actors. pub fn send_request(&self, msg: Request) { - let (sender, receiver) = tokio::sync::oneshot::channel(); - let msg = RequestMessage { + let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel(); + let msg = RequestWithReplyChannel { request: msg, - reply_channel: Some(Arc::new(sender)), + reply_channel: reply_sender, }; self.connection_register.send(msg); @@ -155,23 +157,36 @@ impl< let name = self.name.clone(); tokio::spawn(async move { - let r = receiver.await.unwrap(); - sender.send(M::new(name, r)).await + let r = match reply_receiver.await { + Ok(r) => r, + Err(e) => { + panic!("Error: {:?}", e); + } + }; + match sender.send(M::new(name, r)) { + Ok(_) => {} + Err(e) => { + panic!("Error: {:?}", e); + } + } }); } } /// An empty request hub - used for actors that do not have any request channels. #[derive(Debug, Clone, Default)] -pub struct NullRequest {} +pub struct NullOutRequests {} -impl IsRequestHub for NullRequest { - fn from_parent_and_sender(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender) -> Self { +impl IsOutRequestHub for NullOutRequests { + fn from_parent_and_sender( + _actor_name: &str, + _sender: &tokio::sync::mpsc::UnboundedSender, + ) -> Self { Self {} } } -impl HasActivate for NullRequest { +impl HasActivate for NullOutRequests { fn extract(&mut self) -> Self { Self {} } diff --git a/src/core/outbound.rs b/src/core/outbound.rs index e8b83c0..0041749 100644 --- a/src/core/outbound.rs +++ b/src/core/outbound.rs @@ -118,14 +118,14 @@ impl HasActivate for OutboundChannel { #[derive(Clone, Debug)] pub(crate) struct OutboundConnection { - pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) sender: tokio::sync::mpsc::UnboundedSender, pub(crate) inbound_channel: String, pub(crate) phantom: std::marker::PhantomData, } #[derive(Clone)] pub(crate) struct OutboundConnectionWithAdapter { - pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) sender: tokio::sync::mpsc::UnboundedSender, pub(crate) inbound_channel: String, pub(crate) adapter: fn(Out) -> InT, } @@ -151,7 +151,7 @@ impl> IsGenericConnection let msg = M::new(self.inbound_channel.clone(), msg); let c = self.sender.clone(); let handler = tokio::spawn(async move { - match c.send(msg).await { + match c.send(msg) { Ok(_) => {} Err(SendError(_)) => { println!("SendError"); @@ -169,7 +169,7 @@ impl> IsGenericConnection {} Err(SendError(_)) => { println!("SendError"); diff --git a/src/core/runner.rs b/src/core/runner.rs index c12d691..3a528a1 100644 --- a/src/core/runner.rs +++ b/src/core/runner.rs @@ -1,26 +1,35 @@ -use crate::core::actor::IsActorNodeImpl; +use crate::core::actor::ActorNodeImpl; +use crate::core::actor::ForwardRequestTable; use crate::prelude::*; use crate::ForwardTable; /// Runner executes the pipeline. pub trait Runner< Prop, - Inbound: IsInboundHub, + Inbound: IsInboundHub, + InRequest, State, Outbound: IsOutboundHub, - Request: IsRequestHub, + OutRequest: IsOutRequestHub, M: IsInboundMessage, + R: IsInRequestMessage, > { /// Create a new actor to be stored by the context. fn new_actor_node( name: String, prop: Prop, - state: State, - receiver: tokio::sync::mpsc::Receiver, - forward: ForwardTable, - outbound: Outbound, - request: Request, + init_state: State, + forward_receiver_outbound: ( + ForwardTable, + tokio::sync::mpsc::UnboundedReceiver, + Outbound, + ), + forward_receiver_request: ( + ForwardRequestTable, + tokio::sync::mpsc::UnboundedReceiver, + OutRequest, + ), ) -> Box; } @@ -28,20 +37,22 @@ pub trait Runner< pub struct DefaultRunner< Prop, Inbound: Send + Sync, + InRequest, State, Outbound: Send + Sync + 'static, Request: Send + Sync + 'static, > { - phantom: std::marker::PhantomData<(Prop, Inbound, State, Outbound, Request)>, + phantom: std::marker::PhantomData<(Prop, Inbound, InRequest, State, Outbound, Request)>, } impl< Prop, State, Inbound: Send + Sync, + InRequest, Outbound: Send + Sync + 'static, Request: Send + Sync + 'static, - > Default for DefaultRunner + > Default for DefaultRunner { fn default() -> Self { Self::new() @@ -52,9 +63,10 @@ impl< Prop, State, Inbound: Send + Sync, + InRequest, Outbound: Send + Sync + 'static, Request: Send + Sync + 'static, - > DefaultRunner + > DefaultRunner { /// Create a new default runner. pub fn new() -> Self { @@ -66,31 +78,41 @@ impl< impl< Prop: std::marker::Send + std::marker::Sync + 'static, - Inbound: IsInboundHub, + Inbound: IsInboundHub, + InRequest, State: std::marker::Send + std::marker::Sync + 'static, Outbound: IsOutboundHub, + R: IsInRequestMessage, M: IsInboundMessage, - Request: IsRequestHub, - > Runner - for DefaultRunner + OutRequest: IsOutRequestHub, + > Runner + for DefaultRunner { fn new_actor_node( name: String, prop: Prop, init_state: State, - receiver: tokio::sync::mpsc::Receiver, - forward: ForwardTable, - outbound: Outbound, - request: Request, + forward_receiver_outbound: ( + ForwardTable, + tokio::sync::mpsc::UnboundedReceiver, + Outbound, + ), + forward_receiver_request: ( + ForwardRequestTable, + tokio::sync::mpsc::UnboundedReceiver, + OutRequest, + ), ) -> Box { - Box::new(IsActorNodeImpl:: { + Box::new(ActorNodeImpl:: { name, prop, state: Some(init_state), - receiver: Some(receiver), - outbound, - forward, - request, + forward: forward_receiver_outbound.0, + receiver: Some(forward_receiver_outbound.1), + outbound: forward_receiver_outbound.2, + forward_request: forward_receiver_request.0, + request_receiver: Some(forward_receiver_request.1), + out_request: forward_receiver_request.2, }) } } diff --git a/src/example_actors/moving_average.rs b/src/example_actors/moving_average.rs index d3408ba..08ff98b 100644 --- a/src/example_actors/moving_average.rs +++ b/src/example_actors/moving_average.rs @@ -19,15 +19,6 @@ pub struct MovingAverageProp { pub timeout: f64, } -impl Default for MovingAverageProp { - fn default() -> Self { - MovingAverageProp { - alpha: 0.5, - timeout: 10.0, - } - } -} - /// State of the MovingAverage actor. #[derive(Clone, Debug, Default)] pub struct MovingAverageState { @@ -38,7 +29,15 @@ pub struct MovingAverageState { /// Inbound message for the MovingAverage actor. /// #[derive(Clone, Debug)] -#[actor_inputs(MovingAverageInbound, {MovingAverageProp, MovingAverageState, MovingAverageOutbound, NullRequest})] +#[actor_inputs( + MovingAverageInbound, + { + MovingAverageProp, + MovingAverageState, + MovingAverageOutbound, + NullOutRequests, + NullInRequestMessage + })] pub enum MovingAverageMessage { /// a float value Value(f64), @@ -51,7 +50,7 @@ impl HasOnMessage for MovingAverageMessage { prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - _request: &Self::RequestHub, + _request: &Self::OutRequestHub, ) { match &self { MovingAverageMessage::Value(new_value) => { @@ -74,14 +73,12 @@ impl IsInboundMessageNew for MovingAverageMessage { /// The MovingAverage actor. /// -#[actor(MovingAverageMessage)] +#[actor(MovingAverageMessage, NullInRequestMessage)] type MovingAverage = Actor< MovingAverageProp, MovingAverageInbound, + NullInRequests, MovingAverageState, MovingAverageOutbound, - NullRequest, + NullOutRequests, >; - -/// Manual implementation of the moving average actor -pub mod manual; diff --git a/src/example_actors/moving_average/manual.rs b/src/example_actors/moving_average/manual.rs deleted file mode 100644 index 70b786d..0000000 --- a/src/example_actors/moving_average/manual.rs +++ /dev/null @@ -1 +0,0 @@ -// TODO diff --git a/src/example_actors/one_dim_robot/draw.rs b/src/example_actors/one_dim_robot/draw.rs index 3fe0643..0fd09d3 100644 --- a/src/example_actors/one_dim_robot/draw.rs +++ b/src/example_actors/one_dim_robot/draw.rs @@ -7,15 +7,23 @@ use drawille::Canvas; /// Inbound channels for the draw actor #[derive(Clone, Debug)] -#[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound, NullRequest})] +#[actor_inputs(DrawInbound, + { + NullProp, + DrawState, + NullOutbound, + NullOutRequests, + NullInRequestMessage + })] pub enum DrawInboundMessage { /// Tuple of true pos, true range and filter state Zipped(Tuple3, Stamped, NamedFilterState>), } /// Draw actor for one-dim-robot example. -#[actor(DrawInboundMessage)] -pub type DrawActor = Actor; +#[actor(DrawInboundMessage, NullInRequestMessage)] +pub type DrawActor = + Actor; impl HasOnMessage for DrawInboundMessage { /// Forward the message to the correct handler method of [DrawState]. @@ -24,7 +32,7 @@ impl HasOnMessage for DrawInboundMessage { _prop: &NullProp, state: &mut Self::State, _outbound: &Self::OutboundHub, - _request: &Self::RequestHub, + _request: &Self::OutRequestHub, ) { match self { DrawInboundMessage::Zipped(msg) => { diff --git a/src/example_actors/one_dim_robot/filter.rs b/src/example_actors/one_dim_robot/filter.rs index 762cbc7..b4acf3f 100644 --- a/src/example_actors/one_dim_robot/filter.rs +++ b/src/example_actors/one_dim_robot/filter.rs @@ -1,7 +1,7 @@ use crate::example_actors::one_dim_robot::RangeMeasurementModel; use crate::example_actors::one_dim_robot::Stamped; use crate::prelude::*; -use crate::RequestMessage; +use crate::RequestWithReplyChannel; use hollywood_macros::actor; use hollywood_macros::actor_inputs; use hollywood_macros::actor_outputs; @@ -10,20 +10,66 @@ use std::fmt::Display; use super::sim::PingPong; +#[derive(Debug)] +/// Measurement model for the range measurement. +pub enum FilterInRequestMessage { + /// Request + PingPongRequest(RequestWithReplyChannel), +} + +impl HasOnRequestMessage for FilterInRequestMessage { + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + _outbound: &Self::OutboundHub, + _request: &Self::OutRequestHub, + ) { + match self { + FilterInRequestMessage::PingPongRequest(request) => { + request.reply(|ping| PingPong { + ping, + pong: state.time, + }); + } + } + } +} + +impl IsInRequestMessageNew> for FilterInRequestMessage { + fn new(_inbound_channel: String, request: RequestWithReplyChannel) -> Self { + FilterInRequestMessage::PingPongRequest(request) + } +} + +impl IsInRequestMessage for FilterInRequestMessage { + type Prop = NullProp; + + type State = FilterState; + + type OutboundHub = FilterOutbound; + + type OutRequestHub = NullOutRequests; + + fn in_request_channel(&self) -> String { + "ping_pong_request".to_owned() + } +} + /// Inbound channels for the filter actor. #[derive(Clone, Debug)] -#[actor_inputs(FilterInbound,{NullProp, FilterState, FilterOutbound, NullRequest})] +#[actor_inputs(FilterInbound,{NullProp, FilterState, FilterOutbound, NullOutRequests, + FilterInRequestMessage})] pub enum FilterInboundMessage { /// noisy velocity measurements NoisyVelocity(Stamped), /// noisy range measurements NoisyRange(Stamped), - /// Request - PingPongRequest(RequestMessage), } -#[actor(FilterInboundMessage)] -type Filter = Actor; +#[actor(FilterInboundMessage, FilterInRequestMessage)] +type Filter = + Actor; impl HasOnMessage for FilterInboundMessage { /// Process the inbound message NoisyVelocity or NoisyRange. @@ -35,7 +81,7 @@ impl HasOnMessage for FilterInboundMessage { _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - _request: &Self::RequestHub, + _request: &Self::OutRequestHub, ) { match self { FilterInboundMessage::NoisyVelocity(v) => { @@ -44,12 +90,6 @@ impl HasOnMessage for FilterInboundMessage { FilterInboundMessage::NoisyRange(r) => { state.update(&r, outbound); } - FilterInboundMessage::PingPongRequest(request) => { - request.reply(|ping| PingPong { - ping, - pong: state.time, - }); - } } } } @@ -64,12 +104,6 @@ impl IsInboundMessageNew> for FilterInboundMessage { } } -impl IsInboundMessageNew> for FilterInboundMessage { - fn new(_inbound_channel: String, request: RequestMessage) -> Self { - FilterInboundMessage::PingPongRequest(request) - } -} - /// Filter state #[derive(Clone, Debug, Default)] pub struct FilterState { @@ -221,3 +255,47 @@ pub struct FilterOutbound { /// Publishes the updated state of the filter. pub updated_state: OutboundChannel, } + +/// Inbound channels for the filter actor. +pub struct FilterInRequest { + /// Request channel for the ping_pong_request. + pub ping_pong_request: + InRequestChannel, FilterInRequestMessage>, +} + +impl + IsInRequestHub< + NullProp, + FilterState, + FilterOutbound, + NullOutRequests, + FilterInboundMessage, + FilterInRequestMessage, + > for FilterInRequest +{ + fn from_builder( + builder: &mut ActorBuilder< + NullProp, + FilterState, + FilterOutbound, + NullOutRequests, + FilterInboundMessage, + FilterInRequestMessage, + >, + actor_name: &str, + ) -> Self { + let ping_pong_request = InRequestChannel::new( + builder.context, + actor_name, + &builder.request_sender, + "ping_pong_request".to_owned(), + ); + + builder.forward_request.insert( + "ping_pong_request".to_owned(), + Box::new(ping_pong_request.clone()), + ); + + Self { ping_pong_request } + } +} diff --git a/src/example_actors/one_dim_robot/sim.rs b/src/example_actors/one_dim_robot/sim.rs index d928483..d95e87b 100644 --- a/src/example_actors/one_dim_robot/sim.rs +++ b/src/example_actors/one_dim_robot/sim.rs @@ -2,8 +2,8 @@ use crate::example_actors::one_dim_robot::RangeMeasurementModel; use crate::example_actors::one_dim_robot::Robot; use crate::example_actors::one_dim_robot::Stamped; use crate::prelude::*; +use crate::OutRequestChannel; use crate::ReplyMessage; -use crate::RequestChannel; use rand_distr::Distribution; use rand_distr::Normal; use std::fmt::Debug; @@ -19,7 +19,15 @@ pub struct PingPong { /// Inbound channels for the simulation actor. #[derive(Clone, Debug)] -#[actor_inputs(SimInbound, {NullProp, SimState, SimOutbound, SimRequest})] +#[actor_inputs( + SimInbound, + { + NullProp, + SimState, + SimOutbound, + SimRequest, + NullInRequestMessage + })] pub enum SimInboundMessage { /// Time-stamp message to drive the simulation. TimeStamp(f64), @@ -28,8 +36,8 @@ pub enum SimInboundMessage { } /// Simulation for the one-dimensional Robot. -#[actor(SimInboundMessage)] -pub type Sim = Actor; +#[actor(SimInboundMessage, NullInRequestMessage)] +pub type Sim = Actor; impl HasOnMessage for SimInboundMessage { /// Invokes [SimState::process_time_stamp()] on TimeStamp. @@ -38,7 +46,7 @@ impl HasOnMessage for SimInboundMessage { _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub, - request: &Self::RequestHub, + request: &Self::OutRequestHub, ) { match self { SimInboundMessage::TimeStamp(time) => { @@ -139,7 +147,7 @@ impl SimState { self.seq += 1; - if time == 5.0 { + if time == 0.1 { request.ping_pong.send_request(time); } } @@ -163,8 +171,8 @@ pub struct SimOutbound { } /// Request of the simulation actor. -#[actor_requests] +#[actor_out_requests] pub struct SimRequest { /// Check time-stamp of receiver - pub ping_pong: RequestChannel, + pub ping_pong: OutRequestChannel, } diff --git a/src/lib.rs b/src/lib.rs index 1b0f384..5ffb064 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,14 +79,6 @@ //! pub alpha: f64, //! } //! -//! impl Default for MovingAverageProp { -//! fn default() -> Self { -//! MovingAverageProp { -//! alpha: 0.5, -//! } -//! } -//! } -//! //! /// State of the MovingAverage actor. //! #[derive(Clone, Debug, Default)] //! pub struct MovingAverageState { @@ -112,10 +104,15 @@ //! ```ignore //! /// Inbound message for the MovingAverage actor. //! #[derive(Clone, Debug)] -//! #[actor_inputs(MovingAverageInbound, {MovingAverageProp, -//! MovingAverageState, -//! MovingAverageOutbound, -//! NullRequest})] +//! #[actor_inputs( +//! MovingAverageInbound, +//! { +//! MovingAverageProp, +//! MovingAverageState, +//! MovingAverageOutbound, +//! NullOutRequests, +//! NullInRequestMessage +//! })] //! pub enum MovingAverageMessage { //! /// a float value //! Value(f64), @@ -128,7 +125,7 @@ //! prop: &Self::Prop, //! state: &mut Self::State, //! outbound: &Self::Outbound, -//! _request: &Self::RequestHub) +//! _request: &Self::OutRequestHub) //! { //! match &self { //! MovingAverageMessage::Value(new_value) => { @@ -164,9 +161,10 @@ //! type MovingAverage = Actor< //! MovingAverageProp, //! MovingAverageInbound, +//! NullInRequests, //! MovingAverageOutbound, //! MovingAverageState, -//! NullRequest>; +//! NullOutRequests>; //! ``` //! //! ### Configure and execute the pipeline @@ -193,7 +191,7 @@ //! context, //! MovingAverageProp { //! alpha: 0.3, -//! ..Default::default() +//! timeout: 5.0, //! }, //! MovingAverageState::default(), //! ); @@ -290,30 +288,39 @@ pub use crate::core::actor::ForwardTable; pub use crate::core::actor::GenericActor; pub use crate::core::actor::HasFromPropState; pub use crate::core::actor::IsActorNode; +pub use crate::core::actor_builder::ActorBuilder; pub use crate::core::connection::ConnectionEnum; +pub use crate::core::in_request::HasForwardRequestMessage; +pub use crate::core::in_request::HasOnRequestMessage; +pub use crate::core::in_request::InRequestChannel; +pub use crate::core::in_request::IsInRequestHub; +pub use crate::core::in_request::IsInRequestMessage; +pub use crate::core::in_request::IsInRequestMessageNew; +pub use crate::core::in_request::NullInRequestMessage; +pub use crate::core::in_request::NullInRequests; pub use crate::core::inbound::HasForwardMessage; +pub use crate::core::inbound::HasOnMessage; +pub use crate::core::inbound::InboundChannel; +pub use crate::core::inbound::IsInboundHub; +pub use crate::core::inbound::IsInboundMessage; +pub use crate::core::inbound::IsInboundMessageNew; +pub use crate::core::inbound::NullInbound; +pub use crate::core::inbound::NullMessage; +pub use crate::core::out_request::IsOutRequestHub; +pub use crate::core::out_request::IsRequestWithReplyChannel; +pub use crate::core::out_request::NullOutRequests; +pub use crate::core::out_request::OutRequestChannel; +pub use crate::core::out_request::ReplyMessage; +pub use crate::core::out_request::RequestWithReplyChannel; +pub use crate::core::outbound::HasActivate; pub use crate::core::outbound::IsGenericConnection; pub use crate::core::outbound::IsOutboundHub; -pub use crate::core::request::IsRequestHub; -pub use crate::core::request::ReplyMessage; -pub use crate::core::request::RequestChannel; -pub use crate::core::request::RequestMessage; +pub use crate::core::outbound::NullOutbound; +pub use crate::core::outbound::OutboundChannel; +pub use crate::core::runner::DefaultRunner; pub use crate::core::runner::Runner; -pub use core::actor_builder::ActorBuilder; -pub use core::inbound::HasOnMessage; -pub use core::inbound::InboundChannel; -pub use core::inbound::IsInboundHub; -pub use core::inbound::IsInboundMessage; -pub use core::inbound::IsInboundMessageNew; -pub use core::inbound::NullInbound; -pub use core::inbound::NullMessage; -pub use core::outbound::HasActivate; -pub use core::outbound::NullOutbound; -pub use core::outbound::OutboundChannel; -pub use core::request::NullRequest; -pub use core::runner::DefaultRunner; -pub use core::value::NullProp; -pub use core::value::NullState; +pub use crate::core::value::NullProp; +pub use crate::core::value::NullState; /// The compute context and compute graph. pub mod compute; @@ -336,7 +343,7 @@ pub mod example_actors; /// following order: /// /// 1. [actor_outputs](macros::actor_outputs) -/// 2. [actor_requests](macros::actor_requests) +/// 2. [actor_out_requests](macros::actor_out_requests) /// 3. [actor_inputs](macros::actor_inputs) which depends on 1. and 2. /// 4. [actor](macros::actor) which depends on 1., 2. and 3. /// @@ -344,116 +351,12 @@ pub mod example_actors; /// refer to the examples in the root of the [crate](crate#example-moving-average). pub mod macros { - /// This macro generates the boilerplate for the outbound hub struct it is applied to. - /// - /// Macro template: - /// - /// ``` text - /// #[actor_outputs] - /// pub struct OUTBOUND { - /// pub CHANNEL0: OutboundChannel, - /// pub CHANNEL1: OutboundChannel, - /// ... - /// } - /// ``` - /// - /// Here, OUTBOUND is the user-specified name of the struct. The struct shall be defined right - /// after the macro invocation. (Indeed, these types of macros are called "attribute macros". - /// They are applied to the item directly following them, in this case a struct.) The outbound - /// struct consists of a zero, one or more outbound channels. Each outbound channel has a - /// user-specified name CHANNEL* and a user specified type TYPE*. - /// - /// Effect: The macro generates the [IsOutboundHub](crate::IsOutboundHub) and - /// [HasActivate](crate::HasActivate) implementations for the provided struct OUTBOUND. - /// - /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] - /// and [macro@actor]. - /// pub use hollywood_macros::actor_outputs; - /// This macro generates the boilerplate for the request hub struct it is applied to. - /// - /// Macro template: - /// - /// ``` text - /// #[actor_requests] - /// pub struct REQUEST { - /// pub CHANNEL0: RequestChannel, - /// pub CHANNEL1: RequestChannel, - /// ... - /// } - /// ``` - /// - /// Here, REQUEST is the user-specified name of the struct. The struct shall be defined right - /// after the macro invocation. The request struct consists of one or more request channels. - /// Each request channel has name CHANNEL*, a request type REQ_TYPE*, a reply type REPL_TYPE*, - /// and a message type M*. - /// - /// Effect: The macro generates the [IsRequestHub](crate::IsRequestHub) and - /// [HasActivate](crate::HasActivate) implementations for the provided struct REQUEST. - /// - pub use hollywood_macros::actor_requests; + pub use hollywood_macros::actor_out_requests; - /// This macro generates the boilerplate for the inbound hub of an actor. - /// - /// Macro template: - /// - /// ``` text - /// #[derive(Clone, Debug)] - /// actor_inputs(INBOUND,(PROP, STATE, OUTBOUND)); - /// pub enum INBOUND_MESSAGE { - /// VARIANT0(TYPE0), - /// VARIANT1(TYPE1), - /// ... - /// } - /// ``` - /// - /// INBOUND_MESSAGE is the user-specified name of an enum which shall be defined right below the - /// macro invocation. The enum shall consist of a zero, one or more message variants. Each - /// variant has a user-specified name VARIENT* and type TYPE*. - /// - /// Prerequisite: - /// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) - /// and [HasActivate](crate::HasActivate), typically using the [macro@actor_outputs] macro. - /// - The REQUEST struct is defined and implements [IsRequestHub](crate::IsRequestHub) and - /// [HasActivate](crate::HasActivate), e.g. using the [actor_requests] macro. - /// - The PROP and STATE structs are defined. - /// - /// Effects: - /// - The macro defines the struct INBOUND that contains an inbound channel field for each - /// variant of the INBOUND_MESSAGE enum, and implements the - /// [IsInboundHub](crate::IsInboundHub) trait for it. - /// - Implements the [IsInboundMessage](crate::IsInboundMessage) trait for INBOUND_MESSAGE. - /// pub use hollywood_macros::actor_inputs; - /// This macro generates the boilerplate to define an new actor type. - /// - /// Macro template: - /// - /// ``` text - /// #[actor(INBOUND_MESSAGE)] - /// type ACTOR = Actor; - /// ``` - /// - /// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined - /// right after the macro invocation as an alias of [Actor](crate::Actor). - /// - /// Prerequisites: - /// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) and - /// [HasActivate](crate::HasActivate), e.g. using the [actor_outputs] macro. - /// - The REQUEST struct is defined and implements [IsRequestHub](crate::IsRequestHub) and - /// [HasActivate](crate::HasActivate), e.g. using the [actor_requests] macro. - /// - The INBOUND_MESSAGE enum is defined and implements - /// [IsInboundMessage](crate::IsInboundMessage), as well as the INBOUND struct is defined - /// and implements the [IsInboundHub](crate::IsInboundHub) trait, e.g through the - /// [actor_inputs] macro. - /// - The PROP and STATE structs are defined. - /// - /// Effect: - /// - This macro implements the [HasFromPropState](crate::HasFromPropState) trait for the ACTOR - /// type. - /// pub use hollywood_macros::actor; pub use hollywood_macros::zip_n; @@ -467,22 +370,31 @@ pub mod prelude { pub use crate::DefaultRunner; pub use crate::HasActivate; pub use crate::HasForwardMessage; + pub use crate::HasForwardRequestMessage; pub use crate::HasFromPropState; pub use crate::HasOnMessage; + pub use crate::HasOnRequestMessage; pub use crate::Hollywood; + pub use crate::InRequestChannel; pub use crate::InboundChannel; pub use crate::IsActorNode; pub use crate::IsGenericConnection; + pub use crate::IsInRequestHub; + pub use crate::IsInRequestMessage; + pub use crate::IsInRequestMessageNew; pub use crate::IsInboundHub; pub use crate::IsInboundMessage; pub use crate::IsInboundMessageNew; + pub use crate::IsOutRequestHub; pub use crate::IsOutboundHub; - pub use crate::IsRequestHub; + pub use crate::IsRequestWithReplyChannel; + pub use crate::NullInRequestMessage; + pub use crate::NullInRequests; pub use crate::NullInbound; pub use crate::NullMessage; + pub use crate::NullOutRequests; pub use crate::NullOutbound; pub use crate::NullProp; - pub use crate::NullRequest; pub use crate::NullState; pub use crate::OutboundChannel; }