Skip to content

Commit

Permalink
[relayer] Refactor relayer to resolve the sync process block issue (#…
Browse files Browse the repository at this point in the history
…2667)

* [relayer] Refactor relayer to resolve the sync process block issue

* fixup

* add btc-sync-block-interval to deploy testnet script

* fixup

* fixup
  • Loading branch information
jolestar authored Sep 22, 2024
1 parent 3ed1227 commit 8921cd6
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
25 changes: 21 additions & 4 deletions crates/rooch-relayer/src/actor/bitcoin_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ impl BitcoinRelayer {
sync_block_interval,
latest_sync_timestamp: 0u64,
sync_to_latest: false,
batch_size: 10,
batch_size: 5,
})
}

async fn sync_block(&mut self) -> Result<()> {
if !self.buffer.is_empty() {
if self.buffer.len() > self.batch_size {
return Ok(());
}
if self.sync_to_latest
Expand All @@ -76,7 +76,17 @@ impl BitcoinRelayer {
self.latest_sync_timestamp = chrono::Utc::now().timestamp() as u64;

let pending_block_module = self.move_caller.as_module_binding::<PendingBlockModule>();
let best_block_in_rooch = pending_block_module.get_best_block()?;
let best_block_in_rooch = if self.buffer.is_empty() {
pending_block_module.get_best_block()?
} else {
let last_block = self.buffer.last().unwrap();
let last_block_hash = last_block.header_info.hash;
let last_block_height = last_block.header_info.height;
Some(BlockHeightHash {
block_hash: last_block_hash.into_address(),
block_height: last_block_height as u64,
})
};
let best_block_hash_in_bitcoin = self.rpc_client.get_best_block_hash().await?;

//The start block is included
Expand Down Expand Up @@ -118,6 +128,7 @@ impl BitcoinRelayer {

let mut next_block_hash = start_block_hash;

let mut batch_count = 0;
while let Some(next_hash) = next_block_hash {
let header_info = self.rpc_client.get_block_header_info(next_hash).await?;
let block = self.rpc_client.get_block(next_hash).await?;
Expand All @@ -132,11 +143,17 @@ impl BitcoinRelayer {
);
break;
}
info!(
"BitcoinRelayer buffer block, height: {}, hash: {}",
next_block_height, header_info.hash
);
self.buffer.push(BlockResult { header_info, block });
if self.buffer.len() > self.batch_size {
if batch_count > self.batch_size {
break;
}
batch_count += 1;
}

Ok(())
}

Expand Down
68 changes: 61 additions & 7 deletions crates/rooch-relayer/src/actor/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ use async_trait::async_trait;
use coerce::actor::{context::ActorContext, message::Handler, Actor, LocalActorRef};
use move_core_types::vm_status::KeptVMStatus;
use moveos_eventbus::bus::EventData;
use moveos_types::module_binding::MoveFunctionCaller;
use rooch_config::{BitcoinRelayerConfig, EthereumRelayerConfig};
use rooch_event::actor::{EventActor, EventActorSubscribeMessage};
use rooch_event::event::ServiceStatusEvent;
use rooch_executor::proxy::ExecutorProxy;
use rooch_pipeline_processor::proxy::PipelineProcessorProxy;
use rooch_types::bitcoin::pending_block::PendingBlockModule;
use rooch_types::multichain_id::RoochMultiChainID;
use rooch_types::service_status::ServiceStatus;
use rooch_types::transaction::{L1BlockWithBody, L1Transaction};
use std::ops::Deref;
Expand Down Expand Up @@ -138,6 +141,45 @@ impl RelayerActor {
Ok(())
}

//We migrate this function from Relayer to here
//Becase the relayer actor will blocked when sync block
//TODO refactor the relayer, put the sync task in a separate actor
fn get_ready_l1_txs(&self, relayer: &RelayerProxy) -> Result<Vec<L1Transaction>> {
if relayer.is_bitcoin() {
self.get_ready_l1_txs_bitcoin()
} else {
Ok(vec![])
}
}

fn get_ready_l1_txs_bitcoin(&self) -> Result<Vec<L1Transaction>> {
let pending_block_module = self.executor.as_module_binding::<PendingBlockModule>();
let pending_txs = pending_block_module.get_ready_pending_txs()?;
match pending_txs {
Some(pending_txs) => {
let block_hash = pending_txs.block_hash;
let mut txs = pending_txs.txs;
if txs.len() > 1 {
// move coinbase tx to the end
let coinbase_tx = txs.remove(0);
txs.push(coinbase_tx);
}
let l1_txs = txs
.into_iter()
.map(|txid| {
L1Transaction::new(
RoochMultiChainID::Bitcoin.multichain_id(),
block_hash.to_vec(),
txid.to_vec(),
)
})
.collect();
Ok(l1_txs)
}
None => Ok(vec![]),
}
}

async fn sync(&mut self) {
let relayers = self.relayers.clone();
for relayer in relayers {
Expand All @@ -146,12 +188,19 @@ impl RelayerActor {
continue;
}
let relayer_name = relayer.name();
if let Err(e) = relayer.sync().await {
warn!("Relayer {} sync error: {:?}", relayer_name, e);
}

loop {
match relayer.get_ready_l1_txs().await {
// Notify the relayer to sync the latest block
// The sync task will block the relayer actor, but call sync() will not block this actor
// It a notify call.
if let Err(e) = relayer.sync().await {
warn!("Relayer {} sync error: {:?}", relayer_name, e);
}

let mut break_flag = false;

// Execute all ready l1 txs
match self.get_ready_l1_txs(&relayer) {
Ok(txs) => {
for tx in txs {
if let Err(err) = self.handle_l1_tx(tx).await {
Expand All @@ -161,9 +210,10 @@ impl RelayerActor {
}
Err(err) => {
warn!("Relayer {} error: {:?}", relayer_name, err);
break;
break_flag = true;
}
}

match relayer.get_ready_l1_block().await {
Ok(Some(l1_block)) => {
if let Err(err) = self.handle_l1_block(l1_block).await {
Expand All @@ -172,13 +222,17 @@ impl RelayerActor {
}
Ok(None) => {
//skip
break;
break_flag = true;
}
Err(err) => {
warn!("Relayer {} error: {:?}", relayer_name, err);
break;
break_flag = true;
}
}

if break_flag {
break;
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/rooch-relayer/src/actor/relayer_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ impl RelayerProxy {
Self::Ethereum(actor) => actor.send(GetReadyL1TxsMessage {}).await?,
}
}

pub fn is_bitcoin(&self) -> bool {
matches!(self, Self::Bitcoin(_))
}
}
1 change: 1 addition & 0 deletions scripts/deploy_rooch_testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ docker ps -a | grep rooch | grep -v faucet | awk '{print $1}' | xargs -r docker
docker pull "ghcr.io/rooch-network/rooch:$REF"
docker run -d --name rooch-testnet --restart unless-stopped -v /data:/root -p 6767:6767 -p 9184:9184 -e RUST_BACKTRACE=full "ghcr.io/rooch-network/rooch:$REF" \
server start -n test \
--btc-sync-block-interval 3 \
--btc-rpc-url "$BTC_TEST_RPC_URL" \
--btc-rpc-username rooch-test \
--btc-rpc-password "$BTC_TEST_RPC_PWD" \
Expand Down

0 comments on commit 8921cd6

Please sign in to comment.