Skip to content

Commit

Permalink
Allow to specify per-subsystem signal channel size (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov authored Feb 7, 2023
1 parent 4db6c12 commit ced7be6
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 19 deletions.
2 changes: 1 addition & 1 deletion orchestra/examples/duo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Context> Fortified {

#[orchestra(signal=SigSigSig, event=EvX, error=Yikes, gen=AllMessages)]
struct Duo<T, U, V, W> {
#[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768)]
#[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768, signal_capacity: 128)]
sub0: Awesome,

#[subsystem(blocking, consumes: Plinko, sends: [MsgStrukt])]
Expand Down
13 changes: 9 additions & 4 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
let consumes = &info.consumes_without_wip();
let channel_name = &info.channel_names_without_wip(None);
let channel_name_unbounded = &info.channel_names_without_wip("_unbounded");
let channel_capacity = &info.channel_capacities_without_wip(info.message_channel_capacity);
let message_channel_capacity =
&info.message_channel_capacities_without_wip(info.message_channel_capacity);
let signal_channel_capacity =
&info.signal_channel_capacities_without_wip(info.signal_channel_capacity);

let channel_name_tx = &info.channel_names_without_wip("_tx");
let channel_name_unbounded_tx = &info.channel_names_without_wip("_unbounded_tx");
Expand Down Expand Up @@ -520,6 +523,8 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#spawner_where_clause,
{
/// Set the interconnecting signal channel capacity.
/// This will override both static overseer default, e.g. `overseer(signal_capacity=123,...)`,
/// **and** subsystem specific capacities, e.g. `subsystem(signal_capacity: 123,...)`.
pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
{
self.signal_capacity = Some(capacity);
Expand All @@ -528,7 +533,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {

/// Set the interconnecting message channel capacities.
/// This will override both static overseer default, e.g. `overseer(message_capacity=123,...)`,
/// **and** subsystem specific capacities, e.g. `subsystem(message_capacity=123,...)`.
/// **and** subsystem specific capacities, e.g. `subsystem(message_capacity: 123,...)`.
pub fn message_channel_capacity(mut self, capacity: usize) -> Self
{
self.channel_capacity = Some(capacity);
Expand Down Expand Up @@ -581,7 +586,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#support_crate ::metered::channel::<
MessagePacket< #consumes >
>(
self.channel_capacity.unwrap_or(#channel_capacity)
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)*

Expand Down Expand Up @@ -624,7 +629,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#support_crate ::select_message_channel_strategy
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
self.signal_capacity.unwrap_or(#signal_channel_capacity)
);

let ctx = #subsystem_ctx_name::< #consumes >::new(
Expand Down
70 changes: 56 additions & 14 deletions orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use syn::{
parse::{Parse, ParseStream},
punctuated::Punctuated,
spanned::Spanned,
token,
token::Bracket,
AttrStyle, Error, Field, FieldsNamed, GenericParam, Ident, ItemStruct, LitInt, Path,
PathSegment, Result, Token, Type, Visibility,
Expand All @@ -34,6 +35,7 @@ mod kw {
syn::custom_keyword!(consumes);
syn::custom_keyword!(sends);
syn::custom_keyword!(message_capacity);
syn::custom_keyword!(signal_capacity);
}

#[derive(Clone, Debug)]
Expand All @@ -49,7 +51,9 @@ pub(crate) enum SubSysAttrItem {
/// Message to be consumed by this subsystem.
Consumes(Consumes),
/// Custom message channels capacity for this subsystem
MessageChannelCapacity(MessageCapacity),
MessageChannelCapacity(ChannelCapacity<kw::message_capacity>),
/// Custom signal channels capacity for this subsystem
SignalChannelCapacity(ChannelCapacity<kw::signal_capacity>),
}

impl Parse for SubSysAttrItem {
Expand All @@ -62,7 +66,9 @@ impl Parse for SubSysAttrItem {
} else if lookahead.peek(kw::sends) {
Self::Sends(input.parse::<Sends>()?)
} else if lookahead.peek(kw::message_capacity) {
Self::MessageChannelCapacity(input.parse::<MessageCapacity>()?)
Self::MessageChannelCapacity(input.parse::<ChannelCapacity<kw::message_capacity>>()?)
} else if lookahead.peek(kw::signal_capacity) {
Self::SignalChannelCapacity(input.parse::<ChannelCapacity<kw::signal_capacity>>()?)
} else {
Self::Consumes(input.parse::<Consumes>()?)
})
Expand All @@ -87,6 +93,9 @@ impl ToTokens for SubSysAttrItem {
Self::MessageChannelCapacity(_) => {
quote! {}
},
Self::SignalChannelCapacity(_) => {
quote! {}
},
};
tokens.extend(ts.into_iter());
}
Expand All @@ -113,8 +122,10 @@ pub(crate) struct SubSysField {
/// Avoids dispatching `Wrapper` type messages, but generates the variants.
/// Does not require the subsystem to be instantiated with the builder pattern.
pub(crate) wip: bool,
/// Custom message capacity
/// Custom message channel capacity
pub(crate) message_capacity: Option<usize>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<usize>,
}

impl SubSysField {
Expand Down Expand Up @@ -300,18 +311,18 @@ impl Parse for Consumes {
}

#[derive(Debug, Clone)]
pub(crate) struct MessageCapacity {
pub(crate) struct ChannelCapacity<T: Parse> {
#[allow(dead_code)]
tag: kw::message_capacity,
tag: T,
#[allow(dead_code)]
colon_token: Token![:],
colon_token: token::Colon,
value: usize,
}

impl Parse for MessageCapacity {
impl<T: Parse> Parse for ChannelCapacity<T> {
fn parse(input: syn::parse::ParseStream) -> Result<Self> {
Ok(Self {
tag: input.parse::<kw::message_capacity>()?,
tag: input.parse::<T>()?,
colon_token: input.parse()?,
value: input.parse::<LitInt>()?.base10_parse::<usize>()?,
})
Expand All @@ -331,8 +342,10 @@ pub(crate) struct SubSystemAttrItems {
/// The message type being consumed by the subsystem.
pub(crate) consumes: Option<Consumes>,
pub(crate) sends: Option<Sends>,
/// Custom channel capacity
pub(crate) message_capacity: Option<MessageCapacity>,
/// Custom message channel capacity
pub(crate) message_capacity: Option<ChannelCapacity<kw::message_capacity>>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<ChannelCapacity<kw::signal_capacity>>,
}

impl Parse for SubSystemAttrItems {
Expand Down Expand Up @@ -373,8 +386,9 @@ impl Parse for SubSystemAttrItems {
let blocking = extract_variant!(unique, Blocking; default = false);
let wip = extract_variant!(unique, Wip; default = false);
let message_capacity = extract_variant!(unique, MessageChannelCapacity take );
let signal_capacity = extract_variant!(unique, SignalChannelCapacity take );

Ok(Self { blocking, wip, sends, consumes, message_capacity })
Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity })
}
}

Expand Down Expand Up @@ -507,7 +521,10 @@ impl OrchestraInfo {
.collect::<Vec<_>>()
}

pub(crate) fn channel_capacities_without_wip(&self, default_capacity: usize) -> Vec<LitInt> {
pub(crate) fn message_channel_capacities_without_wip(
&self,
default_capacity: usize,
) -> Vec<LitInt> {
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
Expand All @@ -520,6 +537,22 @@ impl OrchestraInfo {
.collect::<Vec<_>>()
}

pub(crate) fn signal_channel_capacities_without_wip(
&self,
default_capacity: usize,
) -> Vec<LitInt> {
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| {
LitInt::new(
&(ssf.signal_capacity.unwrap_or(default_capacity).to_string()),
ssf.signal_capacity.span(),
)
})
.collect::<Vec<_>>()
}

pub(crate) fn consumes_without_wip(&self) -> Vec<Path> {
self.subsystems
.iter()
Expand Down Expand Up @@ -601,8 +634,15 @@ impl OrchestraGuts {
}
unique_subsystem_idents.insert(generic.clone());

let SubSystemAttrItems { wip, blocking, consumes, sends, message_capacity, .. } =
subsystem_attrs;
let SubSystemAttrItems {
wip,
blocking,
consumes,
sends,
message_capacity,
signal_capacity,
..
} = subsystem_attrs;

// messages to be sent
let sends = if let Some(sends) = sends {
Expand All @@ -612,6 +652,7 @@ impl OrchestraGuts {
};
let consumes = consumes.map(|consumes| consumes.consumes);
let message_capacity = message_capacity.map(|capacity| capacity.value);
let signal_capacity = signal_capacity.map(|capacity| capacity.value);

subsystems.push(SubSysField {
name: ident,
Expand All @@ -621,6 +662,7 @@ impl OrchestraGuts {
wip,
blocking,
message_capacity,
signal_capacity,
});
} else {
// collect the "baggage"
Expand Down

0 comments on commit ced7be6

Please sign in to comment.