diff --git a/api/stake/admin.py b/api/stake/admin.py index 33a4b5cf6..9d6e07770 100644 --- a/api/stake/admin.py +++ b/api/stake/admin.py @@ -1,6 +1,6 @@ from django.contrib import admin from scorer.scorer_admin import ScorerModelAdmin -from stake.models import Stake, StakeEvent +from stake.models import Stake, StakeEvent, ReindexRequest @admin.register(Stake) @@ -51,3 +51,20 @@ class StakeEventAdmin(ScorerModelAdmin): "tx_hash", ] search_help_text = "Search by: " + ", ".join(search_fields) + + +@admin.register(ReindexRequest) +class ReindexRequestAdmin(admin.ModelAdmin): + list_display = [ + "pending", + "chain", + "start_block_number", + "created_at", + ] + + list_filter = [ + "chain", + "pending", + ] + + readonly_fields = ("pending",) diff --git a/api/stake/migrations/0006_reindexrequest_alter_stakeevent_block_number_and_more.py b/api/stake/migrations/0006_reindexrequest_alter_stakeevent_block_number_and_more.py new file mode 100644 index 000000000..2563e1f20 --- /dev/null +++ b/api/stake/migrations/0006_reindexrequest_alter_stakeevent_block_number_and_more.py @@ -0,0 +1,56 @@ +# Generated by Django 4.2.6 on 2024-07-02 21:30 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("stake", "0005_alter_stake_current_amount_alter_stakeevent_amount"), + ] + + operations = [ + migrations.CreateModel( + name="ReindexRequest", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "chain", + models.IntegerField( + db_index=True, + help_text="Decimal chain ID. Ethereum: 1, Optimism: 10, Arbitrum: 42161", + ), + ), + ( + "start_block_number", + models.DecimalField(decimal_places=0, max_digits=78), + ), + ("pending", models.BooleanField(db_index=True, default=True)), + ], + ), + migrations.AlterField( + model_name="stakeevent", + name="block_number", + field=models.DecimalField(db_index=True, decimal_places=0, max_digits=78), + ), + migrations.AlterUniqueTogether( + name="stakeevent", + unique_together={("tx_hash", "chain")}, + ), + migrations.AddConstraint( + model_name="reindexrequest", + constraint=models.UniqueConstraint( + condition=models.Q(("pending", True)), + fields=("chain",), + name="unique_only_one_pending_per_chain", + ), + ), + ] diff --git a/api/stake/models.py b/api/stake/models.py index 878fbb609..182222666 100644 --- a/api/stake/models.py +++ b/api/stake/models.py @@ -65,10 +65,40 @@ class StakeEventType(models.TextChoices): ) block_number = models.DecimalField( - decimal_places=0, null=False, blank=False, max_digits=78 + decimal_places=0, null=False, blank=False, max_digits=78, db_index=True ) tx_hash = models.CharField(max_length=66, null=False, blank=False) # Only applies to SelfStake and CommunityStake events unlock_time = models.DateTimeField(null=True, blank=True) + + class Meta: + unique_together = ["tx_hash", "chain"] + + +class ReindexRequest(models.Model): + created_at = models.DateTimeField(auto_now_add=True) + + chain = models.IntegerField( + null=False, + blank=False, + db_index=True, + help_text="Decimal chain ID. Ethereum: 1, Optimism: 10, Arbitrum: 42161", + ) + + start_block_number = models.DecimalField( + decimal_places=0, null=False, blank=False, max_digits=78 + ) + + pending = models.BooleanField(null=False, blank=False, default=True, db_index=True) + + class Meta: + # Only one reindex request can be pending at a time for a chain + constraints = [ + models.UniqueConstraint( + fields=["chain"], + name="unique_only_one_pending_per_chain", + condition=models.Q(pending=True), + ), + ] diff --git a/indexer/src/main.rs b/indexer/src/main.rs index ecc128ca7..1a0492b78 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -27,7 +27,7 @@ async fn main() -> Result<()> { let contract_address_op_mainnet = get_env("STAKING_CONTRACT_ADDRESS_OP_MAINNET") .parse::
() .unwrap(); - + let contract_address_op_sepolia = get_env("STAKING_CONTRACT_ADDRESS_OP_SEPOLIA") .parse::
() .unwrap(); diff --git a/indexer/src/postgres.rs b/indexer/src/postgres.rs index 40459a67e..a384e62bd 100644 --- a/indexer/src/postgres.rs +++ b/indexer/src/postgres.rs @@ -36,9 +36,7 @@ impl PostgresClient { let pool = Pool::builder(mgr).max_size(16).build().unwrap(); - Ok(Self { - pool, - }) + Ok(Self { pool }) } // This function is for legacy staking contract events @@ -56,7 +54,7 @@ impl PostgresClient { let client = self.pool.get().await.unwrap(); client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",&[&"SelfStake", &round_id, &staker, &decimal_amount, &staked, &block_number, &tx_hash]).await?; println!( - "Row inserted into registry_gtcstakeevent with type SelfStake for block {}!", + "Row inserted into registry_gtcstakeevent with type SelfStake for block {} for legacy contract!", block_number ); Ok(()) @@ -78,7 +76,7 @@ impl PostgresClient { let client = self.pool.get().await.unwrap(); client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, address, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&"Xstake", &round_id, &staker, &user, &decimal_amount, &staked, &block_number, &tx_hash]).await?; println!( - "Row inserted into registry_gtcstakeevent with type Xstake for block {}!", + "Row inserted into registry_gtcstakeevent with type Xstake for block {} for legacy contract!", block_number ); Ok(()) @@ -111,28 +109,50 @@ impl PostgresClient { let client = self.pool.get().await.unwrap(); - // Log current stake state - client.execute( - concat!( - "INSERT INTO stake_stake as stake (chain, staker, stakee, unlock_time, lock_time, last_updated_in_block, current_amount)", - " VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (chain, staker, stakee) DO UPDATE", - " SET unlock_time = EXCLUDED.unlock_time,", - " lock_time = EXCLUDED.lock_time,", - " last_updated_in_block = EXCLUDED.last_updated_in_block,", - " current_amount = stake.current_amount + EXCLUDED.current_amount", - " WHERE EXCLUDED.last_updated_in_block >= stake.last_updated_in_block" - ), - &[&chain_id, &staker, &stakee, &unlock_time, &lock_time, &block_number, &increase_amount] - ).await?; - - // Log raw event - client.execute( - concat!( - "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, unlock_time, block_number, tx_hash)", - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" - ), - &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &increase_amount, &unlock_time, &block_number, &tx_hash] - ).await?; + // begin transaction + client.execute("BEGIN", &[]).await?; + + let do_query = async { + // Log raw event + client.execute( + concat!( + "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, unlock_time, block_number, tx_hash)", + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + ), + &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &increase_amount, &unlock_time, &block_number, &tx_hash] + ).await?; + + // Log current stake state + client.execute( + concat!( + "INSERT INTO stake_stake as stake (chain, staker, stakee, unlock_time, lock_time, last_updated_in_block, current_amount)", + " VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (chain, staker, stakee) DO UPDATE", + " SET unlock_time = GREATEST(EXCLUDED.unlock_time, stake.unlock_time),", + " lock_time = GREATEST(EXCLUDED.lock_time, stake.lock_time),", + " last_updated_in_block = GREATEST(EXCLUDED.last_updated_in_block, stake.last_updated_in_block),", + " current_amount = stake.current_amount + EXCLUDED.current_amount", + ), + &[&chain_id, &staker, &stakee, &unlock_time, &lock_time, &block_number, &increase_amount] + ).await?; + + Ok::<(), Error>(()) + }; + + match do_query.await { + Ok(_) => { + // commit transaction + client.execute("COMMIT", &[]).await?; + } + Err(e) => { + // rollback transaction + client.execute("ROLLBACK", &[]).await?; + // continue if duplicate key error + if format!("{:?}", e).contains(&format!("Key (tx_hash, chain)=({}, {}) already exists.", tx_hash, chain_id)) { + return Ok(()); + } + return Err(e); + } + } println!( "Added or extended stake in block {} on chain {}!", @@ -166,55 +186,129 @@ impl PostgresClient { let client = self.pool.get().await.unwrap(); - // Log current stake state - client - .execute( - concat!( - "UPDATE stake_stake", - " SET current_amount = current_amount + $1", - " WHERE chain = $2 AND staker = $3 AND stakee = $4", - " AND last_updated_in_block <= $5" - ), - &[&amount, &chain_id, &staker, &stakee, &block_number], + // begin transaction + client.execute("BEGIN", &[]).await?; + + let do_query = async { + // Log raw event + client.execute( + concat!( + "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, block_number, tx_hash)", + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + ), + &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &amount, &block_number, &tx_hash], + ) + .await?; + + // Log current stake state + client + .execute( + concat!( + "UPDATE stake_stake as stake", + " SET current_amount = current_amount + $1,", + " last_updated_in_block = GREATEST($5, stake.last_updated_in_block)", + " WHERE chain = $2 AND staker = $3 AND stakee = $4", + ), + &[&amount, &chain_id, &staker, &stakee, &block_number], + ) + .await?; + + Ok::<(), Error>(()) + }; + + match do_query.await { + Ok(_) => { + // commit transaction + client.execute("COMMIT", &[]).await?; + } + Err(e) => { + // rollback transaction + client.execute("ROLLBACK", &[]).await?; + // continue if duplicate key error + if format!("{:?}", e).contains(&format!("Key (tx_hash, chain)=({}, {}) already exists.", tx_hash, chain_id)) { + return Ok(()); + } + return Err(e); + } + } + + println!( + "Modified stake amount in block {} on chain {}!", + block_number, chain_id + ); + + Ok(()) + } + + pub async fn get_requested_start_block(&self, chain_id: u32) -> Result { + let chain_id: i32 = chain_id as i32; + let client = self.pool.get().await.unwrap(); + let start_block_rows = client + .query( + "SELECT start_block_number FROM stake_reindexrequest WHERE chain = $1 AND pending = true", + &[&chain_id], ) .await?; - // Log raw event + match start_block_rows.get(0) { + Some(row) => { + let start_block: Decimal = row.get("start_block_number"); + Ok(start_block.to_u64().unwrap()) + } + None => Ok(0), + } + } + + pub async fn acknowledge_requested_start_block(&self, chain_id: u32) -> Result<(), Error> { + let chain_id: i32 = chain_id as i32; + let client = self.pool.get().await.unwrap(); client .execute( - concat!( - "INSERT INTO stake_stakeevent (event_type, chain, staker, stakee, amount, block_number, tx_hash)", - " VALUES ($1, $2, $3, $4, $5, $6, $7)" - ), - &[&get_code_for_stake_event_type(event_type), &chain_id, &staker, &stakee, &amount, &block_number, &tx_hash], + "UPDATE stake_reindexrequest SET pending = false WHERE chain = $1 and pending = true", + &[&chain_id], ) .await?; + Ok(()) + } - println!( - "Modified stake amount in block {} on chain {}!", - block_number, chain_id - ); + pub async fn get_stake_event_count(&self, chain_id: u32) -> Result { + let chain_id: i32 = chain_id as i32; + let client = self.pool.get().await.unwrap(); + let count_rows = client + .query( + "SELECT COUNT(*) FROM stake_stakeevent WHERE chain = $1", + &[&chain_id], + ) + .await?; - Ok(()) + match count_rows.get(0) { + Some(row) => { + let count: i64 = row.get("count"); + Ok(count) + } + None => Ok(0), + } } - pub async fn get_latest_block(&self, chain_id: u32, fallback_start_block: u64) -> Result { + pub async fn get_latest_block( + &self, + chain_id: u32 + ) -> Result { let chain_id: i32 = chain_id as i32; let client = self.pool.get().await.unwrap(); let latest_block_rows = client .query( - "SELECT last_updated_in_block FROM stake_stake WHERE chain = $1 ORDER BY last_updated_in_block DESC LIMIT 1;", + "SELECT block_number FROM stake_stakeevent WHERE chain = $1 ORDER BY block_number DESC LIMIT 1;", &[&chain_id], ) .await?; - if let Some(row) = latest_block_rows.get(0) { - // Extract and return the block number - let latest_block: Decimal = row.get("last_updated_in_block"); - Ok(latest_block.to_u64().unwrap()) - } else { - // return contract start block - Ok(fallback_start_block) + match latest_block_rows.get(0) { + Some(row) => { + let latest_block: Decimal = row.get("block_number"); + Ok(latest_block.to_u64().unwrap()) + } + None => Ok(0), } } @@ -227,13 +321,13 @@ impl PostgresClient { ) .await?; - if let Some(row) = latest_block_rows.get(0) { - // Extract and return the block number - let latest_block: i32 = row.get("block_number"); - Ok(latest_block) - } else { - // return contract start block - Ok(LEGACY_CONTRACT_START_BLOCK) + match latest_block_rows.get(0) { + Some(row) => { + // Extract and return the block number + let latest_block: i32 = row.get("block_number"); + Ok(latest_block) + } + None => Ok(LEGACY_CONTRACT_START_BLOCK), } } } diff --git a/indexer/src/staking_indexer.rs b/indexer/src/staking_indexer.rs index 3b5d04ee6..04bda65b2 100644 --- a/indexer/src/staking_indexer.rs +++ b/indexer/src/staking_indexer.rs @@ -47,20 +47,41 @@ impl<'a> StakingIndexer<'a> { }) } + async fn get_query_start_block(&self) -> Result { + let requested_start_block = self + .postgres_client + .get_requested_start_block(self.chain_id) + .await?; + + if requested_start_block > 0 { + self.postgres_client + .acknowledge_requested_start_block(self.chain_id) + .await?; + return Ok(requested_start_block); + } else { + let latest_logged_block = self.postgres_client.get_latest_block(self.chain_id).await?; + + if latest_logged_block > 0 { + return Ok(latest_logged_block + 1); + } else { + return Ok(self.start_block); + } + } + } + pub async fn listen_with_timeout_reset(&self) -> Result<()> { loop { - let start_block = self - .postgres_client - .get_latest_block(self.chain_id, self.start_block) - .await?; + let query_start_block = self.get_query_start_block().await?; + println!( "Debug - Starting indexer for chain {} at block {}", - self.chain_id, start_block + self.chain_id, query_start_block ); match try_join!( - self.throw_when_no_events_logged(&start_block), - self.listen_for_stake_events(&start_block), + self.throw_when_no_events_logged(), + self.throw_when_reindex_requested(), + self.listen_for_stake_events(&query_start_block), ) { Ok(_) => { eprintln!( @@ -73,7 +94,12 @@ impl<'a> StakingIndexer<'a> { .to_string() .contains("No events logged in the last 15 minutes") { - eprintln!("Warning - resetting indexer due to no events logged in the last 15 minutes for chain {}", self.chain_id); + eprintln!("Debug - resetting indexer due to no events logged in the last 15 minutes for chain {}", self.chain_id); + } else if err.to_string().contains("Reindex requested for chain") { + eprintln!( + "Debug - resetting indexer due to reindex requested for chain {}", + self.chain_id + ); } else { eprintln!( "Warning - indexer timeout join ended with error for chain {}, {:?}", @@ -85,25 +111,49 @@ impl<'a> StakingIndexer<'a> { } } - async fn throw_when_no_events_logged(&self, starting_event_block: &u64) -> Result<()> { - let mut timer_begin_block = *starting_event_block; + async fn throw_when_no_events_logged(&self) -> Result<()> { + let mut timer_begin_event_count = self + .postgres_client + .get_stake_event_count(self.chain_id) + .await?; loop { // sleep for 15 minutes tokio::time::sleep(tokio::time::Duration::from_secs(900)).await; - let latest_logged_block = self + let event_count = self .postgres_client - .get_latest_block(self.chain_id, self.start_block) + .get_stake_event_count(self.chain_id) .await?; - if latest_logged_block == timer_begin_block { + if event_count <= timer_begin_event_count { return Err(eyre::eyre!( - "No events logged in the last 15 minutes for chain {}", - self.chain_id + "No events logged in the last 15 minutes for chain {}, event count is {}", + self.chain_id, + event_count )); } - timer_begin_block = latest_logged_block; + timer_begin_event_count = event_count; + } + } + + async fn throw_when_reindex_requested(&self) -> Result<()> { + loop { + // sleep for 1 minute + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + let requested_start_block = self + .postgres_client + .get_requested_start_block(self.chain_id) + .await?; + + if requested_start_block > 0 { + return Err(eyre::eyre!( + "Reindex requested for chain {} at block {}", + self.chain_id, + requested_start_block + )); + } } } @@ -115,7 +165,7 @@ impl<'a> StakingIndexer<'a> { Ok(block_number.as_u64()) } - async fn listen_for_stake_events(&self, query_start_block: &u64) -> Result<()> { + async fn listen_for_stake_events(&self, initial_query_start_block: &u64) -> Result<()> { let mut current_block: u64 = 2; if let Ok(block_number) = self.get_current_block().await { current_block = block_number; @@ -130,15 +180,19 @@ impl<'a> StakingIndexer<'a> { let id_staking_contract = IdentityStaking::new(*self.contract_address, client.clone()); - let mut last_queried_block: u64 = *query_start_block; + let mut query_start_block: u64 = *initial_query_start_block; // You can make eth_getLogs requests with up to a 2K block range and no limit on the response size // Reducing to 1k because of occasional timeout issues - while last_queried_block < current_block - 1 { - let query_end_block = min(last_queried_block + 1000, current_block - 1); + while query_start_block < current_block - 1 { + let query_end_block = min(query_start_block + 999, current_block - 1); + eprintln!( + "Debug - Querying past events for chain {} from block {} to block {}", + self.chain_id, query_start_block, query_end_block + ); let previous_events_query = id_staking_contract .events() - .from_block(last_queried_block + 1) + .from_block(query_start_block) .to_block(query_end_block) .query_with_meta() .await; @@ -148,12 +202,15 @@ impl<'a> StakingIndexer<'a> { for (event, meta) in previous_events.iter() { self.process_staking_event(&event, &meta, &client).await?; } - last_queried_block = query_end_block; + query_start_block = query_end_block + 1; } Err(err) => { return Err(eyre::eyre!( - "Error - Failed to query events: {}, {}, {:?}", - last_queried_block, query_end_block, err + "Error - Failed to query events for chain {}: {}, {}, {:?}", + self.chain_id, + query_start_block, + query_end_block, + err )); } } @@ -164,17 +221,17 @@ impl<'a> StakingIndexer<'a> { self.chain_id ); - let future_events = id_staking_contract - .events() - .from_block(max(last_queried_block + 1, current_block)); - - let mut stream = future_events.stream().await?.with_meta(); + let from_block = max(query_start_block, current_block); eprintln!( - "Debug - Listening for future events for chain {}", - self.chain_id + "Debug - Listening for future events for chain {} from block {}", + self.chain_id, from_block ); + let future_events = id_staking_contract.events().from_block(from_block); + + let mut stream = future_events.stream().await?.with_meta(); + while let Some(event_with_meta) = stream.next().await { let (event, meta) = match event_with_meta { Err(err) => {