From 5bcd89c6e760ee19275d554ce08c9dc2d5c42a43 Mon Sep 17 00:00:00 2001 From: quake Date: Wed, 14 Jul 2021 17:51:49 +0900 Subject: [PATCH] feat: add long fork detection --- src/indexer.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/service.rs | 37 ++++++++++++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/src/indexer.rs b/src/indexer.rs index 75382eb..5987944 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -560,6 +560,23 @@ where })) } + pub fn get_block_hash(&self, block_number: BlockNumber) -> Result, Error> { + let mut key_prefix_header = vec![KeyPrefix::Header as u8]; + key_prefix_header.extend_from_slice(&block_number.to_be_bytes()); + Ok( + match self + .store + .iter(&key_prefix_header, IteratorDirection::Forward)? + .next() + { + Some((key, _v)) if key.starts_with(&key_prefix_header) => { + Some(Byte32::from_slice(&key[9..]).expect("stored block key")) + } + _ => None, + }, + ) + } + pub fn prune(&self) -> Result<(), Error> { let (tip_number, _tip_hash) = self.tip()?.expect("stored tip"); if tip_number > self.keep_num { @@ -1475,4 +1492,32 @@ mod tests { .len() ); } + + #[test] + fn get_block_hash() { + let indexer = new_indexer::("get_block_hash"); + + let block_hashes: Vec = (0..10) + .map(|i| { + let cellbase = TransactionBuilder::default() + .input(CellInput::new_cellbase_input(i)) + .build(); + let block = BlockBuilder::default() + .transaction(cellbase) + .header(HeaderBuilder::default().number(i.pack()).build()) + .build(); + indexer.append(&block).unwrap(); + block.hash() + }) + .collect(); + + block_hashes.into_iter().enumerate().for_each(|(i, hash)| { + assert_eq!( + hash, + indexer.get_block_hash(i as BlockNumber).unwrap().unwrap() + ) + }); + + assert!(indexer.get_block_hash(10).unwrap().is_none()); + } } diff --git a/src/service.rs b/src/service.rs index 7775c80..b79bae8 100644 --- a/src/service.rs +++ b/src/service.rs @@ -72,7 +72,9 @@ impl Service { } pub async fn poll(&self, rpc_client: gen_client::Client) { - let indexer = Indexer::new(self.store.clone(), 100, 1000); + // assume that long fork will not happen >= 100 blocks. + let keep_num = 100; + let indexer = Indexer::new(self.store.clone(), keep_num, 1000); // 0.37.0 and above supports hex format let use_hex_format = loop { match rpc_client.local_node_info().await { @@ -98,8 +100,37 @@ impl Service { info!("append {}, {}", block.number(), block.hash()); indexer.append(&block).expect("append block should be OK"); } else { - info!("rollback {}, {}", tip_number, tip_hash); - indexer.rollback().expect("rollback block should be OK"); + // Long fork detection + let longest_fork_number = tip_number.saturating_sub(keep_num); + match get_block_by_number( + &rpc_client, + longest_fork_number, + use_hex_format, + ) + .await + { + Ok(Some(block)) => { + let stored_block_hash = indexer + .get_block_hash(longest_fork_number) + .expect("get block hash should be OK") + .expect("stored block header"); + if block.hash() != stored_block_hash { + error!("long fork detected, ckb-indexer stored block {} => {:#x}, ckb node returns block {} => {:#x}, please check if ckb-indexer is connected to the same network ckb node.", longest_fork_number, stored_block_hash, longest_fork_number, block.hash()); + thread::sleep(self.poll_interval); + } else { + info!("rollback {}, {}", tip_number, tip_hash); + indexer.rollback().expect("rollback block should be OK"); + } + } + Ok(None) => { + error!("long fork detected, ckb-indexer stored block {}, ckb node returns none, please check if ckb-indexer is connected to the same network ckb node.", longest_fork_number); + thread::sleep(self.poll_interval); + } + Err(err) => { + error!("cannot get block from ckb node, error: {}", err); + thread::sleep(self.poll_interval); + } + } } } Ok(None) => {