Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): commit notifications to WAL before sending to ExExes #11354

Merged
merged 38 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ba0c655
feat(exex): commit notifications to WAL before sending to ExExes
shekhirin Sep 30, 2024
4d4e172
Merge remote-tracking branch 'origin/main' into alexey/exex-wal-commit
shekhirin Oct 1, 2024
806d133
report file path when reading fails
shekhirin Oct 1, 2024
c1fe123
add path to notification decode error
shekhirin Oct 1, 2024
989700b
revertme: use serde json again
shekhirin Oct 1, 2024
b311dd7
feat(trie): deserialize trie updates with serde as hex
shekhirin Oct 1, 2024
0444a48
Merge remote-tracking branch 'origin/alexey/trie-updates-serde' into …
shekhirin Oct 1, 2024
1d1f00f
increment next file id counter
shekhirin Oct 1, 2024
3da5cb3
add test
shekhirin Oct 1, 2024
4513124
Merge remote-tracking branch 'origin/alexey/exex-wal-storage-next-id'…
shekhirin Oct 1, 2024
9c3a2a4
use u32
shekhirin Oct 1, 2024
169235d
Merge remote-tracking branch 'origin/alexey/exex-wal-storage-next-id'…
shekhirin Oct 1, 2024
168a4a1
fix tracing targets
shekhirin Oct 1, 2024
bb08f5f
format with to string
shekhirin Oct 1, 2024
c62c64d
account for finalized header when finalizing the wal
shekhirin Oct 1, 2024
e028b70
feat(chain-state): notify about new safe/finalized only if modified
shekhirin Oct 1, 2024
54e57d3
Merge remote-tracking branch 'origin/alexey/chain-info-tracker-send-i…
shekhirin Oct 1, 2024
14ef5d4
do not crash when can't find canonical notification in WAL
shekhirin Oct 1, 2024
e3741f3
Revert "do not crash when can't find canonical notification in WAL"
shekhirin Oct 1, 2024
1ae103b
fix(exex): check exex head against node head to determine canonical
shekhirin Oct 2, 2024
3cfd0ba
Merge remote-tracking branch 'origin/alexey/exex-notifications-check-…
shekhirin Oct 2, 2024
71d2322
Merge remote-tracking branch 'origin/main' into alexey/exex-wal-commit
shekhirin Oct 2, 2024
f7c1be1
Revert "revertme: use serde json again"
shekhirin Oct 2, 2024
3b3e098
report notification id in not found error
shekhirin Oct 2, 2024
59c8da3
Merge remote-tracking branch 'origin/main' into alexey/exex-wal-commit
shekhirin Oct 2, 2024
0933f5e
Merge remote-tracking branch 'origin/main' into alexey/exex-wal-commit
shekhirin Oct 2, 2024
f2569dc
test that we commit to wal
shekhirin Oct 2, 2024
5eaaeb3
return instrument macro to remove notification
shekhirin Oct 2, 2024
6b191aa
no format_with pls
shekhirin Oct 2, 2024
35ac702
Revert "no format_with pls"
shekhirin Oct 2, 2024
cab49b9
explain format_with
shekhirin Oct 2, 2024
fbc4b43
remove unneeded log
shekhirin Oct 2, 2024
a271501
feat(exex): WAL metrics
shekhirin Oct 2, 2024
72f767e
Merge remote-tracking branch 'origin/alexey/exex-wal-metrics' into al…
shekhirin Oct 2, 2024
44e70e2
fix lowest/height metric
shekhirin Oct 2, 2024
cf19ae1
fix lowest/height metric
shekhirin Oct 2, 2024
9420e87
Merge remote-tracking branch 'origin/alexey/exex-wal-metrics' into al…
shekhirin Oct 2, 2024
743b0ac
Merge remote-tracking branch 'origin/main' into alexey/exex-wal-commit
shekhirin Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl ExExHandle {
// I.e., the ExEx has already processed the notification.
if finished_height.number >= new.tip().number {
debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
?finished_height,
Expand All @@ -135,6 +136,7 @@ impl ExExHandle {
}

debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Reserving slot for notification"
Expand All @@ -145,6 +147,7 @@ impl ExExHandle {
}

debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Sending notification"
Expand All @@ -162,7 +165,7 @@ impl ExExHandle {

/// Metrics for the `ExEx` manager.
#[derive(Metrics)]
#[metrics(scope = "exex_manager")]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
/// Max size of the internal state notifications buffer.
max_capacity: Gauge,
Expand Down Expand Up @@ -327,7 +330,7 @@ where
/// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
/// necessary.
fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
debug!(header = ?finalized_header.num_hash(), "Received finalized header");
debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");

// Check if all ExExes are on the canonical chain
let exex_finished_heights = self
Expand Down Expand Up @@ -368,9 +371,13 @@ where
is_canonical.not().then_some((exex_id, num_hash))
})
.format_with(", ", |(exex_id, num_hash), f| {
f(&format_args!("{exex_id:?} = {num_hash:?}"))
});
f(&format_args!("{exex_id} = {num_hash:?}"))
})
// We need this because `debug!` uses the argument twice when formatting the final
// log message, but the result of `format_with` can only be used once
.to_string();
debug!(
target: "exex::manager",
%unfinalized_exexes,
"Not all ExExes are on the canonical chain, can't finalize the WAL"
);
Expand Down Expand Up @@ -403,7 +410,7 @@ where
// Handle incoming ExEx events
for exex in &mut this.exex_handles {
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
debug!(exex_id = %exex.id, ?event, "Received event from ExEx");
debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
exex.metrics.events_sent_total.increment(1);
match event {
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
Expand All @@ -424,10 +431,12 @@ where
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
target: "exex::manager",
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
this.wal.commit(&notification)?;
this.push_notification(notification);
continue
}
Expand Down Expand Up @@ -459,7 +468,7 @@ where
}

// Remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
this.buffer.retain(|&(id, _)| id >= min_id);
this.min_id = min_id;

Expand Down Expand Up @@ -602,7 +611,7 @@ mod tests {
use super::*;
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use rand::Rng;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
Expand Down Expand Up @@ -1121,7 +1130,7 @@ mod tests {
}

#[tokio::test]
async fn test_exex_wal_finalize() -> eyre::Result<()> {
async fn test_exex_wal() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let mut rng = generators::rng();
Expand All @@ -1141,12 +1150,11 @@ mod tests {
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
wal.commit(&notification)?;

let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);

let (exex_handle, events_tx, _) =
let (exex_handle, events_tx, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let mut exex_manager = std::pin::pin!(ExExManager::new(
Expand All @@ -1159,7 +1167,13 @@ mod tests {

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
exex_manager.handle().send(notification.clone())?;

assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.next().poll_unpin(&mut cx),
Poll::Ready(Some(notification.clone()))
);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
Expand Down
6 changes: 2 additions & 4 deletions crates/exex/exex/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,19 @@ where
/// - ExEx is at the same block number as the node head (`node_head.number ==
/// exex_head.number`). Nothing to do.
fn check_backfill(&mut self) -> eyre::Result<()> {
debug!(target: "exex::manager", "Synchronizing ExEx head");

let backfill_job_factory =
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
match self.exex_head.block.number.cmp(&self.node_head.number) {
std::cmp::Ordering::Less => {
// ExEx is behind the node head, start backfill
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill");
debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
let backfill = backfill_job_factory
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
.into_stream();
self.backfill_job = Some(backfill);
}
std::cmp::Ordering::Equal => {
debug!(target: "exex::manager", "ExEx is at the node head");
debug!(target: "exex::notifications", "ExEx is at the node head");
}
std::cmp::Ordering::Greater => {
return Err(eyre::eyre!("ExEx is ahead of the node head"))
Expand Down
48 changes: 41 additions & 7 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification;
#[derive(Debug, Default)]
pub struct BlockCache {
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
///
/// Contains one highest block in notification. In a notification with both committed and
/// reverted chain, the highest block is chosen between both chains.
pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
/// Block height of the lowest committed block currently in the cache.
pub(super) lowest_committed_block_height: Option<BlockNumber>,
/// Block height of the highest committed block currently in the cache.
pub(super) highest_committed_block_height: Option<BlockNumber>,
}

impl BlockCache {
/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.blocks.is_empty()
self.notification_max_blocks.is_empty()
}

/// Removes all files from the cache that has notifications with a tip block less than or equal
Expand All @@ -37,17 +44,37 @@ impl BlockCache {
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
let mut file_ids = HashSet::default();

while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
while let Some(block @ Reverse((max_block, file_id))) =
self.notification_max_blocks.peek().copied()
{
if max_block <= block_number {
let popped_block = self.blocks.pop().unwrap();
let popped_block = self.notification_max_blocks.pop().unwrap();
debug_assert_eq!(popped_block, block);
file_ids.insert(file_id);
} else {
break
}
}

self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
self.committed_blocks.retain(|_, (file_id, block)| {
let retain = !file_ids.contains(file_id);

if retain {
lowest_committed_block_height = Some(
lowest_committed_block_height
.map_or(block.block.number, |lowest| block.block.number.min(lowest)),
);
highest_committed_block_height = Some(
highest_committed_block_height
.map_or(block.block.number, |highest| block.block.number.max(highest)),
);
}

retain
});
self.lowest_committed_block_height = lowest_committed_block_height;
self.highest_committed_block_height = highest_committed_block_height;

file_ids
}
Expand All @@ -70,7 +97,7 @@ impl BlockCache {
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
self.notification_max_blocks.push(Reverse((max_block, file_id)));
}

if let Some(committed_chain) = &committed_chain {
Expand All @@ -81,12 +108,19 @@ impl BlockCache {
};
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}

self.highest_committed_block_height = Some(committed_chain.tip().number);
}
}

#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
self.notification_max_blocks
.clone()
.into_sorted_vec()
.into_iter()
.map(|entry| entry.0)
.collect()
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions crates/exex/exex/src/wal/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use metrics::Gauge;
use reth_metrics::Metrics;

/// Metrics for the [WAL](`super::Wal`)
#[derive(Metrics)]
#[metrics(scope = "exex.wal")]
pub(super) struct Metrics {
/// Size of all notifications in WAL in bytes
pub size_bytes: Gauge,
/// Total number of notifications in WAL
pub notifications_total: Gauge,
/// Total number of committed blocks in WAL
pub committed_blocks_total: Gauge,
/// Lowest committed block height in WAL
pub lowest_committed_block_height: Gauge,
/// Highest committed block height in WAL
pub highest_committed_block_height: Gauge,
}
Loading
Loading