-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: subscribe to finalized and safe headers #9402
feat: subscribe to finalized and safe headers #9402
Conversation
|
||
/// A type that allows to register finalized block related event subscriptions | ||
/// and get notified when a new finalized block header is available. | ||
pub trait FinalizedBlockHeaderSubscription: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's combine safe and finalized into one trait
maybe we can come up with a name that isn't also tied to headers, essentially these are derived from forkchoice, so perhaps Forkchoice(Block)Subscriptions
/// and get notified when a new finalized block header is available. | ||
pub trait FinalizedBlockHeaderSubscription: Send + Sync { | ||
/// Get notified when a new finalized block header is available. | ||
fn subscribe_to_finalized_header(&self) -> broadcast::Receiver<SealedHeader>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to roll a standalone wrapper type for this, because perhaps we want to put some more features into this,
I know we didn't do this for CanonStateNotifications
but we should have
7534153
to
f13dbbc
Compare
@@ -139,3 +139,44 @@ impl CanonStateNotification { | |||
receipts | |||
} | |||
} | |||
|
|||
/// Wrapper around a broadcast receiver that receives fork choice notifications. | |||
#[derive(Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please derive derive_more::Deref
and derive_more::DerefMut
for wrapper types
Some(Err(err)) => { | ||
debug!(%err, "finalized header notification stream lagging behind"); | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some(Err(err)) => { | |
debug!(%err, "finalized header notification stream lagging behind"); | |
continue | |
} | |
Some(Err(err)) => { | |
debug!(target: "provider::fcu", | |
%err, | |
"finalized header notification stream lagging behind" | |
); | |
cx.waker().wake_by_ref(); | |
return Poll::Pending | |
} |
if we don't return the error here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I also modify CanonStateNotificationStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushed without for now, if needed I can make a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, please modify CanonStateNotificationStream
in a new PR, you can ref this comment in the PR description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
951b89a
to
278a19f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last nit
return match ready!(self.as_mut().project().st.poll_next(cx)) { | ||
Some(Ok(notification)) => Poll::Ready(Some(notification)), | ||
Some(Err(err)) => { | ||
debug!(target: "provider::fcu", | ||
%err, | ||
"finalized header notification stream lagging behind" | ||
); | ||
|
||
cx.waker().wake_by_ref(); | ||
Poll::Pending | ||
} | ||
None => Poll::Ready(None), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in a loop
then we don't need the manual waker call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made this update from a comment from @emhane, afaiu it's better to remove the loop, call the waker and yield to the executor no? can't you otherwise end up with the stream taking up excessive resources if it keeps erroring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly, it can lead to starvation (for no need)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the loop, call the waker and yield to the executor no?
this is effectively a loop with extra steps
return match ready!(self.as_mut().project().st.poll_next(cx)) { | ||
Some(Ok(notification)) => Poll::Ready(Some(notification)), | ||
Some(Err(err)) => { | ||
debug!(target: "provider::fcu", | ||
%err, | ||
"finalized header notification stream lagging behind" | ||
); | ||
|
||
cx.waker().wake_by_ref(); | ||
Poll::Pending | ||
} | ||
None => Poll::Ready(None), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly, it can lead to starvation (for no need)
@@ -139,3 +139,47 @@ impl CanonStateNotification { | |||
receipts | |||
} | |||
} | |||
|
|||
/// Wrapper around a broadcast receiver that receives fork choice notifications. | |||
#[derive(Debug, derive_more::Deref, derive_more::DerefMut)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deref
and DerefMut
can be brought into scope in the prelude
Resolves #9269.