From 89821df81203942a874e7647e3f6467685372f1d Mon Sep 17 00:00:00 2001 From: Tiram <18632023+tiram88@users.noreply.github.com> Date: Tue, 9 Apr 2024 12:21:24 +0200 Subject: [PATCH] Address Tracker for subscriptions to changed UTXOs notifications (#427) * Add an integration test covering UTXOs propagation * Refactor daemon_utxos_propagation_test() * Add heap profiling feature * Cover VirtualDaaScoreChanged notifications in test * Merge branch 'master' into rpc-memory-benchmark * Assert all changed UTXOs * Use UtxosChangedScope ctor * Make active non-blanket UtxosChanged subscription unique * Refactor broadcaster subscription unregistering * Add Display to Connection trait * Save creation of UtxosChanged address vec on listener unregistration * Reduce UtxosChangedSubscription memory footprint on mutation * Add gRPC client timeout to every request & remove tpc keep alive * Reconnect gRPC client on broken pipe * Disable gRPC server http2 keepalive * Add a test benchmarking UtxosChanged notifications memory footprint * Fix log * Merge branch 'master' into rpc-memory-benchmark * Remove tokio dependency in crate notify * Add a UtxosChanged mutation policy to the notification system * Share subscriptions between listeners and broadcasters * Replace UtxoAddress with Address * Refactor Single::mutate * Refactor `UtxosChangedSubscription` internals * Make `UtxosChangedSubscription::hash()` cheaper to compute * mimalloc * mimalloc disable purge_decommits * Add to simpa and integration tests * remove redundant unix and replace with win/linux/mac * Add comment * Add comment * Sort out features * Remove request timeout from `heap` feature * Enhance `Broadcaster` memory release and logs * Track global `UtxosChangedSubscription` count * Fix heap profile * Let the number of broadcasters in gRPC server be configurable * Identify an active and filtering UtxosChangedSubscription with a ListenerId * Give address.rs its folder * Address tracker, indexing and counting registrations * Add a sharable subscription context to the notification system * Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription` * Add an index counter and use short names * Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait * Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions * fix lint * Add an optional maximum capacity to the address tracker * Introduce a mutation outcome * Remove unneeded CompoundedClone::clone_arc * Provide inner mutability to `Indexes` and `Counters` * Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core, gRPC server and wRPC server * Measure memory footprint of CounterMap * Extend `UtxosChangedSubscription` inner mutability to its state * Group all wildcard `UtxosChangedSubscription` in broadcaster plan * Have kaspad use a single `SubscriptionContext` * Add event_type() to Scope * Reduce the number of mutation clones * Log some memory stats in a file * Consume the address vector of UtxosChangedScope * Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor * Enhance the termination of all gRPC server connections * Put `UtxosChangedSubscription` state and indexes under the same lock * Some Tracker lock fairness enhancements * Preallocate static UtxosChanged subscriptions * Address new lint rules * Move memory_monitor * Silent on simnet * Add a shutdown listener to `Core` * Add a `Task` trait and implement some tasks * New daemon memory benchmark running in its own child process * Refactor `ClientPool` with a start fn returning a vector of `JoinHandle` * Add start and shutdown signaling to `ClientPool` * Add full miner, tx sender, subscriber tasks and all their sub-tasks * Use the tasks in fn utxos_changed_subscriptions_client * Cleaning * Fix a rare case potentially preventing subscriber tasks to exit * Fill the mempool up to the target * Run actual memory benchmarks * Add a main task to `TasksRunner` * Move tasks * Move tasks (2) * Rename full to group * Rename full to group (2) * Fix cargo syntax error * Add a stopper status to some tasks * Let the main task run before adding sub tasks that need it alive * Mempool benchmark based on tasks * Small adjustments on the utxos changed subscribe benchmark * Prevent a race condition * Refactor * Move the core shutdown request signaling into `RpcCoreService` * Add a signal indicating the gRPC server has started * Recycle emptied tracker entries * Add `max-tracked-addresses` argument * Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard` * Cleaning: remove R&D code * Merge branch 'master' into address-tracker-subscriptions * Some comments and documentation * Use a preset listener id in direct mode * Add lower and upper bounds to the tracker max address count & change the default value * For each event type the notifier can have at most one subscriber * Add and document `GrpcClient::connect_with_args` * Some doc * Complete `UtxosChangedMutationPolicy` description * Validate --max-tracked-addresses argument * remove unused AddressesHash * fix minor warnings under `devnet-prealloc` feature code --- indexes/core/src/notification.rs | 128 +++++++++++++++++++++++++++++++ indexes/processor/src/service.rs | 2 +- kaspad/src/args.rs | 2 +- kaspad/src/daemon.rs | 3 + rpc/wrpc/client/src/wasm.rs | 6 +- 5 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 indexes/core/src/notification.rs diff --git a/indexes/core/src/notification.rs b/indexes/core/src/notification.rs new file mode 100644 index 000000000..409e7670e --- /dev/null +++ b/indexes/core/src/notification.rs @@ -0,0 +1,128 @@ +use crate::indexed_utxos::{UtxoChanges, UtxoSetByScriptPublicKey}; +use derive_more::Display; +use kaspa_notify::{ + events::EventType, + full_featured, + notification::Notification as NotificationTrait, + subscription::{ + context::SubscriptionContext, + single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription}, + Subscription, + }, +}; +use std::{collections::HashMap, sync::Arc}; + +full_featured! { +#[derive(Clone, Debug, Display)] +pub enum Notification { + #[display(fmt = "UtxosChanged notification")] + UtxosChanged(UtxosChangedNotification), + + #[display(fmt = "PruningPointUtxoSetOverride notification")] + PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification), +} +} + +impl NotificationTrait for Notification { + fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option { + match subscription.active() { + true => Some(self.clone()), + false => None, + } + } + + fn apply_virtual_chain_changed_subscription( + &self, + _subscription: &VirtualChainChangedSubscription, + _context: &SubscriptionContext, + ) -> Option { + Some(self.clone()) + } + + fn apply_utxos_changed_subscription( + &self, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> Option { + match subscription.active() { + true => { + let Self::UtxosChanged(notification) = self else { return None }; + notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged) + } + false => None, + } + } + + fn event_type(&self) -> EventType { + self.into() + } +} + +#[derive(Debug, Clone, Default)] +pub struct PruningPointUtxoSetOverrideNotification {} + +#[derive(Debug, Clone)] +pub struct UtxosChangedNotification { + pub added: Arc, + pub removed: Arc, +} + +impl From for UtxosChangedNotification { + fn from(item: UtxoChanges) -> Self { + Self { added: Arc::new(item.added), removed: Arc::new(item.removed) } + } +} + +impl UtxosChangedNotification { + pub fn from_utxos_changed(utxos_changed: UtxoChanges) -> Self { + Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) } + } + + pub(crate) fn apply_utxos_changed_subscription( + &self, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> Option { + if subscription.to_all() { + Some(self.clone()) + } else { + let added = Self::filter_utxo_set(&self.added, subscription, context); + let removed = Self::filter_utxo_set(&self.removed, subscription, context); + if added.is_empty() && removed.is_empty() { + None + } else { + Some(Self { added: Arc::new(added), removed: Arc::new(removed) }) + } + } + } + + fn filter_utxo_set( + utxo_set: &UtxoSetByScriptPublicKey, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> UtxoSetByScriptPublicKey { + // As an optimization, we iterate over the smaller set (O(n)) among the two below + // and check existence over the larger set (O(1)) + let mut result = HashMap::default(); + let subscription_data = subscription.data(); + if utxo_set.len() < subscription_data.len() { + { + utxo_set.iter().for_each(|(script_public_key, collection)| { + if subscription_data.contains(script_public_key, context) { + result.insert(script_public_key.clone(), collection.clone()); + } + }); + } + } else { + let tracker_data = context.address_tracker.data(); + subscription_data.iter().for_each(|index| { + if let Some(script_public_key) = tracker_data.get_index(*index) { + if let Some(collection) = utxo_set.get(script_public_key) { + result.insert(script_public_key.clone(), collection.clone()); + } + } + }); + } + result + } +} diff --git a/indexes/processor/src/service.rs b/indexes/processor/src/service.rs index 36e62f7bd..66a150849 100644 --- a/indexes/processor/src/service.rs +++ b/indexes/processor/src/service.rs @@ -11,7 +11,7 @@ use kaspa_notify::{ connection::ChannelType, events::{EventSwitches, EventType}, listener::ListenerLifespan, - scope::{ChainAcceptanceDataPrunedScope, PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope}, + scope::{PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope}, subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_txindex::api::TxIndexProxy; diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index b0960f0ef..3ff349f40 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -110,7 +110,7 @@ impl Default for Args { outbound_target: 8, inbound_limit: 128, rpc_max_clients: 128, - max_tracked_addresses: 0, + max_tracked_addresses: Tracker::DEFAULT_MAX_ADDRESSES, enable_unsynced_mining: false, enable_mainnet_mining: true, testnet: false, diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 64dee93ec..94fab3290 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -97,6 +97,9 @@ pub fn validate_args(args: &Args) -> ConsensusConfigResult<()> { if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND { return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND)); } + if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND { + return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND)); + } Ok(()) } diff --git a/rpc/wrpc/client/src/wasm.rs b/rpc/wrpc/client/src/wasm.rs index 112cffdaa..7128d2bd6 100644 --- a/rpc/wrpc/client/src/wasm.rs +++ b/rpc/wrpc/client/src/wasm.rs @@ -41,7 +41,7 @@ impl RpcClient { let url = if let Some(network_type) = network_type { Self::parse_url(url, encoding, network_type)? } else { url.to_string() }; let rpc_client = RpcClient { - client: Arc::new(KaspaRpcClient::new(encoding, url.as_str()).unwrap_or_else(|err| panic!("{err}"))), + client: Arc::new(KaspaRpcClient::new(encoding, url.as_str(), None).unwrap_or_else(|err| panic!("{err}"))), inner: Arc::new(Inner { notification_task: AtomicBool::new(false), notification_ctl: DuplexChannel::oneshot(), @@ -218,7 +218,7 @@ impl RpcClient { .into_iter() .map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string()))) .collect::, JsError>>()?; - self.client.start_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + self.client.start_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?; Ok(()) } @@ -230,7 +230,7 @@ impl RpcClient { .into_iter() .map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string()))) .collect::, JsError>>()?; - self.client.stop_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + self.client.stop_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?; Ok(()) }