diff --git a/.codespellrc b/.codespellrc index 3811b3c25..aa65dde4e 100644 --- a/.codespellrc +++ b/.codespellrc @@ -1,3 +1,3 @@ [codespell] skip = .git,target,Cargo.toml,Cargo.lock,mutants* -ignore-words-list = crate,ser +ignore-words-list = crate,ser,ment diff --git a/Cargo.lock b/Cargo.lock index 6b56499dd..1337e1f83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,6 +886,7 @@ dependencies = [ "digest 0.10.7", "num_enum", "sha2 0.10.8", + "tracing", ] [[package]] diff --git a/crates/btcio/src/reader/query.rs b/crates/btcio/src/reader/query.rs index 0b6ecabcb..23fcdfa82 100644 --- a/crates/btcio/src/reader/query.rs +++ b/crates/btcio/src/reader/query.rs @@ -22,20 +22,23 @@ fn filter_interesting_txs(block: &Block) -> Vec { /// State we use in various parts of the reader. #[derive(Debug)] struct ReaderState { - /// The highest block in the chain, at `.back()` of queue. - cur_height: u64, + /// The highest block in the chain, at `.back()` of queue + 1. + next_height: u64, /// The `.back()` of this should have the same height as cur_height. recent_blocks: VecDeque, + /// Depth at which we start pulling recent blocks out of the front of the queue. max_depth: usize, } impl ReaderState { - fn new(cur_height: u64, max_depth: usize, recent_blocks: VecDeque) -> Self { + /// Constructs a new reader state instance using some context about how we + /// want to manage it. + fn new(next_height: u64, max_depth: usize, recent_blocks: VecDeque) -> Self { assert!(!recent_blocks.is_empty()); Self { - cur_height, + next_height, max_depth, recent_blocks, } @@ -45,7 +48,17 @@ impl ReaderState { self.recent_blocks.back().unwrap() } - /// Accepts a new block and purges a buried one. + fn best_block_idx(&self) -> u64 { + self.next_height - 1 + } + + /// Returns the idx of the deepest block in the reader state. + #[allow(unused)] + fn deepest_block(&self) -> u64 { + self.best_block_idx() - self.recent_blocks.len() as u64 + 1 + } + + /// Accepts a new block and possibly purges a buried one. fn accept_new_block(&mut self, blkhash: BlockHash) -> Option { let ret = if self.recent_blocks.len() > self.max_depth { Some(self.recent_blocks.pop_front().unwrap()) @@ -54,14 +67,14 @@ impl ReaderState { }; self.recent_blocks.push_back(blkhash); - self.cur_height += 1; + self.next_height += 1; ret } - #[allow(unused)] /// Gets the blockhash of the given height, if we have it. + #[allow(unused)] pub fn get_height_blkid(&self, height: u64) -> Option<&BlockHash> { - if height > self.cur_height { + if height >= self.next_height { return None; } @@ -69,19 +82,14 @@ impl ReaderState { return None; } - let back_off = self.cur_height - height; - let idx = self.recent_blocks.len() as u64 - back_off - 1; - Some(&self.recent_blocks[idx as usize]) - } - #[allow(unused)] - fn deepest_block(&self) -> u64 { - self.cur_height - self.recent_blocks.len() as u64 - 1 + let off = height - self.deepest_block(); + Some(&self.recent_blocks[off as usize]) } fn revert_tip(&mut self) -> Option { if !self.recent_blocks.is_empty() { let back = self.recent_blocks.pop_back().unwrap(); - self.cur_height -= 1; + self.next_height -= 1; Some(back) } else { None @@ -89,11 +97,11 @@ impl ReaderState { } fn rollback_to_height(&mut self, new_height: u64) -> Vec { - if new_height > self.cur_height { + if new_height > self.next_height { panic!("reader: new height greater than cur height"); } - let rollback_cnt = self.cur_height - new_height; + let rollback_cnt = self.best_block_idx() - new_height; if rollback_cnt >= self.recent_blocks.len() as u64 { panic!("reader: tried to rollback past deepest block"); } @@ -106,7 +114,7 @@ impl ReaderState { // More sanity checks. assert!(!self.recent_blocks.is_empty()); - assert_eq!(self.cur_height, new_height); + assert_eq!(self.best_block_idx(), new_height); buf } @@ -114,18 +122,19 @@ impl ReaderState { /// Iterates over the blocks back from the tip, giving both the height and /// the blockhash to compare against the chain. fn iter_blocks_back(&self) -> impl Iterator { + let best_blk_idx = self.best_block_idx(); self.recent_blocks .iter() .rev() .enumerate() - .map(|(i, b)| (self.cur_height - i as u64, b)) + .map(move |(i, b)| (best_blk_idx - i as u64, b)) } } pub async fn bitcoin_data_reader_task( client: impl L1Client, event_tx: mpsc::Sender, - cur_block_height: u64, + target_next_block: u64, config: Arc, l1_status: Arc>, ) { @@ -133,7 +142,7 @@ pub async fn bitcoin_data_reader_task( if let Err(e) = do_reader_task( &client, &event_tx, - cur_block_height, + target_next_block, config, &mut status_updates, l1_status.clone(), @@ -147,17 +156,17 @@ pub async fn bitcoin_data_reader_task( async fn do_reader_task( client: &impl L1Client, event_tx: &mpsc::Sender, - cur_block_height: u64, + target_next_block: u64, config: Arc, status_updates: &mut Vec, l1_status: Arc>, ) -> anyhow::Result<()> { - info!(%cur_block_height, "started L1 reader task!"); + info!(%target_next_block, "started L1 reader task!"); let poll_dur = Duration::from_millis(config.client_poll_dur_ms as u64); let mut state = init_reader_state( - cur_block_height, + target_next_block, config.max_reorg_depth as usize * 2, client, ) @@ -168,14 +177,14 @@ async fn do_reader_task( // FIXME This function will return when reorg happens when there are not // enough elements in the vec deque, probably during startup. loop { - let cur_height = state.cur_height; - let poll_span = debug_span!("l1poll", %cur_height); + let cur_best_height = state.best_block_idx(); + let poll_span = debug_span!("l1poll", %cur_best_height); if let Err(err) = poll_for_new_blocks(client, event_tx, &config, &mut state, status_updates) .instrument(poll_span) .await { - warn!(%cur_height, err = %err, "failed to poll Bitcoin client"); + warn!(%cur_best_height, err = %err, "failed to poll Bitcoin client"); status_updates.push(StatusUpdate::RpcError(err.to_string())); if let Some(err) = err.downcast_ref::() { @@ -205,22 +214,23 @@ async fn do_reader_task( /// Inits the reader state by trying to backfill blocks up to a target height. async fn init_reader_state( - target_block: u64, + target_next_block: u64, lookback: usize, client: &impl L1Client, ) -> anyhow::Result { // Init the reader state using the blockid we were given, fill in a few blocks back. - debug!(%target_block, "initializing reader state"); + debug!(%target_next_block, "initializing reader state"); let mut init_queue = VecDeque::new(); // Do some math to figure out where our start and end are. + // TODO something screwed up with bookkeeping here let chain_info = client.get_blockchain_info().await?; - let start_height = i64::max(target_block as i64 - lookback as i64, 0) as u64; - let end_height = u64::min(target_block, chain_info.blocks); + let start_height = i64::max(target_next_block as i64 - lookback as i64, 0) as u64; + let end_height = u64::min(target_next_block - 1, chain_info.blocks); debug!(%start_height, %end_height, "queried L1 client, have init range"); - // Loop through the range we've determined to be okay and pull the blocks - // in. + // Loop through the range we've determined to be okay and pull the blocks we want to look back + // through in. let mut real_cur_height = start_height; for height in start_height..=end_height { let blkid = client.get_block_hash(height).await?; @@ -229,7 +239,7 @@ async fn init_reader_state( real_cur_height = height; } - let state = ReaderState::new(real_cur_height, lookback, init_queue); + let state = ReaderState::new(real_cur_height + 1, lookback, init_queue); Ok(state) } @@ -257,7 +267,7 @@ async fn poll_for_new_blocks( // First, check for a reorg if there is one. if let Some((pivot_height, pivot_blkid)) = find_pivot_block(client, state).await? { - if pivot_height < state.cur_height { + if pivot_height < state.best_block_idx() { info!(%pivot_height, %pivot_blkid, "found apparent reorg"); state.rollback_to_height(pivot_height); let revert_ev = L1Event::RevertTo(pivot_height); @@ -274,9 +284,9 @@ async fn poll_for_new_blocks( debug!(%client_height, "have new blocks"); // Now process each block we missed. - let scan_start_height = state.cur_height + 1; + let scan_start_height = state.next_height; for fetch_height in scan_start_height..=client_height { - let blkid = + let l1blkid = match fetch_and_process_block(fetch_height, client, event_tx, state, status_updates) .await { @@ -286,7 +296,7 @@ async fn poll_for_new_blocks( break; } }; - info!(%fetch_height, %blkid, "accepted new block"); + info!(%fetch_height, %l1blkid, "accepted new block"); } Ok(()) @@ -298,16 +308,16 @@ async fn find_pivot_block( client: &impl L1Client, state: &ReaderState, ) -> anyhow::Result> { - for (height, blkid) in state.iter_blocks_back() { + for (height, l1blkid) in state.iter_blocks_back() { // If at genesis, we can't reorg any farther. if height == 0 { - return Ok(Some((height, *blkid))); + return Ok(Some((height, *l1blkid))); } - let queried_blkid = client.get_block_hash(height).await?; - trace!(%height, %blkid, %queried_blkid, "comparing blocks to find pivot"); - if queried_blkid == *blkid { - return Ok(Some((height, *blkid))); + let queried_l1blkid = client.get_block_hash(height).await?; + trace!(%height, %l1blkid, %queried_l1blkid, "comparing blocks to find pivot"); + if queried_l1blkid == *l1blkid { + return Ok(Some((height, *l1blkid))); } } @@ -322,20 +332,22 @@ async fn fetch_and_process_block( status_updates: &mut Vec, ) -> anyhow::Result { let block = client.get_block_at(height).await?; + let txs = block.txdata.len(); let filtered_txs = filter_interesting_txs(&block); let block_data = BlockData::new(height, block, filtered_txs); - let blkid = block_data.block().block_hash(); + let l1blkid = block_data.block().block_hash(); + trace!(%l1blkid, %height, %txs, "fetched block from client"); status_updates.push(StatusUpdate::CurHeight(height)); - status_updates.push(StatusUpdate::CurTip(blkid.to_string())); + status_updates.push(StatusUpdate::CurTip(l1blkid.to_string())); if let Err(e) = event_tx.send(L1Event::BlockData(block_data)).await { error!("failed to submit L1 block event, did the persistence task crash?"); return Err(e.into()); } // Insert to new block, incrementing cur_height. - let _deep = state.accept_new_block(blkid); + let _deep = state.accept_new_block(l1blkid); - Ok(blkid) + Ok(l1blkid) } diff --git a/crates/consensus-logic/src/block_assembly.rs b/crates/consensus-logic/src/block_assembly.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/consensus-logic/src/block_assembly.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/consensus-logic/src/chain_transition.rs b/crates/consensus-logic/src/chain_transition.rs index a24ba274d..b0f573795 100644 --- a/crates/consensus-logic/src/chain_transition.rs +++ b/crates/consensus-logic/src/chain_transition.rs @@ -1,13 +1,157 @@ //! Top-level CL state transition logic. This is largely stubbed off now, but //! we'll replace components with real implementations as we go along. +#![allow(unused)] -use alpen_express_state::{block::L2Block, chain_state::ChainState, state_op::WriteBatch}; +use tracing::*; + +use alpen_express_primitives::params::RollupParams; +use alpen_express_state::{ + block::L1Segment, + exec_update, + l1::{self, L1MaturationEntry}, + prelude::*, + state_op::StateCache, + state_queue, +}; use crate::errors::TsnError; -/// Processes a block, producing a write batch for the block to produce a new -/// chainstate. -pub fn process_block(_state: &ChainState, _block: &L2Block) -> Result { - // TODO - Ok(WriteBatch::new_empty()) +/// Processes a block, making writes into the provided state cache that will +/// then be written to disk. This does not check the block's credentials, it +/// plays out all the updates a block makes to the chain, but it will abort if +/// there are any semantic issues that don't make sense. +/// +/// This operates on a state cache that's expected to be empty, panics +/// otherwise. Does not check the `state_root` in the header for correctness, +/// so that can be unset so it can be use during block assembly. +pub fn process_block( + state: &mut StateCache, + header: &impl L2Header, + body: &L2BlockBody, + params: &RollupParams, +) -> Result<(), TsnError> { + // We want to fail quickly here because otherwise we don't know what's + // happening. + if !state.is_empty() { + panic!("transition: state cache not fresh"); + } + + // Update basic bookkeeping. + state.set_slot(header.blockidx()); + + // Go through each stage and play out the operations it has. + process_l1_view_update(state, body.l1_segment(), params)?; + process_pending_withdrawals(state)?; + process_execution_update(state, body.exec_segment().update())?; + + Ok(()) +} + +fn process_pending_withdrawals(state: &mut StateCache) -> Result<(), TsnError> { + // TODO if there's enough to fill a batch, package it up and pick a deposit + // to dispatch it to + Ok(()) +} + +/// Process an execution update, to change an exec env state. +/// +/// This is meant to be kinda generic so we can reuse it across multiple exec +/// envs if we decide to go in that direction. +fn process_execution_update( + state: &mut StateCache, + update: &exec_update::ExecUpdate, +) -> Result<(), TsnError> { + // TODO release anything that we need to + Ok(()) +} + +/// Update our view of the L1 state, playing out downstream changes from that. +fn process_l1_view_update( + state: &mut StateCache, + l1seg: &L1Segment, + params: &RollupParams, +) -> Result<(), TsnError> { + let l1v = state.state().l1_view(); + + // Accept new blocks, comparing the tip against the current to figure out if + // we need to do a reorg. + // FIXME this should actually check PoW, it just does it based on block heights + if !l1seg.new_payloads().is_empty() { + println!("new payloads {:?}", l1seg.new_payloads()); + + // Validate the new blocks actually extend the tip. This is what we have to tweak to make + // more complicated to check the PoW. + let new_tip_block = l1seg.new_payloads().last().unwrap(); + let new_tip_height = new_tip_block.idx(); + let first_new_block_height = new_tip_height - l1seg.new_payloads().len() as u64 + 1; + let implied_pivot_height = new_tip_height - l1seg.new_payloads().len() as u64; + let cur_tip_height = l1v.tip_height(); + + // Check that the new chain is actually longer, if it's shorter then we didn't do anything. + // TODO This probably needs to be adjusted for PoW. + if new_tip_block.idx() <= cur_tip_height { + return Err(TsnError::L1SegNotExtend); + } + + // Now make sure that the block hashes all connect up sensibly. + let pivot_idx = implied_pivot_height; + let pivot_blkid = l1v + .maturation_queue() + .get_absolute(pivot_idx) + .map(|b| b.blkid()) + .unwrap_or_else(|| l1v.safe_block().blkid()); + check_chain_integrity(pivot_idx, pivot_blkid, l1seg.new_payloads())?; + + // Okay now that we've figured that out, let's actually how to actually do the reorg. + if pivot_idx > params.horizon_l1_height && pivot_idx < cur_tip_height { + state.revert_l1_view_to(pivot_idx); + } + + for e in l1seg.new_payloads() { + let ment = L1MaturationEntry::from(e.clone()); + state.apply_l1_block_entry(ment); + } + } + + // TODO accept sufficiently buried blocks (triggering deposit creation or whatever), etc. + + Ok(()) +} + +/// Checks the attested block IDs and parent blkid connections in new blocks. +// TODO unit tests +fn check_chain_integrity( + pivot_idx: u64, + pivot_blkid: &L1BlockId, + new_blocks: &[l1::L1HeaderPayload], +) -> Result<(), TsnError> { + // Iterate over all the blocks in the new list and make sure they match. + for (i, e) in new_blocks.iter().enumerate() { + let h = e.idx(); + assert_eq!(pivot_idx + 1 + i as u64, h); + + // Make sure the hash matches. + let computed_id = L1BlockId::compute_from_header_buf(e.header_buf()); + let attested_id = e.record().blkid(); + if computed_id != *attested_id { + return Err(TsnError::L1BlockIdMismatch(h, *attested_id, computed_id)); + } + + // Make sure matches parent. + // TODO FIXME I think my impl for parent_blkid is incorrect, fix this later + /*let blk_parent = e.record().parent_blkid(); + if i == 0 { + if blk_parent != *pivot_blkid { + return Err(TsnError::L1BlockParentMismatch(h, blk_parent, *pivot_blkid)); + } + } else { + let parent_payload = &new_blocks[i - 1]; + let parent_id = parent_payload.record().blkid(); + if blk_parent != *parent_id { + return Err(TsnError::L1BlockParentMismatch(h, blk_parent, *parent_id)); + } + }*/ + } + + Ok(()) } diff --git a/crates/consensus-logic/src/client_transition.rs b/crates/consensus-logic/src/client_transition.rs index d86749bc8..45201acf7 100644 --- a/crates/consensus-logic/src/client_transition.rs +++ b/crates/consensus-logic/src/client_transition.rs @@ -24,21 +24,50 @@ pub fn process_event( match ev { SyncEvent::L1Block(height, l1blkid) => { + // If the block is before the horizon we don't care about it. + if *height < params.rollup().horizon_l1_height { + #[cfg(test)] + eprintln!("early L1 block at h={height}, you may have set up the test env wrong"); + + warn!(%height, "ignoring unexpected L1Block event before horizon"); + return Ok(ClientUpdateOutput::new(writes, actions)); + } + // FIXME this doesn't do any SPV checks to make sure we only go to // a longer chain, it just does it unconditionally let l1prov = database.l1_provider(); - let _blkmf = l1prov.get_block_manifest(*height)?; + let new_block_mf = l1prov.get_block_manifest(*height)?; + + let l1v = state.l1_view(); // TODO do the consensus checks - writes.push(ClientStateWrite::AcceptL1Block(*l1blkid)); + // Only accept the block if it's the next block in the chain we expect to accept. + let cur_seen_tip_height = l1v.tip_height(); + let next_exp_height = l1v.next_expected_block(); + if next_exp_height > params.rollup().horizon_l1_height { + // TODO check that the new block we're trying to add has the same parent as the tip + // block + let cur_tip_block = l1prov + .get_block_manifest(cur_seen_tip_height)? + .ok_or(Error::MissingL1BlockHeight(cur_seen_tip_height))?; + } - // If we have some number of L1 blocks finalized, also emit an `UpdateBuried` write - if *height >= params.rollup().l1_reorg_safe_depth as u64 + state.buried_l1_height() { - writes.push(ClientStateWrite::UpdateBuried(state.buried_l1_height() + 1)); + if *height == next_exp_height { + writes.push(ClientStateWrite::AcceptL1Block(*l1blkid)); + } else { + #[cfg(test)] + eprintln!("not sure what to do here h={height} exp={next_exp_height}"); + return Err(Error::OutOfOrderL1Block(next_exp_height, *height, *l1blkid)); } - let l1v = state.l1_view(); + // If we have some number of L1 blocks finalized, also emit an `UpdateBuried` write. + // TODO clean up this bookkeeping slightly + let keep_window = params.rollup().l1_reorg_safe_depth as u64; + let maturable_height = next_exp_height.saturating_sub(keep_window); + if maturable_height > state.next_exp_l1_block() { + writes.push(ClientStateWrite::UpdateBuried(maturable_height)); + } if let Some(ss) = state.sync() { // TODO figure out what to do here @@ -59,11 +88,14 @@ pub fn process_event( SyncEvent::L1Revert(to_height) => { let l1prov = database.l1_provider(); - let blkmf = l1prov - .get_block_manifest(*to_height)? - .ok_or(Error::MissingL1BlockHeight(*to_height))?; - let blkid = blkmf.block_hash().into(); - writes.push(ClientStateWrite::RollbackL1BlocksTo(blkid)); + + let buried = state.l1_view().buried_l1_height(); + if *to_height < buried { + error!(%to_height, %buried, "got L1 revert below buried height"); + return Err(Error::ReorgTooDeep(*to_height, buried)); + } + + writes.push(ClientStateWrite::RollbackL1BlocksTo(*to_height)); } SyncEvent::L1DABatch(blkids) => { @@ -86,7 +118,7 @@ pub fn process_event( let last = blkids.last().unwrap(); writes.push(ClientStateWrite::UpdateFinalized(*last)); - actions.push(SyncAction::FinalizeBlock(*last)) + actions.push(SyncAction::FinalizeBlock(*last)); } else { // TODO we can expand this later to make more sense return Err(Error::MissingClientSyncState); @@ -129,7 +161,10 @@ mod tests { use super::*; #[test] - fn handle_l1_block() { + fn test_genesis() { + // TODO there's a ton of duplication in this test, we could wrap it up so we just run + // through a table and call a function with it + let database = get_common_db(); let params = gen_params(); let mut state = gen_client_state(Some(¶ms)); @@ -137,9 +172,20 @@ mod tests { assert!(!state.is_chain_active()); let l1_chain = gen_l1_chain(15); - // before the genesis of L2 is reached + for (i, b) in l1_chain.iter().enumerate() { + let l1_store = database.l1_store(); + l1_store + .put_block_data(i as u64, b.clone(), Vec::new()) + .expect("test: insert blocks"); + } + + let blkids: Vec = l1_chain.iter().map(|b| b.block_hash().into()).collect(); + let horizon = params.rollup().horizon_l1_height; + let genesis = params.rollup().genesis_l1_height; + + // at horizon block { - let height = 1; + let height = horizon; let l1_block_id = l1_chain[height as usize].block_hash().into(); let event = SyncEvent::L1Block(height, l1_block_id); @@ -148,82 +194,157 @@ mod tests { let writes = output.writes(); let actions = output.actions(); - let expection_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; + let expected_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; let expected_actions = []; - assert_eq!(writes, expection_writes); + assert_eq!(writes, expected_writes); assert_eq!(actions, expected_actions); operation::apply_writes_to_state(&mut state, writes.iter().cloned()); assert!(!state.is_chain_active()); - assert_eq!(state.recent_l1_block(), Some(&l1_block_id)); - assert_eq!(state.buried_l1_height(), params.rollup.genesis_l1_height); - assert_eq!(state.l1_view().local_unaccepted_blocks(), [l1_block_id]); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), horizon + 1); + assert_eq!( + state.l1_view().local_unaccepted_blocks(), + &blkids[height as usize..height as usize + 1] + ); } - // after the genesis of L2 is reached + // at horizon block + 1 { - let height = params.rollup.genesis_l1_height + 3; + let height = params.rollup().horizon_l1_height + 1; let l1_block_id = l1_chain[height as usize].block_hash().into(); let event = SyncEvent::L1Block(height, l1_block_id); let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); - let expection_writes = [ - ClientStateWrite::AcceptL1Block(l1_block_id), - ClientStateWrite::ActivateChain, - ]; + let writes = output.writes(); + let actions = output.actions(); + + let expected_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; + let expected_actions = []; + + assert_eq!(writes, expected_writes); + assert_eq!(actions, expected_actions); + + operation::apply_writes_to_state(&mut state, writes.iter().cloned()); + + assert!(!state.is_chain_active()); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), genesis); + assert_eq!( + state.l1_view().local_unaccepted_blocks(), + &blkids[horizon as usize..horizon as usize + 2] + ); + } + + // as the genesis of L2 is reached, but not locked in yet + { + let height = genesis; + let l1_block_id = l1_chain[height as usize].block_hash().into(); + let event = SyncEvent::L1Block(height, l1_block_id); + + let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); + + let expected_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; let expected_actions = []; - assert_eq!(output.writes(), expection_writes); + assert_eq!(output.writes(), expected_writes); assert_eq!(output.actions(), expected_actions); operation::apply_writes_to_state(&mut state, output.writes().iter().cloned()); - assert!(state.is_chain_active()); - assert_eq!(state.recent_l1_block(), Some(&l1_block_id)); - assert_eq!(state.buried_l1_height(), params.rollup.genesis_l1_height); + assert!(!state.is_chain_active()); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), height + 1); + assert_eq!( + state.l1_view().local_unaccepted_blocks(), + &blkids[horizon as usize..height as usize + 1] + ); + } + + // genesis + 1 + { + let height = genesis + 1; + let l1_block_id = l1_chain[height as usize].block_hash().into(); + let event = SyncEvent::L1Block(height, l1_block_id); + + let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); + + let expected_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; + let expected_actions = []; + + assert_eq!(output.writes(), expected_writes); + assert_eq!(output.actions(), expected_actions); + + operation::apply_writes_to_state(&mut state, output.writes().iter().cloned()); + + assert!(!state.is_chain_active()); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), height + 1); + assert_eq!( + state.l1_view().local_unaccepted_blocks(), + &blkids[horizon as usize..height as usize + 1] + ); + } + + // genesis + 2 + { + let height = genesis + 2; + let l1_block_id = l1_chain[height as usize].block_hash().into(); + let event = SyncEvent::L1Block(height, l1_block_id); + + let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); + + let expected_writes = [ClientStateWrite::AcceptL1Block(l1_block_id)]; + let expected_actions = []; + + assert_eq!(output.writes(), expected_writes); + assert_eq!(output.actions(), expected_actions); + + operation::apply_writes_to_state(&mut state, output.writes().iter().cloned()); + + assert!(!state.is_chain_active()); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), height + 1); assert_eq!( state.l1_view().local_unaccepted_blocks(), - [l1_chain[1].block_hash().into(), l1_block_id,] + &blkids[horizon as usize..height as usize + 1] ); } - // after l1_reorg_depth is reached + // genesis + 3, where we should lock in genesis { - let height = params.rollup.genesis_l1_height + params.rollup.l1_reorg_safe_depth as u64; + let height = genesis + 3; let l1_block_id = l1_chain[height as usize].block_hash().into(); let event = SyncEvent::L1Block(height, l1_block_id); let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); - let expection_writes = [ + let expected_writes = [ ClientStateWrite::AcceptL1Block(l1_block_id), - ClientStateWrite::UpdateBuried(params.rollup.genesis_l1_height + 1), + ClientStateWrite::ActivateChain, ]; let expected_actions = []; - assert_eq!(output.writes(), expection_writes); + assert_eq!(output.writes(), expected_writes); assert_eq!(output.actions(), expected_actions); operation::apply_writes_to_state(&mut state, output.writes().iter().cloned()); assert!(state.is_chain_active()); - assert_eq!(state.recent_l1_block(), Some(&l1_block_id)); - assert_eq!( - state.buried_l1_height(), - params.rollup.genesis_l1_height + 1 - ); + assert_eq!(state.most_recent_l1_block(), Some(&l1_block_id)); + assert_eq!(state.next_exp_l1_block(), height + 1); assert_eq!( state.l1_view().local_unaccepted_blocks(), - [l1_chain[8].block_hash().into(), l1_block_id,] + &blkids[horizon as usize..height as usize + 1] ); } } #[test] - fn handle_l1_revert() { + fn test_l1_reorg() { let database = get_common_db(); let params = gen_params(); let mut state = gen_client_state(Some(¶ms)); @@ -231,22 +352,18 @@ mod tests { let height = 5; let event = SyncEvent::L1Revert(height); - let output = process_event(&state, &event, database.as_ref(), ¶ms); - assert!(output.is_err_and(|x| matches!(x, Error::MissingL1BlockHeight(height)))); - let l1_block: L1BlockManifest = ArbitraryGenerator::new().generate(); database .l1_store() .put_block_data(height, l1_block.clone(), vec![]) .unwrap(); - let output = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); - let expectation_writes = [ClientStateWrite::RollbackL1BlocksTo( - l1_block.block_hash().into(), - )]; + let res = process_event(&state, &event, database.as_ref(), ¶ms).unwrap(); + eprintln!("process_event on {event:?} -> {res:?}"); + let expected_writes = [ClientStateWrite::RollbackL1BlocksTo(5)]; let expected_actions = []; - assert_eq!(output.actions(), expected_actions); - assert_eq!(output.writes(), expectation_writes); + assert_eq!(res.actions(), expected_actions); + assert_eq!(res.writes(), expected_writes); } } diff --git a/crates/consensus-logic/src/ctl.rs b/crates/consensus-logic/src/ctl.rs index 087b582ba..742a935df 100644 --- a/crates/consensus-logic/src/ctl.rs +++ b/crates/consensus-logic/src/ctl.rs @@ -31,6 +31,7 @@ impl CsmController { /// Writes a sync event to the database and updates the watch channel to /// trigger the CSM executor to process the event. pub fn submit_event(&self, sync_event: SyncEvent) -> anyhow::Result<()> { + trace!(?sync_event, "submitting sync event"); let ev_idx = self.submit_event_shim.submit_event_blocking(sync_event)?; let msg = CsmMessage::EventInput(ev_idx); if self.csm_tx.blocking_send(msg).is_err() { diff --git a/crates/consensus-logic/src/duty/block_assembly.rs b/crates/consensus-logic/src/duty/block_assembly.rs new file mode 100644 index 000000000..5bd42f020 --- /dev/null +++ b/crates/consensus-logic/src/duty/block_assembly.rs @@ -0,0 +1,580 @@ +//! Impl logic for the block assembly duties. +#![allow(unused)] + +use std::sync::Arc; +use std::thread; +use std::time; + +use alpen_express_db::traits::L1DataProvider; +use alpen_express_state::l1::L1HeaderPayload; +use alpen_express_state::l1::L1HeaderRecord; +use tracing::*; + +use alpen_express_db::traits::{ + ChainstateProvider, ClientStateProvider, Database, L2DataProvider, L2DataStore, +}; +use alpen_express_evmctl::engine::{ExecEngineCtl, PayloadStatus}; +use alpen_express_evmctl::errors::EngineError; +use alpen_express_evmctl::messages::{ExecPayloadData, PayloadEnv}; +use alpen_express_primitives::buf::{Buf32, Buf64}; +use alpen_express_primitives::params::Params; +use alpen_express_state::block::L2BlockAccessory; +use alpen_express_state::block::L2BlockBundle; +use alpen_express_state::block::{ExecSegment, L1Segment}; +use alpen_express_state::chain_state::ChainState; +use alpen_express_state::client_state::ClientState; +use alpen_express_state::exec_update::{ExecUpdate, UpdateOutput}; +use alpen_express_state::header::L2BlockHeader; +use alpen_express_state::prelude::*; +use alpen_express_state::state_op::*; + +use super::types::IdentityKey; +use crate::chain_transition; +use crate::credential::sign_schnorr_sig; +use crate::errors::Error; + +/// Signs and stores a block in the database. Does not submit it to the +/// forkchoice manager. +// TODO pass in the CSM state we're using to assemble this +pub(super) fn sign_and_store_block( + slot: u64, + ik: &IdentityKey, + database: &D, + engine: &E, + params: &Arc, +) -> Result, Error> { + debug!("preparing block"); + let l1_prov = database.l1_provider(); + let l2_prov = database.l2_provider(); + let cs_prov = database.client_state_provider(); + let chs_prov = database.chainstate_provider(); + + // Check the block we were supposed to build isn't already in the database, + // if so then just republish that. This checks that there just if we have a + // block at that height, which for now is the same thing. + let blocks_at_slot = l2_prov.get_blocks_at_height(slot)?; + if !blocks_at_slot.is_empty() { + // FIXME Should we be more verbose about this? + warn!(%slot, "was turn to propose block, but found block in database already"); + return Ok(None); + } + + // TODO get the consensus state this duty was created in response to and + // pull out the current tip block from it + // XXX this is really bad as-is, see comment in worker.rs in `perform_duty` + // FIXME this isn't what this is for, it only works because we're + // checkpointing on every state right now + let ckpt_idx = cs_prov.get_last_checkpoint_idx()?; + let last_cstate = cs_prov + .get_state_checkpoint(ckpt_idx)? + .expect("dutyexec: get state checkpoint"); + + // Figure out some general data about the slot and the previous state. + let ts = now_millis(); + + let Some(last_ss) = last_cstate.sync() else { + warn!(%slot, %ckpt_idx, "tried to produce block but no sync state available"); + return Ok(None); + }; + + // By "prev" here we mean the block we're building on top of. + let prev_block_id = *last_ss.chain_tip_blkid(); + let prev_block = l2_prov + .get_block_data(prev_block_id)? + .ok_or(Error::MissingL2Block(prev_block_id))?; + + // TODO: get from rollup config + let block_time = params.rollup().block_time; + let target_ts = prev_block.block().header().timestamp() + block_time; + let current_ts = now_millis(); + + // If it's too early to produce the block, wait a bit. + if current_ts < target_ts { + let sleep_dur = target_ts - current_ts; + trace!(%current_ts, %target_ts, sleep_dur, "too early, waiting to produce block"); + thread::sleep(time::Duration::from_millis(sleep_dur)); + } + + let prev_global_sr = *prev_block.header().state_root(); + + // Get the previous block's state + // TODO make this get the prev block slot from somewhere more reliable in + // case we skip slots + let prev_slot = prev_block.header().blockidx(); + let prev_chstate = chs_prov + .get_toplevel_state(prev_slot)? + .ok_or(Error::MissingBlockChainstate(prev_block_id))?; + + // Figure out the save L1 blkid. + // FIXME this is somewhat janky, should get it from the MMR + let safe_l1_block_rec = prev_chstate.l1_view().safe_block(); + let safe_l1_blkid = alpen_express_primitives::hash::sha256d(safe_l1_block_rec.buf()); + + // TODO Pull data from CSM state that we've observed from L1, including new + // headers or any headers needed to perform a reorg if necessary. + let l1_seg = prepare_l1_segment(&last_cstate, &prev_chstate, l1_prov.as_ref())?; + + // Prepare the execution segment, which right now is just talking to the EVM + // but will be more advanced later. + let (exec_seg, block_acc) = prepare_exec_data( + slot, + ts, + prev_block_id, + prev_global_sr, + safe_l1_blkid, + engine, + )?; + + // Assemble the body and fake header. + let body = L2BlockBody::new(l1_seg, exec_seg); + let fake_header = L2BlockHeader::new(slot, ts, prev_block_id, &body, Buf32::zero()); + + // Execute the block to compute the new state root, then assemble the real header. + // TODO do something with the write batch? to prepare it in the database? + let (post_state, _wb) = compute_post_state(prev_chstate, &fake_header, &body, params)?; + let new_state_root = post_state.compute_state_root(); + + let header = L2BlockHeader::new(slot, ts, prev_block_id, &body, new_state_root); + let header_sig = sign_header(&header, ik); + let signed_header = SignedL2BlockHeader::new(header, header_sig); + + let blkid = signed_header.get_blockid(); + let final_block = L2Block::new(signed_header, body); + let final_bundle = L2BlockBundle::new(final_block.clone(), block_acc); + info!(?blkid, "finished building new block"); + + // Store the block in the database. + let l2store = database.l2_store(); + l2store.put_block_data(final_bundle)?; + debug!(?blkid, "wrote block to datastore"); + + // TODO should we actually return the bundle here? + Ok(Some((blkid, final_block))) +} + +fn prepare_l1_segment( + cstate: &ClientState, + prev_chstate: &ChainState, + l1_prov: &impl L1DataProvider, +) -> Result { + let l1v = cstate.l1_view(); + + let unacc_blocks = l1v.unacc_blocks_iter().collect::>(); + trace!(unacc_blocks = %unacc_blocks.len(), "figuring out which blocks to include in L1 segment"); + + // Check to see if there's actually no blocks in the queue. In that case we can just give + // everything we know about. + let maturation_queue_size = prev_chstate.l1_view().maturation_queue().len(); + if maturation_queue_size == 0 { + let mut payloads = Vec::new(); + for (h, _b) in unacc_blocks { + let rec = load_header_record(h, l1_prov)?; + payloads.push(L1HeaderPayload::new_bare(h, rec)); + } + + debug!(n = %payloads.len(), "filling in empty queue with fresh L1 payloads"); + return Ok(L1Segment::new(payloads)); + } + + let maturing_blocks = prev_chstate + .l1_view() + .maturation_queue() + .iter_entries() + .map(|(h, e)| (h, e.blkid())) + .collect::>(); + + // FIXME this is not comparing the proof of work, it's just looking at the chain lengths, this + // is almost the same thing, but might break in the case of a difficulty adjustment taking place + // at a reorg exactly on the transition block + trace!("computing pivot"); + let Some((pivot_h, _pivot_id)) = find_pivot_block_height(&unacc_blocks, &maturing_blocks) + else { + // Then we're really screwed. + warn!("can't determine shared block to insert new maturing blocks"); + return Ok(L1Segment::new_empty()); + }; + + // Compute the offset in the unaccepted list for the blocks we want to use. + let unacc_fresh_offset = (pivot_h - l1v.next_expected_block() - 1) as usize; + let fresh_blocks = &unacc_blocks[unacc_fresh_offset..]; + + // Load the blocsks. + let mut payloads = Vec::new(); + for (h, _b) in fresh_blocks { + let rec = load_header_record(*h, l1_prov)?; + payloads.push(L1HeaderPayload::new_bare(*h, rec)); + } + + if !payloads.is_empty() { + debug!(n = %payloads.len(), "have new L1 blocks to provide"); + } + + Ok(L1Segment::new(payloads)) +} + +fn load_header_record(h: u64, l1_prov: &impl L1DataProvider) -> Result { + let mf = l1_prov + .get_block_manifest(h)? + .ok_or(Error::MissingL1BlockHeight(h))?; + // TODO need to include tx root proof we can verify + Ok(L1HeaderRecord::new(mf.header().to_vec(), mf.txs_root())) +} + +/// Takes two partially-overlapping lists of block indexes and IDs and returns a +/// ref to the last index and ID they share, if any. +fn find_pivot_block_height<'c>( + a: &'c [(u64, &'c L1BlockId)], + b: &'c [(u64, &'c L1BlockId)], +) -> Option<(u64, &'c L1BlockId)> { + if a.is_empty() || b.is_empty() { + return None; + } + + let a_start = a[0].0 as i64; + let b_start = b[0].0 as i64; + let a_end = a_start + (a.len() as i64); + let b_end = b_start + (b.len() as i64); + + #[cfg(test)] + eprintln!("ranges {a_start}..{a_end}, {b_start}..{b_end}"); + + // Check if they're actually overlapping. + if !(a_start < b_end && b_start < a_end) { + return None; + } + + // Depending on how the windows overlap at the start we figure out the offsets for how we're + // iterating here. + let (a_off, b_off) = match b_start - a_start { + 0 => (0, 0), + n if n > 0 => (n, 0), + n if n < 0 => (0, -n), + _ => unreachable!(), + }; + + #[cfg(test)] + eprintln!("offsets {a_off} {b_off}"); + + // Sanity checks. + assert!(a_off >= 0); + assert!(b_off >= 0); + + let overlap_len = i64::min(a_end, b_end) - i64::max(a_start, b_start); + + #[cfg(test)] + eprintln!("overlap {overlap_len}"); + + // Now iterate over the overlap range and return if it makes sense. + let mut last = None; + for i in 0..overlap_len { + let (ai, aid) = a[(i + a_off) as usize]; + let (bi, bid) = b[(i + b_off) as usize]; + + #[cfg(test)] + { + eprintln!("Ai {ai} {aid}"); + eprintln!("Bi {bi} {bid}"); + } + + assert_eq!(ai, bi); // Sanity check. + + if aid != bid { + break; + } + + last = Some((ai, aid)); + } + + last +} + +/// Prepares the execution segment for the block. +fn prepare_exec_data( + slot: u64, + timestamp: u64, + prev_l2_blkid: L2BlockId, + prev_global_sr: Buf32, + safe_l1_block: Buf32, + engine: &E, +) -> Result<(ExecSegment, L2BlockAccessory), Error> { + trace!("preparing exec payload"); + + // Start preparing the EL payload. + let payload_env = PayloadEnv::new(timestamp, prev_l2_blkid, safe_l1_block, Vec::new()); + let key = engine.prepare_payload(payload_env)?; + trace!("submitted EL payload job, waiting for completion"); + + // Wait 2 seconds for the block to be finished. + // TODO Pull data from state about the new safe L1 hash, prev state roots, + // etc. to assemble the payload env for this block. + let wait = time::Duration::from_millis(100); + let timeout = time::Duration::from_millis(3000); + let Some(payload_data) = poll_status_loop(key, engine, wait, timeout)? else { + // TODO better error message + return Err(Error::Other("EL block assembly timed out".to_owned())); + }; + trace!("finished EL payload job"); + + // Reassemble it into an exec update. + let eui = payload_data.update_input().clone(); + let exec_update = ExecUpdate::new(eui, UpdateOutput::new_from_state(Buf32::zero())); + let exec_seg = ExecSegment::new(exec_update); + + // And the accessory. + let acc = L2BlockAccessory::new(payload_data.accessory_data().to_vec()); + + Ok((exec_seg, acc)) +} + +fn poll_status_loop( + job: u64, + engine: &E, + wait: time::Duration, + timeout: time::Duration, +) -> Result, EngineError> { + let start = time::Instant::now(); + loop { + // Sleep at the beginning since the first iter isn't likely to have it + // ready. + thread::sleep(wait); + + // Check the payload for the result. + trace!(%job, "polling engine for completed payload"); + let payload = engine.get_payload_status(job)?; + if let PayloadStatus::Ready(pl) = payload { + return Ok(Some(pl)); + } + + // If we've waited too long now. + if time::Instant::now() - start > timeout { + warn!(%job, "payload build job timed out"); + break; + } + } + + Ok(None) +} + +// TODO do we want to do this here? +fn compute_post_state( + prev_chstate: ChainState, + header: &impl L2Header, + body: &L2BlockBody, + params: &Arc, +) -> Result<(ChainState, WriteBatch), Error> { + let mut state_cache = StateCache::new(prev_chstate); + chain_transition::process_block(&mut state_cache, header, body, params.rollup())?; + let (post_state, wb) = state_cache.finalize(); + Ok((post_state, wb)) +} + +/// Signs the L2BlockHeader and returns the signature +fn sign_header(header: &L2BlockHeader, ik: &IdentityKey) -> Buf64 { + let msg = header.get_sighash(); + match ik { + IdentityKey::Sequencer(sk) => sign_schnorr_sig(&msg, sk), + } +} + +/// Returns the current unix time as milliseconds. +// TODO maybe we should use a time source that is possibly more consistent with +// the rest of the network for this? +fn now_millis() -> u64 { + time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64 +} + +#[cfg(test)] +mod tests { + // TODO to improve these tests, they could use a bit more randomization of lengths and offsets + + use alpen_express_state::l1::L1BlockId; + use alpen_test_utils::ArbitraryGenerator; + + use super::find_pivot_block_height; + + #[test] + fn test_find_pivot_noop() { + let ag = ArbitraryGenerator::new_with_size(1 << 12); + + let blkids: [L1BlockId; 10] = ag.generate(); + eprintln!("{blkids:#?}"); + + let blocks = blkids + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let max_height = blocks.last().unwrap().0; + let max_id = blocks.last().unwrap().1; + + let (shared_h, shared_id) = + find_pivot_block_height(&blocks, &blocks).expect("test: find pivot"); + + assert_eq!(shared_h, max_height); + assert_eq!(shared_id, max_id); + } + + #[test] + fn test_find_pivot_noop_offset() { + let ag = ArbitraryGenerator::new_with_size(1 << 12); + + let blkids: [L1BlockId; 10] = ag.generate(); + eprintln!("{blkids:#?}"); + + let blocks = blkids + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let max_height = blocks[7].0; + let max_id = blocks[7].1; + + let (shared_h, shared_id) = + find_pivot_block_height(&blocks[..8], &blocks[2..]).expect("test: find pivot"); + + assert_eq!(shared_h, max_height); + assert_eq!(shared_id, max_id); + } + + #[test] + fn test_find_pivot_simple_extend() { + let ag = ArbitraryGenerator::new_with_size(1 << 12); + + let blkids1: [L1BlockId; 10] = ag.generate(); + let mut blkids2 = Vec::from(blkids1); + blkids2.push(ag.generate()); + blkids2.push(ag.generate()); + eprintln!("{blkids1:#?}"); + + let blocks1 = blkids1 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let blocks2 = blkids2 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let max_height = blocks1.last().unwrap().0; + let max_id = blocks1.last().unwrap().1; + + let (shared_h, shared_id) = + find_pivot_block_height(&blocks1, &blocks2).expect("test: find pivot"); + + assert_eq!(shared_h, max_height); + assert_eq!(shared_id, max_id); + } + + #[test] + fn test_find_pivot_typical_reorg() { + let ag = ArbitraryGenerator::new_with_size(1 << 16); + + let mut blkids1: Vec = Vec::new(); + for _ in 0..10 { + blkids1.push(ag.generate()); + } + + let mut blkids2 = blkids1.clone(); + blkids2.pop(); + blkids2.push(ag.generate()); + blkids2.push(ag.generate()); + + let blocks1 = blkids1 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let blocks2 = blkids2 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let max_height = blocks1[blocks1.len() - 2].0; + let max_id = blocks1[blocks1.len() - 2].1; + + // Also using a pretty deep offset here. + let (shared_h, shared_id) = + find_pivot_block_height(&blocks1, &blocks2[5..]).expect("test: find pivot"); + + assert_eq!(shared_h, max_height); + assert_eq!(shared_id, max_id); + } + + #[test] + fn test_find_pivot_cur_shorter_reorg() { + let ag = ArbitraryGenerator::new_with_size(1 << 16); + + let mut blkids1: Vec = Vec::new(); + for _ in 0..10 { + blkids1.push(ag.generate()); + } + + let mut blkids2 = blkids1.clone(); + blkids2.pop(); + blkids2.pop(); + blkids2.pop(); + blkids2.pop(); + let len = blkids2.len(); + blkids2.push(ag.generate()); + blkids2.push(ag.generate()); + + let blocks1 = blkids1 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let blocks2 = blkids2 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let max_height = blocks2[len - 1].0; + let max_id = blocks2[len - 1].1; + + // Also using a pretty deep offset here. + let (shared_h, shared_id) = + find_pivot_block_height(&blocks1, &blocks2[5..]).expect("test: find pivot"); + + assert_eq!(shared_h, max_height); + assert_eq!(shared_id, max_id); + } + + #[test] + fn test_find_pivot_disjoint() { + let ag = ArbitraryGenerator::new_with_size(1 << 16); + + let mut blkids1: Vec = Vec::new(); + for _ in 0..10 { + blkids1.push(ag.generate()); + } + + let mut blkids2: Vec = Vec::new(); + for _ in 0..10 { + blkids2.push(ag.generate()); + } + + let blocks1 = blkids1 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let blocks2 = blkids2 + .iter() + .enumerate() + .map(|(i, b)| (i as u64 + 8, b)) + .collect::>(); + + let res = find_pivot_block_height(&blocks1[2..], &blocks2[..3]); + assert!(res.is_none()); + } +} diff --git a/crates/consensus-logic/src/duty_extractor.rs b/crates/consensus-logic/src/duty/extractor.rs similarity index 83% rename from crates/consensus-logic/src/duty_extractor.rs rename to crates/consensus-logic/src/duty/extractor.rs index f3ba7861e..d3e1d5f1e 100644 --- a/crates/consensus-logic/src/duty_extractor.rs +++ b/crates/consensus-logic/src/duty/extractor.rs @@ -1,17 +1,15 @@ use alpen_express_db::traits::{Database, L2DataProvider}; use alpen_express_state::{client_state::ClientState, header::L2Header}; -use crate::{ - duties::{self, BlockSigningDuty}, - errors::Error, -}; +use super::types::{self, BlockSigningDuty}; +use crate::errors::Error; /// Extracts new duties given a consensus state and a identity. pub fn extract_duties( state: &ClientState, - _ident: &duties::Identity, + _ident: &types::Identity, database: &D, -) -> Result, Error> { +) -> Result, Error> { // If a sync state isn't present then we probably don't have anything we // want to do. We might change this later. let Some(ss) = state.sync() else { @@ -31,5 +29,5 @@ pub fn extract_duties( // Since we're not rotating sequencers, for now we just *always* produce a // new block. let duty_data = BlockSigningDuty::new_simple(block_idx + 1); - Ok(vec![duties::Duty::SignBlock(duty_data)]) + Ok(vec![types::Duty::SignBlock(duty_data)]) } diff --git a/crates/consensus-logic/src/duty/mod.rs b/crates/consensus-logic/src/duty/mod.rs new file mode 100644 index 000000000..73dd2ba58 --- /dev/null +++ b/crates/consensus-logic/src/duty/mod.rs @@ -0,0 +1,4 @@ +pub mod block_assembly; +pub mod extractor; +pub mod types; +pub mod worker; diff --git a/crates/consensus-logic/src/duties.rs b/crates/consensus-logic/src/duty/types.rs similarity index 90% rename from crates/consensus-logic/src/duties.rs rename to crates/consensus-logic/src/duty/types.rs index eb8480c83..8629bd866 100644 --- a/crates/consensus-logic/src/duties.rs +++ b/crates/consensus-logic/src/duty/types.rs @@ -238,3 +238,24 @@ impl DutyBatch { &self.duties } } + +/// Sequencer key used for signing-related duties. +#[derive(Clone, Debug, BorshDeserialize, BorshSerialize)] +pub enum IdentityKey { + Sequencer(Buf32), +} + +/// Contains both the identity key used for signing and the identity used for +/// verifying signatures. This is really just a stub that we should replace +/// with real cryptographic signatures and putting keys in the rollup params. +#[derive(Clone, Debug)] +pub struct IdentityData { + pub ident: Identity, + pub key: IdentityKey, +} + +impl IdentityData { + pub fn new(ident: Identity, key: IdentityKey) -> Self { + Self { ident, key } + } +} diff --git a/crates/consensus-logic/src/duty/worker.rs b/crates/consensus-logic/src/duty/worker.rs new file mode 100644 index 000000000..490f177f7 --- /dev/null +++ b/crates/consensus-logic/src/duty/worker.rs @@ -0,0 +1,273 @@ +//! Executes duties. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time; + +use tokio::sync::broadcast; +use tracing::*; + +use alpen_express_db::traits::*; +use alpen_express_evmctl::engine::ExecEngineCtl; +use alpen_express_primitives::params::Params; +use alpen_express_state::client_state::ClientState; +use alpen_express_state::prelude::*; + +use super::types::{self, Duty, DutyBatch, Identity, IdentityKey}; +use super::{block_assembly, extractor}; +use crate::errors::Error; +use crate::message::{ClientUpdateNotif, ForkChoiceMessage}; +use crate::sync_manager::SyncManager; + +pub fn duty_tracker_task( + cupdate_rx: broadcast::Receiver>, + batch_queue: broadcast::Sender, + ident: Identity, + database: Arc, +) { + let db = database.as_ref(); + if let Err(e) = duty_tracker_task_inner(cupdate_rx, batch_queue, ident, db) { + error!(err = %e, "tracker task exited"); + } +} + +fn duty_tracker_task_inner( + mut cupdate_rx: broadcast::Receiver>, + batch_queue: broadcast::Sender, + ident: Identity, + database: &impl Database, +) -> Result<(), Error> { + let mut duties_tracker = types::DutyTracker::new_empty(); + + let idx = database.client_state_provider().get_last_checkpoint_idx()?; + let last_checkpoint_state = database.client_state_provider().get_state_checkpoint(idx)?; + let last_finalized_blk = match last_checkpoint_state { + Some(state) => state.sync().map(|sync| *sync.finalized_blkid()), + None => None, + }; + duties_tracker.set_finalized_block(last_finalized_blk); + + loop { + let update = match cupdate_rx.blocking_recv() { + Ok(u) => u, + Err(broadcast::error::RecvError::Closed) => { + break; + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + // TODO maybe check the things we missed, but this is fine for now + warn!(%skipped, "overloaded, skipping indexing some duties"); + continue; + } + }; + + let ev_idx = update.sync_event_idx(); + let new_state = update.new_state(); + trace!(%ev_idx, "new consensus state, updating duties"); + trace!("STATE: {new_state:#?}"); + + if let Err(e) = update_tracker(&mut duties_tracker, new_state, &ident, database) { + error!(err = %e, "failed to update duties tracker"); + } + + // Publish the new batch. + let batch = DutyBatch::new(ev_idx, duties_tracker.duties().to_vec()); + if batch_queue.send(batch).is_err() { + warn!("failed to publish new duties batch"); + } + } + + info!("duty extractor task exiting"); + + Ok(()) +} + +fn update_tracker( + tracker: &mut types::DutyTracker, + state: &ClientState, + ident: &Identity, + database: &D, +) -> Result<(), Error> { + let Some(ss) = state.sync() else { + return Ok(()); + }; + + let new_duties = extractor::extract_duties(state, ident, database)?; + + // Figure out the block slot from the tip blockid. + // TODO include the block slot in the consensus state + let tip_blkid = *ss.chain_tip_blkid(); + let l2prov = database.l2_provider(); + let block = l2prov + .get_block_data(tip_blkid)? + .ok_or(Error::MissingL2Block(tip_blkid))?; + let block_idx = block.header().blockidx(); + let ts = time::Instant::now(); // FIXME XXX use .timestamp()!!! + + // Figure out which blocks were finalized + let new_finalized = state.sync().map(|sync| *sync.finalized_blkid()); + let newly_finalized_blocks: Vec = get_finalized_blocks( + tracker.get_finalized_block(), + l2prov.as_ref(), + new_finalized, + )?; + + let tracker_update = types::StateUpdate::new(block_idx, ts, newly_finalized_blocks); + let n_evicted = tracker.update(&tracker_update); + trace!(%n_evicted, "evicted old duties from new consensus state"); + + // Now actually insert the new duties. + tracker.add_duties(tip_blkid, block_idx, new_duties.into_iter()); + + Ok(()) +} + +fn get_finalized_blocks( + last_finalized_block: Option, + l2prov: &impl L2DataProvider, + finalized: Option, +) -> Result, Error> { + // Figure out which blocks were finalized + let mut newly_finalized_blocks: Vec = Vec::new(); + let mut new_finalized = finalized; + + while let Some(finalized) = new_finalized { + // If the last finalized block is equal to the new finalized block, + // it means that no new blocks are finalized + if last_finalized_block == Some(finalized) { + break; + } + + // else loop till we reach to the last finalized block or go all the way + // as long as we get some block data + match l2prov.get_block_data(finalized)? { + Some(block) => new_finalized = Some(*block.header().parent()), + None => break, + } + + newly_finalized_blocks.push(finalized); + } + + Ok(newly_finalized_blocks) +} + +pub fn duty_dispatch_task< + D: Database + Sync + Send + 'static, + E: ExecEngineCtl + Sync + Send + 'static, +>( + mut updates: broadcast::Receiver, + ident_key: IdentityKey, + sync_man: Arc, + database: Arc, + engine: Arc, + pool: Arc, + params: Arc, +) { + // TODO make this actually work + let mut pending_duties: HashMap = HashMap::new(); + + // TODO still need some stuff here to decide if we're fully synced and + // *should* dispatch duties + + loop { + let update = match updates.blocking_recv() { + Ok(u) => u, + Err(broadcast::error::RecvError::Closed) => { + break; + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + warn!(%skipped, "overloaded, skipping dispatching some duties"); + continue; + } + }; + + // TODO check pending_duties to remove any completed duties + + for duty in update.duties() { + let id = duty.id(); + + // Skip any duties we've already dispatched. + if pending_duties.contains_key(&id) { + continue; + } + + // Clone some things, spawn the task, then remember the join handle. + // TODO make this use a thread pool + let d = duty.duty().clone(); + let ik = ident_key.clone(); + let sm = sync_man.clone(); + let db = database.clone(); + let e = engine.clone(); + let params = params.clone(); + pool.execute(move || duty_exec_task(d, ik, sm, db, e, params)); + trace!(%id, "dispatched duty exec task"); + pending_duties.insert(id, ()); + } + } + + info!("duty dispatcher task exiting"); +} + +/// Toplevel function that's actually performs a job. This is spawned on a/ +/// thread pool so we don't have to worry about it blocking *too* much other +/// work. +fn duty_exec_task( + duty: Duty, + ik: IdentityKey, + sync_man: Arc, + database: Arc, + engine: Arc, + params: Arc, +) { + if let Err(e) = perform_duty( + &duty, + &ik, + &sync_man, + database.as_ref(), + engine.as_ref(), + ¶ms, + ) { + error!(err = %e, "error performing duty"); + } else { + debug!("completed duty successfully"); + } +} + +fn perform_duty( + duty: &Duty, + ik: &IdentityKey, + sync_man: &SyncManager, + database: &D, + engine: &E, + params: &Arc, +) -> Result<(), Error> { + match duty { + Duty::SignBlock(data) => { + let target_slot = data.target_slot(); + + // TODO get the cur client state from the sync manager, the one used + // to initiate this dutyn and pass it into `sign_and_store_block` + + let asm_span = info_span!("blockasm", %target_slot); + let _span = asm_span.enter(); + + let Some((blkid, _block)) = + block_assembly::sign_and_store_block(target_slot, ik, database, engine, params)? + else { + return Ok(()); + }; + + // Submit it to the fork choice manager to update the consensus state + // with it. + let ctm = ForkChoiceMessage::NewBlock(blkid); + if !sync_man.submit_chain_tip_msg(ctm) { + error!(?blkid, "failed to submit new block to fork choice manager"); + } + + // TODO do we have to do something with _block right now? + + // TODO eventually, send the block out to peers + + Ok(()) + } + } +} diff --git a/crates/consensus-logic/src/duty_executor.rs b/crates/consensus-logic/src/duty_executor.rs deleted file mode 100644 index f4a653710..000000000 --- a/crates/consensus-logic/src/duty_executor.rs +++ /dev/null @@ -1,495 +0,0 @@ -//! Executes duties. - -use std::collections::HashMap; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; -use std::{thread, time}; - -use borsh::{BorshDeserialize, BorshSerialize}; -use tokio::sync::broadcast; -use tracing::*; - -use alpen_express_db::traits::{ClientStateProvider, Database, L2DataProvider, L2DataStore}; -use alpen_express_evmctl::engine::{ExecEngineCtl, PayloadStatus}; -use alpen_express_evmctl::errors::EngineError; -use alpen_express_evmctl::messages::{ExecPayloadData, PayloadEnv}; -use alpen_express_primitives::buf::{Buf32, Buf64}; -use alpen_express_primitives::params::RollupParams; -use alpen_express_state::block::{ExecSegment, L1Segment, L2BlockAccessory, L2BlockBundle}; -use alpen_express_state::client_state::ClientState; -use alpen_express_state::exec_update::{ExecUpdate, UpdateOutput}; -use alpen_express_state::header::L2BlockHeader; -use alpen_express_state::prelude::*; - -use crate::credential::sign_schnorr_sig; -use crate::duties::{self, Duty, DutyBatch, Identity}; -use crate::duty_extractor; -use crate::errors::Error; -use crate::message::{ClientUpdateNotif, ForkChoiceMessage}; -use crate::sync_manager::SyncManager; - -#[derive(Clone, Debug, BorshDeserialize, BorshSerialize)] -pub enum IdentityKey { - Sequencer(Buf32), -} - -/// Contains both the identity key used for signing and the identity used for -/// verifying signatures. This is really just a stub that we should replace -/// with real cryptographic signatures and putting keys in the rollup params. -#[derive(Clone, Debug)] -pub struct IdentityData { - pub ident: Identity, - pub key: IdentityKey, -} - -impl IdentityData { - pub fn new(ident: Identity, key: IdentityKey) -> Self { - Self { ident, key } - } -} - -pub fn duty_tracker_task( - mut cupdate_rx: broadcast::Receiver>, - batch_queue: broadcast::Sender, - ident: Identity, - database: Arc, -) -> Result<(), Error> { - let mut duties_tracker = duties::DutyTracker::new_empty(); - - let idx = database.client_state_provider().get_last_checkpoint_idx()?; - let last_checkpoint_state = database.client_state_provider().get_state_checkpoint(idx)?; - let last_finalized_blk = match last_checkpoint_state { - Some(state) => state.sync().map(|sync| *sync.finalized_blkid()), - None => None, - }; - duties_tracker.set_finalized_block(last_finalized_blk); - - loop { - let update = match cupdate_rx.blocking_recv() { - Ok(u) => u, - Err(broadcast::error::RecvError::Closed) => { - break; - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - // TODO maybe check the things we missed, but this is fine for now - warn!(%skipped, "overloaded, skipping indexing some duties"); - continue; - } - }; - - let ev_idx = update.sync_event_idx(); - let new_state = update.new_state(); - trace!(%ev_idx, "new consensus state, updating duties"); - trace!("STATE: {new_state:#?}"); - - if let Err(e) = update_tracker(&mut duties_tracker, new_state, &ident, database.as_ref()) { - error!(err = %e, "failed to update duties tracker"); - } - - // Publish the new batch. - let batch = DutyBatch::new(ev_idx, duties_tracker.duties().to_vec()); - if batch_queue.send(batch).is_err() { - warn!("failed to publish new duties batch"); - } - } - - info!("duty extractor task exiting"); - - Ok(()) -} - -fn update_tracker( - tracker: &mut duties::DutyTracker, - state: &ClientState, - ident: &Identity, - database: &D, -) -> Result<(), Error> { - let Some(ss) = state.sync() else { - return Ok(()); - }; - - let new_duties = duty_extractor::extract_duties(state, ident, database)?; - - // Figure out the block slot from the tip blockid. - // TODO include the block slot in the consensus state - let tip_blkid = *ss.chain_tip_blkid(); - let l2prov = database.l2_provider(); - let block = l2prov - .get_block_data(tip_blkid)? - .ok_or(Error::MissingL2Block(tip_blkid))?; - let block_idx = block.header().blockidx(); - let ts = time::Instant::now(); // FIXME XXX use .timestamp()!!! - - // Figure out which blocks were finalized - let new_finalized = state.sync().map(|sync| *sync.finalized_blkid()); - let newly_finalized_blocks: Vec = get_finalized_blocks( - tracker.get_finalized_block(), - l2prov.as_ref(), - new_finalized, - )?; - - let tracker_update = duties::StateUpdate::new(block_idx, ts, newly_finalized_blocks); - let n_evicted = tracker.update(&tracker_update); - trace!(%n_evicted, "evicted old duties from new consensus state"); - - // Now actually insert the new duties. - tracker.add_duties(tip_blkid, block_idx, new_duties.into_iter()); - - Ok(()) -} - -fn get_finalized_blocks( - last_finalized_block: Option, - l2prov: &impl L2DataProvider, - finalized: Option, -) -> Result, Error> { - // Figure out which blocks were finalized - let mut newly_finalized_blocks: Vec = Vec::new(); - let mut new_finalized = finalized; - - while let Some(finalized) = new_finalized { - // If the last finalized block is equal to the new finalized block, - // it means that no new blocks are finalized - if last_finalized_block == Some(finalized) { - break; - } - - // else loop till we reach to the last finalized block or go all the way - // as long as we get some block data - match l2prov.get_block_data(finalized)? { - Some(block) => new_finalized = Some(*block.header().parent()), - None => break, - } - - newly_finalized_blocks.push(finalized); - } - - Ok(newly_finalized_blocks) -} - -pub fn duty_dispatch_task< - D: Database + Sync + Send + 'static, - E: ExecEngineCtl + Sync + Send + 'static, ->( - mut updates: broadcast::Receiver, - ident_key: IdentityKey, - sync_man: Arc, - database: Arc, - engine: Arc, - pool: Arc, - params: &RollupParams, -) { - // TODO make this actually work - let mut pending_duties: HashMap = HashMap::new(); - - // TODO still need some stuff here to decide if we're fully synced and - // *should* dispatch duties - - loop { - let update = match updates.blocking_recv() { - Ok(u) => u, - Err(broadcast::error::RecvError::Closed) => { - break; - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - warn!(%skipped, "overloaded, skipping dispatching some duties"); - continue; - } - }; - - // TODO check pending_duties to remove any completed duties - - for duty in update.duties() { - let id = duty.id(); - - // Skip any duties we've already dispatched. - if pending_duties.contains_key(&id) { - continue; - } - - // Clone some things, spawn the task, then remember the join handle. - // TODO make this use a thread pool - let d = duty.duty().clone(); - let ik = ident_key.clone(); - let sm = sync_man.clone(); - let db = database.clone(); - let e = engine.clone(); - let params = params.clone(); - pool.execute(move || duty_exec_task(d, ik, sm, db, e, params)); - trace!(%id, "dispatched duty exec task"); - pending_duties.insert(id, ()); - } - } - - info!("duty dispatcher task exiting"); -} - -/// Toplevel function that's actually performs a job. This is spawned on a/ -/// thread pool so we don't have to worry about it blocking *too* much other -/// work. -fn duty_exec_task( - duty: Duty, - ik: IdentityKey, - sync_man: Arc, - database: Arc, - engine: Arc, - params: RollupParams, -) { - if let Err(e) = perform_duty( - &duty, - &ik, - &sync_man, - database.as_ref(), - engine.as_ref(), - params, - ) { - error!(err = %e, "error performing duty"); - } else { - debug!("completed duty successfully"); - } -} - -fn perform_duty( - duty: &Duty, - ik: &IdentityKey, - sync_man: &SyncManager, - database: &D, - engine: &E, - params: RollupParams, -) -> Result<(), Error> { - match duty { - Duty::SignBlock(data) => { - let target = data.target_slot(); - let Some((blkid, _block)) = sign_and_store_block(target, ik, database, engine, params)? - else { - return Ok(()); - }; - - // Submit it to the fork choice manager to update the consensus state - // with it. - let ctm = ForkChoiceMessage::NewBlock(blkid); - if !sync_man.submit_chain_tip_msg(ctm) { - error!(?blkid, "failed to submit new block to fork choice manager"); - } - - // TODO do we have to do something with _block right now? - - // TODO eventually, send the block out to peers - - Ok(()) - } - } -} - -fn sign_and_store_block( - slot: u64, - ik: &IdentityKey, - database: &D, - engine: &E, - params: RollupParams, -) -> Result, Error> { - debug!(%slot, "prepating to publish block"); - - // Check the block we were supposed to build isn't already in the database, - // if so then just republish that. This checks that there just if we have a - // block at that height, which for now is the same thing. - let l2prov = database.l2_provider(); - let blocks_at_slot = l2prov.get_blocks_at_height(slot)?; - if !blocks_at_slot.is_empty() { - // FIXME Should we be more verbose about this? - warn!(%slot, "was turn to propose block, but found block in database already"); - return Ok(None); - } - - // TODO get the consensus state this duty was created in response to and - // pull out the current tip block from it - // XXX this is really bad as-is - let cs_prov = database.client_state_provider(); - let ckpt_idx = cs_prov.get_last_checkpoint_idx()?; // FIXME: this isn't what this is for, it only works because we're checkpointing on every state - // right now - let last_cstate = cs_prov - .get_state_checkpoint(ckpt_idx)? - .expect("dutyexec: get state checkpoint"); - - let Some(last_ss) = last_cstate.sync() else { - return Ok(None); - }; - - let prev_block_id = *last_ss.chain_tip_blkid(); - - let prev_block = database - .l2_provider() - .get_block_data(prev_block_id)? - .expect("prev block must exist"); - - // TODO: get from rollup config - let block_time = params.block_time; - let target_ts = prev_block.block().header().timestamp() + block_time; - let current_ts = now_millis(); - - if current_ts < target_ts { - sleep(Duration::from_millis(target_ts - current_ts)); - } - - // Start preparing the EL payload. - let ts = now_millis(); - let safe_l1_block = Buf32::zero(); // TODO - let payload_env = PayloadEnv::new(ts, prev_block_id, safe_l1_block, Vec::new()); - let key = engine.prepare_payload(payload_env)?; - trace!(%slot, "submitted EL payload job, waiting for completion"); - - // TODO Pull data from CSM state that we've observed from L1, including new - // headers or any headers needed to perform a reorg if necessary. - let l1_seg = L1Segment::new(Vec::new()); - - // Wait 2 seconds for the block to be finished. - // TODO Pull data from state about the new safe L1 hash, prev state roots, - // etc. to assemble the payload env for this block. - let wait = time::Duration::from_millis(100); - let timeout = time::Duration::from_millis(3000); - let Some(payload_data) = poll_status_loop(key, engine, wait, timeout)? else { - // TODO better error message - return Err(Error::Other("EL block assembly timed out".to_owned())); - }; - trace!(%slot, "finished EL payload job"); - - // TODO correctly assemble the exec segment, since this is bodging out how - // the inputs/outputs should be structured - let eui = payload_data.update_input().clone(); - let exec_update = ExecUpdate::new(eui, UpdateOutput::new_from_state(Buf32::zero())); - let exec_seg = ExecSegment::new(exec_update); - - // Assemble the body and the header core. - let body = L2BlockBody::new(l1_seg, exec_seg); - let state_root = Buf32::zero(); // TODO compute this from the different parts - let header = L2BlockHeader::new(slot, ts, prev_block_id, &body, state_root); - let header_sig = sign_header(&header, ik); - let signed_header = SignedL2BlockHeader::new(header, header_sig); - let blkid = signed_header.get_blockid(); - let block_accessory = L2BlockAccessory::new(payload_data.accessory_data().to_vec()); - let final_block = L2Block::new(signed_header, body); - let final_block_bundle = L2BlockBundle::new(final_block.clone(), block_accessory); - info!(%slot, ?blkid, "finished building new block"); - - // Store the block in the database. - let l2store = database.l2_store(); - l2store.put_block_data(final_block_bundle)?; - debug!(?blkid, "wrote block to datastore"); - - Ok(Some((blkid, final_block))) -} - -/// Signs the L2BlockHeader and returns the signature -fn sign_header(header: &L2BlockHeader, ik: &IdentityKey) -> Buf64 { - let msg = header.get_sighash(); - match ik { - IdentityKey::Sequencer(sk) => sign_schnorr_sig(&msg, sk), - } -} - -/// Returns the current unix time as milliseconds. -// TODO maybe we should use a time source that is possibly more consistent with -// the rest of the network for this? -fn now_millis() -> u64 { - time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64 -} - -fn poll_status_loop( - job: u64, - engine: &E, - wait: time::Duration, - timeout: time::Duration, -) -> Result, EngineError> { - let start = time::Instant::now(); - loop { - // Sleep at the beginning since the first iter isn't likely to have it - // ready. - thread::sleep(wait); - - // Check the payload for the result. - let payload = engine.get_payload_status(job)?; - if let PayloadStatus::Ready(pl) = payload { - return Ok(Some(pl)); - } - - // If we've waited too long now. - if time::Instant::now() - start > timeout { - warn!(%job, "payload build job timed out"); - break; - } - } - - Ok(None) -} - -#[cfg(test)] -mod tests { - use alpen_express_db::traits::{Database, L2DataStore}; - use alpen_express_state::header::L2Header; - use alpen_test_utils::l2::gen_l2_chain; - - use super::get_finalized_blocks; - - #[test] - fn test_get_finalized_blocks() { - let db = alpen_test_utils::get_common_db(); - let chain = gen_l2_chain(None, 5); - - for block in chain.clone() { - db.as_ref() - .l2_store() - .as_ref() - .put_block_data(block) - .unwrap(); - } - - let block_ids: Vec<_> = chain.iter().map(|b| b.header().get_blockid()).collect(); - - { - let last_finalized_block = Some(block_ids[0]); - let new_finalized = Some(block_ids[4]); - let finalized_blocks = get_finalized_blocks( - last_finalized_block, - db.l2_provider().as_ref(), - new_finalized, - ) - .unwrap(); - - let expected_finalized_blocks: Vec<_> = - block_ids[1..=4].iter().rev().cloned().collect(); - - assert_eq!(finalized_blocks, expected_finalized_blocks); - } - - { - let last_finalized_block = None; - let new_finalized = Some(block_ids[4]); - let finalized_blocks = get_finalized_blocks( - last_finalized_block, - db.l2_provider().as_ref(), - new_finalized, - ) - .unwrap(); - - let expected_finalized_blocks: Vec<_> = - block_ids[0..=4].iter().rev().cloned().collect(); - - assert_eq!(finalized_blocks, expected_finalized_blocks); - } - - { - let last_finalized_block = None; - let new_finalized = None; - let finalized_blocks = get_finalized_blocks( - last_finalized_block, - db.l2_provider().as_ref(), - new_finalized, - ) - .unwrap(); - - let expected_finalized_blocks: Vec<_> = vec![]; - assert_eq!(finalized_blocks, expected_finalized_blocks); - } - } -} diff --git a/crates/consensus-logic/src/errors.rs b/crates/consensus-logic/src/errors.rs index 6beca4dfe..19f42da2c 100644 --- a/crates/consensus-logic/src/errors.rs +++ b/crates/consensus-logic/src/errors.rs @@ -44,9 +44,19 @@ pub enum Error { #[error("client sync state unset")] MissingClientSyncState, + /// Used when assembling blocks and we don't have an actual block ID to use. + #[error("invalid state transition: {0}")] + InvalidStateTsnImm(#[from] TsnError), + #[error("csm dropped")] CsmDropped, + #[error("tried to reorg too deep (target {0} vs buried {1})")] + ReorgTooDeep(u64, u64), + + #[error("out of order L1 block {2} (exp next height {0}, block {1})")] + OutOfOrderL1Block(u64, u64, L1BlockId), + #[error("chaintip: {0}")] ChainTip(#[from] ChainTipError), @@ -83,4 +93,13 @@ pub enum TsnError { #[error("mismatch parent (head {0:?}, parent {1:?}")] MismatchParent(L2BlockId, L2BlockId), + + #[error("attested mismatched ID for {0} (set {1}, computed {2})")] + L1BlockIdMismatch(u64, L1BlockId, L1BlockId), + + #[error("parent link at L1 block {0} incorrect (set parent {1}, found block {2})")] + L1BlockParentMismatch(u64, L1BlockId, L1BlockId), + + #[error("L1 segment block did not extend the chain tip")] + L1SegNotExtend, } diff --git a/crates/consensus-logic/src/fork_choice_manager.rs b/crates/consensus-logic/src/fork_choice_manager.rs index f83801411..9f6b71fef 100644 --- a/crates/consensus-logic/src/fork_choice_manager.rs +++ b/crates/consensus-logic/src/fork_choice_manager.rs @@ -2,7 +2,6 @@ use std::sync::Arc; -use alpen_express_state::state_op; use tokio::sync::mpsc; use tracing::*; @@ -16,6 +15,7 @@ use alpen_express_primitives::params::Params; use alpen_express_state::client_state::ClientState; use alpen_express_state::operation::SyncAction; use alpen_express_state::prelude::*; +use alpen_express_state::state_op::StateCache; use alpen_express_state::sync_event::SyncEvent; use crate::ctl::CsmController; @@ -563,11 +563,16 @@ fn apply_tip_update( .ok_or(Error::MissingL2Block(*blkid))?; let block_idx = block.header().blockidx(); + let header = block.header(); + let body = block.body(); + // Compute the transition write batch, then compute the new state // locally and update our going state. - let wb = chain_transition::process_block(&pre_state, &block) + let rparams = state.params.rollup(); + let mut prestate_cache = StateCache::new(pre_state); + chain_transition::process_block(&mut prestate_cache, header, body, rparams) .map_err(|e| Error::InvalidStateTsn(*blkid, e))?; - let post_state = state_op::apply_write_batch_to_chainstate(pre_state, &wb); + let (post_state, wb) = prestate_cache.finalize(); pre_state = post_state; // After each application we update the fork choice tip data in case we fail diff --git a/crates/consensus-logic/src/genesis.rs b/crates/consensus-logic/src/genesis.rs index 1c2d28044..c5e199107 100644 --- a/crates/consensus-logic/src/genesis.rs +++ b/crates/consensus-logic/src/genesis.rs @@ -75,8 +75,8 @@ pub fn init_genesis_chainstate( let genesis_blkid = gblock.header().get_blockid(); info!(?genesis_blkid, "created genesis block"); - let horizon_blk_rec = L1HeaderRecord::from(pregenesis_mfs.last().unwrap()); - let l1vs = L1ViewState::new_at_horizon(horizon_blk_height, horizon_blk_rec); + let genesis_blk_rec = L1HeaderRecord::from(pregenesis_mfs.last().unwrap()); + let l1vs = L1ViewState::new_at_genesis(horizon_blk_height, genesis_blk_height, genesis_blk_rec); let gchstate = ChainState::from_genesis(genesis_blkid, l1vs, gees); diff --git a/crates/consensus-logic/src/l1_handler.rs b/crates/consensus-logic/src/l1_handler.rs index d34ee2fdd..28a95307a 100644 --- a/crates/consensus-logic/src/l1_handler.rs +++ b/crates/consensus-logic/src/l1_handler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use alpen_express_primitives::params::Params; use bitcoin::consensus::serialize; use bitcoin::hashes::Hash; use bitcoin::Block; @@ -19,12 +20,13 @@ pub fn bitcoin_data_handler_task( l1db: Arc, csm_ctl: Arc, mut event_rx: mpsc::Receiver, + params: Arc, ) -> anyhow::Result<()> where L1D: L1DataStore + Sync + Send + 'static, { while let Some(event) = event_rx.blocking_recv() { - if let Err(e) = handle_event(event, l1db.as_ref(), csm_ctl.as_ref()) { + if let Err(e) = handle_event(event, l1db.as_ref(), csm_ctl.as_ref(), ¶ms) { error!(err = %e, "failed to handle L1 event"); } } @@ -33,7 +35,12 @@ where Ok(()) } -fn handle_event(event: L1Event, l1db: &L1D, csm_ctl: &CsmController) -> anyhow::Result<()> +fn handle_event( + event: L1Event, + l1db: &L1D, + csm_ctl: &CsmController, + params: &Arc, +) -> anyhow::Result<()> where L1D: L1DataStore + Sync + Send + 'static, { @@ -52,6 +59,15 @@ where } L1Event::BlockData(blockdata) => { + let height = blockdata.block_num(); + + // Bail out fast if we don't have to care. + let horizon = params.rollup().horizon_l1_height; + if height < horizon { + warn!(%height, %horizon, "ignoring BlockData for block before horizon"); + return Ok(()); + } + let l1blkid = blockdata.block().block_hash(); let manifest = generate_block_manifest(blockdata.block()); @@ -65,9 +81,9 @@ where let l1txs = Vec::new(); let num_txs = l1txs.len(); l1db.put_block_data(blockdata.block_num(), manifest, l1txs)?; - info!(%l1blkid, txs = %num_txs, "wrote L1 block manifest"); + info!(%height, %l1blkid, txs = %num_txs, "wrote L1 block manifest"); - // Write to sync event db. + // Write to sync event db if it's something we care about. let blkid: Buf32 = blockdata.block().block_hash().into(); let ev = SyncEvent::L1Block(blockdata.block_num(), blkid.into()); csm_ctl.submit_event(ev)?; diff --git a/crates/consensus-logic/src/lib.rs b/crates/consensus-logic/src/lib.rs index a1b2b626d..90315642a 100644 --- a/crates/consensus-logic/src/lib.rs +++ b/crates/consensus-logic/src/lib.rs @@ -1,14 +1,11 @@ #![allow(dead_code)] // TODO: remove this once `finalized_tip` fn is used in `ForkChoiceManager`. //! Consensus validation logic and core state machine -pub mod block_assembly; pub mod chain_transition; pub mod client_transition; pub mod credential; pub mod ctl; -pub mod duties; -pub mod duty_executor; -pub mod duty_extractor; +pub mod duty; pub mod fork_choice_manager; pub mod genesis; pub mod l1_handler; diff --git a/crates/consensus-logic/src/state_tracker.rs b/crates/consensus-logic/src/state_tracker.rs index 585992df6..1ff10b3e4 100644 --- a/crates/consensus-logic/src/state_tracker.rs +++ b/crates/consensus-logic/src/state_tracker.rs @@ -53,8 +53,9 @@ impl StateTracker { &mut self, ev_idx: u64, ) -> anyhow::Result<(ClientUpdateOutput, Arc)> { - if ev_idx != self.cur_state_idx + 1 { - return Err(Error::SkippedEventIdx(ev_idx, self.cur_state_idx).into()); + let prev_ev_idx = ev_idx - 1; + if prev_ev_idx != self.cur_state_idx { + return Err(Error::SkippedEventIdx(prev_ev_idx, self.cur_state_idx).into()); } // Load the event from the database. diff --git a/crates/consensus-logic/src/worker.rs b/crates/consensus-logic/src/worker.rs index be5a6c505..2a6aeb052 100644 --- a/crates/consensus-logic/src/worker.rs +++ b/crates/consensus-logic/src/worker.rs @@ -98,7 +98,7 @@ pub fn client_worker_task( &csm_status_tx, &fcm_msg_tx, ) { - error!(err = %e, "failed to process sync message, skipping"); + error!(err = %e, ?msg, "failed to process sync message, skipping"); } } diff --git a/crates/primitives/src/hash.rs b/crates/primitives/src/hash.rs index 689f255b0..c88ba1174 100644 --- a/crates/primitives/src/hash.rs +++ b/crates/primitives/src/hash.rs @@ -1,5 +1,6 @@ //! Common wrapper around whatever we choose our native hash function to be. +use bitcoin::hashes::Hash; use borsh::BorshSerialize; use digest::Digest; use sha2::Sha256; @@ -18,3 +19,9 @@ pub fn compute_borsh_hash(v: &T) -> Buf32 { let arr: [u8; 32] = result.into(); Buf32::from(arr) } + +/// Computes a Bitcoin-style double-SHA-256. +pub fn sha256d(buf: &[u8]) -> Buf32 { + let h = bitcoin::hashes::sha256d::Hash::hash(buf); + h.to_byte_array().into() +} diff --git a/crates/rpc/api/src/lib.rs b/crates/rpc/api/src/lib.rs index eb88ccb01..6d4d88c39 100644 --- a/crates/rpc/api/src/lib.rs +++ b/crates/rpc/api/src/lib.rs @@ -26,10 +26,13 @@ pub struct L1Status { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ClientStatus { - /// Blockchain tip. + /// L1 blockchain tip. #[serde(with = "hex::serde")] pub chain_tip: [u8; 32], + /// L1 chain tip slot. + pub chain_tip_slot: u64, + /// L2 block that's been finalized and proven on L1. #[serde(with = "hex::serde")] pub finalized_blkid: [u8; 32], diff --git a/crates/state/Cargo.toml b/crates/state/Cargo.toml index acd78709e..474963890 100644 --- a/crates/state/Cargo.toml +++ b/crates/state/Cargo.toml @@ -14,3 +14,4 @@ borsh = { workspace = true } digest = { workspace = true } num_enum = { workspace = true } sha2 = { workspace = true } +tracing = { workspace = true } # ideally this shouldn't be in this trait diff --git a/crates/state/src/block.rs b/crates/state/src/block.rs index b2feb9e33..045f98c39 100644 --- a/crates/state/src/block.rs +++ b/crates/state/src/block.rs @@ -32,6 +32,10 @@ impl L2Block { &self.header } + pub fn body(&self) -> &L2BlockBody { + &self.body + } + pub fn l1_segment(&self) -> &L1Segment { &self.body.l1_segment } @@ -97,6 +101,10 @@ impl L1Segment { pub fn new_empty() -> Self { Self::new(Vec::new()) } + + pub fn new_payloads(&self) -> &[l1::L1HeaderPayload] { + &self.new_payloads + } } /// Information relating to how to update the execution layer. diff --git a/crates/state/src/bridge_state.rs b/crates/state/src/bridge_state.rs index ca4d3cab9..3aabd7553 100644 --- a/crates/state/src/bridge_state.rs +++ b/crates/state/src/bridge_state.rs @@ -58,6 +58,20 @@ impl OperatorTable { } } + /// Inserts a new operator entry. + pub fn insert(&mut self, signing_pk: Buf32, wallet_pk: Buf32) { + let entry = OperatorEntry { + idx: { + let idx = self.next_idx; + self.next_idx += 1; + idx + }, + signing_pk, + wallet_pk, + }; + self.operators.push(entry); + } + /// Gets an operator from the table by its idx. /// /// Does a binary search. diff --git a/crates/state/src/chain_state.rs b/crates/state/src/chain_state.rs index 18e3fec48..e20b81b1b 100644 --- a/crates/state/src/chain_state.rs +++ b/crates/state/src/chain_state.rs @@ -4,8 +4,9 @@ use borsh::{BorshDeserialize, BorshSerialize}; use alpen_express_primitives::buf::Buf32; use alpen_express_primitives::hash::compute_borsh_hash; +use crate::l1::L1ViewState; +use crate::prelude::*; use crate::{bridge_ops, bridge_state, exec_env, l1}; -use crate::{id::L2BlockId, state_queue::StateQueue}; /// L2 blockchain state. This is the state computed as a function of a /// pre-state and a block. @@ -16,6 +17,16 @@ pub struct ChainState { /// Most recent seen block. pub(crate) last_block: L2BlockId, + /// The slot of the last produced block. + pub(crate) slot: u64, + + /// The index of the checkpoint period we're in, and so the index we expect + /// the next checkpoint to be. + /// + /// Immediately after genesis, this is 0, so the first checkpoint batch is + /// checkpoint 0, moving us into checkpoint period 1. + pub(crate) checkpoint_period: u64, + /// Rollup's view of L1 state. pub(crate) l1_state: l1::L1ViewState, @@ -39,6 +50,9 @@ pub struct ChainState { // which defines all of this more rigorously #[derive(BorshSerialize)] struct HashedChainState { + last_block: Buf32, + slot: u64, + checkpoint_period: u64, l1_state_hash: Buf32, pending_withdraws_hash: Buf32, exec_env_hash: Buf32, @@ -47,6 +61,7 @@ struct HashedChainState { } impl ChainState { + // TODO remove genesis blkid since apparently we don't need it anymore pub fn from_genesis( genesis_blkid: L2BlockId, l1_state: l1::L1ViewState, @@ -54,6 +69,8 @@ impl ChainState { ) -> Self { Self { last_block: genesis_blkid, + slot: 0, + checkpoint_period: 0, l1_state, pending_withdraws: StateQueue::new_empty(), exec_env_state: exec_state, @@ -66,8 +83,17 @@ impl ChainState { self.last_block } - pub fn state_root(&self) -> Buf32 { + pub fn l1_view(&self) -> &L1ViewState { + &self.l1_state + } + + /// Computes a commitment to a the chainstate. This is super expensive + /// because it does a bunch of hashing. + pub fn compute_state_root(&self) -> Buf32 { let hashed_state = HashedChainState { + last_block: self.last_block.into(), + slot: self.slot, + checkpoint_period: self.checkpoint_period, l1_state_hash: compute_borsh_hash(&self.l1_state), pending_withdraws_hash: compute_borsh_hash(&self.pending_withdraws), exec_env_hash: compute_borsh_hash(&self.exec_env_state), diff --git a/crates/state/src/client_state.rs b/crates/state/src/client_state.rs index 41bdf0c75..40eb50221 100644 --- a/crates/state/src/client_state.rs +++ b/crates/state/src/client_state.rs @@ -42,7 +42,7 @@ impl ClientState { Self { chain_active: false, sync_state: None, - local_l1_view: LocalL1State::new(genesis_l1_height), + local_l1_view: LocalL1State::new(horizon_l1_height), horizon_l1_height, genesis_l1_height, } @@ -81,12 +81,12 @@ impl ClientState { .expect("clientstate: missing sync state") } - pub fn recent_l1_block(&self) -> Option<&L1BlockId> { + pub fn most_recent_l1_block(&self) -> Option<&L1BlockId> { self.local_l1_view.local_unaccepted_blocks.last() } - pub fn buried_l1_height(&self) -> u64 { - self.local_l1_view.buried_l1_height + pub fn next_exp_l1_block(&self) -> u64 { + self.local_l1_view.next_expected_block } pub fn genesis_l1_height(&self) -> u64 { @@ -132,22 +132,58 @@ pub struct LocalL1State { pub(super) local_unaccepted_blocks: Vec, /// L1 block index we treat as being "buried" and won't reorg. - pub(super) buried_l1_height: u64, + pub(super) next_expected_block: u64, } impl LocalL1State { - pub fn new(buried: u64) -> Self { + /// Constructs a new instance of the local L1 state bookkeeping. + /// + /// # Panics + /// + /// If we try to construct it in a way that implies we don't have the L1 genesis block. + pub fn new(next_expected_block: u64) -> Self { + if next_expected_block == 0 { + panic!("clientstate: tried to construct without known L1 genesis block"); + } + Self { local_unaccepted_blocks: Vec::new(), - buried_l1_height: buried, + next_expected_block, } } + /// Returns a slice of the unaccepted blocks. pub fn local_unaccepted_blocks(&self) -> &[L1BlockId] { &self.local_unaccepted_blocks } + /// Returns the height of the next block we expected to receive. + pub fn next_expected_block(&self) -> u64 { + self.next_expected_block + } + + /// Returned the height of the buried L1 block, which we can't reorg to. pub fn buried_l1_height(&self) -> u64 { - self.buried_l1_height + self.next_expected_block - self.local_unaccepted_blocks.len() as u64 + } + + /// Returns an iterator over the unaccepted L2 blocks, from the lowest up. + pub fn unacc_blocks_iter(&self) -> impl Iterator { + self.local_unaccepted_blocks() + .iter() + .enumerate() + .map(|(i, b)| (self.buried_l1_height() + i as u64, b)) + } + + pub fn tip_height(&self) -> u64 { + if self.next_expected_block == 0 { + panic!("clientstate: started without L1 genesis block somehow"); + } + + self.next_expected_block - 1 + } + + pub fn tip_blkid(&self) -> Option<&L1BlockId> { + self.local_unaccepted_blocks().last() } } diff --git a/crates/state/src/exec_env.rs b/crates/state/src/exec_env.rs index 839c615ce..11970df21 100644 --- a/crates/state/src/exec_env.rs +++ b/crates/state/src/exec_env.rs @@ -25,7 +25,8 @@ pub struct ExecEnvState { /// Deposits that have been queued by something but haven't been accepted in /// an update yet. The sequencer should be processing these as soon as /// possible. - pending_deposits: StateQueue, + // TODO make this not pub + pub pending_deposits: StateQueue, /// Forced inclusions that have been accepted by the CL but not processed by /// a CL payload yet. @@ -54,6 +55,14 @@ impl ExecEnvState { pub fn cur_state_root(&self) -> &Buf32 { &self.cur_state } + + pub fn pending_deposits(&self) -> &StateQueue { + &self.pending_deposits + } + + pub fn pending_deposits_mut(&mut self) -> &mut StateQueue { + &mut self.pending_deposits + } } impl<'a> Arbitrary<'a> for ExecEnvState { diff --git a/crates/state/src/l1.rs b/crates/state/src/l1.rs index e543a77ee..15a859f22 100644 --- a/crates/state/src/l1.rs +++ b/crates/state/src/l1.rs @@ -13,6 +13,14 @@ use crate::state_queue::StateQueue; )] pub struct L1BlockId(Buf32); +impl L1BlockId { + /// Computes the blkid from the header buf. This expensive in proofs and + /// should only be done when necessary. + pub fn compute_from_header_buf(buf: &[u8]) -> L1BlockId { + Self::from(alpen_express_primitives::hash::sha256d(buf)) + } +} + impl From for L1BlockId { fn from(value: Buf32) -> Self { Self(value) @@ -45,6 +53,10 @@ impl fmt::Display for L1BlockId { /// something. #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub struct L1HeaderRecord { + /// L1 block ID here so that we don't have to recompute it too much, which + /// is expensive in proofs. + blkid: L1BlockId, + /// Serialized header. For Bitcoin this is always 80 bytes. buf: Vec, @@ -57,7 +69,17 @@ pub struct L1HeaderRecord { impl L1HeaderRecord { pub fn new(buf: Vec, wtxs_root: Buf32) -> Self { - Self { buf, wtxs_root } + // TODO move this hash outside + let blkid = alpen_express_primitives::hash::sha256d(&buf).into(); + Self { + blkid, + buf, + wtxs_root, + } + } + + pub fn blkid(&self) -> &L1BlockId { + &self.blkid } pub fn buf(&self) -> &[u8] { @@ -67,11 +89,20 @@ impl L1HeaderRecord { pub fn wtxs_root(&self) -> &Buf32 { &self.wtxs_root } + + /// Extracts the parent block ID from the header record. + pub fn parent_blkid(&self) -> L1BlockId { + assert_eq!(self.buf.len(), 80, "l1: header record not 80 bytes"); + let mut buf = [0; 32]; + buf.copy_from_slice(&self.buf()[4..36]); // range of parent field bytes + L1BlockId::from(Buf32::from(buf)) + } } impl From<&alpen_express_primitives::l1::L1BlockManifest> for L1HeaderRecord { fn from(value: &alpen_express_primitives::l1::L1BlockManifest) -> Self { Self { + blkid: value.block_hash().into(), buf: value.header().to_vec(), wtxs_root: value.txs_root(), } @@ -136,6 +167,16 @@ impl L1HeaderPayload { } } +impl From for L1MaturationEntry { + fn from(value: L1HeaderPayload) -> Self { + Self { + record: value.record, + deposit_update_txs: value.deposit_update_txs, + da_txs: value.da_txs, + } + } +} + /// Describes state relating to the CL's view of L1. Updated by entries in the /// L1 segment of CL blocks. #[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize)] @@ -161,6 +202,18 @@ impl L1ViewState { } } + pub fn new_at_genesis( + horizon_height: u64, + genesis_height: u64, + genesis_trigger_block: L1HeaderRecord, + ) -> Self { + Self { + horizon_height, + safe_block: genesis_trigger_block, + maturation_queue: StateQueue::new_at_index(genesis_height), + } + } + pub fn safe_block(&self) -> &L1HeaderRecord { &self.safe_block } @@ -219,6 +272,11 @@ impl L1MaturationEntry { } } + /// Computes the L1 blockid from the stored block. + pub fn blkid(&self) -> &L1BlockId { + self.record.blkid() + } + pub fn into_parts(self) -> (L1HeaderRecord, Vec, Vec) { (self.record, self.deposit_update_txs, self.da_txs) } diff --git a/crates/state/src/operation.rs b/crates/state/src/operation.rs index 16181be16..24c1e3d40 100644 --- a/crates/state/src/operation.rs +++ b/crates/state/src/operation.rs @@ -3,6 +3,7 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; +use tracing::*; use crate::client_state::{ClientState, SyncState}; use crate::id::L2BlockId; @@ -52,8 +53,8 @@ pub enum ClientStateWrite { /// Accept an L2 block and update tip state. AcceptL2Block(L2BlockId), - /// Rolls back L1 blocks to this block ID. - RollbackL1BlocksTo(L1BlockId), + /// Rolls back L1 blocks to this block height. + RollbackL1BlocksTo(u64), /// Insert L1 blocks into the pending queue. AcceptL1Block(L1BlockId), @@ -77,9 +78,6 @@ pub enum SyncAction { /// Marks an L2 blockid as invalid and we won't follow any chain that has /// it, and will reject it from our peers. - // TODO possibly we should have some way of marking a block invalid through - // preliminary checks before writing a sync event we then have to check, - // this should be investigated more MarkInvalid(L2BlockId), /// Finalizes a block, indicating that it won't be reverted. @@ -111,23 +109,24 @@ pub fn apply_writes_to_state( state.chain_active = true; } - RollbackL1BlocksTo(l1blkid) => { + RollbackL1BlocksTo(block_height) => { let l1v = state.l1_view_mut(); - let pos = l1v - .local_unaccepted_blocks - .iter() - .position(|b| *b == l1blkid); - let Some(pos) = pos else { - // TODO better logging, maybe make this an actual error + let buried_height = l1v.buried_l1_height(); + + if block_height < buried_height { + error!(%block_height, %buried_height, "unable to roll back past buried height"); panic!("operation: emitted invalid write"); - }; - l1v.local_unaccepted_blocks.truncate(pos); + } + + let new_unacc_len = block_height - buried_height; + l1v.local_unaccepted_blocks.truncate(new_unacc_len as usize); } AcceptL1Block(l1blkid) => { // TODO make this also do shit let l1v = state.l1_view_mut(); l1v.local_unaccepted_blocks.push(l1blkid); + l1v.next_expected_block += 1; } AcceptL2Block(blkid) => { @@ -137,26 +136,27 @@ pub fn apply_writes_to_state( } UpdateBuried(new_idx) => { + let l1v = state.l1_view_mut(); + // Check that it's increasing. - let old_idx = state.buried_l1_height(); - if old_idx >= new_idx { + let old_idx = l1v.buried_l1_height(); + + if new_idx < old_idx { panic!("operation: emitted non-greater buried height"); } - let l1v = state.l1_view_mut(); - // Check that it's not higher than what we know about. - let diff = (new_idx - old_idx) as usize; - if diff > l1v.local_unaccepted_blocks.len() { + if new_idx >= l1v.next_expected_block() { panic!("operation: new buried height above known L1 tip"); } // If everything checks out we can just remove them. + let diff = (new_idx - old_idx) as usize; let _blocks = l1v .local_unaccepted_blocks .drain(..diff) .collect::>(); - l1v.buried_l1_height = new_idx; + l1v.next_expected_block = new_idx; // TODO merge these blocks into the L1 MMR in the client state if // we haven't already diff --git a/crates/state/src/prelude.rs b/crates/state/src/prelude.rs index 182e35431..c14e90eb3 100644 --- a/crates/state/src/prelude.rs +++ b/crates/state/src/prelude.rs @@ -1,4 +1,5 @@ pub use crate::block::{L2Block, L2BlockBody}; -pub use crate::header::{L2Header, SignedL2BlockHeader}; +pub use crate::header::{L2BlockHeader, L2Header, SignedL2BlockHeader}; pub use crate::id::L2BlockId; pub use crate::l1::L1BlockId; +pub use crate::state_queue::StateQueue; diff --git a/crates/state/src/state_op.rs b/crates/state/src/state_op.rs index 2c1510bc4..25b7a1b93 100644 --- a/crates/state/src/state_op.rs +++ b/crates/state/src/state_op.rs @@ -3,16 +3,21 @@ //! decide to expand the chain state in the future such that we can't keep it //! entire in memory. +use alpen_express_primitives::buf::Buf32; use borsh::{BorshDeserialize, BorshSerialize}; use crate::chain_state::ChainState; -use crate::l1; +use crate::l1::L1MaturationEntry; +use crate::{bridge_ops, l1}; #[derive(Clone, Debug, PartialEq, BorshDeserialize, BorshSerialize)] pub enum StateOp { /// Replace the chain state with something completely different. Replace(Box), + /// Sets the current slot. + SetSlot(u64), + /// Reverts L1 accepted height back to a previous height, rolling back any /// blocks that were there. RevertL1Height(u64), @@ -23,6 +28,12 @@ pub enum StateOp { /// Matures the next L1 block, whose idx must match the one specified here /// as a sanity check. MatureL1Block(u64), + + /// Inserts a deposit intent into the pending deposits queue. + EnqueueDepositIntent(bridge_ops::DepositIntent), + + /// Creates an operator + CreateOperator(Buf32, Buf32), } /// Collection of writes we're making to the state. @@ -55,22 +66,140 @@ pub fn apply_write_batch_to_chainstate( batch: &WriteBatch, ) -> ChainState { for op in &batch.ops { - match op { - StateOp::Replace(new_state) => chainstate = new_state.as_ref().clone(), + apply_op_to_chainstate(op, &mut chainstate); + } + + chainstate +} - StateOp::RevertL1Height(_to_height) => { - // TODO +fn apply_op_to_chainstate(op: &StateOp, state: &mut ChainState) { + match op { + StateOp::Replace(new_state) => *state = new_state.as_ref().clone(), + + StateOp::SetSlot(slot) => state.slot = *slot, + + StateOp::RevertL1Height(to_height) => { + let mqueue = &mut state.l1_state.maturation_queue; + let back_idx = mqueue.back_idx().expect("stateop: maturation queue empty"); + + // Do some bookkeeping to make sure it's safe to do this. + if *to_height > back_idx { + panic!("stateop: revert to above tip block"); } - StateOp::AcceptL1Block(_new_blkid) => { - // TODO + let n_drop = back_idx - to_height; + if n_drop > mqueue.len() as u64 { + panic!("stateop: revert matured block"); } - StateOp::MatureL1Block(_maturing_idx) => { - // TODO + // Now that it's safe to do the revert, we can just do it. + for _ in 0..n_drop { + // This expect should never trigger. + mqueue.pop_back().expect("stateop: unable to revert more"); } } + + StateOp::AcceptL1Block(entry) => { + state.l1_state.maturation_queue.push_back(entry.clone()); + } + + StateOp::MatureL1Block(maturing_idx) => { + let mqueue = &mut state.l1_state.maturation_queue; + + // Checks. + assert!(mqueue.len() > 1); // make sure we'll still have blocks in the queue + let front_idx = mqueue.front_idx().unwrap(); + assert_eq!(front_idx, *maturing_idx); + + // Actually take the block out so we can do something with it. + let _matured_block = mqueue.pop_front(); + + // TODO add it to the MMR so we can reference it in the future + // TODO handle the DA txs and the deposit update txs, maybe in other ops + } + + StateOp::EnqueueDepositIntent(intent) => { + let deposits = state.exec_env_state.pending_deposits_mut(); + deposits.push_back(intent.clone()); + } + + StateOp::CreateOperator(spk, wpk) => { + state.operator_table.insert(*spk, *wpk); + } } +} - chainstate +/// Cache that writes to state and remembers the series of operations made to it +/// so they can be persisted to disk without saving the chainstate. +/// +/// If we ever have a large state that's persisted to disk, this will eventually +/// be made generic over a state provider that exposes access to that and then +/// the `WriteBatch` will include writes that can be made to that. +pub struct StateCache { + state: ChainState, + write_ops: Vec, +} + +impl StateCache { + pub fn new(state: ChainState) -> Self { + Self { + state, + write_ops: Vec::new(), + } + } + + pub fn state(&self) -> &ChainState { + &self.state + } + + /// Finalizes the changes made to the state, exporting it and a write batch + /// that can be applied to the previous state to produce it. + pub fn finalize(self) -> (ChainState, WriteBatch) { + (self.state, WriteBatch::new(self.write_ops)) + } + + /// Returns if the state cache is empty, meaning that no writes have been + /// performed. + pub fn is_empty(&self) -> bool { + self.write_ops.is_empty() + } + + /// Applies some operations to the state, including them in the write ops + /// list. + fn merge_ops(&mut self, ops: impl Iterator) { + for op in ops { + apply_op_to_chainstate(&op, &mut self.state); + self.write_ops.push(op); + } + } + + /// Like `merge_ops`, but only for a single op, for convenience. + fn merge_op(&mut self, op: StateOp) { + self.merge_ops([op].into_iter()); + } + + /// Sets the current slot in the state. + pub fn set_slot(&mut self, slot: u64) { + self.merge_op(StateOp::SetSlot(slot)); + } + + /// Enqueues a deposit intent into the pending deposits queue. + pub fn enqueue_deposit_intent(&mut self, intent: bridge_ops::DepositIntent) { + self.merge_op(StateOp::EnqueueDepositIntent(intent)); + } + + /// Inserts a new operator with the specified pubkeys into the operator table. + pub fn insert_operator(&mut self, signing_pk: Buf32, wallet_pk: Buf32) { + self.merge_op(StateOp::CreateOperator(signing_pk, wallet_pk)); + } + + pub fn revert_l1_view_to(&mut self, height: u64) { + self.merge_op(StateOp::RevertL1Height(height)); + } + + pub fn apply_l1_block_entry(&mut self, ent: L1MaturationEntry) { + self.merge_op(StateOp::AcceptL1Block(ent)); + } + + // TODO add more manipulator functions } diff --git a/crates/state/src/transition.rs b/crates/state/src/transition.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/state/src/transition.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/test-utils/src/bitcoin.rs b/crates/test-utils/src/bitcoin.rs index 9779b0c6a..ede3518b0 100644 --- a/crates/test-utils/src/bitcoin.rs +++ b/crates/test-utils/src/bitcoin.rs @@ -16,6 +16,7 @@ pub fn get_test_bitcoin_txns() -> Vec { } pub fn gen_l1_chain(len: usize) -> Vec { + // FIXME this is bad, the blocks generated are nonsensical let mut blocks = vec![]; for _ in 0..len { let block: L1BlockManifest = ArbitraryGenerator::new().generate(); diff --git a/crates/test-utils/src/l2.rs b/crates/test-utils/src/l2.rs index 60ad9264d..30ef02a07 100644 --- a/crates/test-utils/src/l2.rs +++ b/crates/test-utils/src/l2.rs @@ -93,7 +93,7 @@ pub fn gen_client_state(params: Option<&Params>) -> ClientState { None => &gen_params(), }; ClientState::from_genesis_params( - params.rollup.genesis_l1_height, + params.rollup.horizon_l1_height, params.rollup.genesis_l1_height, ) } diff --git a/crates/test-utils/src/lib.rs b/crates/test-utils/src/lib.rs index f54fb695f..ce07a08be 100644 --- a/crates/test-utils/src/lib.rs +++ b/crates/test-utils/src/lib.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use alpen_express_db::database::CommonDatabase; @@ -6,15 +7,18 @@ use alpen_express_db::stubs::{chain_state::StubChainstateDb, l2::StubL2Db}; use alpen_express_rocksdb::{ClientStateDb, L1Db, SyncEventDb}; use arbitrary::{Arbitrary, Unstructured}; -use rand::Rng; +use rand::RngCore; use rockbound::rocksdb; use tempfile::TempDir; pub mod bitcoin; pub mod l2; +const ARB_GEN_LEN: usize = 1 << 24; // 16 MiB + pub struct ArbitraryGenerator { - buffer: Vec, + buf: Vec, + off: AtomicUsize, } impl Default for ArbitraryGenerator { @@ -25,15 +29,26 @@ impl Default for ArbitraryGenerator { impl ArbitraryGenerator { pub fn new() -> Self { + Self::new_with_size(ARB_GEN_LEN) + } + + pub fn new_with_size(n: usize) -> Self { let mut rng = rand::thread_rng(); - // NOTE: 128 should be enough for testing purposes. Change to 256 as needed - let buffer: Vec = (0..128).map(|_| rng.gen()).collect(); - ArbitraryGenerator { buffer } + let mut buf = vec![0; n]; + rng.fill_bytes(&mut buf); // 128 wasn't enough + let off = AtomicUsize::new(0); + ArbitraryGenerator { buf, off } } pub fn generate<'a, T: Arbitrary<'a> + Clone>(&'a self) -> T { - let mut u = Unstructured::new(&self.buffer); - T::arbitrary(&mut u).expect("failed to generate arbitrary instance") + // Doing hacky atomics to make this actually be reusable, this is pretty bad. + let off = self.off.load(Ordering::Relaxed); + let mut u = Unstructured::new(&self.buf[off..]); + let prev_off = u.len(); + let inst = T::arbitrary(&mut u).expect("failed to generate arbitrary instance"); + let additional_off = prev_off - u.len(); + self.off.store(off + additional_off, Ordering::Relaxed); + inst } } diff --git a/functional-tests/entry.py b/functional-tests/entry.py index 3b745032a..279fc3bb5 100755 --- a/functional-tests/entry.py +++ b/functional-tests/entry.py @@ -44,7 +44,7 @@ def generate_task(rpc: BitcoindClient, wait_dur, addr): time.sleep(wait_dur) try: blk = rpc.proxy.generatetoaddress(1, addr) - print("made block", blk) + #print("made block", blk) except Exception as ex: log.warning(f"{ex} while generating address") return diff --git a/functional-tests/env.bash b/functional-tests/env.bash index 76f76929c..e0c5ad8d6 100644 --- a/functional-tests/env.bash +++ b/functional-tests/env.bash @@ -1,5 +1,5 @@ # export RUST_BACKTRACE=1 -export RUST_LOG=debug,hyper=warn,soketto=warn,jsonrpsee-server=warn +export RUST_LOG=trace,hyper=warn,soketto=warn,jsonrpsee-server=warn,mio=warn export NO_COLOR=1 export PATH=$PATH:$(realpath ../target/release) export RUST_BACKTRACE=1 diff --git a/functional-tests/fn_btcio_connect.py b/functional-tests/fn_btcio_connect.py index b79c5ad11..2ecbec135 100644 --- a/functional-tests/fn_btcio_connect.py +++ b/functional-tests/fn_btcio_connect.py @@ -10,5 +10,6 @@ def main(self, ctx: flexitest.RunContext): seq = ctx.get_service("sequencer") seqrpc = seq.create_rpc() + print("checking connectivity") l1stat = seqrpc.alp_l1connected() assert l1stat, "Error connecting to Bitcoin Rpc client" diff --git a/functional-tests/fn_btcio_read.py b/functional-tests/fn_btcio_read.py index 6a0fb3f42..ac460bb47 100644 --- a/functional-tests/fn_btcio_read.py +++ b/functional-tests/fn_btcio_read.py @@ -24,16 +24,22 @@ def main(self, ctx: flexitest.RunContext): time.sleep(interval) received_block = btcrpc.getblock(btcrpc.proxy.getbestblockhash()) l1stat = seqrpc.alp_l1status() + # Time is in millis - curr_time = l1stat['last_update'] // 1000 + cur_time = l1stat['last_update'] // 1000 + print("cur time", cur_time) + # ensure that the l1reader task has started within few seconds of test being run - assert((curr_time - start_time) <= interval) + assert cur_time - start_time <= interval, "time not flowing properly" # check if height on bitcoin is same as, it is seen in sequencer assert ( l1stat["cur_height"] == received_block["height"] - ), "Height seen by Sequencer doesn't match the Height on the bitcoin node" + ), "sequencer height doesn't match the bitcoin node height" + time.sleep(MAX_HORIZON_POLL_INTERVAL_SECS * 2) l1stat = seqrpc.alp_l1status() elapsed_time = l1stat['last_update'] // 1000 + print("elapsed", elapsed_time) + # check if L1 reader is seeing new L1 activity - assert((elapsed_time - curr_time) >= MAX_HORIZON_POLL_INTERVAL_SECS * 2) + assert elapsed_time - cur_time >= MAX_HORIZON_POLL_INTERVAL_SECS * 2, "time not flowing properly" diff --git a/functional-tests/fn_btcio_read_reorg.py b/functional-tests/fn_btcio_read_reorg.py index 256f32158..2fab379bd 100644 --- a/functional-tests/fn_btcio_read_reorg.py +++ b/functional-tests/fn_btcio_read_reorg.py @@ -6,10 +6,11 @@ from constants import BLOCK_GENERATION_INTERVAL_SECS, SEQ_SLACK_TIME_SECS from entry import BasicEnvConfig +REORG_DEPTH = 3 + @flexitest.register class L1ReadReorgTest(flexitest.Test): - REORG_DEPTH = 3 def __init__(self, ctx: flexitest.InitContext): # standalone env for this test as it involves mutating the blockchain via invalidation @@ -24,17 +25,20 @@ def main(self, ctx: flexitest.RunContext): # We need at least `REORG_DEPTH` + 1 or more blocks # to invalidate `REORG_DEPTH` blocks at the end. - wait_time = BLOCK_GENERATION_INTERVAL_SECS * (self.REORG_DEPTH + 1) + SEQ_SLACK_TIME_SECS + wait_time = BLOCK_GENERATION_INTERVAL_SECS * (REORG_DEPTH + 1) + SEQ_SLACK_TIME_SECS time.sleep(wait_time) l1stat = seqrpc.alp_l1status() - height_to_invalidate_from = int(l1stat["cur_height"]) - self.REORG_DEPTH + height_to_invalidate_from = int(l1stat["cur_height"]) - REORG_DEPTH + print("height to invalidate from", height_to_invalidate_from) block_to_invalidate_from = btcrpc.proxy.getblockhash(height_to_invalidate_from) to_be_invalid_block = seqrpc.alp_getL1blockHash(height_to_invalidate_from + 1) + print("invalidating block", to_be_invalid_block) btcrpc.proxy.invalidateblock(block_to_invalidate_from) # Wait for at least 1 block to be added after invalidating `REORG_DEPTH` blocks. time.sleep(BLOCK_GENERATION_INTERVAL_SECS * 1 + SEQ_SLACK_TIME_SECS) block_from_invalidated_height = seqrpc.alp_getL1blockHash(height_to_invalidate_from + 1) + print("now have block", block_from_invalidated_height) - assert to_be_invalid_block != block_from_invalidated_height, "Expected reorg from 3rd Block" + assert to_be_invalid_block != block_from_invalidated_height, "Expected reorg from block 3" diff --git a/functional-tests/fn_sync_genesis.py b/functional-tests/fn_sync_genesis.py index a5da436ef..3c5083dc9 100644 --- a/functional-tests/fn_sync_genesis.py +++ b/functional-tests/fn_sync_genesis.py @@ -4,7 +4,7 @@ import flexitest UNSET_ID = "0000000000000000000000000000000000000000000000000000000000000000" - +MAX_GENESIS_TRIES = 10 @flexitest.register class SyncGenesisTest(flexitest.Test): @@ -18,12 +18,37 @@ def main(self, ctx: flexitest.RunContext): # create both btc and sequencer RPC seqrpc = seq.create_rpc() - time.sleep(1) + time.sleep(3) - stat = None - for _ in range(10): + # Wait until genesis. This might need to be tweaked if we change how + # long we wait for genesis in tests. + tries = 0 + last_slot = None + while True: + if tries > MAX_GENESIS_TRIES: + assert False, "did not observe genesis before timeout" + + print("waiting for genesis") stat = seqrpc.alp_clientStatus() print(stat) - time.sleep(1) + if stat["finalized_blkid"] != UNSET_ID: + last_slot = stat["chain_tip_slot"] + print("observed genesis, now at slot", last_slot) + break + + time.sleep(0.5) + print("waiting for genesis... -- tries", tries) - assert stat["finalized_blkid"] != UNSET_ID, "did not notice genesis" + assert last_slot is not None, "last slot never set" + + # Make sure we're making progress. + stat = None + for _ in range(5): + time.sleep(3) + stat = seqrpc.alp_clientStatus() + tip_slot = stat["chain_tip_slot"] + tip_blkid = stat["chain_tip"] + print("cur tip slot", tip_slot, "blkid", tip_blkid) + assert tip_slot >= last_slot, "cur slot went backwards" + assert tip_slot > last_slot, "seem to not be making progress" + last_slot = tip_slot diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8ee25bed9..b7e6eae06 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,10 +1,11 @@ [toolchain] -channel = "nightly-2024-07-09" +channel = "nightly-2024-07-27" components = [ "cargo", "clippy", "rustc", "rustfmt", + "rust-analyzer", "rust-docs", "rust-src", "rust-std", diff --git a/sequencer/src/l1_reader.rs b/sequencer/src/l1_reader.rs index eb45211fa..3cc34a9ea 100644 --- a/sequencer/src/l1_reader.rs +++ b/sequencer/src/l1_reader.rs @@ -16,7 +16,7 @@ use alpen_express_primitives::params::Params; use crate::config::Config; pub async fn start_reader_tasks( - params: &Params, + params: Arc, config: &Config, rpc_client: impl L1Client, db: Arc, @@ -32,9 +32,10 @@ where // TODO switch to checking the L1 tip in the consensus/client state let l1prov = db.l1_provider().clone(); - let current_block_height = l1prov + let target_next_block = l1prov .get_chain_tip()? - .unwrap_or(params.rollup().horizon_l1_height - 1); + .map(|i| i + 1) + .unwrap_or(params.rollup().horizon_l1_height); let config = Arc::new(ReaderConfig { max_reorg_depth: config.sync.max_reorg_depth, @@ -45,13 +46,14 @@ where let _reader_handle = tokio::spawn(bitcoin_data_reader_task( rpc_client, ev_tx, - current_block_height, + target_next_block, config.clone(), l1_status.clone(), )); let l1db = db.l1_store().clone(); let _sedb = db.sync_event_store().clone(); - let _handler_handle = thread::spawn(move || bitcoin_data_handler_task(l1db, csm_ctl, ev_rx)); + let _handler_handle = + thread::spawn(move || bitcoin_data_handler_task(l1db, csm_ctl, ev_rx, params)); Ok(()) } diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 9976a732d..719c6adc8 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -21,8 +21,8 @@ use tracing::*; use alpen_express_btcio::rpc::traits::L1Client; use alpen_express_common::logging; -use alpen_express_consensus_logic::duties::{DutyBatch, Identity}; -use alpen_express_consensus_logic::duty_executor::{self, IdentityData, IdentityKey}; +use alpen_express_consensus_logic::duty::types::{DutyBatch, Identity, IdentityData, IdentityKey}; +use alpen_express_consensus_logic::duty::worker::{self as duty_worker}; use alpen_express_consensus_logic::sync_manager; use alpen_express_consensus_logic::sync_manager::SyncManager; use alpen_express_db::traits::Database; @@ -199,17 +199,13 @@ fn main_inner(args: Args) -> anyhow::Result<()> { // Spawn the two tasks. thread::spawn(move || { // FIXME figure out why this can't infer the type, it's like *right there* - duty_executor::duty_tracker_task::<_>(cu_rx, duties_tx, idata.ident, db) + duty_worker::duty_tracker_task::<_>(cu_rx, duties_tx, idata.ident, db) }); + + let d_params = params.clone(); thread::spawn(move || { - duty_executor::duty_dispatch_task( - duties_rx, - idata.key, - sm, - db2, - eng_ctl_de, - pool, - params.rollup(), + duty_worker::duty_dispatch_task( + duties_rx, idata.key, sm, db2, eng_ctl_de, pool, d_params, ) }); } @@ -240,7 +236,7 @@ where // Start the L1 tasks to get that going. let csm_ctl = sync_man.get_csm_ctl(); l1_reader::start_reader_tasks( - sync_man.params(), + sync_man.get_params(), config, l1_rpc_client, database.clone(), diff --git a/sequencer/src/rpc_server.rs b/sequencer/src/rpc_server.rs index 6dfbf4586..cd66e3d57 100644 --- a/sequencer/src/rpc_server.rs +++ b/sequencer/src/rpc_server.rs @@ -1,8 +1,9 @@ #![allow(unused)] -use std::sync::Arc; +use std::{borrow::BorrowMut, sync::Arc}; use alpen_express_consensus_logic::sync_manager::SyncManager; +use alpen_express_primitives::buf::Buf32; use async_trait::async_trait; use jsonrpsee::{ core::RpcResult, @@ -23,6 +24,7 @@ use alpen_express_db::traits::{ChainstateProvider, Database, L2DataProvider}; use alpen_express_rpc_api::{AlpenApiServer, ClientStatus, L1Status}; use alpen_express_state::{ chain_state::ChainState, client_state::ClientState, header::L2Header, id::L2BlockId, + l1::L1BlockId, }; use tracing::*; @@ -189,23 +191,23 @@ where } async fn get_l1_block_hash(&self, height: u64) -> RpcResult { - let block_manifest = self - .database - .l1_provider() - .get_block_manifest(height) - .unwrap() - .unwrap(); - - Ok(format!("{:?}", block_manifest.block_hash())) + // FIXME this used to panic and take down core services making the test + // hang, but it now it's just returns the wrong data without crashing + match self.database.l1_provider().get_block_manifest(height) { + Ok(Some(mf)) => Ok(mf.block_hash().to_string()), + Ok(None) => Ok("".to_string()), + Err(e) => Ok(e.to_string()), + } } async fn get_client_status(&self) -> RpcResult { let state = self.get_client_state().await; - let Some(last_l1) = state.recent_l1_block() else { - warn!("last L1 block not set in client state, returning still not started"); - return Err(Error::ClientNotStarted.into()); - }; + let last_l1 = state.most_recent_l1_block().copied().unwrap_or_else(|| { + // TODO figure out a better way to do this + warn!("last L1 block not set in client state, returning zero"); + L1BlockId::from(Buf32::zero()) + }); // Copy these out of the sync state, if they're there. let (chain_tip, finalized_blkid) = state @@ -213,11 +215,25 @@ where .map(|ss| (*ss.chain_tip_blkid(), *ss.finalized_blkid())) .unwrap_or_default(); + // FIXME make this load from cache, and put the data we actually want + // here in the client state + // FIXME error handling + let db = self.database.clone(); + let slot: u64 = wait_blocking("load_cur_block", move || { + let l2_prov = db.l2_provider(); + l2_prov + .get_block_data(chain_tip) + .map(|b| b.map(|b| b.header().blockidx()).unwrap_or(u64::MAX)) + .map_err(Error::from) + }) + .await?; + Ok(ClientStatus { chain_tip: *chain_tip.as_ref(), + chain_tip_slot: slot, finalized_blkid: *finalized_blkid.as_ref(), last_l1_block: *last_l1.as_ref(), - buried_l1_height: state.buried_l1_height(), + buried_l1_height: state.l1_view().buried_l1_height(), }) } }