From ced7be6b9eb3af65d88618dc1dc7593dd8133872 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 7 Feb 2023 08:16:14 +0000 Subject: [PATCH] Allow to specify per-subsystem signal channel size (#29) --- orchestra/examples/duo.rs | 2 +- orchestra/proc-macro/src/impl_builder.rs | 13 ++-- .../src/parse/parse_orchestra_struct.rs | 70 +++++++++++++++---- 3 files changed, 66 insertions(+), 19 deletions(-) diff --git a/orchestra/examples/duo.rs b/orchestra/examples/duo.rs index 4dfb3eb..ba11721 100644 --- a/orchestra/examples/duo.rs +++ b/orchestra/examples/duo.rs @@ -63,7 +63,7 @@ impl Fortified { #[orchestra(signal=SigSigSig, event=EvX, error=Yikes, gen=AllMessages)] struct Duo { - #[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768)] + #[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768, signal_capacity: 128)] sub0: Awesome, #[subsystem(blocking, consumes: Plinko, sends: [MsgStrukt])] diff --git a/orchestra/proc-macro/src/impl_builder.rs b/orchestra/proc-macro/src/impl_builder.rs index b1d8fa0..dbde9a3 100644 --- a/orchestra/proc-macro/src/impl_builder.rs +++ b/orchestra/proc-macro/src/impl_builder.rs @@ -42,7 +42,10 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { let consumes = &info.consumes_without_wip(); let channel_name = &info.channel_names_without_wip(None); let channel_name_unbounded = &info.channel_names_without_wip("_unbounded"); - let channel_capacity = &info.channel_capacities_without_wip(info.message_channel_capacity); + let message_channel_capacity = + &info.message_channel_capacities_without_wip(info.message_channel_capacity); + let signal_channel_capacity = + &info.signal_channel_capacities_without_wip(info.signal_channel_capacity); let channel_name_tx = &info.channel_names_without_wip("_tx"); let channel_name_unbounded_tx = &info.channel_names_without_wip("_unbounded_tx"); @@ -520,6 +523,8 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { #spawner_where_clause, { /// Set the interconnecting signal channel capacity. + /// This will override both static overseer default, e.g. `overseer(signal_capacity=123,...)`, + /// **and** subsystem specific capacities, e.g. `subsystem(signal_capacity: 123,...)`. pub fn signal_channel_capacity(mut self, capacity: usize) -> Self { self.signal_capacity = Some(capacity); @@ -528,7 +533,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { /// Set the interconnecting message channel capacities. /// This will override both static overseer default, e.g. `overseer(message_capacity=123,...)`, - /// **and** subsystem specific capacities, e.g. `subsystem(message_capacity=123,...)`. + /// **and** subsystem specific capacities, e.g. `subsystem(message_capacity: 123,...)`. pub fn message_channel_capacity(mut self, capacity: usize) -> Self { self.channel_capacity = Some(capacity); @@ -581,7 +586,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { #support_crate ::metered::channel::< MessagePacket< #consumes > >( - self.channel_capacity.unwrap_or(#channel_capacity) + self.channel_capacity.unwrap_or(#message_channel_capacity) ); )* @@ -624,7 +629,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { #support_crate ::select_message_channel_strategy ); let (signal_tx, signal_rx) = #support_crate ::metered::channel( - self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY) + self.signal_capacity.unwrap_or(#signal_channel_capacity) ); let ctx = #subsystem_ctx_name::< #consumes >::new( diff --git a/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs b/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs index 80e3055..b80eca7 100644 --- a/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs +++ b/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs @@ -21,6 +21,7 @@ use syn::{ parse::{Parse, ParseStream}, punctuated::Punctuated, spanned::Spanned, + token, token::Bracket, AttrStyle, Error, Field, FieldsNamed, GenericParam, Ident, ItemStruct, LitInt, Path, PathSegment, Result, Token, Type, Visibility, @@ -34,6 +35,7 @@ mod kw { syn::custom_keyword!(consumes); syn::custom_keyword!(sends); syn::custom_keyword!(message_capacity); + syn::custom_keyword!(signal_capacity); } #[derive(Clone, Debug)] @@ -49,7 +51,9 @@ pub(crate) enum SubSysAttrItem { /// Message to be consumed by this subsystem. Consumes(Consumes), /// Custom message channels capacity for this subsystem - MessageChannelCapacity(MessageCapacity), + MessageChannelCapacity(ChannelCapacity), + /// Custom signal channels capacity for this subsystem + SignalChannelCapacity(ChannelCapacity), } impl Parse for SubSysAttrItem { @@ -62,7 +66,9 @@ impl Parse for SubSysAttrItem { } else if lookahead.peek(kw::sends) { Self::Sends(input.parse::()?) } else if lookahead.peek(kw::message_capacity) { - Self::MessageChannelCapacity(input.parse::()?) + Self::MessageChannelCapacity(input.parse::>()?) + } else if lookahead.peek(kw::signal_capacity) { + Self::SignalChannelCapacity(input.parse::>()?) } else { Self::Consumes(input.parse::()?) }) @@ -87,6 +93,9 @@ impl ToTokens for SubSysAttrItem { Self::MessageChannelCapacity(_) => { quote! {} }, + Self::SignalChannelCapacity(_) => { + quote! {} + }, }; tokens.extend(ts.into_iter()); } @@ -113,8 +122,10 @@ pub(crate) struct SubSysField { /// Avoids dispatching `Wrapper` type messages, but generates the variants. /// Does not require the subsystem to be instantiated with the builder pattern. pub(crate) wip: bool, - /// Custom message capacity + /// Custom message channel capacity pub(crate) message_capacity: Option, + /// Custom signal channel capacity + pub(crate) signal_capacity: Option, } impl SubSysField { @@ -300,18 +311,18 @@ impl Parse for Consumes { } #[derive(Debug, Clone)] -pub(crate) struct MessageCapacity { +pub(crate) struct ChannelCapacity { #[allow(dead_code)] - tag: kw::message_capacity, + tag: T, #[allow(dead_code)] - colon_token: Token![:], + colon_token: token::Colon, value: usize, } -impl Parse for MessageCapacity { +impl Parse for ChannelCapacity { fn parse(input: syn::parse::ParseStream) -> Result { Ok(Self { - tag: input.parse::()?, + tag: input.parse::()?, colon_token: input.parse()?, value: input.parse::()?.base10_parse::()?, }) @@ -331,8 +342,10 @@ pub(crate) struct SubSystemAttrItems { /// The message type being consumed by the subsystem. pub(crate) consumes: Option, pub(crate) sends: Option, - /// Custom channel capacity - pub(crate) message_capacity: Option, + /// Custom message channel capacity + pub(crate) message_capacity: Option>, + /// Custom signal channel capacity + pub(crate) signal_capacity: Option>, } impl Parse for SubSystemAttrItems { @@ -373,8 +386,9 @@ impl Parse for SubSystemAttrItems { let blocking = extract_variant!(unique, Blocking; default = false); let wip = extract_variant!(unique, Wip; default = false); let message_capacity = extract_variant!(unique, MessageChannelCapacity take ); + let signal_capacity = extract_variant!(unique, SignalChannelCapacity take ); - Ok(Self { blocking, wip, sends, consumes, message_capacity }) + Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity }) } } @@ -507,7 +521,10 @@ impl OrchestraInfo { .collect::>() } - pub(crate) fn channel_capacities_without_wip(&self, default_capacity: usize) -> Vec { + pub(crate) fn message_channel_capacities_without_wip( + &self, + default_capacity: usize, + ) -> Vec { self.subsystems .iter() .filter(|ssf| !ssf.wip) @@ -520,6 +537,22 @@ impl OrchestraInfo { .collect::>() } + pub(crate) fn signal_channel_capacities_without_wip( + &self, + default_capacity: usize, + ) -> Vec { + self.subsystems + .iter() + .filter(|ssf| !ssf.wip) + .map(|ssf| { + LitInt::new( + &(ssf.signal_capacity.unwrap_or(default_capacity).to_string()), + ssf.signal_capacity.span(), + ) + }) + .collect::>() + } + pub(crate) fn consumes_without_wip(&self) -> Vec { self.subsystems .iter() @@ -601,8 +634,15 @@ impl OrchestraGuts { } unique_subsystem_idents.insert(generic.clone()); - let SubSystemAttrItems { wip, blocking, consumes, sends, message_capacity, .. } = - subsystem_attrs; + let SubSystemAttrItems { + wip, + blocking, + consumes, + sends, + message_capacity, + signal_capacity, + .. + } = subsystem_attrs; // messages to be sent let sends = if let Some(sends) = sends { @@ -612,6 +652,7 @@ impl OrchestraGuts { }; let consumes = consumes.map(|consumes| consumes.consumes); let message_capacity = message_capacity.map(|capacity| capacity.value); + let signal_capacity = signal_capacity.map(|capacity| capacity.value); subsystems.push(SubSysField { name: ident, @@ -621,6 +662,7 @@ impl OrchestraGuts { wip, blocking, message_capacity, + signal_capacity, }); } else { // collect the "baggage"