Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly create L1 segment from sequencer's view of L1 #116

Merged
merged 27 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
320b7f9
state: added start of `StateCache` type, tweaked chain state
delbonis Jul 11, 2024
a99b1d1
consensus-logic, state: laid out more of basic chain tsn verification…
delbonis Jul 12, 2024
e948a67
consensus-logic, state: fix some warnings
delbonis Jul 26, 2024
d2eabfe
meta: bump nightly version
delbonis Jul 29, 2024
b8dee7d
consensus-logic: reorganize duty modules
delbonis Jul 29, 2024
3a8b992
consensus-logic: added more context when assembling blocks
delbonis Jul 29, 2024
fde8d2a
consensus-logic: refactoring to compute state root during block assembly
delbonis Jul 29, 2024
8d69306
test-utils: fix `ArbitraryGenerator` randomness
delbonis Aug 1, 2024
45e9ec9
consensus-logic, state: mostly implemented L1 segment assembly
delbonis Aug 1, 2024
c9e8e1b
test/scripts: tweak logging verbosity
delbonis Aug 2, 2024
2cb5806
consensus-logic, state: reworking L1 block inclusion rules
delbonis Aug 2, 2024
e6da788
improved L1 reader bookkeeping to ingest blocks more robustly
delbonis Aug 2, 2024
259984e
consensus-logic: re-add burying L1 blocks in client state
delbonis Aug 2, 2024
9baafa8
test/scripts: tweak log params some more
delbonis Aug 2, 2024
6079bb3
consensus-logic: fix some final broken pieces in block assembly after…
delbonis Aug 2, 2024
d691cb0
primitives: removed unnecessary import
delbonis Aug 5, 2024
989335e
test: add more logging to some tests
delbonis Aug 5, 2024
15417ad
rpc/api, test: improved genesis sync test, added cur slot to client s…
delbonis Aug 5, 2024
5d2d1b4
btcio/reader: fixed messed up init bookkeeping
delbonis Aug 5, 2024
b5677bc
consensus-logic, state: fix some warnings
delbonis Aug 6, 2024
3a53144
state: basic `MatureL1Block` state op impl
delbonis Aug 6, 2024
983d9d7
consensus-logic, state: added impl for L1 block operations
delbonis Aug 7, 2024
347eecf
consensus-logic: fix clippy lint
delbonis Aug 7, 2024
8e4bec3
state: fix formatting error?
delbonis Aug 7, 2024
46e481a
ci: update .codespellrc
delbonis Aug 7, 2024
6b868f5
consensus-logic, state, test-utils: changed how we track the next exp…
delbonis Aug 7, 2024
c53fe9d
consensus-logic: partial fix for L1 bookkeeping moving blocks into th…
delbonis Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[codespell]
skip = .git,target,Cargo.toml,Cargo.lock,mutants*
ignore-words-list = crate,ser
ignore-words-list = crate,ser,ment
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 61 additions & 49 deletions crates/btcio/src/reader/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ fn filter_interesting_txs(block: &Block) -> Vec<u32> {
/// State we use in various parts of the reader.
#[derive(Debug)]
struct ReaderState {
/// The highest block in the chain, at `.back()` of queue.
cur_height: u64,
/// The highest block in the chain, at `.back()` of queue + 1.
next_height: u64,

/// The `.back()` of this should have the same height as cur_height.
recent_blocks: VecDeque<BlockHash>,

/// Depth at which we start pulling recent blocks out of the front of the queue.
max_depth: usize,
}

impl ReaderState {
fn new(cur_height: u64, max_depth: usize, recent_blocks: VecDeque<BlockHash>) -> Self {
/// Constructs a new reader state instance using some context about how we
/// want to manage it.
fn new(next_height: u64, max_depth: usize, recent_blocks: VecDeque<BlockHash>) -> Self {
assert!(!recent_blocks.is_empty());
Self {
cur_height,
next_height,
max_depth,
recent_blocks,
}
Expand All @@ -45,7 +48,17 @@ impl ReaderState {
self.recent_blocks.back().unwrap()
}

/// Accepts a new block and purges a buried one.
fn best_block_idx(&self) -> u64 {
self.next_height - 1
}

/// Returns the idx of the deepest block in the reader state.
#[allow(unused)]
fn deepest_block(&self) -> u64 {
self.best_block_idx() - self.recent_blocks.len() as u64 + 1
}

/// Accepts a new block and possibly purges a buried one.
fn accept_new_block(&mut self, blkhash: BlockHash) -> Option<BlockHash> {
let ret = if self.recent_blocks.len() > self.max_depth {
Some(self.recent_blocks.pop_front().unwrap())
Expand All @@ -54,46 +67,41 @@ impl ReaderState {
};

self.recent_blocks.push_back(blkhash);
self.cur_height += 1;
self.next_height += 1;
ret
}

#[allow(unused)]
/// Gets the blockhash of the given height, if we have it.
#[allow(unused)]
pub fn get_height_blkid(&self, height: u64) -> Option<&BlockHash> {
if height > self.cur_height {
if height >= self.next_height {
return None;
}

if height < self.deepest_block() {
return None;
}

let back_off = self.cur_height - height;
let idx = self.recent_blocks.len() as u64 - back_off - 1;
Some(&self.recent_blocks[idx as usize])
}
#[allow(unused)]
fn deepest_block(&self) -> u64 {
self.cur_height - self.recent_blocks.len() as u64 - 1
let off = height - self.deepest_block();
Some(&self.recent_blocks[off as usize])
}

fn revert_tip(&mut self) -> Option<BlockHash> {
if !self.recent_blocks.is_empty() {
let back = self.recent_blocks.pop_back().unwrap();
self.cur_height -= 1;
self.next_height -= 1;
Some(back)
} else {
None
}
}

fn rollback_to_height(&mut self, new_height: u64) -> Vec<BlockHash> {
if new_height > self.cur_height {
if new_height > self.next_height {
panic!("reader: new height greater than cur height");
}

let rollback_cnt = self.cur_height - new_height;
let rollback_cnt = self.best_block_idx() - new_height;
if rollback_cnt >= self.recent_blocks.len() as u64 {
panic!("reader: tried to rollback past deepest block");
}
Expand All @@ -106,34 +114,35 @@ impl ReaderState {

// More sanity checks.
assert!(!self.recent_blocks.is_empty());
assert_eq!(self.cur_height, new_height);
assert_eq!(self.best_block_idx(), new_height);

buf
}

/// Iterates over the blocks back from the tip, giving both the height and
/// the blockhash to compare against the chain.
fn iter_blocks_back(&self) -> impl Iterator<Item = (u64, &BlockHash)> {
let best_blk_idx = self.best_block_idx();
self.recent_blocks
.iter()
.rev()
.enumerate()
.map(|(i, b)| (self.cur_height - i as u64, b))
.map(move |(i, b)| (best_blk_idx - i as u64, b))
}
}

pub async fn bitcoin_data_reader_task(
client: impl L1Client,
event_tx: mpsc::Sender<L1Event>,
cur_block_height: u64,
target_next_block: u64,
config: Arc<ReaderConfig>,
l1_status: Arc<RwLock<L1Status>>,
) {
let mut status_updates = Vec::new();
if let Err(e) = do_reader_task(
&client,
&event_tx,
cur_block_height,
target_next_block,
config,
&mut status_updates,
l1_status.clone(),
Expand All @@ -147,17 +156,17 @@ pub async fn bitcoin_data_reader_task(
async fn do_reader_task(
client: &impl L1Client,
event_tx: &mpsc::Sender<L1Event>,
cur_block_height: u64,
target_next_block: u64,
config: Arc<ReaderConfig>,
status_updates: &mut Vec<StatusUpdate>,
l1_status: Arc<RwLock<L1Status>>,
) -> anyhow::Result<()> {
info!(%cur_block_height, "started L1 reader task!");
info!(%target_next_block, "started L1 reader task!");

let poll_dur = Duration::from_millis(config.client_poll_dur_ms as u64);

let mut state = init_reader_state(
cur_block_height,
target_next_block,
config.max_reorg_depth as usize * 2,
client,
)
Expand All @@ -168,14 +177,14 @@ async fn do_reader_task(
// FIXME This function will return when reorg happens when there are not
// enough elements in the vec deque, probably during startup.
loop {
let cur_height = state.cur_height;
let poll_span = debug_span!("l1poll", %cur_height);
let cur_best_height = state.best_block_idx();
let poll_span = debug_span!("l1poll", %cur_best_height);

if let Err(err) = poll_for_new_blocks(client, event_tx, &config, &mut state, status_updates)
.instrument(poll_span)
.await
{
warn!(%cur_height, err = %err, "failed to poll Bitcoin client");
warn!(%cur_best_height, err = %err, "failed to poll Bitcoin client");
status_updates.push(StatusUpdate::RpcError(err.to_string()));

if let Some(err) = err.downcast_ref::<reqwest::Error>() {
Expand Down Expand Up @@ -205,22 +214,23 @@ async fn do_reader_task(

/// Inits the reader state by trying to backfill blocks up to a target height.
async fn init_reader_state(
target_block: u64,
target_next_block: u64,
lookback: usize,
client: &impl L1Client,
) -> anyhow::Result<ReaderState> {
// Init the reader state using the blockid we were given, fill in a few blocks back.
debug!(%target_block, "initializing reader state");
debug!(%target_next_block, "initializing reader state");
let mut init_queue = VecDeque::new();

// Do some math to figure out where our start and end are.
// TODO something screwed up with bookkeeping here
let chain_info = client.get_blockchain_info().await?;
let start_height = i64::max(target_block as i64 - lookback as i64, 0) as u64;
let end_height = u64::min(target_block, chain_info.blocks);
let start_height = i64::max(target_next_block as i64 - lookback as i64, 0) as u64;
let end_height = u64::min(target_next_block - 1, chain_info.blocks);
debug!(%start_height, %end_height, "queried L1 client, have init range");
sapinb marked this conversation as resolved.
Show resolved Hide resolved

// Loop through the range we've determined to be okay and pull the blocks
// in.
// Loop through the range we've determined to be okay and pull the blocks we want to look back
// through in.
let mut real_cur_height = start_height;
for height in start_height..=end_height {
let blkid = client.get_block_hash(height).await?;
Expand All @@ -229,7 +239,7 @@ async fn init_reader_state(
real_cur_height = height;
}

let state = ReaderState::new(real_cur_height, lookback, init_queue);
let state = ReaderState::new(real_cur_height + 1, lookback, init_queue);
Ok(state)
}

Expand Down Expand Up @@ -257,7 +267,7 @@ async fn poll_for_new_blocks(

// First, check for a reorg if there is one.
if let Some((pivot_height, pivot_blkid)) = find_pivot_block(client, state).await? {
if pivot_height < state.cur_height {
if pivot_height < state.best_block_idx() {
info!(%pivot_height, %pivot_blkid, "found apparent reorg");
state.rollback_to_height(pivot_height);
let revert_ev = L1Event::RevertTo(pivot_height);
Expand All @@ -274,9 +284,9 @@ async fn poll_for_new_blocks(
debug!(%client_height, "have new blocks");

// Now process each block we missed.
let scan_start_height = state.cur_height + 1;
let scan_start_height = state.next_height;
for fetch_height in scan_start_height..=client_height {
let blkid =
let l1blkid =
match fetch_and_process_block(fetch_height, client, event_tx, state, status_updates)
.await
{
Expand All @@ -286,7 +296,7 @@ async fn poll_for_new_blocks(
break;
}
};
info!(%fetch_height, %blkid, "accepted new block");
info!(%fetch_height, %l1blkid, "accepted new block");
}

Ok(())
Expand All @@ -298,16 +308,16 @@ async fn find_pivot_block(
client: &impl L1Client,
state: &ReaderState,
) -> anyhow::Result<Option<(u64, BlockHash)>> {
for (height, blkid) in state.iter_blocks_back() {
for (height, l1blkid) in state.iter_blocks_back() {
// If at genesis, we can't reorg any farther.
if height == 0 {
return Ok(Some((height, *blkid)));
return Ok(Some((height, *l1blkid)));
}

let queried_blkid = client.get_block_hash(height).await?;
trace!(%height, %blkid, %queried_blkid, "comparing blocks to find pivot");
if queried_blkid == *blkid {
return Ok(Some((height, *blkid)));
let queried_l1blkid = client.get_block_hash(height).await?;
trace!(%height, %l1blkid, %queried_l1blkid, "comparing blocks to find pivot");
if queried_l1blkid == *l1blkid {
return Ok(Some((height, *l1blkid)));
}
}

Expand All @@ -322,20 +332,22 @@ async fn fetch_and_process_block(
status_updates: &mut Vec<StatusUpdate>,
) -> anyhow::Result<BlockHash> {
let block = client.get_block_at(height).await?;
let txs = block.txdata.len();

let filtered_txs = filter_interesting_txs(&block);
let block_data = BlockData::new(height, block, filtered_txs);
let blkid = block_data.block().block_hash();
let l1blkid = block_data.block().block_hash();
trace!(%l1blkid, %height, %txs, "fetched block from client");

status_updates.push(StatusUpdate::CurHeight(height));
status_updates.push(StatusUpdate::CurTip(blkid.to_string()));
status_updates.push(StatusUpdate::CurTip(l1blkid.to_string()));
if let Err(e) = event_tx.send(L1Event::BlockData(block_data)).await {
error!("failed to submit L1 block event, did the persistence task crash?");
return Err(e.into());
}

// Insert to new block, incrementing cur_height.
let _deep = state.accept_new_block(blkid);
let _deep = state.accept_new_block(l1blkid);

Ok(blkid)
Ok(l1blkid)
}
1 change: 0 additions & 1 deletion crates/consensus-logic/src/block_assembly.rs

This file was deleted.

Loading
Loading