Skip to content

Commit

Permalink
Introduce get_leader_tpus
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Nov 8, 2024
1 parent 97c32c8 commit d234341
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 70 deletions.
67 changes: 18 additions & 49 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -70,37 +67,23 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
let leader_pubkeys: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
self.recent_peers
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
.get(leader_pubkey)
.and_then(|addr| match protocol {
Protocol::UDP => Some(&addr.0),
Protocol::QUIC => Some(&addr.1),
})
})
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
.collect()
}
}

Expand Down Expand Up @@ -279,8 +262,8 @@ mod test {
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -298,11 +281,8 @@ mod test {
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -320,25 +300,14 @@ mod test {
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert!(
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
);
}
}
Expand Down
32 changes: 18 additions & 14 deletions send-transaction-service/src/tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use {
solana_connection_cache::connection_cache::Protocol, solana_sdk::clock::Slot,
std::net::SocketAddr,
};
use {solana_connection_cache::connection_cache::Protocol, std::net::SocketAddr};

/// A trait to abstract out the leader estimation for the SendTransactionService.
pub trait TpuInfo {
fn refresh_recent_peers(&mut self);
/// Takes `max_count` which specifies how many leaders per
/// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns *unique*
/// TPU socket addresses for these leaders.
///
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2,
/// L1, ...]` it will return `[L1, L2]` (the last L1 will be not added to
/// the result).
fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;
fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)>;

/// Takes `max_count` which specifies how many leaders per
/// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns TPU socket
/// addresses for these leaders.
///
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, ...]` it will
/// return `[L1, L2]`.
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;
}

#[derive(Clone)]
Expand All @@ -21,11 +29,7 @@ impl TpuInfo for NullTpuInfo {
fn get_unique_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> {
vec![]
}
fn get_leader_tpus_with_slots(
&self,
_max_count: u64,
_protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> {
vec![]
}
}
11 changes: 4 additions & 7 deletions send-transaction-service/src/transaction_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ where
}
}

fn get_tpu_addresses_with_slots<'a>(
&'a self,
leader_info: Option<&'a T>,
) -> Vec<&'a SocketAddr> {
fn get_unique_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> {
leader_info
.map(|leader_info| {
leader_info.get_unique_leader_tpus(
Expand Down Expand Up @@ -143,7 +140,7 @@ where
.unwrap_or_default();
let mut leader_info_provider = self.leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let leader_addresses = self.get_tpu_addresses_with_slots(leader_info);
let leader_addresses = self.get_unique_tpu_addresses(leader_info);
addresses.extend(leader_addresses);

for address in &addresses {
Expand Down Expand Up @@ -174,12 +171,12 @@ impl<T> LeaderUpdater for SendTransactionServiceLeaderUpdater<T>
where
T: TpuInfoWithSendStatic,
{
fn next_leaders(&mut self, lookahead_slots: usize) -> Vec<SocketAddr> {
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
let discovered_peers = self
.leader_info_provider
.get_leader_info()
.map(|leader_info| {
leader_info.get_unique_leader_tpus(lookahead_slots as u64, Protocol::QUIC)
leader_info.get_leader_tpus(lookahead_leaders as u64, Protocol::QUIC)
})
.filter(|addresses| !addresses.is_empty())
.unwrap_or_else(|| vec![&self.my_tpu_address]);
Expand Down

0 comments on commit d234341

Please sign in to comment.