Skip to content

Commit

Permalink
verifies retransmitter signature on chained Merkle shreds (#1735)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6f94686)
  • Loading branch information
behzadnouri authored and AshwinSekar committed Jul 25, 2024
1 parent 4234b1b commit 5c143b8
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 19 additions & 6 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,8 @@ pub mod layout {
.map(Hash::new)
}

pub(crate) fn set_retransmitter_signature(
shred: &mut [u8],
signature: &Signature,
) -> Result<(), Error> {
let offset = match get_shred_variant(shred)? {
fn get_retransmitter_signature_offset(shred: &[u8]) -> Result<usize, Error> {
match get_shred_variant(shred)? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant),
ShredVariant::MerkleCode {
proof_size,
Expand All @@ -777,7 +774,23 @@ pub mod layout {
} => {
merkle::ShredData::get_retransmitter_signature_offset(proof_size, chained, resigned)
}
}?;
}
}

pub fn get_retransmitter_signature(shred: &[u8]) -> Result<Signature, Error> {
let offset = get_retransmitter_signature_offset(shred)?;
shred
.get(offset..offset + SIZE_OF_SIGNATURE)
.map(|bytes| <[u8; SIZE_OF_SIGNATURE]>::try_from(bytes).unwrap())
.map(Signature::from)
.ok_or(Error::InvalidPayloadSize(shred.len()))
}

pub(crate) fn set_retransmitter_signature(
shred: &mut [u8],
signature: &Signature,
) -> Result<(), Error> {
let offset = get_retransmitter_signature_offset(shred)?;
let Some(buffer) = shred.get_mut(offset..offset + SIZE_OF_SIGNATURE) else {
return Err(Error::InvalidPayloadSize(shred.len()));
};
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

Expand Down
3 changes: 1 addition & 2 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ impl ClusterNodes<RetransmitStage> {

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or if it is not staked.
#[allow(unused)]
fn get_retransmit_parent(
pub(crate) fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
Expand Down
158 changes: 140 additions & 18 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use {
crate::{
cluster_nodes::{self, ClusterNodesCache},
retransmit_stage::RetransmitStage,
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
Expand All @@ -9,15 +13,22 @@ use {
},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_runtime::{
bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
bank_forks::BankForks,
},
solana_sdk::{
clock::Slot,
pubkey::Pubkey,
signature::{Keypair, Signer},
},
static_assertions::const_assert_eq,
std::{
collections::HashMap,
sync::{Arc, RwLock},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -30,6 +41,16 @@ const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);

// Num epochs capacity should be at least 2 because near the epoch boundary we
// may receive shreds from the other side of the epoch boundary. Because of the
// TTL based eviction it does not make sense to cache more than
// MAX_LEADER_SCHEDULE_STAKES epochs.
const_assert_eq!(CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, 5);
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = MAX_LEADER_SCHEDULE_STAKES as usize;
// Because for ClusterNodes::get_retransmit_parent only pubkeys of staked nodes
// are needed, we can use longer durations for cache TTL.
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(30);

#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
Expand All @@ -48,6 +69,10 @@ pub fn spawn_shred_sigverify(
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
let cache = RwLock::new(LruCache::new(SIGVERIFY_LRU_CACHE_CAPACITY));
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
Expand All @@ -66,13 +91,15 @@ pub fn spawn_shred_sigverify(
match run_shred_sigverify(
&thread_pool,
&keypair,
&cluster_info,
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&cluster_nodes_cache,
&cache,
&mut stats,
) {
Expand All @@ -94,13 +121,15 @@ pub fn spawn_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
keypair: &Keypair,
cluster_info: &ClusterInfo,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
cache: &RwLock<LruCache>,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
Expand Down Expand Up @@ -128,34 +157,59 @@ fn run_shred_sigverify<const K: usize>(
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
let (working_bank, root_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank())
};
verify_packets(
thread_pool,
&keypair.pubkey(),
bank_forks,
&working_bank,
leader_schedule_cache,
recycler_cache,
&mut packets,
cache,
);
stats.num_discards_post += count_discards(&packets);
// Resign shreds Merkle root as the retransmitter node.
// Verify retransmitter's signature, and resign shreds
// Merkle root as the retransmitter node.
let resign_start = Instant::now();
thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| !packet.meta().discard())
.for_each(|packet| {
if let Some(shred) = shred::layout::get_shred_mut(packet) {
// We can ignore Error::InvalidShredVariant because that
// basically means that the shred is of a variant which
// cannot be signed by the retransmitter node.
if !matches!(
shred::layout::resign_shred(shred, keypair),
Ok(()) | Err(shred::Error::InvalidShredVariant)
) {
packet.meta_mut().set_discard(true);
}
let repair = packet.meta().repair();
let Some(shred) = shred::layout::get_shred_mut(packet) else {
packet.meta_mut().set_discard(true);
return;
};
// Repair packets do not follow turbine tree and
// are verified using the trailing nonce.
if !repair
&& !verify_retransmitter_signature(
shred,
&root_bank,
&working_bank,
cluster_info,
leader_schedule_cache,
cluster_nodes_cache,
stats,
)
{
stats
.num_invalid_retransmitter
.fetch_add(1, Ordering::Relaxed);
}
// We can ignore Error::InvalidShredVariant because that
// basically means that the shred is of a variant which
// cannot be signed by the retransmitter node.
if !matches!(
shred::layout::resign_shred(shred, keypair),
Ok(()) | Err(shred::Error::InvalidShredVariant)
) {
packet.meta_mut().set_discard(true);
}
})
});
Expand All @@ -175,18 +229,64 @@ fn run_shred_sigverify<const K: usize>(
Ok(())
}

#[must_use]
fn verify_retransmitter_signature(
shred: &[u8],
root_bank: &Bank,
working_bank: &Bank,
cluster_info: &ClusterInfo,
leader_schedule_cache: &LeaderScheduleCache,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
stats: &ShredSigVerifyStats,
) -> bool {
let signature = match shred::layout::get_retransmitter_signature(shred) {
Ok(signature) => signature,
// If the shred is not of resigned variant,
// then there is nothing to verify.
Err(shred::Error::InvalidShredVariant) => return true,
Err(_) => return false,
};
let Some(merkle_root) = shred::layout::get_merkle_root(shred) else {
return false;
};
let Some(shred) = shred::layout::get_shred_id(shred) else {
return false;
};
let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), Some(working_bank))
else {
stats
.num_unknown_slot_leader
.fetch_add(1, Ordering::Relaxed);
return false;
};
let cluster_nodes =
cluster_nodes_cache.get(shred.slot(), root_bank, working_bank, cluster_info);
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank);
let parent = match cluster_nodes.get_retransmit_parent(&leader, &shred, data_plane_fanout) {
Ok(Some(parent)) => parent,
Ok(None) => return true,
Err(err) => {
error!("get_retransmit_parent: {err:?}");
stats
.num_unknown_turbine_parent
.fetch_add(1, Ordering::Relaxed);
return false;
}
};
signature.verify(parent.as_ref(), merkle_root.as_ref())
}

fn verify_packets(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
working_bank: &Bank,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
packets: &mut [PacketBatch],
cache: &RwLock<LruCache>,
) {
let working_bank = bank_forks.read().unwrap().working_bank();
let leader_slots: HashMap<Slot, Pubkey> =
get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank)
get_slot_leaders(self_pubkey, packets, leader_schedule_cache, working_bank)
.into_iter()
.filter_map(|(slot, pubkey)| Some((slot, pubkey?)))
.chain(std::iter::once((Slot::MAX, Pubkey::default())))
Expand Down Expand Up @@ -262,7 +362,10 @@ struct ShredSigVerifyStats {
num_discards_post: usize,
num_discards_pre: usize,
num_duplicates: usize,
num_invalid_retransmitter: AtomicUsize,
num_retransmit_shreds: usize,
num_unknown_slot_leader: AtomicUsize,
num_unknown_turbine_parent: AtomicUsize,
elapsed_micros: u64,
resign_micros: u64,
}
Expand All @@ -280,7 +383,10 @@ impl ShredSigVerifyStats {
num_deduper_saturations: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_invalid_retransmitter: AtomicUsize::default(),
num_retransmit_shreds: 0usize,
num_unknown_slot_leader: AtomicUsize::default(),
num_unknown_turbine_parent: AtomicUsize::default(),
elapsed_micros: 0u64,
resign_micros: 0u64,
}
Expand All @@ -299,7 +405,22 @@ impl ShredSigVerifyStats {
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
(
"num_invalid_retransmitter",
self.num_invalid_retransmitter.load(Ordering::Relaxed),
i64
),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
(
"num_unknown_slot_leader",
self.num_unknown_slot_leader.load(Ordering::Relaxed),
i64
),
(
"num_unknown_turbine_parent",
self.num_unknown_turbine_parent.load(Ordering::Relaxed),
i64
),
("elapsed_micros", self.elapsed_micros, i64),
("resign_micros", self.resign_micros, i64),
);
Expand Down Expand Up @@ -365,10 +486,11 @@ mod tests {

let cache = RwLock::new(LruCache::new(/*capacity:*/ 128));
let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
let working_bank = bank_forks.read().unwrap().working_bank();
verify_packets(
&thread_pool,
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&working_bank,
&leader_schedule_cache,
&RecyclerCache::warmed(),
&mut batches,
Expand Down

0 comments on commit 5c143b8

Please sign in to comment.