From 32b5dc02e0de9a594667d33f63d0dd16d839d091 Mon Sep 17 00:00:00 2001 From: Lucian Hymer Date: Tue, 19 Mar 2024 11:51:14 -0600 Subject: [PATCH] feat(indexer): storing decimal chain ID as internal chain identifier --- api/stake/api.py | 2 +- ...lter_stake_chain_alter_stakeevent_chain.py | 23 ++++++ api/stake/models.py | 12 +-- api/stake/schema.py | 2 +- api/stake/test/test_get_stake.py | 16 ++-- indexer/src/main.rs | 57 ++++++------- indexer/src/postgres.rs | 34 ++++---- indexer/src/staking_indexer.rs | 79 +++++++++++-------- indexer/src/utils.rs | 7 -- 9 files changed, 127 insertions(+), 105 deletions(-) create mode 100644 api/stake/migrations/0003_alter_stake_chain_alter_stakeevent_chain.py diff --git a/api/stake/api.py b/api/stake/api.py index bb991ae4f..3eb22638f 100644 --- a/api/stake/api.py +++ b/api/stake/api.py @@ -42,7 +42,7 @@ def handle_get_gtc_stake(address: str) -> List[StakeSchema]: try: return [ StakeSchema( - chain=Stake.Chain.names[stake.chain], + chain=stake.chain, staker=stake.staker, stakee=stake.stakee, amount=stake.current_amount, diff --git a/api/stake/migrations/0003_alter_stake_chain_alter_stakeevent_chain.py b/api/stake/migrations/0003_alter_stake_chain_alter_stakeevent_chain.py new file mode 100644 index 000000000..410345d43 --- /dev/null +++ b/api/stake/migrations/0003_alter_stake_chain_alter_stakeevent_chain.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.6 on 2024-03-19 17:33 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("stake", "0002_alter_stake_chain_alter_stakeevent_chain"), + ] + + operations = [ + migrations.AlterField( + model_name="stake", + name="chain", + field=models.IntegerField(db_index=True, help_text="Decimal chain ID"), + ), + migrations.AlterField( + model_name="stakeevent", + name="chain", + field=models.IntegerField(db_index=True, help_text="Decimal chain ID"), + ), + ] diff --git a/api/stake/models.py b/api/stake/models.py index 876caa5c2..f657e587f 100644 --- a/api/stake/models.py +++ b/api/stake/models.py @@ -4,12 +4,8 @@ # Stores the current summary for each (chain, staker, stakee) combo class Stake(models.Model): - class Chain(models.IntegerChoices): - ETHEREUM = 0, "Ethereum" - OPTIMISM = 1, "Optimism" - - chain = models.SmallIntegerField( - choices=Chain.choices, default=Chain.ETHEREUM, db_index=True + chain = models.IntegerField( + null=False, blank=False, db_index=True, help_text="Decimal chain ID" ) lock_time = models.DateTimeField(null=False, blank=False) unlock_time = models.DateTimeField(null=False, blank=False) @@ -56,8 +52,8 @@ class StakeEventType(models.TextChoices): db_index=True, ) - chain = models.SmallIntegerField( - choices=Stake.Chain.choices, null=False, blank=False, db_index=True + chain = models.IntegerField( + null=False, blank=False, db_index=True, help_text="Decimal chain ID" ) # For self-stake, staker and stakee are the same diff --git a/api/stake/schema.py b/api/stake/schema.py index 65a6f4f7a..933a00652 100644 --- a/api/stake/schema.py +++ b/api/stake/schema.py @@ -5,7 +5,7 @@ class StakeSchema(Schema): - chain: str + chain: int staker: str stakee: str amount: str diff --git a/api/stake/test/test_get_stake.py b/api/stake/test/test_get_stake.py index 0902018c0..51dcf481e 100644 --- a/api/stake/test/test_get_stake.py +++ b/api/stake/test/test_get_stake.py @@ -15,7 +15,7 @@ @pytest.fixture def mock_stakes(sample_address): Stake.objects.create( - chain=Stake.Chain.ETHEREUM, + chain="11155420", staker=sample_address, stakee=sample_address, current_amount=Decimal("100000000000000000000"), @@ -24,7 +24,7 @@ def mock_stakes(sample_address): lock_time=datetime.now() - timedelta(days=90), ) Stake.objects.create( - chain=Stake.Chain.ETHEREUM, + chain="11155420", staker=sample_address, stakee=other_user_address, current_amount=Decimal("200000000000000000000"), @@ -33,7 +33,7 @@ def mock_stakes(sample_address): lock_time=datetime.now() - timedelta(days=90), ) Stake.objects.create( - chain=Stake.Chain.ETHEREUM, + chain="11155420", staker=other_user_address, stakee=sample_address, current_amount=Decimal("350000000000000000000"), @@ -42,7 +42,7 @@ def mock_stakes(sample_address): lock_time=datetime.now() - timedelta(days=90), ) Stake.objects.create( - chain=Stake.Chain.OPTIMISM, + chain="1", staker=other_user_address, stakee=sample_address, current_amount=Decimal("475000000000000000000"), @@ -67,9 +67,9 @@ def test_successful_get_staking_results(self, mock_stakes, sample_address): # an extra stake event was added that is below the filtered amount, hence the minus 1 assert len(response_data) == 4 assert response_data[0]["staker"] == sample_address.lower() - assert response_data[0]["chain"] == "ETHEREUM" + assert response_data[0]["chain"] == 11155420 assert response_data[3]["staker"] == other_user_address.lower() - assert response_data[3]["chain"] == "OPTIMISM" + assert response_data[3]["chain"] == 1 def test_failed_auth(self, mock_stakes, sample_address): client = Client() @@ -92,9 +92,9 @@ def test_ceramic_cache_endpoint_success( # an extra stake event was added that is below the filtered amount, hence the minus 1 assert len(response_data) == 4 assert response_data[0]["staker"] == sample_address.lower() - assert response_data[0]["chain"] == "ETHEREUM" + assert response_data[0]["chain"] == 11155420 assert response_data[3]["staker"] == other_user_address.lower() - assert response_data[3]["chain"] == "OPTIMISM" + assert response_data[3]["chain"] == 1 def test_ceramic_cache_endpoint_failed_auth(self, mock_stakes): client = Client() diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 1b0e957a3..482fbe7b8 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -3,7 +3,6 @@ mod postgres; mod staking_indexer; mod utils; -use std::collections::HashMap; use dotenv::dotenv; use ethers::core::types::Address; use eyre::Result; @@ -11,20 +10,17 @@ use futures::try_join; use legacy_staking_indexer::LegacyStakingIndexer; use postgres::PostgresClient; use staking_indexer::StakingIndexer; -use utils::{get_env, Chain}; +use utils::get_env; pub const LEGACY_CONTRACT_START_BLOCK: i32 = 16403024; pub const LEGACY_CONTRACT_ADDRESS: &str = "0x0E3efD5BE54CC0f4C64e0D186b0af4b7F2A0e95F"; -pub const CONTRACT_START_BLOCK_MAP: &[(Chain, u64)] = - &[(Chain::Ethereum, 16403024), (Chain::Optimism, 0)]; - #[tokio::main] async fn main() -> Result<()> { dotenv().ok(); loop { - let postgres_client = initialize_postgres_client().await?; + let postgres_client = PostgresClient::new().await?; let contract_address = get_env("STAKING_CONTRACT_ADDRESS") .parse::
() .unwrap(); @@ -46,21 +42,6 @@ async fn main() -> Result<()> { } } -async fn initialize_postgres_client() -> Result { - let mut start_block_map = HashMap::new(); - start_block_map.insert( - Chain::Ethereum, - get_env("INDEXER_ETHEREUM_START_BLOCK").parse::().unwrap(), - ); - start_block_map.insert( - Chain::Optimism, - get_env("INDEXER_OPTIMISM_START_BLOCK").parse::().unwrap(), - ); - - let postgres_client = PostgresClient::new(start_block_map).await?; - Ok(postgres_client) -} - async fn run_legacy_indexer(postgres_client: PostgresClient) -> Result<()> { if get_env("INDEXER_LEGACY_ENABLED") != "true" { return Ok(()); @@ -71,24 +52,46 @@ async fn run_legacy_indexer(postgres_client: PostgresClient) -> Result<()> { legacy_staking_indexer.listen_with_timeout_reset().await } -async fn run_ethereum_indexer(postgres_client: PostgresClient, contract_address: &Address) -> Result<()> { +async fn run_ethereum_indexer( + postgres_client: PostgresClient, + contract_address: &Address, +) -> Result<()> { if get_env("INDEXER_ETHEREUM_ENABLED") != "true" { return Ok(()); } let ethereum_rpc_url = get_env("INDEXER_ETHEREUM_RPC_URL"); - let ethereum_staking_indexer = - StakingIndexer::new(postgres_client, ðereum_rpc_url, Chain::Ethereum, contract_address); + let ethereum_start_block = get_env("INDEXER_ETHEREUM_START_BLOCK") + .parse::() + .unwrap(); + let ethereum_staking_indexer = StakingIndexer::new( + postgres_client, + ðereum_rpc_url, + ethereum_start_block, + contract_address, + ) + .await?; ethereum_staking_indexer.listen_with_timeout_reset().await } -async fn run_optimism_indexer(postgres_client: PostgresClient, contract_address: &Address) -> Result<()> { +async fn run_optimism_indexer( + postgres_client: PostgresClient, + contract_address: &Address, +) -> Result<()> { if get_env("INDEXER_OPTIMISM_ENABLED") != "true" { return Ok(()); } let optimism_rpc_url = get_env("INDEXER_OPTIMISM_RPC_URL"); - let optimism_staking_indexer = - StakingIndexer::new(postgres_client, &optimism_rpc_url, Chain::Optimism, contract_address); + let optimism_start_block = get_env("INDEXER_OPTIMISM_START_BLOCK") + .parse::() + .unwrap(); + let optimism_staking_indexer = StakingIndexer::new( + postgres_client, + &optimism_rpc_url, + optimism_start_block, + contract_address, + ) + .await?; optimism_staking_indexer.listen_with_timeout_reset().await } diff --git a/indexer/src/postgres.rs b/indexer/src/postgres.rs index c76cda7ff..2a89e1461 100644 --- a/indexer/src/postgres.rs +++ b/indexer/src/postgres.rs @@ -5,23 +5,21 @@ use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod}; use ethers::types::H160; use rust_decimal::prelude::*; use rust_decimal::Decimal; -use std::collections::HashMap; use std::str::FromStr; use tokio_postgres::{Error, NoTls}; use crate::{ - utils::{get_code_for_stake_event_type, get_env, Chain, StakeAmountOperation, StakeEventType}, + utils::{get_code_for_stake_event_type, get_env, StakeAmountOperation, StakeEventType}, LEGACY_CONTRACT_START_BLOCK, }; #[derive(Debug, Clone)] pub struct PostgresClient { pool: Pool, - start_block_map: HashMap, } impl PostgresClient { - pub async fn new(start_block_map: HashMap) -> Result { + pub async fn new() -> Result { let mut pg_config = tokio_postgres::Config::new(); pg_config @@ -40,7 +38,6 @@ impl PostgresClient { Ok(Self { pool, - start_block_map, }) } @@ -94,7 +91,7 @@ impl PostgresClient { pub async fn add_or_extend_stake( &self, event_type: &StakeEventType, - chain: &Chain, + chain_id: u32, staker: &H160, stakee: &H160, increase_amount: &u128, @@ -103,7 +100,7 @@ impl PostgresClient { block_number: &u64, tx_hash: &str, ) -> Result<(), Error> { - let chain = *chain as i16; + let chain_id: i32 = chain_id as i32; let staker = format!("{:#x}", staker); let stakee = format!("{:#x}", stakee); let increase_amount = Decimal::from_u128(*increase_amount).unwrap(); @@ -124,7 +121,7 @@ impl PostgresClient { " current_amount = stake.current_amount + EXCLUDED.current_amount", " WHERE EXCLUDED.last_updated_in_block >= stake.last_updated_in_block" ), - &[&chain, &staker, &stakee, &unlock_time, &lock_time, &block_number, &increase_amount] + &[&chain_id, &staker, &stakee, &unlock_time, &lock_time, &block_number, &increase_amount] ).await?; // Log raw event @@ -133,12 +130,12 @@ impl PostgresClient { "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, unlock_time, block_number, tx_hash)", " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" ), - &[&get_code_for_stake_event_type(event_type), &chain, &staker, &stakee, &increase_amount, &unlock_time, &block_number, &tx_hash] + &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &increase_amount, &unlock_time, &block_number, &tx_hash] ).await?; println!( "Added or extended stake in block {} on chain {}!", - block_number, chain + block_number, chain_id ); Ok(()) @@ -147,7 +144,7 @@ impl PostgresClient { pub async fn update_stake_amount( &self, event_type: &StakeEventType, - chain: &Chain, + chain_id: u32, staker: &H160, stakee: &H160, change_amount: &u128, @@ -155,7 +152,7 @@ impl PostgresClient { block_number: &u64, tx_hash: &str, ) -> Result<(), Error> { - let chain = *chain as i16; + let chain_id: i32 = chain_id as i32; let staker = format!("{:#x}", staker); let stakee = format!("{:#x}", stakee); let change_amount = Decimal::from_u128(*change_amount).unwrap(); @@ -176,7 +173,7 @@ impl PostgresClient { " WHERE chain = $2 AND staker = $3 AND stakee = $4", " AND last_updated_in_block <= $5" ), - &[&amount, &chain, &staker, &stakee, &block_number], + &[&amount, &chain_id, &staker, &stakee, &block_number], ) .await?; @@ -187,24 +184,25 @@ impl PostgresClient { "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, block_number, tx_hash)", " VALUES ($1, $2, $3, $4, $5, $6, $7)" ), - &[&get_code_for_stake_event_type(event_type), &chain, &staker, &stakee, &amount, &block_number, &tx_hash], + &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &amount, &block_number, &tx_hash], ) .await?; println!( "Modified stake amount in block {} on chain {}!", - block_number, chain + block_number, chain_id ); Ok(()) } - pub async fn get_latest_block(&self, chain: &Chain) -> Result { + pub async fn get_latest_block(&self, chain_id: u32, fallback_start_block: u64) -> Result { + let chain_id: i32 = chain_id as i32; let client = self.pool.get().await.unwrap(); let latest_block_rows = client .query( "SELECT last_updated_in_block FROM stake_stake WHERE chain = $1 ORDER BY last_updated_in_block DESC LIMIT 1;", - &[&(*chain as i16)], + &[&chain_id], ) .await?; @@ -214,7 +212,7 @@ impl PostgresClient { Ok(latest_block.to_u64().unwrap()) } else { // return contract start block - Ok(*self.start_block_map.get(chain).unwrap()) + Ok(fallback_start_block) } } diff --git a/indexer/src/staking_indexer.rs b/indexer/src/staking_indexer.rs index 2eee28800..026761a92 100644 --- a/indexer/src/staking_indexer.rs +++ b/indexer/src/staking_indexer.rs @@ -1,6 +1,6 @@ use crate::{ postgres::PostgresClient, - utils::{create_rpc_connection, Chain, StakeAmountOperation, StakeEventType}, + utils::{create_rpc_connection, StakeAmountOperation, StakeEventType}, }; use ethers::{ contract::{abigen, LogMeta}, @@ -19,31 +19,40 @@ abigen!(IdentityStaking, "./src/IdentityStaking.json",); pub struct StakingIndexer<'a> { postgres_client: PostgresClient, rpc_url: &'a String, - chain: Chain, + chain_id: u32, + start_block: u64, contract_address: &'a Address, } +async fn get_chain_id(rpc_url: &String) -> Result { + let client = create_rpc_connection(rpc_url).await; + let chain_id = client.get_chainid().await?; + Ok(chain_id.as_u32()) +} + impl<'a> StakingIndexer<'a> { - pub fn new( + pub async fn new( postgres_client: PostgresClient, rpc_url: &'a String, - chain: Chain, + start_block: u64, contract_address: &'a Address, - ) -> Self { - Self { + ) -> Result { + let chain_id = get_chain_id(rpc_url).await.unwrap(); + Ok(Self { postgres_client, rpc_url, - chain, + chain_id, + start_block, contract_address, - } + }) } pub async fn listen_with_timeout_reset(&self) -> Result<()> { loop { - let start_block = self.postgres_client.get_latest_block(&self.chain).await?; + let start_block = self.postgres_client.get_latest_block(self.chain_id, self.start_block).await?; println!( "Debug - Starting indexer for chain {} at block {}", - self.chain as u8, start_block + self.chain_id, start_block ); match try_join!( @@ -53,7 +62,7 @@ impl<'a> StakingIndexer<'a> { Ok(_) => { eprintln!( "Warning - indexer timeout join ended without error for chain {}", - self.chain as u8 + self.chain_id ); } Err(err) => { @@ -61,11 +70,11 @@ impl<'a> StakingIndexer<'a> { .to_string() .contains("No events logged in the last 15 minutes") { - eprintln!("Warning - resetting indexer due to no events logged in the last 15 minutes for chain {}", self.chain as u8); + eprintln!("Warning - resetting indexer due to no events logged in the last 15 minutes for chain {}", self.chain_id); } else { eprintln!( "Warning - indexer timeout join ended with error for chain {}, {:?}", - self.chain as u8, err + self.chain_id, err ); } } @@ -74,21 +83,21 @@ impl<'a> StakingIndexer<'a> { } async fn throw_when_no_events_logged(&self, starting_event_block: &u64) -> Result<()> { - let mut start_block = *starting_event_block; + let mut timer_begin_block = *starting_event_block; loop { // sleep for 15 minutes tokio::time::sleep(tokio::time::Duration::from_secs(900)).await; - let latest_logged_block = self.postgres_client.get_latest_block(&self.chain).await?; + let latest_logged_block = self.postgres_client.get_latest_block(self.chain_id, self.start_block).await?; - if latest_logged_block == start_block { + if latest_logged_block == timer_begin_block { return Err(eyre::eyre!( "No events logged in the last 15 minutes for chain {}", - self.chain as u8 + self.chain_id )); } - start_block = latest_logged_block; + timer_begin_block = latest_logged_block; } } @@ -107,7 +116,7 @@ impl<'a> StakingIndexer<'a> { } else { eprintln!( "Warning - Failed to fetch current block number for chain {}", - self.chain as u8 + self.chain_id ); } @@ -145,7 +154,7 @@ impl<'a> StakingIndexer<'a> { eprintln!( "Debug - Finished querying past events for chain {}", - self.chain as u8 + self.chain_id ); let future_events = id_staking_contract @@ -156,7 +165,7 @@ impl<'a> StakingIndexer<'a> { eprintln!( "Debug - Listening for future events for chain {}", - self.chain as u8 + self.chain_id ); while let Some(event_with_meta) = stream.next().await { @@ -164,7 +173,7 @@ impl<'a> StakingIndexer<'a> { Err(err) => { eprintln!( "Error - Failed to fetch IdentityStaking events for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); break; } @@ -214,7 +223,7 @@ impl<'a> StakingIndexer<'a> { _ => { eprintln!( "Debug - Unhandled event in tx {} for chain {}", - tx_hash, self.chain as u8 + tx_hash, self.chain_id ); Ok(()) } @@ -250,7 +259,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .add_or_extend_stake( &StakeEventType::SelfStake, - &self.chain, + self.chain_id, &event.staker, &event.staker, &event.amount, @@ -263,7 +272,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process self stake event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) @@ -282,7 +291,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .add_or_extend_stake( &StakeEventType::CommunityStake, - &self.chain, + self.chain_id, &event.staker, &event.stakee, &event.amount, @@ -295,7 +304,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process community stake event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) @@ -311,7 +320,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .update_stake_amount( &StakeEventType::SelfStakeWithdraw, - &self.chain, + self.chain_id, &event.staker, &event.staker, &event.amount, @@ -323,7 +332,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process self stake event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) @@ -339,7 +348,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .update_stake_amount( &StakeEventType::CommunityStakeWithdraw, - &self.chain, + self.chain_id, &event.staker, &event.stakee, &event.amount, @@ -351,7 +360,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process community stake event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) @@ -367,7 +376,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .update_stake_amount( &StakeEventType::Slash, - &self.chain, + self.chain_id, &event.staker, &event.stakee, &event.amount, @@ -379,7 +388,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process slash event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) @@ -395,7 +404,7 @@ impl<'a> StakingIndexer<'a> { .postgres_client .update_stake_amount( &StakeEventType::Release, - &self.chain, + self.chain_id, &event.staker, &event.stakee, &event.amount, @@ -407,7 +416,7 @@ impl<'a> StakingIndexer<'a> { { eprintln!( "Error - Failed to process release event for chain {}: {:?}", - self.chain as u8, err + self.chain_id, err ); } Ok(()) diff --git a/indexer/src/utils.rs b/indexer/src/utils.rs index caaac19f3..8d2c23c7c 100644 --- a/indexer/src/utils.rs +++ b/indexer/src/utils.rs @@ -40,13 +40,6 @@ async fn connect_with_reconnects(rpc_url: &String) -> Option> { } } -#[repr(u8)] -#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)] -pub enum Chain { - Ethereum = 0, - Optimism = 1, -} - #[derive(Copy, Clone)] pub enum StakeAmountOperation { Add,