diff --git a/client/src/geyser_consumer.rs b/client/src/geyser_consumer.rs index 6bbd012..0492e53 100644 --- a/client/src/geyser_consumer.rs +++ b/client/src/geyser_consumer.rs @@ -19,7 +19,8 @@ use std::{ use jito_geyser_protos::solana::geyser::{ geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest, MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest, - SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate, + SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest, + SubscribeSlotUpdateRequest, TimestampedAccountUpdate, }; use log::*; use lru::LruCache; @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status}; use crate::{ geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed}, - types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate}, + types::{ + AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate, + }, GrpcInterceptor, Pubkey, Slot, }; @@ -242,6 +245,35 @@ impl GeyserConsumer { Ok(()) } + pub async fn consume_slot_entry_updates( + &self, + slot_updates_tx: UnboundedSender, + ) -> Result<()> { + let mut c = self.client.clone(); + + let resp = c + .subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {}) + .await?; + let mut stream = resp.into_inner(); + + while !self.exit.load(Ordering::Relaxed) { + match stream.message().await { + Ok(Some(slot_update)) => { + if slot_updates_tx + .send(SlotEntryUpdate::from(slot_update.entry_update.unwrap())) + .is_err() + { + return Err(GeyserConsumerError::ConsumerChannelDisconnected); + }; + } + Ok(None) => return Err(StreamClosed), + Err(e) => return Err(e.into()), + } + } + + Ok(()) + } + #[allow(clippy::too_many_arguments)] fn process_account_update( maybe_message: std::result::Result, Status>, diff --git a/client/src/types.rs b/client/src/types.rs index 3dd670f..701df64 100644 --- a/client/src/types.rs +++ b/client/src/types.rs @@ -136,3 +136,17 @@ impl From for SlotUpdate { } } } + +pub struct SlotEntryUpdate { + pub slot: Slot, + pub index: u64, +} + +impl From for SlotEntryUpdate { + fn from(proto: geyser::SlotEntryUpdate) -> Self { + Self { + slot: proto.slot, + index: proto.index, + } + } +} diff --git a/proto/proto/geyser.proto b/proto/proto/geyser.proto index 0287a2e..d20bf2b 100644 --- a/proto/proto/geyser.proto +++ b/proto/proto/geyser.proto @@ -157,6 +157,25 @@ message GetHeartbeatIntervalResponse { uint64 heartbeat_interval_ms = 1; } +message SlotEntryUpdate { + // The slot number of the block containing this Entry + uint64 slot = 1; + // The Entry's index in the block + uint64 index = 2; + // The number of executed transactions in the Entry + // If this number is zero, we can assume its a tick entry + uint64 executed_transaction_count = 3; +} + +message TimestampedSlotEntryUpdate { + // Time at which the message was generated + google.protobuf.Timestamp ts = 1; + // SlotEntryUpdate update + SlotEntryUpdate entry_update = 2; +} + +message SubscribeSlotEntryUpdateRequest {} + // The following __must__ be assumed: // - Clients may receive data for slots out of order. // - Clients may receive account updates for a given slot out of order. @@ -186,4 +205,8 @@ service Geyser { // Subscribes to block updates. rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {} + + // Subscribes to entry updates. + // Returns the highest slot seen thus far and the entry index corresponding to the tick + rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {} } diff --git a/proto/src/convert.rs b/proto/src/convert.rs index 72da2d0..e4dfaa9 100644 --- a/proto/src/convert.rs +++ b/proto/src/convert.rs @@ -23,8 +23,10 @@ use solana_transaction_status::{ TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, }; -use crate::solana::{entries, tx_by_addr}; -use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta}; +use crate::{ + solana::{entries, storage::confirmed_block, tx_by_addr}, + StoredExtendedRewards, StoredTransactionStatusMeta, +}; impl From> for confirmed_block::Rewards { fn from(rewards: Vec) -> Self { diff --git a/server/src/geyser_grpc_plugin.rs b/server/src/geyser_grpc_plugin.rs index e18f1f7..c64c9ff 100644 --- a/server/src/geyser_grpc_plugin.rs +++ b/server/src/geyser_grpc_plugin.rs @@ -13,15 +13,15 @@ use std::{ use agave_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, - ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, + ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }; use bs58; use crossbeam_channel::{bounded, Sender, TrySendError}; use jito_geyser_protos::solana::{ geyser::{ geyser_server::GeyserServer, AccountUpdate, BlockUpdate, SlotUpdate, SlotUpdateStatus, - TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotUpdate, - TimestampedTransactionUpdate, TransactionUpdate, + TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotEntryUpdate, + TimestampedSlotUpdate, TimestampedTransactionUpdate, TransactionUpdate, }, storage::confirmed_block::ConfirmedTransaction, }; @@ -44,6 +44,7 @@ pub struct PluginData { /// Where updates are piped thru to the grpc service. account_update_sender: Sender, slot_update_sender: Sender, + slot_entry_update_sender: Sender, block_update_sender: Sender, transaction_update_sender: Sender, @@ -72,6 +73,7 @@ pub struct PluginConfig { pub bind_address: String, pub account_update_buffer_size: usize, pub slot_update_buffer_size: usize, + pub slot_entry_update_buffer_size: usize, pub block_update_buffer_size: usize, pub transaction_update_buffer_size: usize, pub skip_startup_stream: Option, @@ -112,6 +114,8 @@ impl GeyserPlugin for GeyserGrpcPlugin { let highest_write_slot = Arc::new(AtomicU64::new(0)); let (account_update_sender, account_update_rx) = bounded(config.account_update_buffer_size); let (slot_update_sender, slot_update_rx) = bounded(config.slot_update_buffer_size); + let (slot_entry_update_sender, slot_entry_update_rx) = + bounded(config.slot_entry_update_buffer_size); let (block_update_sender, block_update_receiver) = bounded(config.block_update_buffer_size); let (transaction_update_sender, transaction_update_receiver) = @@ -121,6 +125,7 @@ impl GeyserPlugin for GeyserGrpcPlugin { config.geyser_service_config.clone(), account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, highest_write_slot.clone(), @@ -155,6 +160,7 @@ impl GeyserPlugin for GeyserGrpcPlugin { server_exit_sender: server_exit_tx, account_update_sender, slot_update_sender, + slot_entry_update_sender, block_update_sender, transaction_update_sender, highest_write_slot, @@ -488,6 +494,66 @@ impl GeyserPlugin for GeyserGrpcPlugin { fn transaction_notifications_enabled(&self) -> bool { true } + + fn entry_notifications_enabled(&self) -> bool { + true + } + + fn notify_entry(&self, entry: ReplicaEntryInfoVersions) -> PluginResult<()> { + let data = self.data.as_ref().expect("plugin must be initialized"); + + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + + let slot_entry = utils::get_slot_and_index_from_replica_entry_info_versions(&entry); + + debug!( + "Updating slot entry {} at index {}", + slot_entry.slot, slot_entry.index + ); + + match data + .slot_entry_update_sender + .try_send(TimestampedSlotEntryUpdate { + ts: Some(prost_types::Timestamp::from(SystemTime::now())), + entry_update: Some(slot_entry), + }) { + Ok(_) => Ok(()), + Err(TrySendError::Full(_)) => { + warn!("slot_entry_update channel full, skipping"); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => { + error!("slot entry info send error"); + Err(GeyserPluginError::SlotStatusUpdateError { + msg: "slot_entry_update channel disconnected, exiting".to_string(), + }) + } + } + } +} + +mod utils { + use agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaEntryInfoVersions; + use jito_geyser_protos::solana::geyser::SlotEntryUpdate; + + pub fn get_slot_and_index_from_replica_entry_info_versions( + entry: &ReplicaEntryInfoVersions, + ) -> SlotEntryUpdate { + match entry { + ReplicaEntryInfoVersions::V0_0_1(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + ReplicaEntryInfoVersions::V0_0_2(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + } + } } #[no_mangle] diff --git a/server/src/server.rs b/server/src/server.rs index 6465e85..2fa4c79 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -15,8 +15,9 @@ use jito_geyser_protos::solana::geyser::{ GetHeartbeatIntervalResponse, Heartbeat, MaybePartialAccountUpdate, PartialAccountUpdate, SubscribeAccountUpdatesRequest, SubscribeBlockUpdatesRequest, SubscribePartialAccountUpdatesRequest, SubscribeProgramsUpdatesRequest, - SubscribeSlotUpdateRequest, SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, - TimestampedBlockUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, + SubscribeSlotEntryUpdateRequest, SubscribeSlotUpdateRequest, + SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, TimestampedBlockUpdate, + TimestampedSlotEntryUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, }; use log::*; use once_cell::sync::OnceCell; @@ -48,6 +49,7 @@ impl StreamClosedSender for SubscriptionClosedSender { type AccountUpdateSender = TokioSender>; type PartialAccountUpdateSender = TokioSender>; type SlotUpdateSender = TokioSender>; +type SlotEntryUpdateSender = TokioSender>; type TransactionUpdateSender = TokioSender>; type BlockUpdateSender = TokioSender>; @@ -144,6 +146,10 @@ struct SlotUpdateSubscription { subscription_tx: SlotUpdateSender, } +struct SlotEntryUpdateSubscription { + subscription_tx: SlotEntryUpdateSender, +} + impl ErrorStatusStreamer for SlotUpdateSubscription { fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { self.subscription_tx @@ -157,6 +163,19 @@ impl ErrorStatusStreamer for SlotUpdateSubscription { } } +impl ErrorStatusStreamer for SlotEntryUpdateSubscription { + fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { + self.subscription_tx + .try_send(Err(status)) + .map_err(|e| match e { + TokioTrySendError::Full(_) => GeyserServiceError::NotificationReceiverFull, + TokioTrySendError::Closed(_) => { + GeyserServiceError::NotificationReceiverDisconnected + } + }) + } +} + struct BlockUpdateSubscription { notification_sender: BlockUpdateSender, } @@ -212,6 +231,10 @@ enum SubscriptionAddedEvent { uuid: Uuid, notification_sender: SlotUpdateSender, }, + SlotEntryUpdateSubscription { + uuid: Uuid, + notification_sender: SlotEntryUpdateSender, + }, TransactionUpdateSubscription { uuid: Uuid, notification_sender: TransactionUpdateSender, @@ -234,6 +257,9 @@ impl Debug for SubscriptionAddedEvent { SubscriptionAddedEvent::SlotUpdateSubscription { uuid, .. } => { ("subscribe_slot_update".to_string(), uuid) } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { uuid, .. } => { + ("subscribe_slot_entry_update".to_string(), uuid) + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, .. } => { ("program_update_subscribe".to_string(), uuid) } @@ -264,6 +290,7 @@ enum SubscriptionClosedEvent { ProgramUpdateSubscription(Uuid), PartialAccountUpdateSubscription(Uuid), SlotUpdateSubscription(Uuid), + SlotEntryUpdateSubscription(Uuid), TransactionUpdateSubscription(Uuid), BlockUpdateSubscription(Uuid), } @@ -325,6 +352,8 @@ impl GeyserService { account_update_rx: Receiver, // Slot updates streamed from the validator. slot_update_rx: Receiver, + // Slot updates streamed from the validator. + slot_entry_update_rx: Receiver, // Block metadata receiver block_update_receiver: Receiver, // Transaction updates @@ -339,6 +368,7 @@ impl GeyserService { let t_hdl = Self::event_loop( account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, subscription_added_rx, @@ -365,9 +395,11 @@ impl GeyserService { /// 1. Add new subscriptions. /// 2. Cleanup closed subscriptions. /// 3. Receive geyser events and stream them to subscribers. + #[allow(clippy::too_many_arguments)] fn event_loop( account_update_rx: Receiver, slot_update_rx: Receiver, + slot_entry_update_rx: Receiver, block_update_receiver: Receiver, transaction_update_receiver: Receiver, subscription_added_rx: Receiver, @@ -387,6 +419,7 @@ impl GeyserService { PartialAccountUpdateSubscription, > = HashMap::new(); let mut slot_update_subscriptions: HashMap = HashMap::new(); + let mut slot_entry_update_subscriptions: HashMap = HashMap::new(); let mut transaction_update_subscriptions: HashMap = HashMap::new(); let mut block_update_subscriptions: HashMap = HashMap::new(); @@ -400,14 +433,14 @@ impl GeyserService { } recv(subscription_added_rx) -> maybe_subscription_added => { info!("received new subscription"); - if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions,&mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error adding new subscription: {}", e); return; } }, recv(subscription_closed_rx) -> maybe_subscription_closed => { info!("closing subscription"); - if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error closing existing subscription: {}", e); return; } @@ -438,6 +471,18 @@ impl GeyserService { }, } }, + recv(slot_entry_update_rx) -> maybe_slot_entry_update => { + debug!("received slot entry update"); + match Self::handle_slot_entry_update_event(maybe_slot_entry_update, &slot_entry_update_subscriptions) { + Err(e) => { + error!("error handling a slot entry update event: {}", e); + return; + }, + Ok(failed_subscription_ids) => { + Self::drop_subscriptions(&failed_subscription_ids, &mut slot_entry_update_subscriptions); + }, + } + }, recv(block_update_receiver) -> maybe_block_update => { debug!("received block update"); match Self::handle_block_update_event(maybe_block_update, &block_update_subscriptions) { @@ -510,11 +555,13 @@ impl GeyserService { } /// Handles adding new subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_added( maybe_subscription_added: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -555,6 +602,13 @@ impl GeyserService { } => { slot_update_subscriptions.insert(uuid, SlotUpdateSubscription { subscription_tx }); } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + } => { + slot_entry_update_subscriptions + .insert(uuid, SlotEntryUpdateSubscription { subscription_tx }); + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, notification_sender, @@ -596,11 +650,13 @@ impl GeyserService { } /// Handles closing existing subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_closed( maybe_subscription_closed: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -618,6 +674,9 @@ impl GeyserService { SubscriptionClosedEvent::SlotUpdateSubscription(subscription_id) => { let _ = slot_update_subscriptions.remove(&subscription_id); } + SubscriptionClosedEvent::SlotEntryUpdateSubscription(subscription_id) => { + let _ = slot_entry_update_subscriptions.remove(&subscription_id); + } SubscriptionClosedEvent::ProgramUpdateSubscription(subscription_id) => { let _ = program_update_subscriptions.remove(&subscription_id); } @@ -740,6 +799,30 @@ impl GeyserService { Ok(failed_subscription_ids) } + /// Streams slot updates to subscribers + /// Returns a vector of UUIDs that failed to send to due to the subscription being closed + fn handle_slot_entry_update_event( + maybe_slot_entry_update: Result, + slot_entry_update_subscriptions: &HashMap, + ) -> GeyserServiceResult> { + let slot_entry_update = maybe_slot_entry_update?; + let failed_subscription_ids = slot_entry_update_subscriptions + .iter() + .filter_map(|(uuid, sub)| { + if matches!( + sub.subscription_tx.try_send(Ok(slot_entry_update.clone())), + Err(TokioTrySendError::Closed(_)) + ) { + Some(*uuid) + } else { + None + } + }) + .collect(); + + Ok(failed_subscription_ids) + } + /// Drop broken connections. fn drop_subscriptions( subscription_ids: &[Uuid], @@ -941,6 +1024,46 @@ impl Geyser for GeyserService { Ok(resp) } + type SubscribeSlotEntryUpdatesStream = SubscriptionStream; + async fn subscribe_slot_entry_updates( + &self, + _request: Request, + ) -> Result, Status> { + let (subscription_tx, subscription_rx) = + channel(self.service_config.subscriber_buffer_size); + + let uuid = Uuid::new_v4(); + self.subscription_added_tx + .try_send(SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + }) + .map_err(|e| { + error!( + "failed to add subscribe_slot_entry_updates subscription: {}", + e + ); + Status::internal("error adding subscription") + })?; + + let stream = SubscriptionStream::new( + subscription_rx, + uuid, + ( + self.subscription_closed_sender.clone(), + SubscriptionClosedEvent::SlotEntryUpdateSubscription(uuid), + ), + "subscribe_slot_entry_updates", + ); + let mut resp = Response::new(stream); + resp.metadata_mut().insert( + HIGHEST_WRITE_SLOT_HEADER, + MetadataValue::from(self.highest_write_slot.load(Ordering::Relaxed)), + ); + + Ok(resp) + } + type SubscribeTransactionUpdatesStream = SubscriptionStream; async fn subscribe_transaction_updates(