Skip to content

Commit

Permalink
Address Tracker for subscriptions to changed UTXOs notifications (kas…
Browse files Browse the repository at this point in the history
…panet#427)

* Add an integration test covering UTXOs propagation

* Refactor daemon_utxos_propagation_test()

* Add heap profiling feature

* Cover VirtualDaaScoreChanged notifications in test

* Merge branch 'master' into rpc-memory-benchmark

* Assert all changed UTXOs

* Use UtxosChangedScope ctor

* Make active non-blanket UtxosChanged subscription unique

* Refactor broadcaster subscription unregistering

* Add Display to Connection trait

* Save creation of UtxosChanged address vec on listener unregistration

* Reduce UtxosChangedSubscription memory footprint on mutation

* Add gRPC client timeout to every request & remove tpc keep alive

* Reconnect gRPC client on broken pipe

* Disable gRPC server http2 keepalive

* Add a test benchmarking UtxosChanged notifications memory footprint

* Fix log

* Merge branch 'master' into rpc-memory-benchmark

* Remove tokio dependency in crate notify

* Add a UtxosChanged mutation policy to the notification system

* Share subscriptions between listeners and broadcasters

* Replace UtxoAddress with Address

* Refactor Single::mutate

* Refactor `UtxosChangedSubscription` internals

* Make `UtxosChangedSubscription::hash()` cheaper to compute

* mimalloc

* mimalloc disable purge_decommits

* Add to simpa and integration tests

* remove redundant unix and replace with win/linux/mac

* Add comment

* Add comment

* Sort out features

* Remove request timeout from `heap` feature

* Enhance `Broadcaster` memory release and logs

* Track global `UtxosChangedSubscription` count

* Fix heap profile

* Let the number of broadcasters in gRPC server be configurable

* Identify an active and filtering UtxosChangedSubscription with a ListenerId

* Give address.rs its folder

* Address tracker, indexing and counting registrations

* Add a sharable subscription context to the notification system

* Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription`

* Add an index counter and use short names

* Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait

* Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions

* fix lint

* Add an optional maximum capacity to the address tracker

* Introduce a mutation outcome

* Remove unneeded CompoundedClone::clone_arc

* Provide inner mutability to `Indexes` and `Counters`

* Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core,  gRPC server and wRPC server

* Measure memory footprint of CounterMap

* Extend `UtxosChangedSubscription` inner mutability to its state

* Group all wildcard `UtxosChangedSubscription` in broadcaster plan

* Have kaspad use a single `SubscriptionContext`

* Add event_type() to Scope

* Reduce the number of mutation clones

* Log some memory stats in a file

* Consume the address vector of UtxosChangedScope

* Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor

* Enhance the termination of all gRPC server connections

* Put `UtxosChangedSubscription` state and indexes under the same lock

* Some Tracker lock fairness enhancements

* Preallocate static UtxosChanged subscriptions

* Address new lint rules

* Move memory_monitor

* Silent on simnet

* Add a shutdown listener to `Core`

* Add a `Task` trait and implement some tasks

* New daemon memory benchmark running in its own child process

* Refactor `ClientPool` with a start fn returning a vector of `JoinHandle`

* Add start and shutdown signaling to `ClientPool`

* Add full miner, tx sender, subscriber tasks and all their sub-tasks

* Use the tasks in fn utxos_changed_subscriptions_client

* Cleaning

* Fix a rare case potentially preventing subscriber tasks to exit

* Fill the mempool up to the target

* Run actual memory benchmarks

* Add a main task to `TasksRunner`

* Move tasks

* Move tasks (2)

* Rename full to group

* Rename full to group (2)

* Fix cargo syntax error

* Add a stopper status to some tasks

* Let the main task run before adding sub tasks that need it alive

* Mempool benchmark based on tasks

* Small adjustments on the utxos changed subscribe benchmark

* Prevent a race condition

* Refactor

* Move the core shutdown request signaling into `RpcCoreService`

* Add a signal indicating the gRPC server has started

* Recycle emptied tracker entries

* Add `max-tracked-addresses` argument

* Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard`

* Cleaning: remove R&D code

* Merge branch 'master' into address-tracker-subscriptions

* Some comments and documentation

* Use a preset listener id in direct mode

* Add lower and upper bounds to the tracker max address count & change the default value

* For each event type the notifier can have at most one subscriber

* Add and document `GrpcClient::connect_with_args`

* Some doc

* Complete `UtxosChangedMutationPolicy` description

* Validate --max-tracked-addresses argument

* remove unused AddressesHash

* fix minor warnings under `devnet-prealloc` feature code
  • Loading branch information
tiram88 authored and D-Stacks committed Jul 12, 2024
1 parent d856545 commit 89821df
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 5 deletions.
128 changes: 128 additions & 0 deletions indexes/core/src/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::indexed_utxos::{UtxoChanges, UtxoSetByScriptPublicKey};
use derive_more::Display;
use kaspa_notify::{
events::EventType,
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
};
use std::{collections::HashMap, sync::Arc};

full_featured! {
#[derive(Clone, Debug, Display)]
pub enum Notification {
#[display(fmt = "UtxosChanged notification")]
UtxosChanged(UtxosChangedNotification),

#[display(fmt = "PruningPointUtxoSetOverride notification")]
PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification),
}
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(
&self,
_subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
Some(self.clone())
}

fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
match subscription.active() {
true => {
let Self::UtxosChanged(notification) = self else { return None };
notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged)
}
false => None,
}
}

fn event_type(&self) -> EventType {
self.into()
}
}

#[derive(Debug, Clone, Default)]
pub struct PruningPointUtxoSetOverrideNotification {}

#[derive(Debug, Clone)]
pub struct UtxosChangedNotification {
pub added: Arc<UtxoSetByScriptPublicKey>,
pub removed: Arc<UtxoSetByScriptPublicKey>,
}

impl From<UtxoChanges> for UtxosChangedNotification {
fn from(item: UtxoChanges) -> Self {
Self { added: Arc::new(item.added), removed: Arc::new(item.removed) }
}
}

impl UtxosChangedNotification {
pub fn from_utxos_changed(utxos_changed: UtxoChanges) -> Self {
Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) }
}

pub(crate) fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
if subscription.to_all() {
Some(self.clone())
} else {
let added = Self::filter_utxo_set(&self.added, subscription, context);
let removed = Self::filter_utxo_set(&self.removed, subscription, context);
if added.is_empty() && removed.is_empty() {
None
} else {
Some(Self { added: Arc::new(added), removed: Arc::new(removed) })
}
}
}

fn filter_utxo_set(
utxo_set: &UtxoSetByScriptPublicKey,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> UtxoSetByScriptPublicKey {
// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
let mut result = HashMap::default();
let subscription_data = subscription.data();
if utxo_set.len() < subscription_data.len() {
{
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription_data.contains(script_public_key, context) {
result.insert(script_public_key.clone(), collection.clone());
}
});
}
} else {
let tracker_data = context.address_tracker.data();
subscription_data.iter().for_each(|index| {
if let Some(script_public_key) = tracker_data.get_index(*index) {
if let Some(collection) = utxo_set.get(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
}
});
}
result
}
}
2 changes: 1 addition & 1 deletion indexes/processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use kaspa_notify::{
connection::ChannelType,
events::{EventSwitches, EventType},
listener::ListenerLifespan,
scope::{ChainAcceptanceDataPrunedScope, PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope},
scope::{PruningPointUtxoSetOverrideScope, UtxosChangedScope, VirtualChainChangedScope, PruningPointAdvancementScope},
subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
};
use kaspa_txindex::api::TxIndexProxy;
Expand Down
2 changes: 1 addition & 1 deletion kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Default for Args {
outbound_target: 8,
inbound_limit: 128,
rpc_max_clients: 128,
max_tracked_addresses: 0,
max_tracked_addresses: Tracker::DEFAULT_MAX_ADDRESSES,
enable_unsynced_mining: false,
enable_mainnet_mining: true,
testnet: false,
Expand Down
3 changes: 3 additions & 0 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub fn validate_args(args: &Args) -> ConsensusConfigResult<()> {
if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND {
return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND));
}
if args.max_tracked_addresses > Tracker::MAX_ADDRESS_UPPER_BOUND {
return Err(ConfigError::MaxTrackedAddressesTooHigh(Tracker::MAX_ADDRESS_UPPER_BOUND));
}
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions rpc/wrpc/client/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl RpcClient {
let url = if let Some(network_type) = network_type { Self::parse_url(url, encoding, network_type)? } else { url.to_string() };

let rpc_client = RpcClient {
client: Arc::new(KaspaRpcClient::new(encoding, url.as_str()).unwrap_or_else(|err| panic!("{err}"))),
client: Arc::new(KaspaRpcClient::new(encoding, url.as_str(), None).unwrap_or_else(|err| panic!("{err}"))),
inner: Arc::new(Inner {
notification_task: AtomicBool::new(false),
notification_ctl: DuplexChannel::oneshot(),
Expand Down Expand Up @@ -218,7 +218,7 @@ impl RpcClient {
.into_iter()
.map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string())))
.collect::<std::result::Result<Vec<Address>, JsError>>()?;
self.client.start_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
self.client.start_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?;
Ok(())
}

Expand All @@ -230,7 +230,7 @@ impl RpcClient {
.into_iter()
.map(|jsv| from_value(jsv).map_err(|err| JsError::new(&err.to_string())))
.collect::<std::result::Result<Vec<Address>, JsError>>()?;
self.client.stop_notify(ListenerId::default(), Scope::UtxosChanged(UtxosChangedScope { addresses })).await?;
self.client.stop_notify(ListenerId::default(), UtxosChangedScope::new(addresses).into()).await?;
Ok(())
}

Expand Down

0 comments on commit 89821df

Please sign in to comment.