diff --git a/indexes/core/src/notification.rs b/indexes/core/src/notification.rs new file mode 100644 index 000000000..409e7670e --- /dev/null +++ b/indexes/core/src/notification.rs @@ -0,0 +1,128 @@ +use crate::indexed_utxos::{UtxoChanges, UtxoSetByScriptPublicKey}; +use derive_more::Display; +use kaspa_notify::{ + events::EventType, + full_featured, + notification::Notification as NotificationTrait, + subscription::{ + context::SubscriptionContext, + single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription}, + Subscription, + }, +}; +use std::{collections::HashMap, sync::Arc}; + +full_featured! { +#[derive(Clone, Debug, Display)] +pub enum Notification { + #[display(fmt = "UtxosChanged notification")] + UtxosChanged(UtxosChangedNotification), + + #[display(fmt = "PruningPointUtxoSetOverride notification")] + PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification), +} +} + +impl NotificationTrait for Notification { + fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option { + match subscription.active() { + true => Some(self.clone()), + false => None, + } + } + + fn apply_virtual_chain_changed_subscription( + &self, + _subscription: &VirtualChainChangedSubscription, + _context: &SubscriptionContext, + ) -> Option { + Some(self.clone()) + } + + fn apply_utxos_changed_subscription( + &self, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> Option { + match subscription.active() { + true => { + let Self::UtxosChanged(notification) = self else { return None }; + notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged) + } + false => None, + } + } + + fn event_type(&self) -> EventType { + self.into() + } +} + +#[derive(Debug, Clone, Default)] +pub struct PruningPointUtxoSetOverrideNotification {} + +#[derive(Debug, Clone)] +pub struct UtxosChangedNotification { + pub added: Arc, + pub removed: Arc, +} + +impl From for UtxosChangedNotification { + fn from(item: UtxoChanges) -> Self { + Self { added: Arc::new(item.added), removed: Arc::new(item.removed) } + } +} + +impl UtxosChangedNotification { + pub fn from_utxos_changed(utxos_changed: UtxoChanges) -> Self { + Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) } + } + + pub(crate) fn apply_utxos_changed_subscription( + &self, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> Option { + if subscription.to_all() { + Some(self.clone()) + } else { + let added = Self::filter_utxo_set(&self.added, subscription, context); + let removed = Self::filter_utxo_set(&self.removed, subscription, context); + if added.is_empty() && removed.is_empty() { + None + } else { + Some(Self { added: Arc::new(added), removed: Arc::new(removed) }) + } + } + } + + fn filter_utxo_set( + utxo_set: &UtxoSetByScriptPublicKey, + subscription: &UtxosChangedSubscription, + context: &SubscriptionContext, + ) -> UtxoSetByScriptPublicKey { + // As an optimization, we iterate over the smaller set (O(n)) among the two below + // and check existence over the larger set (O(1)) + let mut result = HashMap::default(); + let subscription_data = subscription.data(); + if utxo_set.len() < subscription_data.len() { + { + utxo_set.iter().for_each(|(script_public_key, collection)| { + if subscription_data.contains(script_public_key, context) { + result.insert(script_public_key.clone(), collection.clone()); + } + }); + } + } else { + let tracker_data = context.address_tracker.data(); + subscription_data.iter().for_each(|index| { + if let Some(script_public_key) = tracker_data.get_index(*index) { + if let Some(collection) = utxo_set.get(script_public_key) { + result.insert(script_public_key.clone(), collection.clone()); + } + } + }); + } + result + } +} diff --git a/indexes/processor/src/service.rs b/indexes/processor/src/service.rs index 36e62f7bd..66a150849 100644 --- a/indexes/processor/src/service.rs +++ b/indexes/processor/src/service.rs @@ -11,7 +11,7 @@ use kaspa_notify::{ connection::ChannelType, events::{EventSwitches, EventType}, listener::ListenerLifespan, - scope::{ChainAcceptanceDataPrunedScope, PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope}, + scope::{PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope}, subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_txindex::api::TxIndexProxy; diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index b0960f0ef..3ff349f40 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -110,7 +110,7 @@ impl Default for Args { outbound_target: 8, inbound_limit: 128, rpc_max_clients: 128, - max_tracked_addresses: 0, + max_tracked_addresses: Tracker::DEFAULT_MAX_ADDRESSES, enable_unsynced_mining: false, enable_mainnet_mining: true, testnet: false, diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 64dee93ec..94fab3290 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -97,6 +97,9 @@ pub fn validate_args(args: &Args) -> ConsensusConfigResult<()> { if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND { return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND)); } + if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND { + return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND)); + } Ok(()) } diff --git a/rpc/wrpc/client/src/wasm.rs b/rpc/wrpc/client/src/wasm.rs index 112cffdaa..7128d2bd6 100644 --- a/rpc/wrpc/client/src/wasm.rs +++ b/rpc/wrpc/client/src/wasm.rs @@ -41,7 +41,7 @@ impl RpcClient { let url = if let Some(network_type) = network_type { Self::parse_url(url, encoding, network_type)? } else { url.to_string() }; let rpc_client = RpcClient { - client: Arc::new(KaspaRpcClient::new(encoding, url.as_str()).unwrap_or_else(|err| panic!("{err}"))), + client: Arc::new(KaspaRpcClient::new(encoding, url.as_str(), None).unwrap_or_else(|err| panic!("{err}"))), inner: Arc::new(Inner { notification_task: AtomicBool::new(false), notification_ctl: DuplexChannel::oneshot(), @@ -218,7 +218,7 @@ impl RpcClient { .into_iter() .map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string()))) .collect::, JsError>>()?; - self.client.start_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + self.client.start_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?; Ok(()) } @@ -230,7 +230,7 @@ impl RpcClient { .into_iter() .map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string()))) .collect::, JsError>>()?; - self.client.stop_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?; + self.client.stop_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?; Ok(()) }