Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: bump Reth and use new notifications API #28

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 128 additions & 125 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backfill/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<Node: FullNodeComponents> BackfillExEx<Node> {
loop {
tokio::select! {
Some(notification) = self.ctx.notifications.next() => {
self.handle_notification(notification).await?;
self.handle_notification(notification?).await?;
}
Some(message) = self.backfill_rx.recv() => {
self.handle_backfill_message(message).await;
Expand Down
4 changes: 2 additions & 2 deletions discv5/src/exex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::Result;
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info};
Expand Down Expand Up @@ -45,7 +45,7 @@ impl<Node: FullNodeComponents> Future for ExEx<Node> {

// Continuously poll the ExExContext notifications
loop {
if let Some(notification) = ready!(self.exex.notifications.poll_next_unpin(cx)) {
if let Some(notification) = ready!(self.exex.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
4 changes: 2 additions & 2 deletions in-memory-state/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(unused_crate_dependencies)]

use futures_util::StreamExt;
use futures_util::{FutureExt, TryStreamExt};
use reth_execution_types::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
4 changes: 2 additions & 2 deletions op-bridge/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy_primitives::{address, Address};
use alloy_sol_types::{sol, SolEventInterface};
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
connection: Connection,
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.next().await {
while let Some(notification) = ctx.notifications.try_next().await? {
// Revert all deposits and withdrawals
if let Some(reverted_chain) = notification.reverted_chain() {
let events = decode_chain_into_events(&reverted_chain);
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/exex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::Result;
use futures::{Future, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth::providers::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -28,7 +28,7 @@ impl<Node: FullNodeComponents> Future for ExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Continuously poll the ExExContext notifications
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
2 changes: 1 addition & 1 deletion remote/bin/exex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: broadcast::Sender<ExExNotification>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.next().await {
while let Some(notification) = ctx.notifications.try_next().await? {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
Expand Down
4 changes: 2 additions & 2 deletions rollup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy_primitives::{address, Address, U256};
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use execution::execute_block;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use once_cell::sync::Lazy;
use reth_chainspec::{ChainSpec, ChainSpecBuilder};
use reth_execution_types::Chain;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<Node: FullNodeComponents> Rollup<Node> {

async fn start(mut self) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = self.ctx.notifications.next().await {
while let Some(notification) = self.ctx.notifications.try_next().await? {
if let Some(reverted_chain) = notification.reverted_chain() {
self.revert(&reverted_chain)?;
}
Expand Down
Loading