Skip to content

Commit

Permalink
feat(indexer): adding logs and manual indexing - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lucianHymer committed Jul 1, 2024
1 parent a7a2f03 commit 5320025
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 13 deletions.
10 changes: 5 additions & 5 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
let contract_address_op_mainnet = get_env("STAKING_CONTRACT_ADDRESS_OP_MAINNET")
.parse::<Address>()
.unwrap();

let contract_address_op_sepolia = get_env("STAKING_CONTRACT_ADDRESS_OP_SEPOLIA")
.parse::<Address>()
.unwrap();
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn run_ethereum_indexer(
contract_address,
)
.await?;
ethereum_staking_indexer.listen_with_timeout_reset().await
ethereum_staking_indexer.index_events().await
}

async fn run_optimism_indexer(
Expand All @@ -105,7 +105,7 @@ async fn run_optimism_indexer(
contract_address,
)
.await?;
optimism_staking_indexer.listen_with_timeout_reset().await
optimism_staking_indexer.index_events().await
}

async fn run_arbitrum_indexer(
Expand All @@ -127,7 +127,7 @@ async fn run_arbitrum_indexer(
contract_address,
)
.await?;
arbitrum_staking_indexer.listen_with_timeout_reset().await
arbitrum_staking_indexer.index_events().await
}


Expand All @@ -150,5 +150,5 @@ async fn run_optimism_sepolia_indexer(
contract_address,
)
.await?;
op_sepolia_staking_indexer.listen_with_timeout_reset().await
op_sepolia_staking_indexer.index_events().await
}
40 changes: 38 additions & 2 deletions indexer/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ pub struct PostgresClient {
pool: Pool,
}

pub struct ManualIndexRequest {
pub id: i32,
pub block_number: u64,
pub tx_hash: String,
}

impl PostgresClient {
pub async fn new() -> Result<Self, Error> {
let mut pg_config = tokio_postgres::Config::new();
Expand Down Expand Up @@ -56,7 +62,7 @@ impl PostgresClient {
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",&[&"SelfStake", &round_id, &staker, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!(
"Row inserted into registry_gtcstakeevent with type SelfStake for block {}!",
"Row inserted into registry_gtcstakeevent with type SelfStake for block {} for legacy contract!",
block_number
);
Ok(())
Expand All @@ -78,7 +84,7 @@ impl PostgresClient {
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, address, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&"Xstake", &round_id, &staker, &user, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!(
"Row inserted into registry_gtcstakeevent with type Xstake for block {}!",
"Row inserted into registry_gtcstakeevent with type Xstake for block {} for legacy contract!",
block_number
);
Ok(())
Expand All @@ -88,6 +94,36 @@ impl PostgresClient {
DateTime::from_timestamp(*unix_time as i64, 0).unwrap()
}

pub async fn get_pending_manual_index_requests(&self, chain_id: u32) -> Result<Vec<ManualIndexRequest>, Error> {
let client = self.pool.get().await.unwrap();
let raw_rows = client
.query("SELECT id, block_number, tx_hash FROM stake_manualindexrequest WHERE status = 'pending' AND chain_id = $1", &[&(chain_id as i32)])
.await?;

let mut rows = Vec::new();

for row in raw_rows {
let id: i32 = row.get("id");
let decimal_block_number: Decimal = row.get("block_number");
let block_number: u64 = decimal_block_number.to_u64().unwrap();
let tx_hash: String = row.get("tx_hash");

rows.push(ManualIndexRequest {
id,
block_number,
tx_hash,
});
}

Ok(rows)
}

pub async fn update_manual_index_request_status(&self, id: i32, status: &str) -> Result<(), Error> {
let client = self.pool.get().await.unwrap();
client.execute("UPDATE stake_manualindexrequest SET status = $1 WHERE id = $2", &[&status, &id]).await?;
Ok(())
}

pub async fn add_or_extend_stake(
&self,
event_type: &StakeEventType,
Expand Down
103 changes: 97 additions & 6 deletions indexer/src/staking_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,34 @@ impl<'a> StakingIndexer<'a> {
})
}

pub async fn index_events(&self) -> Result<()> {
match try_join!(
self.listen_with_timeout_reset(),
self.run_index_manual_requests_loop(),
) {
Ok(_) => {
eprintln!(
"Warning - indexer join ended without error for chain {}. This should never happen.",
self.chain_id
);
}
Err(err) => {
eprintln!(
"Warning - indexer join ended with error for chain {}, {:?}. This should never happen.",
self.chain_id, err
);
}
}
Ok(())
}

pub async fn listen_with_timeout_reset(&self) -> Result<()> {
loop {
let start_block = self
.postgres_client
.get_latest_block(self.chain_id, self.start_block)
.await?;

println!(
"Debug - Starting indexer for chain {} at block {}",
self.chain_id, start_block
Expand Down Expand Up @@ -136,6 +158,12 @@ impl<'a> StakingIndexer<'a> {
// Reducing to 1k because of occasional timeout issues
while last_queried_block < current_block - 1 {
let query_end_block = min(last_queried_block + 1000, current_block - 1);
eprintln!(
"Debug - Querying past events for chain {} from block {} to block {}",
self.chain_id,
last_queried_block + 1,
query_end_block
);
let previous_events_query = id_staking_contract
.events()
.from_block(last_queried_block + 1)
Expand All @@ -153,7 +181,9 @@ impl<'a> StakingIndexer<'a> {
Err(err) => {
return Err(eyre::eyre!(
"Error - Failed to query events: {}, {}, {:?}",
last_queried_block, query_end_block, err
last_queried_block,
query_end_block,
err
));
}
}
Expand All @@ -164,15 +194,14 @@ impl<'a> StakingIndexer<'a> {
self.chain_id
);

let future_events = id_staking_contract
.events()
.from_block(max(last_queried_block + 1, current_block));
let from_block = max(last_queried_block + 1, current_block);
let future_events = id_staking_contract.events().from_block(from_block);

let mut stream = future_events.stream().await?.with_meta();

eprintln!(
"Debug - Listening for future events for chain {}",
self.chain_id
"Debug - Listening for future events for chain {} from block {}",
self.chain_id, from_block
);

while let Some(event_with_meta) = stream.next().await {
Expand Down Expand Up @@ -432,4 +461,66 @@ impl<'a> StakingIndexer<'a> {
}
Ok(())
}

async fn run_index_manual_requests_loop(&self) -> Result<()> {
loop {
if let Err(err) = self.index_manual_requests().await {
eprintln!(
"Error - Failed to index manual requests for chain {}: {:?}",
self.chain_id, err
);
}
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
}

async fn index_manual_requests(&self) -> Result<()> {
let requests = self
.postgres_client
.get_pending_manual_index_requests(self.chain_id)
.await?;

for request in requests {
if let Err(err) = self.index_tx(request.block_number, &request.tx_hash).await {
eprintln!(
"Error - Failed to index tx {} for chain {}: {:?}",
request.tx_hash, self.chain_id, err
);
self.postgres_client
.update_manual_index_request_status(request.id, "failed")
.await?;
} else {
eprintln!(
"Debug - Successfully indexed tx {} for chain {}",
request.tx_hash, self.chain_id
);
self.postgres_client
.update_manual_index_request_status(request.id, "indexed")
.await?;
}
}
Ok(())
}

async fn index_tx(&self, block_number: u64, tx_hash: &String) -> Result<()> {
let client = Arc::new(create_rpc_connection(&self.rpc_url).await);
let id_staking_contract = IdentityStaking::new(*self.contract_address, client.clone());

let events = id_staking_contract
.events()
.from_block(block_number)
.to_block(block_number)
.query_with_meta()
.await?;

for (event, meta) in events.iter() {
let this_tx_hash = format!("{:?}", meta.transaction_hash);
if this_tx_hash == *tx_hash {
self.process_staking_event(&event, &meta, &client).await?;
}
}

Ok(())
}

}

0 comments on commit 5320025

Please sign in to comment.