Skip to content

Commit

Permalink
add more fields for rewards, refactoring, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill-K-1 committed Sep 27, 2024
1 parent ebfb84c commit b08db43
Show file tree
Hide file tree
Showing 19 changed files with 865 additions and 49 deletions.
File renamed without changes.
494 changes: 494 additions & 0 deletions artifacts/RewardDistributionContract.json

Large diffs are not rendered by default.

File renamed without changes.
12 changes: 10 additions & 2 deletions event-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "event_indexer"
path = "src/lib.rs"

[[bin]]
name = "airdao-event-indexer"
path = "src/main.rs"

[dependencies]
# Local
shared = { path = "../shared/" }
Expand Down Expand Up @@ -66,5 +74,5 @@ clap = { workspace = true }
assert_matches = { workspace = true }

[features]
default = []
enable-integration-tests = []
default = ["enable-integration-tests"]
enable-integration-tests = ["shared/enable-integration-tests"]
37 changes: 17 additions & 20 deletions event-indexer/src/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,28 @@ use futures_util::StreamExt;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;

use crate::config::AppConfig;

const ERR_UNHANDLED_EVENT: &str = "Unhandled event";
const HUMAN_SBT_MINT_EVENT_SIGNATURE: ethereum_types::H256 =
shared::event_signature!("SBTMint(address,uint256,uint256)");
const NON_EXP_SBT_MINT_EVENT_SIGNATURE: ethereum_types::H256 =
shared::event_signature!("SBTMint(address)");
const SBT_BURN_EVENT_SIGNATURE: ethereum_types::H256 = shared::event_signature!("SBTBurn(address)");
const REWARD_EVENT_SIGNATURE: ethereum_types::H256 =
shared::event_signature!("Reward(address,amount,timestamp)");
shared::event_signature!("Reward(address,address,uint256,uint256,string,string,string,string)");
const CLAIM_REWARD_EVENT_SIGNATURE: ethereum_types::H256 =
shared::event_signature!("ClaimReward(address,id,amount)");
shared::event_signature!("ClaimReward(address,uint256,uint256)");
const REVERT_REWARD_EVENT_SIGNATURE: ethereum_types::H256 =
shared::event_signature!("RevertReward(id,total)");
shared::event_signature!("RevertReward(uint256,uint256)");

pub struct EventListener<T: PubsubClient> {
provider: Arc<Provider<T>>,
block_number: u64,
sbt_name_by_addr: HashMap<Address, String>,
config: Arc<AppConfig>,
contracts: HashMap<String, Address>,
}

#[derive(Clone)]
struct EventLoopContext {
#[allow(unused)]
config: Arc<AppConfig>,
tx: mpsc::UnboundedSender<GovEventNotification>,
block: u64,
}
Expand Down Expand Up @@ -134,10 +130,17 @@ impl std::fmt::Display for SBTBurnEvent {
#[derive(Debug, Clone, EthEvent)]
#[ethevent(name = "Reward")]
pub struct RewardEvent {
#[ethevent(indexed)]
pub grantor: Address,
#[ethevent(indexed)]
pub wallet: Address,
pub amount: U256,
pub timestamp: U256,
#[ethevent(name = "eventName")]
pub name: String,
pub region: String,
pub community: String,
pub pseudo: String,
}

impl std::fmt::Display for RewardEvent {
Expand Down Expand Up @@ -180,7 +183,7 @@ impl std::fmt::Display for RevertRewardEvent {
pub enum GovEvent {
SBTMint(Address),
SBTBurn(Address),
Reward(Address, U256, U256),
Reward(RewardEvent),
ClaimReward(Address, u64),
RevertReward(u64),
}
Expand Down Expand Up @@ -234,8 +237,7 @@ impl EthEvent for GovEvent {

// Reward distribution events
Some(topic) if topic.eq(&REWARD_EVENT_SIGNATURE) => {
<RewardEvent as EthLogDecode>::decode_log(log)
.map(|event| Self::Reward(event.wallet, event.amount, event.timestamp))
<RewardEvent as EthLogDecode>::decode_log(log).map(Self::Reward)
}
Some(topic) if topic.eq(&CLAIM_REWARD_EVENT_SIGNATURE) => {
<ClaimRewardEvent as EthLogDecode>::decode_log(log)
Expand Down Expand Up @@ -265,19 +267,15 @@ impl EthEvent for GovEvent {

impl<T: PubsubClient> EventListener<T> {
pub fn new(
config: AppConfig,
contracts: HashMap<String, Address>,
provider: Arc<Provider<T>>,
block_number: u64,
) -> anyhow::Result<Self> {
Ok(Self {
provider,
block_number,
sbt_name_by_addr: config
.contracts
.iter()
.map(|(k, v)| (*v, k.clone()))
.collect(),
config: Arc::new(config),
sbt_name_by_addr: contracts.iter().map(|(k, v)| (*v, k.clone())).collect(),
contracts,
})
}

Expand All @@ -286,7 +284,6 @@ impl<T: PubsubClient> EventListener<T> {
tx: mpsc::UnboundedSender<GovEventNotification>,
) -> anyhow::Result<()> {
let mut context = EventLoopContext {
config: self.config.clone(),
tx,
block: self.block_number,
};
Expand All @@ -306,7 +303,7 @@ impl<T: PubsubClient> EventListener<T> {
};

let mut event: Event<Arc<Provider<T>>, Provider<T>, _> = EthEvent::new(
Filter::new().address(self.config.contracts.values().copied().collect::<Vec<_>>()),
Filter::new().address(self.contracts.values().copied().collect::<Vec<_>>()),
self.provider.clone(),
)
.from_block(context.block);
Expand Down
6 changes: 6 additions & 0 deletions event-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[allow(unused)]
mod auto_refresh_token;
pub mod config;
pub mod event_listener;
#[allow(unused)]
mod gov_db_provider;
31 changes: 21 additions & 10 deletions event-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<GovEventNotification>();

let listener = EventListener::new(config, provider, indexer_state_redis_cache.block_number)?;
let listener = EventListener::new(
config.contracts,
provider,
indexer_state_redis_cache.block_number,
)?;

tokio::spawn(async move {
let idle_interval = std::time::Duration::from_secs(10);
Expand Down Expand Up @@ -66,34 +70,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
block_number = event_notification.block_number
);

let result = match event_notification.event {
let result = match &event_notification.event {
GovEvent::SBTMint(wallet) => {
let sbt = SBTInfo {
name: event_notification.contract_name.clone(),
address: event_notification.contract_address,
issued_at_block: event_notification.block_number,
};

gov_db_provider.upsert_user_sbt(wallet, sbt).await
gov_db_provider.upsert_user_sbt(*wallet, sbt).await
}
GovEvent::SBTBurn(wallet) => {
gov_db_provider
.remove_user_sbt(wallet, event_notification.contract_address)
.remove_user_sbt(*wallet, event_notification.contract_address)
.await
}
GovEvent::Reward(wallet, amount, timestamp) => {
GovEvent::Reward(event) => {
gov_db_provider
.insert_reward(RewardInfo {
wallet,
id: event_notification.block_number,
amount,
timestamp: timestamp.as_u64(),
grantor: event.grantor,
wallet: event.wallet,
amount: event.amount,
timestamp: event.timestamp.as_u64(),
event_name: event.name.clone(),
region: event.region.clone(),
community: Some(event.community.clone()).filter(|v| !v.is_empty()),
pseudo: Some(event.pseudo.clone()).filter(|v| !v.is_empty()),
status: RewardStatus::Granted,
})
.await
}
GovEvent::ClaimReward(wallet, id) => gov_db_provider.claim_reward(wallet, id).await,
GovEvent::RevertReward(id) => gov_db_provider.revert_reward(id).await,
GovEvent::ClaimReward(wallet, id) => {
gov_db_provider.claim_reward(*wallet, *id).await
}
GovEvent::RevertReward(id) => gov_db_provider.revert_reward(*id).await,
};

if let Ok(axum::Json(())) = result {
Expand Down
Loading

0 comments on commit b08db43

Please sign in to comment.