Skip to content

Commit

Permalink
feat: subscribe to finalized and safe headers (#9402)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
greged93 and mattsse authored Jul 15, 2024
1 parent b1da0f7 commit e140421
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/storage/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ metrics.workspace = true

# misc
auto_impl.workspace = true
derive_more.workspace = true
itertools.workspace = true
pin-project.workspace = true
parking_lot.workspace = true
Expand Down
46 changes: 44 additions & 2 deletions crates/storage/provider/src/traits/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use crate::{BlockReceipts, Chain};
use auto_impl::auto_impl;
use reth_primitives::SealedBlockWithSenders;
use derive_more::{Deref, DerefMut};
use reth_primitives::{SealedBlockWithSenders, SealedHeader};
use std::{
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -34,7 +35,7 @@ pub trait CanonStateSubscriptions: Send + Sync {
}
}

/// A Stream of [CanonStateNotification].
/// A Stream of [`CanonStateNotification`].
#[derive(Debug)]
#[pin_project::pin_project]
pub struct CanonStateNotificationStream {
Expand Down Expand Up @@ -139,3 +140,44 @@ impl CanonStateNotification {
receipts
}
}

/// Wrapper around a broadcast receiver that receives fork choice notifications.
#[derive(Debug, Deref, DerefMut)]
pub struct ForkChoiceNotifications(broadcast::Receiver<SealedHeader>);

/// A trait that allows to register to fork choice related events
/// and get notified when a new fork choice is available.
pub trait ForkChoiceSubscriptions: Send + Sync {
/// Get notified when a new head of the chain is selected.
fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications;

/// Convenience method to get a stream of the new head of the chain.
fn fork_choice_stream(&self) -> ForkChoiceStream {
ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) }
}
}

/// A stream of the fork choices in the form of [`SealedHeader`].
#[derive(Debug)]
#[pin_project::pin_project]
pub struct ForkChoiceStream {
#[pin]
st: BroadcastStream<SealedHeader>,
}

impl Stream for ForkChoiceStream {
type Item = SealedHeader;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(notification)) => Poll::Ready(Some(notification)),
Some(Err(err)) => {
debug!(%err, "finalized header notification stream lagging behind");
continue
}
None => Poll::Ready(None),
};
}
}
}
3 changes: 2 additions & 1 deletion crates/storage/provider/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub use state::StateWriter;
mod chain;
pub use chain::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
};

mod spec;
Expand Down

0 comments on commit e140421

Please sign in to comment.