Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to specify per-subsystem signal channel size #29

Merged
merged 7 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orchestra/examples/duo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Context> Fortified {

#[orchestra(signal=SigSigSig, event=EvX, error=Yikes, gen=AllMessages)]
struct Duo<T, U, V, W> {
#[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768)]
#[subsystem(consumes: MsgStrukt, sends: [Plinko], message_capacity: 32768, signal_capacity: 128)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add test for the overrides(ensure we block when channel full) to ensure it doesn't break over time.

sub0: Awesome,

#[subsystem(blocking, consumes: Plinko, sends: [MsgStrukt])]
Expand Down
13 changes: 9 additions & 4 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
let consumes = &info.consumes_without_wip();
let channel_name = &info.channel_names_without_wip(None);
let channel_name_unbounded = &info.channel_names_without_wip("_unbounded");
let channel_capacity = &info.channel_capacities_without_wip(info.message_channel_capacity);
let message_channel_capacity =
&info.message_channel_capacities_without_wip(info.message_channel_capacity);
let signal_channel_capacity =
&info.signal_channel_capacities_without_wip(info.signal_channel_capacity);

let channel_name_tx = &info.channel_names_without_wip("_tx");
let channel_name_unbounded_tx = &info.channel_names_without_wip("_unbounded_tx");
Expand Down Expand Up @@ -520,6 +523,8 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#spawner_where_clause,
{
/// Set the interconnecting signal channel capacity.
/// This will override both static overseer default, e.g. `overseer(signal_capacity=123,...)`,
/// **and** subsystem specific capacities, e.g. `subsystem(signal_capacity: 123,...)`.
pub fn signal_channel_capacity(mut self, capacity: usize) -> Self
{
self.signal_capacity = Some(capacity);
Expand All @@ -528,7 +533,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {

/// Set the interconnecting message channel capacities.
/// This will override both static overseer default, e.g. `overseer(message_capacity=123,...)`,
/// **and** subsystem specific capacities, e.g. `subsystem(message_capacity=123,...)`.
/// **and** subsystem specific capacities, e.g. `subsystem(message_capacity: 123,...)`.
pub fn message_channel_capacity(mut self, capacity: usize) -> Self
{
self.channel_capacity = Some(capacity);
Expand Down Expand Up @@ -581,7 +586,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#support_crate ::metered::channel::<
MessagePacket< #consumes >
>(
self.channel_capacity.unwrap_or(#channel_capacity)
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)*

Expand Down Expand Up @@ -624,7 +629,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
#support_crate ::select_message_channel_strategy
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
self.signal_capacity.unwrap_or(#signal_channel_capacity)
);

let ctx = #subsystem_ctx_name::< #consumes >::new(
Expand Down
70 changes: 56 additions & 14 deletions orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use syn::{
parse::{Parse, ParseStream},
punctuated::Punctuated,
spanned::Spanned,
token,
token::Bracket,
AttrStyle, Error, Field, FieldsNamed, GenericParam, Ident, ItemStruct, LitInt, Path,
PathSegment, Result, Token, Type, Visibility,
Expand All @@ -34,6 +35,7 @@ mod kw {
syn::custom_keyword!(consumes);
syn::custom_keyword!(sends);
syn::custom_keyword!(message_capacity);
syn::custom_keyword!(signal_capacity);
}

#[derive(Clone, Debug)]
Expand All @@ -49,7 +51,9 @@ pub(crate) enum SubSysAttrItem {
/// Message to be consumed by this subsystem.
Consumes(Consumes),
/// Custom message channels capacity for this subsystem
MessageChannelCapacity(MessageCapacity),
MessageChannelCapacity(ChannelCapacity<kw::message_capacity>),
/// Custom signal channels capacity for this subsystem
SignalChannelCapacity(ChannelCapacity<kw::signal_capacity>),
}

impl Parse for SubSysAttrItem {
Expand All @@ -62,7 +66,9 @@ impl Parse for SubSysAttrItem {
} else if lookahead.peek(kw::sends) {
Self::Sends(input.parse::<Sends>()?)
} else if lookahead.peek(kw::message_capacity) {
Self::MessageChannelCapacity(input.parse::<MessageCapacity>()?)
Self::MessageChannelCapacity(input.parse::<ChannelCapacity<kw::message_capacity>>()?)
} else if lookahead.peek(kw::signal_capacity) {
Self::SignalChannelCapacity(input.parse::<ChannelCapacity<kw::signal_capacity>>()?)
} else {
Self::Consumes(input.parse::<Consumes>()?)
})
Expand All @@ -87,6 +93,9 @@ impl ToTokens for SubSysAttrItem {
Self::MessageChannelCapacity(_) => {
quote! {}
},
Self::SignalChannelCapacity(_) => {
quote! {}
},
};
tokens.extend(ts.into_iter());
}
Expand All @@ -113,8 +122,10 @@ pub(crate) struct SubSysField {
/// Avoids dispatching `Wrapper` type messages, but generates the variants.
/// Does not require the subsystem to be instantiated with the builder pattern.
pub(crate) wip: bool,
/// Custom message capacity
/// Custom message channel capacity
pub(crate) message_capacity: Option<usize>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<usize>,
}

impl SubSysField {
Expand Down Expand Up @@ -300,18 +311,18 @@ impl Parse for Consumes {
}

#[derive(Debug, Clone)]
pub(crate) struct MessageCapacity {
pub(crate) struct ChannelCapacity<T: Parse> {
#[allow(dead_code)]
tag: kw::message_capacity,
tag: T,
#[allow(dead_code)]
colon_token: Token![:],
colon_token: token::Colon,
value: usize,
}

impl Parse for MessageCapacity {
impl<T: Parse> Parse for ChannelCapacity<T> {
fn parse(input: syn::parse::ParseStream) -> Result<Self> {
Ok(Self {
tag: input.parse::<kw::message_capacity>()?,
tag: input.parse::<T>()?,
colon_token: input.parse()?,
value: input.parse::<LitInt>()?.base10_parse::<usize>()?,
})
Expand All @@ -331,8 +342,10 @@ pub(crate) struct SubSystemAttrItems {
/// The message type being consumed by the subsystem.
pub(crate) consumes: Option<Consumes>,
pub(crate) sends: Option<Sends>,
/// Custom channel capacity
pub(crate) message_capacity: Option<MessageCapacity>,
/// Custom message channel capacity
pub(crate) message_capacity: Option<ChannelCapacity<kw::message_capacity>>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<ChannelCapacity<kw::signal_capacity>>,
}

impl Parse for SubSystemAttrItems {
Expand Down Expand Up @@ -373,8 +386,9 @@ impl Parse for SubSystemAttrItems {
let blocking = extract_variant!(unique, Blocking; default = false);
let wip = extract_variant!(unique, Wip; default = false);
let message_capacity = extract_variant!(unique, MessageChannelCapacity take );
let signal_capacity = extract_variant!(unique, SignalChannelCapacity take );

Ok(Self { blocking, wip, sends, consumes, message_capacity })
Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity })
}
}

Expand Down Expand Up @@ -507,7 +521,10 @@ impl OrchestraInfo {
.collect::<Vec<_>>()
}

pub(crate) fn channel_capacities_without_wip(&self, default_capacity: usize) -> Vec<LitInt> {
pub(crate) fn message_channel_capacities_without_wip(
&self,
default_capacity: usize,
) -> Vec<LitInt> {
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
Expand All @@ -520,6 +537,22 @@ impl OrchestraInfo {
.collect::<Vec<_>>()
}

pub(crate) fn signal_channel_capacities_without_wip(
&self,
default_capacity: usize,
) -> Vec<LitInt> {
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| {
LitInt::new(
&(ssf.signal_capacity.unwrap_or(default_capacity).to_string()),
ssf.signal_capacity.span(),
)
})
.collect::<Vec<_>>()
}

pub(crate) fn consumes_without_wip(&self) -> Vec<Path> {
self.subsystems
.iter()
Expand Down Expand Up @@ -601,8 +634,15 @@ impl OrchestraGuts {
}
unique_subsystem_idents.insert(generic.clone());

let SubSystemAttrItems { wip, blocking, consumes, sends, message_capacity, .. } =
subsystem_attrs;
let SubSystemAttrItems {
wip,
blocking,
consumes,
sends,
message_capacity,
signal_capacity,
..
} = subsystem_attrs;

// messages to be sent
let sends = if let Some(sends) = sends {
Expand All @@ -612,6 +652,7 @@ impl OrchestraGuts {
};
let consumes = consumes.map(|consumes| consumes.consumes);
let message_capacity = message_capacity.map(|capacity| capacity.value);
let signal_capacity = signal_capacity.map(|capacity| capacity.value);

subsystems.push(SubSysField {
name: ident,
Expand All @@ -621,6 +662,7 @@ impl OrchestraGuts {
wip,
blocking,
message_capacity,
signal_capacity,
});
} else {
// collect the "baggage"
Expand Down