Skip to content

Commit

Permalink
Add option to specify start block (#3171)
Browse files Browse the repository at this point in the history
* add start block

* improve signal handling
  • Loading branch information
kziemianek authored Nov 7, 2024
1 parent 0fd2f74 commit 1f9ae11
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 53 deletions.
1 change: 1 addition & 0 deletions tee-worker/omni-executor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 54 additions & 45 deletions tee-worker/omni-executor/executor-core/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ impl<
log::debug!("Starting sync from {:?}", block_number_to_sync);

'main: loop {
log::info!("Syncing block: {}", block_number_to_sync);
if self.stop_signal.try_recv().is_ok() {
break;
}
Expand Down Expand Up @@ -135,24 +134,53 @@ impl<
None => false,
};

let mut sync_error = false;

if last_finalized_block >= block_number_to_sync {
if let Ok(events) =
self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync))
{
for event in events {
let event_id = event.get_event_id().clone();
if let Some(ref checkpoint) =
self.checkpoint_repository.get().expect("Could not read checkpoint")
{
if checkpoint.lt(&event.get_event_id().clone().into()) {
log::info!("Syncing block: {}", block_number_to_sync);
match self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) {
Ok(events) => {
for event in events {
let event_id = event.get_event_id().clone();
if let Some(ref checkpoint) =
self.checkpoint_repository.get().expect("Could not read checkpoint")
{
if checkpoint.lt(&event.get_event_id().clone().into()) {
log::info!("Handling event: {:?}", event_id);
if let Err(e) = self
.handle
.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!("Non-recoverable intent handling error, event: {:?}", event_id);
break 'main;
},
Error::RecoverableError => {
error!(
"Recoverable intent handling error, event: {:?}",
event_id
);
continue 'main;
},
}
}
} else {
log::debug!("Skipping event");
}
} else {
log::info!("Handling event: {:?}", event_id);
if let Err(e) =
self.handle.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!("Non-recoverable intent handling error, event: {:?}", event_id);
error!(
"Non-recoverable intent handling error, event: {:?}",
event_id
);
break 'main;
},
Error::RecoverableError => {
Expand All @@ -164,47 +192,28 @@ impl<
},
}
}
} else {
log::debug!("Skipping event");
}
} else {
log::info!("Handling event: {:?}", event_id);
if let Err(e) =
self.handle.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!(
"Non-recoverable intent handling error, event: {:?}",
event_id
);
break 'main;
},
Error::RecoverableError => {
error!(
"Recoverable intent handling error, event: {:?}",
event_id
);
continue 'main;
},
}
}
self.checkpoint_repository
.save(event_id.into())
.expect("Could not save checkpoint");
}
// we processed block completely so store new checkpoint
self.checkpoint_repository
.save(event_id.into())
.save(CheckpointT::from(block_number_to_sync))
.expect("Could not save checkpoint");
}
// we processed block completely so store new checkpoint
self.checkpoint_repository
.save(CheckpointT::from(block_number_to_sync))
.expect("Could not save checkpoint");
log::info!("Finished syncing block: {}", block_number_to_sync);
block_number_to_sync += 1;
log::info!("Finished syncing block: {}", block_number_to_sync);
block_number_to_sync += 1;
},
Err(e) => {
log::error!("Could not get block {} events: {:?}", block_number_to_sync, e);
sync_error = true;
},
}
} else {
log::trace!("Block: {} not yet finalized", block_number_to_sync);
}

if !fast {
if !fast || sync_error {
sleep(Duration::from_secs(1))
} else {
log::trace!("Fast sync skipping 1s wait");
Expand Down
2 changes: 1 addition & 1 deletion tee-worker/omni-executor/executor-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ log = { workspace = true }
parentchain-listener = { path = "../parentchain/listener" }
scale-encode = { workspace = true }
serde_json = "1.0.127"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }

[lints]
workspace = true
1 change: 1 addition & 0 deletions tee-worker/omni-executor/executor-worker/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ use clap::Parser;
pub struct Cli {
pub parentchain_url: String,
pub ethereum_url: String,
pub start_block: u64,
}
17 changes: 13 additions & 4 deletions tee-worker/omni-executor/executor-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::io::Write;
use std::thread::JoinHandle;
use std::{fs, thread};
use tokio::runtime::Handle;
use tokio::signal;
use tokio::sync::oneshot;

mod cli;
Expand All @@ -48,17 +49,25 @@ async fn main() -> Result<(), ()> {
error!("Could not create data dir: {:?}", e);
})?;

listen_to_parentchain(cli.parentchain_url, cli.ethereum_url)
listen_to_parentchain(cli.parentchain_url, cli.ethereum_url, cli.start_block)
.await
.unwrap()
.join()
.unwrap();

match signal::ctrl_c().await {
Ok(()) => {},
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
// we also shut down in case of error
},
}

Ok(())
}

async fn listen_to_parentchain(
parentchain_url: String,
ethereum_url: String,
start_block: u64,
) -> Result<JoinHandle<()>, ()> {
let (_sub_stop_sender, sub_stop_receiver) = oneshot::channel();
let ethereum_intent_executor =
Expand All @@ -75,6 +84,6 @@ async fn listen_to_parentchain(

Ok(thread::Builder::new()
.name("litentry_rococo_sync".to_string())
.spawn(move || parentchain_listener.sync(0))
.spawn(move || parentchain_listener.sync(start_block))
.unwrap())
}
11 changes: 8 additions & 3 deletions tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::primitives::{BlockEvent, EventId};
use async_trait::async_trait;
use log::error;
use log::{error, info};
use parity_scale_codec::Encode;
use std::marker::PhantomData;
use std::ops::Deref;
Expand Down Expand Up @@ -67,9 +67,14 @@ impl<ChainConfig: Config<AccountId = AccountId32>> SubstrateRpcClient<ChainConfi
}
}
async fn get_block_events(&mut self, block_num: u64) -> Result<Vec<BlockEvent>, ()> {
match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|_| ())? {
info!("Getting block {} events", block_num);
match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|e| {
error!("Error getting block {} hash: {:?}", block_num, e);
})? {
Some(hash) => {
let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|_| ())?;
let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|e| {
error!("Error getting block {} events: {:?}", block_num, e);
})?;
Ok(events
.iter()
.enumerate()
Expand Down

0 comments on commit 1f9ae11

Please sign in to comment.