Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement stake snapshot local state query #394

Merged
merged 7 commits into from
Feb 6, 2024
8 changes: 8 additions & 0 deletions examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ async fn do_localstate_query(client: &mut NodeClient) {
let result = queries_v16::get_current_pparams(client, era).await.unwrap();
println!("result: {:?}", result);

// Stake pool ID/verification key hash (either Bech32-decoded or hex-decoded).
// Empty Vec means all pools.
let pools = vec![];
let result = queries_v16::get_stake_snapshots(client, era, pools)
.await
.unwrap();
println!("result: {:?}", result);

client.send_release().await.unwrap();
}

Expand Down
12 changes: 9 additions & 3 deletions pallas-network/src/miniprotocols/localstate/queries_v16/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,16 @@ impl Encode<()> for BlockQuery {
e.u16(19)?;
e.encode(x)?;
}
BlockQuery::GetStakeSnapshots(x) => {
BlockQuery::GetStakeSnapshots(pools) => {
e.array(2)?;
e.u16(20)?;
e.encode(x)?;

if !pools.is_empty() {
e.array(Vec::len(pools) as u64)?;
e.tag(Tag::Unassigned(258))?;
}

e.encode(pools)?;
}
BlockQuery::GetPoolDistr(x) => {
e.array(2)?;
Expand Down Expand Up @@ -143,7 +149,7 @@ impl<'b> Decode<'b, ()> for BlockQuery {
// 17 => Ok(Self::GetStakePoolParams(())),
18 => Ok(Self::GetRewardInfoPools),
// 19 => Ok(Self::GetPoolState(())),
// 20 => Ok(Self::GetStakeSnapshots(())),
20 => Ok(Self::GetStakeSnapshots(d.decode()?)),
// 21 => Ok(Self::GetPoolDistr(())),
// 22 => Ok(Self::GetStakeDelegDeposits(())),
// 23 => Ok(Self::GetConstitutionHash),
Expand Down
61 changes: 60 additions & 1 deletion pallas-network/src/miniprotocols/localstate/queries_v16/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum BlockQuery {
GetStakePoolParams(AnyCbor),
GetRewardInfoPools,
GetPoolState(AnyCbor),
GetStakeSnapshots(AnyCbor),
GetStakeSnapshots(Pools),
GetPoolDistr(AnyCbor),
GetStakeDelegDeposits(AnyCbor),
GetConstitutionHash,
Expand Down Expand Up @@ -210,6 +210,8 @@ pub type Addr = Bytes;

pub type Addrs = Vec<Addr>;

pub type Pools = Vec<Option<Bytes>>;

pub type Coin = AnyUInt;

pub type PolicyId = Hash<28>;
Expand Down Expand Up @@ -270,13 +272,48 @@ pub struct UTxO {
pub index: AnyUInt,
}

#[derive(Debug, Encode, Decode, PartialEq)]
pub struct StakeSnapshot {
#[n(0)]
pub snapshots: Snapshots,
}

#[derive(Debug, Encode, Decode, PartialEq, Clone)]
pub struct Snapshots {
#[n(0)]
pub stake_snapshots: KeyValuePairs<Bytes, Stakes>,

#[n(1)]
pub snapshot_stake_mark_total: u64,

#[n(2)]
pub snapshot_stake_set_total: u64,

#[n(3)]
pub snapshot_stake_go_total: u64,
}

#[derive(Debug, Encode, Decode, PartialEq, Clone)]
pub struct Stakes {
#[n(0)]
pub snapshot_mark_pool: u64,

#[n(1)]
pub snapshot_set_pool: u64,

#[n(2)]
pub snapshot_go_pool: u64,
}

/// Get the current tip of the ledger.
pub async fn get_chain_point(client: &mut Client) -> Result<Point, ClientError> {
let query = Request::GetChainPoint;
let result = client.query(query).await?;

Ok(result)
}

/// Get the current era.
pub async fn get_current_era(client: &mut Client) -> Result<Era, ClientError> {
let query = HardForkQuery::GetCurrentEra;
let query = LedgerQuery::HardForkQuery(query);
Expand All @@ -286,13 +323,15 @@ pub async fn get_current_era(client: &mut Client) -> Result<Era, ClientError> {
Ok(result)
}

/// Get the system start time.
pub async fn get_system_start(client: &mut Client) -> Result<SystemStart, ClientError> {
let query = Request::GetSystemStart;
let result = client.query(query).await?;

Ok(result)
}

/// Get the current protocol parameters.
pub async fn get_current_pparams(
client: &mut Client,
era: u16,
Expand All @@ -305,6 +344,7 @@ pub async fn get_current_pparams(
Ok(result)
}

/// Get the block number for the current tip.
pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result<u32, ClientError> {
let query = BlockQuery::GetEpochNo;
let query = LedgerQuery::BlockQuery(era, query);
Expand All @@ -314,6 +354,7 @@ pub async fn get_block_epoch_number(client: &mut Client, era: u16) -> Result<u32
Ok(result)
}

/// Get the current stake distribution for the given era.
pub async fn get_stake_distribution(
client: &mut Client,
era: u16,
Expand All @@ -326,6 +367,7 @@ pub async fn get_stake_distribution(
Ok(result)
}

/// Get the UTxO set for the given era.
pub async fn get_utxo_by_address(
client: &mut Client,
era: u16,
Expand All @@ -338,3 +380,20 @@ pub async fn get_utxo_by_address(

Ok(result)
}

/// Get stake snapshots for the given era and stake pools.
/// If `pools` are empty, all pools are queried.
/// Otherwise, only the specified pool is queried.
/// Note: This Query is limited by 1 pool per request.
pub async fn get_stake_snapshots(
client: &mut Client,
era: u16,
pools: Vec<Option<Bytes>>,
) -> Result<StakeSnapshot, ClientError> {
let query = BlockQuery::GetStakeSnapshots(pools);
let query = LedgerQuery::BlockQuery(era, query);
let query = Request::LedgerQuery(query);
let result = client.query(query).await?;

Ok(result)
}
90 changes: 88 additions & 2 deletions pallas-network/tests/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::fs;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;

use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
use pallas_codec::utils::{AnyCbor, AnyUInt, Bytes, KeyValuePairs, TagWrap};
use pallas_crypto::hash::Hash;
use pallas_network::facades::{NodeClient, PeerClient, PeerServer};
use pallas_network::miniprotocols::blockfetch::BlockRequest;
use pallas_network::miniprotocols::chainsync::{ClientRequest, HeaderContent, Tip};
use pallas_network::miniprotocols::handshake::n2n::VersionData;
use pallas_network::miniprotocols::localstate::queries_v16::{Addr, Addrs, UnitInterval, Value};
use pallas_network::miniprotocols::localstate::queries_v16::{
Addr, Addrs, Snapshots, Stakes, UnitInterval, Value,
};
use pallas_network::miniprotocols::localstate::ClientQueryRequest;
use pallas_network::miniprotocols::txsubmission::{EraTxBody, TxIdAndSize};
use pallas_network::miniprotocols::{
Expand Down Expand Up @@ -658,6 +660,50 @@ pub async fn local_state_query_server_and_client_happy_path() {

server.statequery().send_result(result).await.unwrap();

// server receives query from client

let query: localstate::queries_v16::Request =
match server.statequery().recv_while_acquired().await.unwrap() {
ClientQueryRequest::Query(q) => q.into_decode().unwrap(),
x => panic!("unexpected message from client: {x:?}"),
};

assert_eq!(
query,
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
5,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(vec![]),
),
)
);

assert_eq!(*server.statequery().state(), localstate::State::Querying);

let pool_id: Bytes =
hex::decode("fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b")
.unwrap()
.into();

let stake_snapshots = KeyValuePairs::from(vec![(
pool_id,
Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 0,
snapshot_go_pool: 0,
},
)]);

let snapshots = Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 0,
snapshot_stake_set_total: 0,
snapshot_stake_go_total: 0,
};

let result = AnyCbor::from_encode(localstate::queries_v16::StakeSnapshot { snapshots });
server.statequery().send_result(result).await.unwrap();

assert_eq!(*server.statequery().state(), localstate::State::Acquired);

// server receives re-acquire from the client
Expand Down Expand Up @@ -874,6 +920,46 @@ pub async fn local_state_query_server_and_client_happy_path() {
}]
);

let request = AnyCbor::from_encode(localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
5,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(vec![]),
),
));

client.statequery().send_query(request).await.unwrap();

let result: localstate::queries_v16::StakeSnapshot = client
.statequery()
.recv_while_querying()
.await
.unwrap()
.into_decode()
.unwrap();

let pool_id: Bytes =
hex::decode("fdb5834ba06eb4baafd50550d2dc9b3742d2c52cc5ee65bf8673823b")
.unwrap()
.into();

let stake_snapshots = KeyValuePairs::from(vec![(
pool_id,
Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 0,
snapshot_go_pool: 0,
},
)]);

let snapshots = Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 0,
snapshot_stake_set_total: 0,
snapshot_stake_go_total: 0,
};

assert_eq!(result, localstate::queries_v16::StakeSnapshot { snapshots });

// client sends a ReAquire
client
.statequery()
Expand Down