Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(merkle-tree): Fix connection timeouts during tree pruning #2372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/node/consensus/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PayloadQueue {

/// Advances the cursor by converting the block into actions and pushing them
/// to the actions queue.
/// Does nothing and returns Ok() if the block has been already processed.
/// Does nothing and returns `Ok(())` if the block has been already processed.
/// Returns an error if a block with an earlier block number was expected.
pub(crate) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> {
let want = self.inner.next_l2_block;
Expand All @@ -53,7 +53,7 @@ impl PayloadQueue {
if block.number < want {
return Ok(());
}
self.actions.push_actions(self.inner.advance(block)).await;
self.actions.push_actions(self.inner.advance(block)).await?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ impl StateKeeper {
actions.push(FetchedTransaction::new(tx).into());
}
actions.push(SyncAction::SealL2Block);
self.actions_sender.push_actions(actions).await;
self.actions_sender.push_actions(actions).await.unwrap();
}

/// Pushes `SealBatch` command to the `StateKeeper`.
pub async fn seal_batch(&mut self) {
// Each batch ends with an empty block (aka fictive block).
let mut actions = vec![self.open_block()];
actions.push(SyncAction::SealBatch);
self.actions_sender.push_actions(actions).await;
self.actions_sender.push_actions(actions).await.unwrap();
self.batch_sealed = true;
}

Expand Down
49 changes: 29 additions & 20 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,30 @@ impl TreeUpdater {
/// is slow for whatever reason.
async fn process_multiple_batches(
&mut self,
storage: &mut Connection<'_, Core>,
pool: &ConnectionPool<Core>,
l1_batch_numbers: ops::RangeInclusive<u32>,
) -> anyhow::Result<L1BatchNumber> {
let tree_mode = self.tree.mode();
let start = Instant::now();
tracing::info!("Processing L1 batches #{l1_batch_numbers:?} in {tree_mode:?} mode");
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let first_l1_batch_number = L1BatchNumber(*l1_batch_numbers.start());
let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end());
let mut l1_batch_data = L1BatchWithLogs::new(storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
let mut l1_batch_data =
L1BatchWithLogs::new(&mut storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
drop(storage);

let mut total_logs = 0;
let mut updated_headers = vec![];
for l1_batch_number in l1_batch_numbers {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let l1_batch_number = L1BatchNumber(l1_batch_number);
let Some(current_l1_batch_data) = l1_batch_data else {
Self::ensure_not_pruned(storage, l1_batch_number).await?;
Self::ensure_not_pruned(&mut storage, l1_batch_number).await?;
return Ok(l1_batch_number);
};
total_logs += current_l1_batch_data.storage_logs.len();
Expand All @@ -116,13 +120,14 @@ impl TreeUpdater {
let load_next_l1_batch_task = async {
if l1_batch_number < last_l1_batch_number {
let next_l1_batch_number = l1_batch_number + 1;
L1BatchWithLogs::new(storage, next_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!(
"failed fetching tree input for L1 batch #{next_l1_batch_number}"
)
})
let batch_result =
L1BatchWithLogs::new(&mut storage, next_l1_batch_number, tree_mode).await;
// Drop storage at the earliest possible moment so that it doesn't block logic running concurrently,
// such as tree pruning.
drop(storage);
batch_result.with_context(|| {
format!("failed fetching tree input for L1 batch #{next_l1_batch_number}")
})
} else {
Ok(None) // Don't need to load the next L1 batch after the last one we're processing.
}
Expand All @@ -135,11 +140,12 @@ impl TreeUpdater {
hash: metadata.root_hash,
rollup_last_leaf_index: metadata.rollup_last_leaf_index,
};

let mut storage = pool.connection_tagged("metadata_calculator").await?;
storage
.blocks_dal()
.save_l1_batch_tree_data(l1_batch_number, &tree_data)
.await
.context("failed saving tree data")?;
.await?;
// ^ Note that `save_l1_batch_tree_data()` will not blindly overwrite changes if L1 batch
// metadata already exists; instead, it'll check that the old and new metadata match.
// That is, if we run multiple tree instances, we'll get metadata correspondence
Expand All @@ -156,6 +162,7 @@ impl TreeUpdater {
.insert_proof_generation_details(l1_batch_number, object_key)
.await?;
}
drop(storage);
save_postgres_latency.observe();
tracing::info!("Updated metadata for L1 batch #{l1_batch_number} in Postgres");

Expand Down Expand Up @@ -187,9 +194,10 @@ impl TreeUpdater {

async fn step(
&mut self,
mut storage: Connection<'_, Core>,
pool: &ConnectionPool<Core>,
next_l1_batch_to_process: &mut L1BatchNumber,
) -> anyhow::Result<()> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight
|| self.sealed_batches_have_protective_reads
{
Expand All @@ -210,6 +218,8 @@ impl TreeUpdater {
.await
.context("failed loading latest L1 batch number with protective reads")?
};
drop(storage);

let last_requested_l1_batch =
next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1;
let last_requested_l1_batch =
Expand All @@ -222,7 +232,7 @@ impl TreeUpdater {
} else {
tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}");
*next_l1_batch_to_process = self
.process_multiple_batches(&mut storage, l1_batch_numbers)
.process_multiple_batches(pool, l1_batch_numbers)
.await?;
}
Ok(())
Expand All @@ -248,10 +258,9 @@ impl TreeUpdater {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
break;
}
let storage = pool.connection_tagged("metadata_calculator").await?;

let snapshot = *next_l1_batch_to_process;
self.step(storage, &mut next_l1_batch_to_process).await?;
self.step(pool, &mut next_l1_batch_to_process).await?;
let delay = if snapshot == *next_l1_batch_to_process {
tracing::trace!(
"Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \
Expand Down
39 changes: 22 additions & 17 deletions core/node/node_sync/src/sync_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,30 @@ impl ActionQueueSender {
/// Requires that the actions are in the correct order: starts with a new open L1 batch / L2 block,
/// followed by 0 or more transactions, have mandatory `SealL2Block` and optional `SealBatch` at the end.
/// Would panic if the order is incorrect.
pub async fn push_actions(&self, actions: Vec<SyncAction>) {
Self::check_action_sequence(&actions).unwrap();
///
/// # Errors
///
/// Errors correspond to incorrect action order, or to `ExternalIO` instance that the queue is connected to shutting down.
/// Hence, returned errors must be treated as unrecoverable by the caller; it is unsound to continue
/// operating a node if some of the `actions` may be lost.
pub async fn push_actions(&self, actions: Vec<SyncAction>) -> anyhow::Result<()> {
Self::check_action_sequence(&actions)?;
for action in actions {
self.0.send(action).await.expect("EN sync logic panicked");
self.0
.send(action)
.await
.map_err(|_| anyhow::anyhow!("node action processor stopped"))?;
QUEUE_METRICS
.action_queue_size
.set(self.0.max_capacity() - self.0.capacity());
}
Ok(())
}

/// Checks whether the action sequence is valid.
/// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable
/// error. This function itself does not panic for the ease of testing.
fn check_action_sequence(actions: &[SyncAction]) -> Result<(), String> {
fn check_action_sequence(actions: &[SyncAction]) -> anyhow::Result<()> {
// Rules for the sequence:
// 1. Must start with either `OpenBatch` or `L2Block`, both of which may be met only once.
// 2. Followed by a sequence of `Tx` actions which consists of 0 or more elements.
Expand All @@ -38,27 +48,22 @@ impl ActionQueueSender {
for action in actions {
match action {
SyncAction::OpenBatch { .. } | SyncAction::L2Block { .. } => {
if opened {
return Err(format!("Unexpected OpenBatch / L2Block: {actions:?}"));
}
anyhow::ensure!(!opened, "Unexpected OpenBatch / L2Block: {actions:?}");
opened = true;
}
SyncAction::Tx(_) => {
if !opened || l2_block_sealed {
return Err(format!("Unexpected Tx: {actions:?}"));
}
anyhow::ensure!(opened && !l2_block_sealed, "Unexpected Tx: {actions:?}");
}
SyncAction::SealL2Block | SyncAction::SealBatch => {
if !opened || l2_block_sealed {
return Err(format!("Unexpected SealL2Block / SealBatch: {actions:?}"));
}
anyhow::ensure!(
opened && !l2_block_sealed,
"Unexpected SealL2Block / SealBatch: {actions:?}"
);
l2_block_sealed = true;
}
}
}
if !l2_block_sealed {
return Err(format!("Incomplete sequence: {actions:?}"));
}
anyhow::ensure!(l2_block_sealed, "Incomplete sequence: {actions:?}");
Ok(())
}
}
Expand Down Expand Up @@ -287,7 +292,7 @@ mod tests {
panic!("Invalid sequence passed the test. Sequence #{idx}, expected error: {expected_err}");
};
assert!(
err.starts_with(expected_err),
err.to_string().contains(expected_err),
"Sequence #{idx} failed. Expected error: {expected_err}, got: {err}"
);
}
Expand Down
31 changes: 23 additions & 8 deletions core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn external_io_basics(snapshot_recovery: bool) {
&[&extract_tx_hashes(&actions)],
)
.await;
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
// Wait until the L2 block is sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 1)
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn external_io_works_without_local_protocol_version(snapshot_recovery: boo
&[&extract_tx_hashes(&actions)],
)
.await;
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
// Wait until the L2 block is sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 1)
Expand Down Expand Up @@ -407,8 +407,14 @@ pub(super) async fn run_state_keeper_with_multiple_l2_blocks(
let (actions_sender, action_queue) = ActionQueue::new();
let client = MockMainNodeClient::default();
let state_keeper = StateKeeperHandles::new(pool, client, action_queue, &[&tx_hashes]).await;
actions_sender.push_actions(first_l2_block_actions).await;
actions_sender.push_actions(second_l2_block_actions).await;
actions_sender
.push_actions(first_l2_block_actions)
.await
.unwrap();
actions_sender
.push_actions(second_l2_block_actions)
.await
.unwrap();
// Wait until both L2 blocks are sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 2)
Expand Down Expand Up @@ -490,7 +496,7 @@ async fn test_external_io_recovery(
number: snapshot.l2_block_number + 3,
};
let actions = vec![open_l2_block, new_tx.into(), SyncAction::SealL2Block];
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 3)
.await;
Expand Down Expand Up @@ -580,9 +586,18 @@ pub(super) async fn run_state_keeper_with_multiple_l1_batches(
&[&[first_tx_hash], &[second_tx_hash]],
)
.await;
actions_sender.push_actions(first_l1_batch_actions).await;
actions_sender.push_actions(fictive_l2_block_actions).await;
actions_sender.push_actions(second_l1_batch_actions).await;
actions_sender
.push_actions(first_l1_batch_actions)
.await
.unwrap();
actions_sender
.push_actions(fictive_l2_block_actions)
.await
.unwrap();
actions_sender
.push_actions(second_l1_batch_actions)
.await
.unwrap();

let hash_task = tokio::spawn(mock_l1_batch_hash_computation(
pool.clone(),
Expand Down
Loading