Skip to content

Commit

Permalink
Geyser: add starting entry to ReplicaEntryInfo(V2) (solana-labs#33963)
Browse files Browse the repository at this point in the history
* Add ReplicaEntryInfoV2

* Add starting_transaction_index field to EntryNotification

* Populate starting_transaction_index in replay stage

* Cache and populate starting_transaction_index in banking stage

* Build ReplicaEntryInfoV2
  • Loading branch information
Tyera authored Nov 14, 2023
1 parent ad65c82 commit 0e91e96
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 10 deletions.
6 changes: 6 additions & 0 deletions core/src/tpu_entry_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl TpuEntryNotifier {
.spawn(move || {
let mut current_slot = 0;
let mut current_index = 0;
let mut current_transaction_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand All @@ -41,6 +42,7 @@ impl TpuEntryNotifier {
&broadcast_entry_sender,
&mut current_slot,
&mut current_index,
&mut current_transaction_index,
) {
break;
}
Expand All @@ -57,11 +59,13 @@ impl TpuEntryNotifier {
broadcast_entry_sender: &Sender<WorkingBankEntry>,
current_slot: &mut u64,
current_index: &mut usize,
current_transaction_index: &mut usize,
) -> Result<(), RecvTimeoutError> {
let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
let index = if slot != *current_slot {
*current_index = 0;
*current_transaction_index = 0;
*current_slot = slot;
0
} else {
Expand All @@ -78,11 +82,13 @@ impl TpuEntryNotifier {
slot,
index,
entry: entry_summary,
starting_transaction_index: *current_transaction_index,
}) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}",
);
}
*current_transaction_index += entry.transactions.len();

if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) {
warn!(
Expand Down
19 changes: 19 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,31 @@ pub struct ReplicaEntryInfo<'a> {
pub executed_transaction_count: u64,
}

#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaEntryInfoV2<'a> {
/// The slot number of the block containing this Entry
pub slot: Slot,
/// The Entry's index in the block
pub index: usize,
/// The number of hashes since the previous Entry
pub num_hashes: u64,
/// The Entry's SHA-256 hash, generated from the previous Entry's hash with
/// `solana_entry::entry::next_hash()`
pub hash: &'a [u8],
/// The number of executed transactions in the Entry
pub executed_transaction_count: u64,
/// The index-in-block of the first executed transaction in this Entry
pub starting_transaction_index: usize,
}

/// A wrapper to future-proof ReplicaEntryInfo handling. To make a change to the structure of
/// ReplicaEntryInfo, add an new enum variant wrapping a newer version, which will force plugin
/// implementations to handle the change.
#[repr(u32)]
pub enum ReplicaEntryInfoVersions<'a> {
V0_0_1(&'a ReplicaEntryInfo<'a>),
V0_0_2(&'a ReplicaEntryInfoV2<'a>),
}

#[derive(Clone, Debug)]
Expand Down
21 changes: 15 additions & 6 deletions geyser-plugin-manager/src/entry_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
log::*,
solana_entry::entry::EntrySummary,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaEntryInfo, ReplicaEntryInfoVersions,
ReplicaEntryInfoV2, ReplicaEntryInfoVersions,
},
solana_ledger::entry_notifier_interface::EntryNotifier,
solana_measure::measure::Measure,
Expand All @@ -18,21 +18,28 @@ pub(crate) struct EntryNotifierImpl {
}

impl EntryNotifier for EntryNotifierImpl {
fn notify_entry<'a>(&'a self, slot: Slot, index: usize, entry: &'a EntrySummary) {
fn notify_entry<'a>(
&'a self,
slot: Slot,
index: usize,
entry: &'a EntrySummary,
starting_transaction_index: usize,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");

let plugin_manager = self.plugin_manager.read().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}

let entry_info = Self::build_replica_entry_info(slot, index, entry);
let entry_info =
Self::build_replica_entry_info(slot, index, entry, starting_transaction_index);

for plugin in plugin_manager.plugins.iter() {
if !plugin.entry_notifications_enabled() {
continue;
}
match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_1(&entry_info)) {
match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_info)) {
Err(err) => {
error!(
"Failed to notify entry, error: ({}) to plugin {}",
Expand Down Expand Up @@ -64,13 +71,15 @@ impl EntryNotifierImpl {
slot: Slot,
index: usize,
entry: &'_ EntrySummary,
) -> ReplicaEntryInfo<'_> {
ReplicaEntryInfo {
starting_transaction_index: usize,
) -> ReplicaEntryInfoV2<'_> {
ReplicaEntryInfoV2 {
slot,
index,
num_hashes: entry.num_hashes,
hash: entry.hash.as_ref(),
executed_transaction_count: entry.num_transactions,
starting_transaction_index,
}
}
}
1 change: 1 addition & 0 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,7 @@ fn confirm_slot_entries(
slot,
index: entry_index,
entry: entry.into(),
starting_transaction_index: entry_tx_starting_index,
}) {
warn!(
"Slot {}, entry {} entry_notification_sender send failed: {:?}",
Expand Down
8 changes: 7 additions & 1 deletion ledger/src/entry_notifier_interface.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use {solana_entry::entry::EntrySummary, solana_sdk::clock::Slot, std::sync::Arc};

pub trait EntryNotifier {
fn notify_entry(&self, slot: Slot, index: usize, entry: &EntrySummary);
fn notify_entry(
&self,
slot: Slot,
index: usize,
entry: &EntrySummary,
starting_transaction_index: usize,
);
}

pub type EntryNotifierArc = Arc<dyn EntryNotifier + Sync + Send>;
11 changes: 8 additions & 3 deletions ledger/src/entry_notifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct EntryNotification {
pub slot: Slot,
pub index: usize,
pub entry: EntrySummary,
pub starting_transaction_index: usize,
}

pub type EntryNotifierSender = Sender<EntryNotification>;
Expand Down Expand Up @@ -54,9 +55,13 @@ impl EntryNotifierService {
entry_notification_receiver: &EntryNotifierReceiver,
entry_notifier: EntryNotifierArc,
) -> Result<(), RecvTimeoutError> {
let EntryNotification { slot, index, entry } =
entry_notification_receiver.recv_timeout(Duration::from_secs(1))?;
entry_notifier.notify_entry(slot, index, &entry);
let EntryNotification {
slot,
index,
entry,
starting_transaction_index,
} = entry_notification_receiver.recv_timeout(Duration::from_secs(1))?;
entry_notifier.notify_entry(slot, index, &entry, starting_transaction_index);
Ok(())
}

Expand Down

0 comments on commit 0e91e96

Please sign in to comment.