Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add rpc support for partitioned rewards #34773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 123 additions & 34 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
base64::{prelude::BASE64_STANDARD, Engine},
bincode::{config::Options, serialize},
crossbeam_channel::{unbounded, Receiver, Sender},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, ErrorCode, Metadata, Result},
jsonrpc_derive::rpc,
solana_account_decoder::{
parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount},
Expand Down Expand Up @@ -62,6 +62,10 @@ use {
clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_rewards_hasher::EpochRewardsHasher,
epoch_rewards_partition_data::{
get_epoch_rewards_partition_data_address, EpochRewardsPartitionDataVersion,
},
epoch_schedule::EpochSchedule,
exit::Exit,
feature_set,
Expand Down Expand Up @@ -519,6 +523,38 @@ impl JsonRpcRequestProcessor {
})
}

async fn get_reward_map<F>(
&self,
slot: Slot,
addresses: &[String],
reward_type_filter: &F,
config: &RpcEpochConfig,
) -> Result<HashMap<String, (Reward, Slot)>>
where
F: Fn(RewardType) -> bool,
{
let Ok(Some(block)) = self
.get_block(
slot,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable { slot }.into());
};

Ok(block
.rewards
.unwrap_or_default()
.into_iter()
.filter(|reward| {
reward.reward_type.is_some_and(reward_type_filter)
&& addresses.contains(&reward.pubkey)
})
.map(|reward| (reward.clone().pubkey, (reward, slot)))
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
.collect())
}

pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
Expand All @@ -527,18 +563,20 @@ impl JsonRpcRequestProcessor {
let config = config.unwrap_or_default();
let epoch_schedule = self.get_epoch_schedule();
let first_available_block = self.get_first_available_block().await;
let slot_context = RpcContextConfig {
commitment: config.commitment,
min_context_slot: config.min_context_slot,
};
let epoch = match config.epoch {
Some(epoch) => epoch,
None => epoch_schedule
.get_epoch(self.get_slot(RpcContextConfig {
commitment: config.commitment,
min_context_slot: config.min_context_slot,
})?)
.get_epoch(self.get_slot(slot_context)?)
.saturating_sub(1),
};

// Rewards for this epoch are found in the first confirmed block of the next epoch
let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1));
let rewards_epoch = epoch.saturating_add(1);
let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(rewards_epoch);

if first_slot_in_epoch < first_available_block {
if self.bigtable_ledger_storage.is_some() {
return Err(RpcCustomError::LongTermStorageSlotSkipped {
Expand All @@ -554,51 +592,103 @@ impl JsonRpcRequestProcessor {
}
}

let bank = self.get_bank_with_config(slot_context)?;

let first_confirmed_block_in_epoch = *self
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)
.await?
.first()
.ok_or(RpcCustomError::BlockNotAvailable {
slot: first_slot_in_epoch,
})?;
let partitioned_epoch_reward_enabled_slot = bank
.feature_set
.activated_slot(&feature_set::enable_partitioned_epoch_reward::id());
let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot
.map(|slot| slot <= first_confirmed_block_in_epoch)
.unwrap_or(false);

let Ok(Some(first_confirmed_block)) = self
.get_block(
let mut reward_map: HashMap<String, (Reward, Slot)> = {
let addresses: Vec<String> =
addresses.iter().map(|pubkey| pubkey.to_string()).collect();

self.get_reward_map(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
&addresses,
&|reward_type| -> bool {
reward_type == RewardType::Voting
|| (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking)
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
},
&config,
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable {
slot: first_confirmed_block_in_epoch,
}
.into());
.await?
};

let addresses: Vec<String> = addresses
.into_iter()
.map(|pubkey| pubkey.to_string())
.collect();
if partitioned_epoch_reward_enabled {
let partition_data_address = get_epoch_rewards_partition_data_address(rewards_epoch);
let partition_data_account =
bank.get_account(&partition_data_address)
.ok_or_else(|| Error {
code: ErrorCode::InternalError,
message: format!(
"Partition data account not found for epoch {:?} at {:?}",
epoch, partition_data_address
),
data: None,
})?;
let EpochRewardsPartitionDataVersion::V0(partition_data) =
bincode::deserialize(partition_data_account.data())
.map_err(|_| Error::internal_error())?;
let hasher = EpochRewardsHasher::new(
partition_data.num_partitions,
&partition_data.parent_blockhash,
);
let mut partition_index_addresses: HashMap<usize, Vec<String>> = HashMap::new();
for address in addresses.iter() {
let address_string = address.to_string();
// Skip this address if (Voting) rewards were already found in
// the first block of the epoch
if !reward_map.contains_key(&address_string) {
let partition_index = hasher.clone().hash_address_to_partition(address);
partition_index_addresses
.entry(partition_index)
.and_modify(|list| list.push(address_string.clone()))
.or_insert(vec![address_string]);
}
}

let reward_hash: HashMap<String, Reward> = first_confirmed_block
.rewards
.unwrap_or_default()
.into_iter()
.filter_map(|reward| match reward.reward_type? {
RewardType::Staking | RewardType::Voting => addresses
.contains(&reward.pubkey)
.then(|| (reward.clone().pubkey, reward)),
_ => None,
})
.collect();
let block_list = self
.get_blocks_with_limit(
first_confirmed_block_in_epoch + 1,
partition_data.num_partitions,
config.commitment,
)
.await?;

for (partition_index, addresses) in partition_index_addresses.iter() {
let slot = *block_list
.get(*partition_index)
.ok_or_else(Error::internal_error)?;

let index_reward_map = self
.get_reward_map(
slot,
addresses,
&|reward_type| -> bool { reward_type == RewardType::Staking },
&config,
)
.await?;
reward_map.extend(index_reward_map);
}
}

let rewards = addresses
.iter()
.map(|address| {
if let Some(reward) = reward_hash.get(address) {
if let Some((reward, slot)) = reward_map.get(&address.to_string()) {
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
return Some(RpcInflationReward {
epoch,
effective_slot: first_confirmed_block_in_epoch,
effective_slot: *slot,
amount: reward.lamports.unsigned_abs(),
post_balance: reward.post_balance,
commission: reward.commission,
Expand All @@ -607,7 +697,6 @@ impl JsonRpcRequestProcessor {
None
})
.collect();

Ok(rewards)
}

Expand Down
5 changes: 1 addition & 4 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use {
account::AccountSharedData,
clock::Slot,
epoch_schedule::EpochSchedule,
feature_set,
native_token::sol_to_lamports,
pubkey::Pubkey,
rent::Rent,
Expand Down Expand Up @@ -349,9 +348,7 @@ fn main() {
exit(1);
});

let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();
// Remove this when client support is ready for the enable_partitioned_epoch_reward feature
features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id());
let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();

if TestValidatorGenesis::ledger_exists(&ledger_path) {
for (name, long) in &[
Expand Down
Loading