Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): write ahead log #10995

Merged
merged 39 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
213bcea
feat(exex): write ahead log
shekhirin Sep 18, 2024
2381ea8
more comments, minor improvements
shekhirin Sep 18, 2024
aaa11a4
do not destroy old file on finalization until the very end
shekhirin Sep 19, 2024
d0a666d
more comments
shekhirin Sep 19, 2024
d537c4d
deserialize into notification
shekhirin Sep 19, 2024
b0965a0
include serde feature
shekhirin Sep 19, 2024
c809d89
spell
shekhirin Sep 19, 2024
d45da30
clarify why cache
shekhirin Sep 19, 2024
9d4a450
logggggggs
shekhirin Sep 19, 2024
75f2572
more comments (i need to start writing tests)
shekhirin Sep 19, 2024
117afef
oof a lot of changes but i fixed the format and added tests
shekhirin Sep 19, 2024
b7c71d9
add dedicated functions for read/write
shekhirin Sep 19, 2024
992d5ce
reorganize tests
shekhirin Sep 19, 2024
a7f3173
finalization test
shekhirin Sep 19, 2024
7a1603c
dedup file creation
shekhirin Sep 19, 2024
857ccc1
fill the block cache on rollback, also test rollback
shekhirin Sep 19, 2024
8272b85
remove unused deps
shekhirin Sep 19, 2024
ce63845
add comments to tests
shekhirin Sep 19, 2024
bee7beb
adjust the default tx count for random block
shekhirin Sep 19, 2024
b05bb87
separate wal module
shekhirin Sep 20, 2024
68ce3b4
revert two unrelated changes
shekhirin Sep 20, 2024
d80c196
add a test for eviction
shekhirin Sep 20, 2024
fce46d1
fix read/write in finalize
shekhirin Sep 20, 2024
2f390f0
move storage to a separate module
shekhirin Sep 20, 2024
fa17c65
comments
shekhirin Sep 20, 2024
23ca02c
WAL file -> storage
shekhirin Sep 20, 2024
1b79e58
oops forgot to add storage.rs
shekhirin Sep 20, 2024
108981f
return blocks on revert
shekhirin Sep 20, 2024
10c8a11
wip rework
shekhirin Sep 20, 2024
3f082c7
okay tests pass
shekhirin Sep 23, 2024
165ebbc
improve comments
shekhirin Sep 23, 2024
35b44fc
more comments improvements, renaming of offset to file id
shekhirin Sep 23, 2024
93cfa09
selector -> range
shekhirin Sep 23, 2024
92bee6c
delete notification from cache in one go
shekhirin Sep 23, 2024
6afd34e
explain why cache is needed
shekhirin Sep 23, 2024
fc72477
do not read and decode notifications on finalization
shekhirin Sep 23, 2024
34f49c6
adjust doc comment
shekhirin Sep 23, 2024
ba279f3
remove min/max id from storage
shekhirin Sep 23, 2024
bf5cd2b
use serde_json
shekhirin Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true
reth-exex-types.workspace = true
reth-exex-types = { workspace = true, features = ["serde"] }
reth-fs-util.workspace = true
reth-metrics.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand All @@ -38,6 +39,8 @@ tokio.workspace = true
## misc
eyre.workspace = true
metrics.workspace = true
rmp-serde = "1.3"
tracing.workspace = true

[dev-dependencies]
reth-blockchain-tree.workspace = true
Expand All @@ -48,8 +51,10 @@ reth-node-api.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-trie.workspace = true

secp256k1.workspace = true
tempfile.workspace = true

[features]
default = []
Expand Down
2 changes: 2 additions & 0 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub use event::*;
mod manager;
pub use manager::*;

mod wal;

// Re-export exex types
#[doc(inline)]
pub use reth_exex_types::*;
34 changes: 16 additions & 18 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::sync::{
mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender},
watch,
};
use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
use tokio_util::sync::{PollSender, ReusableBoxFuture};

/// Metrics for an `ExEx`.
#[derive(Metrics)]
Expand Down Expand Up @@ -99,7 +99,7 @@ impl ExExHandle {
&mut self,
cx: &mut Context<'_>,
(notification_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
) -> Poll<eyre::Result<()>> {
if let Some(finished_height) = self.finished_height {
match notification {
ExExNotification::ChainCommitted { new } => {
Expand Down Expand Up @@ -132,24 +132,25 @@ impl ExExHandle {
%notification_id,
"Reserving slot for notification"
);
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => (),
other => return other,
}
ready!(self.sender.poll_reserve(cx)?);

// debug!(
// exex_id = %self.id,
// %notification_id,
// "Appending notification to WAL"
// );
// self.wal.commit(notification)?;

debug!(
exex_id = %self.id,
%notification_id,
"Sending notification"
);
match self.sender.send_item(notification.clone()) {
Ok(()) => {
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
}
self.sender.send_item(notification.clone())?;
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);

Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -647,10 +648,7 @@ impl Future for ExExManager {
.checked_sub(self.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = self.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// the channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
let _ = exex.send(cx, notification)?;
}
min_id = min_id.min(exex.next_notification_id);
self.exex_handles.push(exex);
Expand Down
Loading
Loading