Skip to content

Commit

Permalink
add block processed data to persistence wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvious committed Jun 12, 2024
1 parent bc57c0e commit b6b9719
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(unreachable_code)]
use std::{io::Read, path::PathBuf, str::FromStr, sync::Arc};
use std::{fmt::format, io::Read, path::PathBuf, str::FromStr, sync::Arc};

use eo_listener::{BlocksProcessed, EoServer as EoListener, EoServerError};
use futures::StreamExt;
Expand All @@ -25,7 +25,7 @@ use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
use web3::types::BlockNumber;
use web3::types::{BlockNumber, U64};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -67,13 +67,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let eo_client = Arc::new(Mutex::new(
setup_eo_client(web3_instance.clone(), sk).await?,
));
let inner_eo_server =
setup_eo_server(web3_instance.clone(), &block_processed_path).map_err(Box::new)?;
log::error!("inner_eo_server established with blocks processed path");

const TIKV_CLIENT_PD_ENDPOINT: &str = "127.0.0.1:2379";
let tikv_client = TikvClient::new(vec![TIKV_CLIENT_PD_ENDPOINT]).await?;

let inner_eo_server = setup_eo_server(
web3_instance.clone(),
&block_processed_path,
tikv_client.clone(),
)
.await
.map_err(Box::new)?;
log::error!("inner_eo_server established with blocks processed path");

#[cfg(feature = "local")]
let bundler: OciBundler<String, String> = OciBundlerBuilder::default()
.runtime("/usr/local/bin/runsc".to_string())
Expand Down Expand Up @@ -509,9 +515,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

fn setup_eo_server(
async fn setup_eo_server(
web3_instance: web3::Web3<web3::transports::Http>,
path: &str,
tikv_client: TikvClient,
) -> Result<EoListener, EoServerError> {
// Initialize the ExecutableOracle Address
//0x5FbDB2315678afecb367f032d93F642f64180aa3
Expand All @@ -532,6 +539,7 @@ fn setup_eo_server(
let mut buf = Vec::new();
let mut file = std::fs::OpenOptions::new()
.read(true)
.create(true) // Creates `blocks_processed.dat` ONLY if not found.
.open(path)
.map_err(|e| EoServerError::Other(e.to_string()))?;

Expand All @@ -541,14 +549,36 @@ fn setup_eo_server(
let blocks_processed: BlocksProcessed =
bincode::deserialize(&buf).map_err(|e| EoServerError::Other(e.to_string()))?;

log::error!("blocks processed: {:?}", blocks_processed);

let bridge_from_block = if let Some(b) = blocks_processed.bridge {
b
let key = "bridge_from_block".to_string();
if let Some(value) = bincode::serialize(&b).ok() {
if tikv_client.put(key, value).await.is_ok() {
log::error!("Updated block_from_bridge {b} in persistence store")
}
Some(b)
} else {
log::error!("failed to serialize `bridge_from_block`");
Some(b)
}
} else {
web3::types::U64::from(0)
let key = "bridge_from_block".to_string();
tikv_client
.get(key)
.await
.typecast()
.log_err(|e| e.to_string())
.flatten()
.and_then(|returned_data| {
bincode::deserialize(&returned_data)
.typecast()
.log_err(|e| e)
.and_then(|b: U64| {
log::error!("retrieved `bridge_from_block` from persistence store: {b:?}");
Some(b)
})
})
};
log::error!("bridge from block: {}", bridge_from_block);
log::error!("bridge from block: {:?}", bridge_from_block);

let settle_from_block = if let Some(b) = blocks_processed.settle {
b
Expand All @@ -568,7 +598,7 @@ fn setup_eo_server(
.build();

let bridge_filter = web3::types::FilterBuilder::default()
.from_block(BlockNumber::Number(bridge_from_block))
.from_block(BlockNumber::Number(bridge_from_block.unwrap_or_default()))
.to_block(BlockNumber::Latest)
.address(vec![contract_address])
.topics(bridge_topic.clone(), None, None, None)
Expand Down Expand Up @@ -610,7 +640,7 @@ fn setup_eo_server(
.bridge_topic(bridge_topic)
.blob_settled_topic(blob_settled_topic)
.bridge_filter(bridge_filter)
.current_bridge_filter_block(bridge_from_block)
.current_bridge_filter_block(bridge_from_block.unwrap_or_default())
.current_blob_settlement_filter_block(settle_from_block)
.blob_settled_filter(blob_settled_filter)
.blob_settled_event(blob_settled_event)
Expand Down

0 comments on commit b6b9719

Please sign in to comment.