Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Total utxo staked ada amount #325

Merged
merged 29 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Arbritrary
asyncio
asyncpg
auditability
bech
bkioshn
bluefireteam
BROTLI
Expand Down
14 changes: 0 additions & 14 deletions catalyst-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ repository = "https://github.com/input-output-hk/catalyst-voices"
license = "MIT OR Apache-2.0"

[workspace.dependencies]

clap = "4"

tracing = "0.1.37"
tracing-subscriber = "0.3.16"

serde = "1.0"
serde_json = "1.0"

poem = "2.0.0"
poem-openapi = "4.0.0"
poem-extensions = "0.8.0"

prometheus = "0.13.0"
cryptoxide = "0.4.4"
uuid = "1"
Expand All @@ -37,25 +32,16 @@ panic-message = "0.3"
cpu-time = "1.0"
ulid = "1.0.1"
rust-embed = "8"

url = "2.4.1"

thiserror = "1.0"

chrono = "0.4"

async-trait = "0.1.64"

rust_decimal = "1.29"

bb8 = "0.8.1"
bb8-postgres = "0.8.1"
tokio-postgres = "0.7.10"

tokio = "1"

dotenvy = "0.15"

local-ip-address = "0.5.7"
gethostname = "0.4.3"

Expand Down
19 changes: 7 additions & 12 deletions catalyst-gateway/bin/src/event_db/follower.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Follower Queries

use cardano_chain_follower::Network;
use chrono::TimeZone;

use crate::event_db::{Error, EventDB};

/// Block time
pub type BlockTime = i64;
pub type BlockTime = chrono::DateTime<chrono::offset::Utc>;
/// Slot
pub type SlotNumber = i64;
/// Epoch
Expand All @@ -15,8 +14,6 @@ pub type EpochNumber = i64;
pub type BlockHash = String;
/// Unique follower id
pub type MachineId = String;
/// Time when a follower last indexed
pub type LastUpdate = chrono::DateTime<chrono::offset::Utc>;

impl EventDB {
/// Index follower block stream
Expand All @@ -26,8 +23,6 @@ impl EventDB {
) -> Result<(), Error> {
let conn = self.pool.get().await?;

let timestamp: chrono::DateTime<chrono::Utc> = chrono::Utc.timestamp_nanos(block_time);

let network = match network {
Network::Mainnet => "mainnet".to_string(),
Network::Preview => "preview".to_string(),
Expand All @@ -42,7 +37,7 @@ impl EventDB {
&slot_no,
&network,
&epoch_no,
&timestamp,
&block_time,
&hex::decode(block_hash).map_err(|e| Error::DecodeHex(e.to_string()))?,
],
)
Expand All @@ -55,7 +50,7 @@ impl EventDB {
/// Start follower from where previous follower left off.
pub(crate) async fn last_updated_metadata(
&self, network: String,
) -> Result<(SlotNumber, BlockHash, LastUpdate), Error> {
) -> Result<(SlotNumber, BlockHash, BlockTime), Error> {
let conn = self.pool.get().await?;

let rows = conn
Expand All @@ -72,16 +67,16 @@ impl EventDB {
return Err(Error::NoLastUpdateMetadata("No metadata".to_string()));
};

let slot_no: SlotNumber = match row.try_get("slot_no") {
let slot_no = match row.try_get("slot_no") {
Ok(slot) => slot,
Err(e) => return Err(Error::NoLastUpdateMetadata(e.to_string())),
};

let block_hash: BlockHash = match row.try_get::<_, Vec<u8>>("block_hash") {
let block_hash = match row.try_get::<_, Vec<u8>>("block_hash") {
Ok(block_hash) => hex::encode(block_hash),
Err(e) => return Err(Error::NoLastUpdateMetadata(e.to_string())),
};
let last_updated: LastUpdate = match row.try_get("ended") {
let last_updated = match row.try_get("ended") {
Ok(last_updated) => last_updated,
Err(e) => return Err(Error::NoLastUpdateMetadata(e.to_string())),
};
Expand All @@ -92,7 +87,7 @@ impl EventDB {
/// Mark point in time where the last follower finished indexing in order for future
/// followers to pick up from this point
pub(crate) async fn refresh_last_updated(
&self, last_updated: LastUpdate, slot_no: SlotNumber, block_hash: BlockHash,
&self, last_updated: BlockTime, slot_no: SlotNumber, block_hash: BlockHash,
network: Network, machine_id: &MachineId,
) -> Result<(), Error> {
let conn = self.pool.get().await?;
Expand Down
52 changes: 45 additions & 7 deletions catalyst-gateway/bin/src/event_db/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use cardano_chain_follower::Network;
use pallas::ledger::traverse::MultiEraTx;

use super::follower::SlotNumber;
use super::{
follower::{BlockTime, SlotNumber},
voter_registration::StakeCredential,
};
use crate::{
event_db::{
Error::{self, SqlTypeConversionFailure},
Expand All @@ -15,9 +18,12 @@ use crate::{
},
};

/// Stake amount.
pub(crate) type StakeAmount = i64;

impl EventDB {
/// Index utxo data
pub async fn index_utxo_data(
pub(crate) async fn index_utxo_data(
&self, txs: Vec<MultiEraTx<'_>>, slot_no: SlotNumber, network: Network,
) -> Result<(), Error> {
let conn = self.pool.get().await?;
Expand Down Expand Up @@ -57,9 +63,7 @@ impl EventDB {

let _rows = conn
.query(
include_str!(
"../../../event-db/queries/follower/utxo_index_utxo_query.sql"
),
include_str!("../../../event-db/queries/utxo/insert_utxo.sql"),
&[
&i32::try_from(index).map_err(|e| {
Error::NotFound(
Expand Down Expand Up @@ -90,7 +94,7 @@ impl EventDB {
}

/// Index txn metadata
pub async fn index_txn_data(
pub(crate) async fn index_txn_data(
&self, tx_id: &[u8], slot_no: SlotNumber, network: Network,
) -> Result<(), Error> {
let conn = self.pool.get().await?;
Expand All @@ -104,11 +108,45 @@ impl EventDB {

let _rows = conn
.query(
include_str!("../../../event-db/queries/follower/utxo_txn_index.sql"),
include_str!("../../../event-db/queries/utxo/insert_txn_index.sql"),
&[&tx_id, &slot_no, &network],
)
.await?;

Ok(())
}

/// Get total utxo amount
#[allow(dead_code)]
pub(crate) async fn total_utxo_amount(
&self, stake_credential: StakeCredential<'_>, network: Network, date_time: BlockTime,
) -> Result<(StakeAmount, SlotNumber, BlockTime), Error> {
let conn = self.pool.get().await?;

let network = match network {
Network::Mainnet => "mainnet".to_string(),
Network::Preview => "preview".to_string(),
Network::Preprod => "preprod".to_string(),
Network::Testnet => "testnet".to_string(),
};

let row = conn
.query_one(
include_str!("../../../event-db/queries/utxo/select_total_utxo_amount.sql"),
&[&stake_credential, &network, &date_time],
)
.await?;

// Aggregate functions as SUM and MAX return NULL if there are no rows, so we need to
// check for it.
// https://www.postgresql.org/docs/8.2/functions-aggregate.html
if let Some(amount) = row.try_get("total_utxo_amount")? {
let slot_number = row.try_get("slot_no")?;
let block_time = row.try_get("block_time")?;

Ok((amount, slot_number, block_time))
} else {
Err(Error::NotFound("Cannot find total utxo amount".to_string()))
}
}
}
27 changes: 14 additions & 13 deletions catalyst-gateway/bin/src/event_db/voter_registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,28 @@ use super::{Error, EventDB};
/// Transaction id
pub(crate) type TxId = String;
/// Stake credential
pub(crate) type StakeCredential = Box<[u8]>;
pub(crate) type StakeCredential<'a> = &'a [u8];
/// Public voting key
pub(crate) type PublicVotingKey = Box<[u8]>;
pub(crate) type PublicVotingKey<'a> = &'a [u8];
/// Payment address
pub(crate) type PaymentAddress = Box<[u8]>;
pub(crate) type PaymentAddress<'a> = &'a [u8];
/// Nonce
pub(crate) type Nonce = i64;
/// Metadata 61284
pub(crate) type Metadata61284 = Box<[u8]>;
pub(crate) type Metadata61284<'a> = &'a [u8];
/// Metadata 61285
pub(crate) type Metadata61285 = Box<[u8]>;
pub(crate) type Metadata61285<'a> = &'a [u8];
/// Stats
pub(crate) type Stats = Option<serde_json::Value>;

impl EventDB {
/// Inserts voter registration data, replacing any existing data.
#[allow(dead_code, clippy::too_many_arguments)]
async fn insert_voter_registration(
&self, tx_id: TxId, stake_credential: StakeCredential, public_voting_key: PublicVotingKey,
payment_address: PaymentAddress, nonce: Nonce, metadata_61284: Metadata61284,
metadata_61285: Metadata61285, valid: bool, stats: Stats,
&self, tx_id: TxId, stake_credential: StakeCredential<'_>,
public_voting_key: PublicVotingKey<'_>, payment_address: PaymentAddress<'_>, nonce: Nonce,
metadata_61284: Metadata61284<'_>, metadata_61285: Metadata61285<'_>, valid: bool,
stats: Stats,
) -> Result<(), Error> {
let conn = self.pool.get().await?;

Expand All @@ -36,12 +37,12 @@ impl EventDB {
),
&[
&hex::decode(tx_id).map_err(|e| Error::DecodeHex(e.to_string()))?,
&stake_credential.as_ref(),
&public_voting_key.as_ref(),
&payment_address.as_ref(),
&stake_credential,
&public_voting_key,
&payment_address,
&nonce,
&metadata_61284.as_ref(),
&metadata_61285.as_ref(),
&metadata_61284,
&metadata_61285,
&valid,
&stats,
],
Expand Down
7 changes: 4 additions & 3 deletions catalyst-gateway/bin/src/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use async_recursion::async_recursion;
use cardano_chain_follower::{
network_genesis_values, ChainUpdate, Follower, FollowerConfigBuilder, Network, Point,
};
use chrono::TimeZone;
use tokio::{task::JoinHandle, time};
use tracing::{error, info};

use crate::{
event_db::{
config::{FollowerMeta, NetworkMeta},
follower::{BlockHash, LastUpdate, MachineId, SlotNumber},
follower::{BlockHash, BlockTime, MachineId, SlotNumber},
EventDB,
},
util::valid_era,
Expand Down Expand Up @@ -150,7 +151,7 @@ async fn spawn_followers(
/// it left off. If there was no previous follower, start indexing from genesis point.
async fn find_last_update_point(
db: Arc<EventDB>, network: &String,
) -> Result<(Option<SlotNumber>, Option<BlockHash>, Option<LastUpdate>), Box<dyn Error>> {
) -> Result<(Option<SlotNumber>, Option<BlockHash>, Option<BlockTime>), Box<dyn Error>> {
let (slot_no, block_hash, last_updated) =
match db.last_updated_metadata(network.to_string()).await {
Ok((slot_no, block_hash, last_updated)) => {
Expand Down Expand Up @@ -214,7 +215,7 @@ async fn init_follower(
};

let wallclock = match block.wallclock(&genesis_values).try_into() {
Ok(time) => time,
Ok(time) => chrono::Utc.timestamp_nanos(time),
Err(err) => {
error!("Cannot parse wall time from block {:?} - skip..", err);
continue;
Expand Down
5 changes: 4 additions & 1 deletion catalyst-gateway/bin/src/service/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use local_ip_address::list_afinet_netifas;
use poem_openapi::{ContactObject, LicenseObject, OpenApiService, ServerObject};
use test_endpoints::TestApi;

use self::utxo::UTXOApi;
use crate::settings::{DocsSettings, API_URL_PREFIX};

mod health;
mod legacy;
mod test_endpoints;
mod utxo;

/// The name of the API
const API_TITLE: &str = "Catalyst Gateway";
Expand Down Expand Up @@ -58,11 +60,12 @@ const TERMS_OF_SERVICE: &str =
/// Create the `OpenAPI` definition
pub(crate) fn mk_api(
hosts: Vec<String>, settings: &DocsSettings,
) -> OpenApiService<(TestApi, HealthApi, LegacyApi), ()> {
) -> OpenApiService<(TestApi, HealthApi, UTXOApi, LegacyApi), ()> {
let mut service = OpenApiService::new(
(
TestApi,
HealthApi,
UTXOApi,
(legacy::RegistrationApi, legacy::V0Api, legacy::V1Api),
),
API_TITLE,
Expand Down
Loading
Loading