Skip to content

Commit

Permalink
Add a subscription for slot entry updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ebin-mathews committed Aug 28, 2024
1 parent 2241435 commit 61cc6e4
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 11 deletions.
36 changes: 34 additions & 2 deletions client/src/geyser_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
use jito_geyser_protos::solana::geyser::{
geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest,
MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest,
SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest,
SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
};
use log::*;
use lru::LruCache;
Expand All @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};

use crate::{
geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed},
types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate},
types::{
AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate,
},
GrpcInterceptor, Pubkey, Slot,
};

Expand Down Expand Up @@ -242,6 +245,35 @@ impl GeyserConsumer {
Ok(())
}

pub async fn consume_slot_entry_updates(
&self,
slot_updates_tx: UnboundedSender<SlotEntryUpdate>,
) -> Result<()> {
let mut c = self.client.clone();

let resp = c
.subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {})
.await?;
let mut stream = resp.into_inner();

while !self.exit.load(Ordering::Relaxed) {
match stream.message().await {
Ok(Some(slot_update)) => {
if slot_updates_tx
.send(SlotEntryUpdate::from(slot_update.entry_update.unwrap()))
.is_err()
{
return Err(GeyserConsumerError::ConsumerChannelDisconnected);
};
}
Ok(None) => return Err(StreamClosed),
Err(e) => return Err(e.into()),
}
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
fn process_account_update(
maybe_message: std::result::Result<Option<TimestampedAccountUpdate>, Status>,
Expand Down
14 changes: 14 additions & 0 deletions client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,17 @@ impl From<geyser::SlotUpdate> for SlotUpdate {
}
}
}

pub struct SlotEntryUpdate {
pub slot: Slot,
pub index: u64,
}

impl From<geyser::SlotEntryUpdate> for SlotEntryUpdate {
fn from(proto: geyser::SlotEntryUpdate) -> Self {
Self {
slot: proto.slot,
index: proto.index,
}
}
}
23 changes: 23 additions & 0 deletions proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ message GetHeartbeatIntervalResponse {
uint64 heartbeat_interval_ms = 1;
}

message SlotEntryUpdate {
// The slot number of the block containing this Entry
uint64 slot = 1;
// The Entry's index in the block
uint64 index = 2;
// The number of executed transactions in the Entry
// If this number is zero, we can assume its a tick entry
uint64 executed_transaction_count = 3;
}

message TimestampedSlotEntryUpdate {
// Time at which the message was generated
google.protobuf.Timestamp ts = 1;
// SlotEntryUpdate update
SlotEntryUpdate entry_update = 2;
}

message SubscribeSlotEntryUpdateRequest {}

// The following __must__ be assumed:
// - Clients may receive data for slots out of order.
// - Clients may receive account updates for a given slot out of order.
Expand Down Expand Up @@ -186,4 +205,8 @@ service Geyser {

// Subscribes to block updates.
rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {}

// Subscribes to entry updates.
// Returns the highest slot seen thus far and the entry index corresponding to the tick
rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {}
}
6 changes: 4 additions & 2 deletions proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use solana_transaction_status::{
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
};

use crate::solana::{entries, tx_by_addr};
use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta};
use crate::{
solana::{entries, storage::confirmed_block, tx_by_addr},
StoredExtendedRewards, StoredTransactionStatusMeta,
};

impl From<Vec<Reward>> for confirmed_block::Rewards {
fn from(rewards: Vec<Reward>) -> Self {
Expand Down
72 changes: 69 additions & 3 deletions server/src/geyser_grpc_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use std::{

use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
};
use bs58;
use crossbeam_channel::{bounded, Sender, TrySendError};
use jito_geyser_protos::solana::{
geyser::{
geyser_server::GeyserServer, AccountUpdate, BlockUpdate, SlotUpdate, SlotUpdateStatus,
TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotUpdate,
TimestampedTransactionUpdate, TransactionUpdate,
TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotEntryUpdate,
TimestampedSlotUpdate, TimestampedTransactionUpdate, TransactionUpdate,
},
storage::confirmed_block::ConfirmedTransaction,
};
Expand All @@ -44,6 +44,7 @@ pub struct PluginData {
/// Where updates are piped thru to the grpc service.
account_update_sender: Sender<TimestampedAccountUpdate>,
slot_update_sender: Sender<TimestampedSlotUpdate>,
slot_entry_update_sender: Sender<TimestampedSlotEntryUpdate>,
block_update_sender: Sender<TimestampedBlockUpdate>,
transaction_update_sender: Sender<TimestampedTransactionUpdate>,

Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct PluginConfig {
pub bind_address: String,
pub account_update_buffer_size: usize,
pub slot_update_buffer_size: usize,
pub slot_entry_update_buffer_size: usize,
pub block_update_buffer_size: usize,
pub transaction_update_buffer_size: usize,
pub skip_startup_stream: Option<bool>,
Expand Down Expand Up @@ -112,6 +114,8 @@ impl GeyserPlugin for GeyserGrpcPlugin {
let highest_write_slot = Arc::new(AtomicU64::new(0));
let (account_update_sender, account_update_rx) = bounded(config.account_update_buffer_size);
let (slot_update_sender, slot_update_rx) = bounded(config.slot_update_buffer_size);
let (slot_entry_update_sender, slot_entry_update_rx) =
bounded(config.slot_entry_update_buffer_size);

let (block_update_sender, block_update_receiver) = bounded(config.block_update_buffer_size);
let (transaction_update_sender, transaction_update_receiver) =
Expand All @@ -121,6 +125,7 @@ impl GeyserPlugin for GeyserGrpcPlugin {
config.geyser_service_config.clone(),
account_update_rx,
slot_update_rx,
slot_entry_update_rx,
block_update_receiver,
transaction_update_receiver,
highest_write_slot.clone(),
Expand Down Expand Up @@ -155,6 +160,7 @@ impl GeyserPlugin for GeyserGrpcPlugin {
server_exit_sender: server_exit_tx,
account_update_sender,
slot_update_sender,
slot_entry_update_sender,
block_update_sender,
transaction_update_sender,
highest_write_slot,
Expand Down Expand Up @@ -488,6 +494,66 @@ impl GeyserPlugin for GeyserGrpcPlugin {
fn transaction_notifications_enabled(&self) -> bool {
true
}

fn entry_notifications_enabled(&self) -> bool {
true
}

fn notify_entry(&self, entry: ReplicaEntryInfoVersions) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");

if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) {
return Ok(());
}

let slot_entry = utils::get_slot_and_index_from_replica_entry_info_versions(&entry);

debug!(
"Updating slot entry {} at index {}",
slot_entry.slot, slot_entry.index
);

match data
.slot_entry_update_sender
.try_send(TimestampedSlotEntryUpdate {
ts: Some(prost_types::Timestamp::from(SystemTime::now())),
entry_update: Some(slot_entry),
}) {
Ok(_) => Ok(()),
Err(TrySendError::Full(_)) => {
warn!("slot_entry_update channel full, skipping");
Ok(())
}
Err(TrySendError::Disconnected(_)) => {
error!("slot entry info send error");
Err(GeyserPluginError::SlotStatusUpdateError {
msg: "slot_entry_update channel disconnected, exiting".to_string(),
})
}
}
}
}

mod utils {
use agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaEntryInfoVersions;
use jito_geyser_protos::solana::geyser::SlotEntryUpdate;

pub fn get_slot_and_index_from_replica_entry_info_versions(
entry: &ReplicaEntryInfoVersions,
) -> SlotEntryUpdate {
match entry {
ReplicaEntryInfoVersions::V0_0_1(entry_info) => SlotEntryUpdate {
slot: entry_info.slot,
index: entry_info.index as u64,
executed_transaction_count: entry_info.executed_transaction_count,
},
ReplicaEntryInfoVersions::V0_0_2(entry_info) => SlotEntryUpdate {
slot: entry_info.slot,
index: entry_info.index as u64,
executed_transaction_count: entry_info.executed_transaction_count,
},
}
}
}

#[no_mangle]
Expand Down
Loading

0 comments on commit 61cc6e4

Please sign in to comment.