Skip to content

Commit

Permalink
Address Tracker for subscriptions to changed UTXOs notifications (kas…
Browse files Browse the repository at this point in the history
…panet#427)

* Add an integration test covering UTXOs propagation

* Refactor daemon_utxos_propagation_test()

* Add heap profiling feature

* Cover VirtualDaaScoreChanged notifications in test

* Merge branch 'master' into rpc-memory-benchmark

* Assert all changed UTXOs

* Use UtxosChangedScope ctor

* Make active non-blanket UtxosChanged subscription unique

* Refactor broadcaster subscription unregistering

* Add Display to Connection trait

* Save creation of UtxosChanged address vec on listener unregistration

* Reduce UtxosChangedSubscription memory footprint on mutation

* Add gRPC client timeout to every request & remove tpc keep alive

* Reconnect gRPC client on broken pipe

* Disable gRPC server http2 keepalive

* Add a test benchmarking UtxosChanged notifications memory footprint

* Fix log

* Merge branch 'master' into rpc-memory-benchmark

* Remove tokio dependency in crate notify

* Add a UtxosChanged mutation policy to the notification system

* Share subscriptions between listeners and broadcasters

* Replace UtxoAddress with Address

* Refactor Single::mutate

* Refactor `UtxosChangedSubscription` internals

* Make `UtxosChangedSubscription::hash()` cheaper to compute

* mimalloc

* mimalloc disable purge_decommits

* Add to simpa and integration tests

* remove redundant unix and replace with win/linux/mac

* Add comment

* Add comment

* Sort out features

* Remove request timeout from `heap` feature

* Enhance `Broadcaster` memory release and logs

* Track global `UtxosChangedSubscription` count

* Fix heap profile

* Let the number of broadcasters in gRPC server be configurable

* Identify an active and filtering UtxosChangedSubscription with a ListenerId

* Give address.rs its folder

* Address tracker, indexing and counting registrations

* Add a sharable subscription context to the notification system

* Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription`

* Add an index counter and use short names

* Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait

* Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions

* fix lint

* Add an optional maximum capacity to the address tracker

* Introduce a mutation outcome

* Remove unneeded CompoundedClone::clone_arc

* Provide inner mutability to `Indexes` and `Counters`

* Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core,  gRPC server and wRPC server

* Measure memory footprint of CounterMap

* Extend `UtxosChangedSubscription` inner mutability to its state

* Group all wildcard `UtxosChangedSubscription` in broadcaster plan

* Have kaspad use a single `SubscriptionContext`

* Add event_type() to Scope

* Reduce the number of mutation clones

* Log some memory stats in a file

* Consume the address vector of UtxosChangedScope

* Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor

* Enhance the termination of all gRPC server connections

* Put `UtxosChangedSubscription` state and indexes under the same lock

* Some Tracker lock fairness enhancements

* Preallocate static UtxosChanged subscriptions

* Address new lint rules

* Move memory_monitor

* Silent on simnet

* Add a shutdown listener to `Core`

* Add a `Task` trait and implement some tasks

* New daemon memory benchmark running in its own child process

* Refactor `ClientPool` with a start fn returning a vector of `JoinHandle`

* Add start and shutdown signaling to `ClientPool`

* Add full miner, tx sender, subscriber tasks and all their sub-tasks

* Use the tasks in fn utxos_changed_subscriptions_client

* Cleaning

* Fix a rare case potentially preventing subscriber tasks to exit

* Fill the mempool up to the target

* Run actual memory benchmarks

* Add a main task to `TasksRunner`

* Move tasks

* Move tasks (2)

* Rename full to group

* Rename full to group (2)

* Fix cargo syntax error

* Add a stopper status to some tasks

* Let the main task run before adding sub tasks that need it alive

* Mempool benchmark based on tasks

* Small adjustments on the utxos changed subscribe benchmark

* Prevent a race condition

* Refactor

* Move the core shutdown request signaling into `RpcCoreService`

* Add a signal indicating the gRPC server has started

* Recycle emptied tracker entries

* Add `max-tracked-addresses` argument

* Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard`

* Cleaning: remove R&D code

* Merge branch 'master' into address-tracker-subscriptions

* Some comments and documentation

* Use a preset listener id in direct mode

* Add lower and upper bounds to the tracker max address count & change the default value

* For each event type the notifier can have at most one subscriber

* Add and document `GrpcClient::connect_with_args`

* Some doc

* Complete `UtxosChangedMutationPolicy` description

* Validate --max-tracked-addresses argument

* remove unused AddressesHash

* fix minor warnings under `devnet-prealloc` feature code
  • Loading branch information
tiram88 authored Apr 9, 2024
1 parent 292d6cc commit 5c437cb
Show file tree
Hide file tree
Showing 96 changed files with 6,021 additions and 842 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ members = [
"utils",
"utils/tower",
"rothschild",
"metrics/perf_monitor",
"metrics/core",
"metrics/perf_monitor",
"utils/alloc",
]

Expand Down
3 changes: 3 additions & 0 deletions consensus/core/src/errors/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum ConfigError {
#[error("Configuration: --ram-scale cannot be set above 10.0")]
RamScaleTooHigh,

#[error("Configuration: --max-tracked-addresses cannot be set above {0}")]
MaxTrackedAddressesTooHigh(usize),

#[cfg(feature = "devnet-prealloc")]
#[error("Cannot preallocate UTXOs on any network except devnet")]
PreallocUtxosOnNonDevnet,
Expand Down
15 changes: 12 additions & 3 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand Down Expand Up @@ -45,14 +46,18 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(&self, subscription: &VirtualChainChangedSubscription) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(
&self,
subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
match subscription.active() {
true => {
// If the subscription excludes accepted transaction ids and the notification includes some
Expand All @@ -72,7 +77,11 @@ impl NotificationTrait for Notification {
}
}

fn apply_utxos_changed_subscription(&self, _subscription: &UtxosChangedSubscription) -> Option<Self> {
fn apply_utxos_changed_subscription(
&self,
_subscription: &UtxosChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
// No effort is made here to apply the subscription addresses.
// This will be achieved farther along the notification backbone.
Some(self.clone())
Expand Down
18 changes: 16 additions & 2 deletions consensus/notify/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use kaspa_core::{
use kaspa_notify::{
events::{EventSwitches, EVENT_TYPE_ARRAY},
subscriber::Subscriber,
subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
};
use kaspa_utils::triggers::SingleTrigger;
use std::sync::Arc;
Expand All @@ -24,11 +25,24 @@ pub struct NotifyService {
}

impl NotifyService {
pub fn new(root: Arc<ConsensusNotificationRoot>, notification_receiver: Receiver<Notification>) -> Self {
pub fn new(
root: Arc<ConsensusNotificationRoot>,
notification_receiver: Receiver<Notification>,
subscription_context: SubscriptionContext,
) -> Self {
let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into();
let collector = Arc::new(ConsensusCollector::new(NOTIFY_SERVICE, notification_receiver, Arc::new(ConsensusConverter::new())));
let subscriber = Arc::new(Subscriber::new(NOTIFY_SERVICE, root_events, root, 0));
let notifier = Arc::new(ConsensusNotifier::new(NOTIFY_SERVICE, root_events, vec![collector], vec![subscriber], 1));
let policies = MutationPolicies::new(UtxosChangedMutationPolicy::Wildcard);
let notifier = Arc::new(ConsensusNotifier::new(
NOTIFY_SERVICE,
root_events,
vec![collector],
vec![subscriber],
subscription_context,
1,
policies,
));
Self { notifier, shutdown: SingleTrigger::default() }
}

Expand Down
5 changes: 3 additions & 2 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCt
use kaspa_core::{core::Core, service::Service};
use kaspa_database::utils::DbLifetime;
use kaspa_hashes::Hash;
use kaspa_notify::subscription::context::SubscriptionContext;
use parking_lot::RwLock;

use kaspa_database::create_temp_db;
Expand Down Expand Up @@ -66,9 +67,9 @@ impl TestConsensus {
}

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>, context: SubscriptionContext) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let notification_root = Arc::new(ConsensusNotificationRoot::with_context(notification_sender, context));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl TransactionValidator {
// Storage mass hardfork was activated
self.check_mass_commitment(tx)?;

if pov_daa_score < self.storage_mass_activation_daa_score + 10 {
if pov_daa_score < self.storage_mass_activation_daa_score + 10 && self.storage_mass_activation_daa_score > 0 {
warn!("--------- Storage mass hardfork was activated successfully!!! --------- (DAA score: {})", pov_daa_score);
}
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ impl Core {

impl Shutdown for Core {
fn shutdown(self: &Arc<Core>) {
let keep_running = self.keep_running.load(Ordering::SeqCst);
if !keep_running {
if self.keep_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_err() {
return;
}

trace!("signaling core shutdown...");
self.keep_running.store(false, Ordering::SeqCst);

{
for service in self.services.lock().unwrap().iter() {
Expand Down
4 changes: 4 additions & 0 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl AsyncRuntime {
self.services.lock().unwrap().push(service);
}

pub fn find(&self, ident: &'static str) -> Option<Arc<dyn AsyncService>> {
self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
}

pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
trace!("initializing async-runtime service");
vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
Expand Down
57 changes: 39 additions & 18 deletions indexes/core/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand All @@ -23,22 +24,30 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(&self, _subscription: &VirtualChainChangedSubscription) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(
&self,
_subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
Some(self.clone())
}

fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
match subscription.active() {
true => {
let Self::UtxosChanged(notification) = self else { return None };
notification.apply_utxos_changed_subscription(subscription).map(Self::UtxosChanged)
notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged)
}
false => None,
}
Expand Down Expand Up @@ -69,12 +78,16 @@ impl UtxosChangedNotification {
Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) }
}

pub(crate) fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
pub(crate) fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
if subscription.to_all() {
Some(self.clone())
} else {
let added = Self::filter_utxo_set(&self.added, subscription);
let removed = Self::filter_utxo_set(&self.removed, subscription);
let added = Self::filter_utxo_set(&self.added, subscription, context);
let removed = Self::filter_utxo_set(&self.removed, subscription, context);
if added.is_empty() && removed.is_empty() {
None
} else {
Expand All @@ -83,24 +96,32 @@ impl UtxosChangedNotification {
}
}

fn filter_utxo_set(utxo_set: &UtxoSetByScriptPublicKey, subscription: &UtxosChangedSubscription) -> UtxoSetByScriptPublicKey {
fn filter_utxo_set(
utxo_set: &UtxoSetByScriptPublicKey,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> UtxoSetByScriptPublicKey {
// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
let mut result = HashMap::default();
if utxo_set.len() < subscription.addresses().len() {
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription.addresses().contains_key(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
});
let subscription_data = subscription.data();
if utxo_set.len() < subscription_data.len() {
{
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription_data.contains(script_public_key, context) {
result.insert(script_public_key.clone(), collection.clone());
}
});
}
} else {
subscription.addresses().iter().filter(|(script_public_key, _)| utxo_set.contains_key(script_public_key)).for_each(
|(script_public_key, _)| {
let tracker_data = context.address_tracker.data();
subscription_data.iter().for_each(|index| {
if let Some(script_public_key) = tracker_data.get_index(*index) {
if let Some(collection) = utxo_set.get(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
},
);
}
});
}
result
}
Expand Down
Loading

0 comments on commit 5c437cb

Please sign in to comment.