-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Address Tracker for subscriptions to changed UTXOs notifications #427
Address Tracker for subscriptions to changed UTXOs notifications #427
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing but I thought I should get some of these out.
consensus/src/processes/transaction_validator/transaction_validator_populated.rs
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few initial comments on inner tracker logic. Still reviewing otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol, I had comments here but it looks like I never submitted them.
…the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving this, but there are a few technical debts to address in following PRs:
- Change
UtxosChangedScope
to an enum and allow a variant with a vector of indexes. This way internal mutations can avoid the translation back and forth to an address. This is both an optimization and a way to avoid the hacked usage of the mainnet address prefix in the translation - In
GrpcClient
, obtain the network from the server on start-up and use it to maintain the correct address prefix (important for re-registration on client re-connection). This too will avoid the usage of mainnet address prefix during re-connection - In
AddressTracker
, use a Guard-style mechanism to manage registrations. The guard will allow modifying the set of registered indexes, but most importantly, will unregister onDrop
, which makes it safer and will avoid future errors
…panet#427) * Add an integration test covering UTXOs propagation * Refactor daemon_utxos_propagation_test() * Add heap profiling feature * Cover VirtualDaaScoreChanged notifications in test * Merge branch 'master' into rpc-memory-benchmark * Assert all changed UTXOs * Use UtxosChangedScope ctor * Make active non-blanket UtxosChanged subscription unique * Refactor broadcaster subscription unregistering * Add Display to Connection trait * Save creation of UtxosChanged address vec on listener unregistration * Reduce UtxosChangedSubscription memory footprint on mutation * Add gRPC client timeout to every request & remove tpc keep alive * Reconnect gRPC client on broken pipe * Disable gRPC server http2 keepalive * Add a test benchmarking UtxosChanged notifications memory footprint * Fix log * Merge branch 'master' into rpc-memory-benchmark * Remove tokio dependency in crate notify * Add a UtxosChanged mutation policy to the notification system * Share subscriptions between listeners and broadcasters * Replace UtxoAddress with Address * Refactor Single::mutate * Refactor `UtxosChangedSubscription` internals * Make `UtxosChangedSubscription::hash()` cheaper to compute * mimalloc * mimalloc disable purge_decommits * Add to simpa and integration tests * remove redundant unix and replace with win/linux/mac * Add comment * Add comment * Sort out features * Remove request timeout from `heap` feature * Enhance `Broadcaster` memory release and logs * Track global `UtxosChangedSubscription` count * Fix heap profile * Let the number of broadcasters in gRPC server be configurable * Identify an active and filtering UtxosChangedSubscription with a ListenerId * Give address.rs its folder * Address tracker, indexing and counting registrations * Add a sharable subscription context to the notification system * Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription` * Add an index counter and use short names * Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait * Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions * fix lint * Add an optional maximum capacity to the address tracker * Introduce a mutation outcome * Remove unneeded CompoundedClone::clone_arc * Provide inner mutability to `Indexes` and `Counters` * Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core, gRPC server and wRPC server * Measure memory footprint of CounterMap * Extend `UtxosChangedSubscription` inner mutability to its state * Group all wildcard `UtxosChangedSubscription` in broadcaster plan * Have kaspad use a single `SubscriptionContext` * Add event_type() to Scope * Reduce the number of mutation clones * Log some memory stats in a file * Consume the address vector of UtxosChangedScope * Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor * Enhance the termination of all gRPC server connections * Put `UtxosChangedSubscription` state and indexes under the same lock * Some Tracker lock fairness enhancements * Preallocate static UtxosChanged subscriptions * Address new lint rules * Move memory_monitor * Silent on simnet * Add a shutdown listener to `Core` * Add a `Task` trait and implement some tasks * New daemon memory benchmark running in its own child process * Refactor `ClientPool` with a start fn returning a vector of `JoinHandle` * Add start and shutdown signaling to `ClientPool` * Add full miner, tx sender, subscriber tasks and all their sub-tasks * Use the tasks in fn utxos_changed_subscriptions_client * Cleaning * Fix a rare case potentially preventing subscriber tasks to exit * Fill the mempool up to the target * Run actual memory benchmarks * Add a main task to `TasksRunner` * Move tasks * Move tasks (2) * Rename full to group * Rename full to group (2) * Fix cargo syntax error * Add a stopper status to some tasks * Let the main task run before adding sub tasks that need it alive * Mempool benchmark based on tasks * Small adjustments on the utxos changed subscribe benchmark * Prevent a race condition * Refactor * Move the core shutdown request signaling into `RpcCoreService` * Add a signal indicating the gRPC server has started * Recycle emptied tracker entries * Add `max-tracked-addresses` argument * Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard` * Cleaning: remove R&D code * Merge branch 'master' into address-tracker-subscriptions * Some comments and documentation * Use a preset listener id in direct mode * Add lower and upper bounds to the tracker max address count & change the default value * For each event type the notifier can have at most one subscriber * Add and document `GrpcClient::connect_with_args` * Some doc * Complete `UtxosChangedMutationPolicy` description * Validate --max-tracked-addresses argument * remove unused AddressesHash * fix minor warnings under `devnet-prealloc` feature code
…panet#427) * Add an integration test covering UTXOs propagation * Refactor daemon_utxos_propagation_test() * Add heap profiling feature * Cover VirtualDaaScoreChanged notifications in test * Merge branch 'master' into rpc-memory-benchmark * Assert all changed UTXOs * Use UtxosChangedScope ctor * Make active non-blanket UtxosChanged subscription unique * Refactor broadcaster subscription unregistering * Add Display to Connection trait * Save creation of UtxosChanged address vec on listener unregistration * Reduce UtxosChangedSubscription memory footprint on mutation * Add gRPC client timeout to every request & remove tpc keep alive * Reconnect gRPC client on broken pipe * Disable gRPC server http2 keepalive * Add a test benchmarking UtxosChanged notifications memory footprint * Fix log * Merge branch 'master' into rpc-memory-benchmark * Remove tokio dependency in crate notify * Add a UtxosChanged mutation policy to the notification system * Share subscriptions between listeners and broadcasters * Replace UtxoAddress with Address * Refactor Single::mutate * Refactor `UtxosChangedSubscription` internals * Make `UtxosChangedSubscription::hash()` cheaper to compute * mimalloc * mimalloc disable purge_decommits * Add to simpa and integration tests * remove redundant unix and replace with win/linux/mac * Add comment * Add comment * Sort out features * Remove request timeout from `heap` feature * Enhance `Broadcaster` memory release and logs * Track global `UtxosChangedSubscription` count * Fix heap profile * Let the number of broadcasters in gRPC server be configurable * Identify an active and filtering UtxosChangedSubscription with a ListenerId * Give address.rs its folder * Address tracker, indexing and counting registrations * Add a sharable subscription context to the notification system * Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription` * Add an index counter and use short names * Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait * Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions * fix lint * Add an optional maximum capacity to the address tracker * Introduce a mutation outcome * Remove unneeded CompoundedClone::clone_arc * Provide inner mutability to `Indexes` and `Counters` * Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core, gRPC server and wRPC server * Measure memory footprint of CounterMap * Extend `UtxosChangedSubscription` inner mutability to its state * Group all wildcard `UtxosChangedSubscription` in broadcaster plan * Have kaspad use a single `SubscriptionContext` * Add event_type() to Scope * Reduce the number of mutation clones * Log some memory stats in a file * Consume the address vector of UtxosChangedScope * Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor * Enhance the termination of all gRPC server connections * Put `UtxosChangedSubscription` state and indexes under the same lock * Some Tracker lock fairness enhancements * Preallocate static UtxosChanged subscriptions * Address new lint rules * Move memory_monitor * Silent on simnet * Add a shutdown listener to `Core` * Add a `Task` trait and implement some tasks * New daemon memory benchmark running in its own child process * Refactor `ClientPool` with a start fn returning a vector of `JoinHandle` * Add start and shutdown signaling to `ClientPool` * Add full miner, tx sender, subscriber tasks and all their sub-tasks * Use the tasks in fn utxos_changed_subscriptions_client * Cleaning * Fix a rare case potentially preventing subscriber tasks to exit * Fill the mempool up to the target * Run actual memory benchmarks * Add a main task to `TasksRunner` * Move tasks * Move tasks (2) * Rename full to group * Rename full to group (2) * Fix cargo syntax error * Add a stopper status to some tasks * Let the main task run before adding sub tasks that need it alive * Mempool benchmark based on tasks * Small adjustments on the utxos changed subscribe benchmark * Prevent a race condition * Refactor * Move the core shutdown request signaling into `RpcCoreService` * Add a signal indicating the gRPC server has started * Recycle emptied tracker entries * Add `max-tracked-addresses` argument * Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard` * Cleaning: remove R&D code * Merge branch 'master' into address-tracker-subscriptions * Some comments and documentation * Use a preset listener id in direct mode * Add lower and upper bounds to the tracker max address count & change the default value * For each event type the notifier can have at most one subscriber * Add and document `GrpcClient::connect_with_args` * Some doc * Complete `UtxosChangedMutationPolicy` description * Validate --max-tracked-addresses argument * remove unused AddressesHash * fix minor warnings under `devnet-prealloc` feature code
…panet#427) * Add an integration test covering UTXOs propagation * Refactor daemon_utxos_propagation_test() * Add heap profiling feature * Cover VirtualDaaScoreChanged notifications in test * Merge branch 'master' into rpc-memory-benchmark * Assert all changed UTXOs * Use UtxosChangedScope ctor * Make active non-blanket UtxosChanged subscription unique * Refactor broadcaster subscription unregistering * Add Display to Connection trait * Save creation of UtxosChanged address vec on listener unregistration * Reduce UtxosChangedSubscription memory footprint on mutation * Add gRPC client timeout to every request & remove tpc keep alive * Reconnect gRPC client on broken pipe * Disable gRPC server http2 keepalive * Add a test benchmarking UtxosChanged notifications memory footprint * Fix log * Merge branch 'master' into rpc-memory-benchmark * Remove tokio dependency in crate notify * Add a UtxosChanged mutation policy to the notification system * Share subscriptions between listeners and broadcasters * Replace UtxoAddress with Address * Refactor Single::mutate * Refactor `UtxosChangedSubscription` internals * Make `UtxosChangedSubscription::hash()` cheaper to compute * mimalloc * mimalloc disable purge_decommits * Add to simpa and integration tests * remove redundant unix and replace with win/linux/mac * Add comment * Add comment * Sort out features * Remove request timeout from `heap` feature * Enhance `Broadcaster` memory release and logs * Track global `UtxosChangedSubscription` count * Fix heap profile * Let the number of broadcasters in gRPC server be configurable * Identify an active and filtering UtxosChangedSubscription with a ListenerId * Give address.rs its folder * Address tracker, indexing and counting registrations * Add a sharable subscription context to the notification system * Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription` * Add an index counter and use short names * Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait * Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions * fix lint * Add an optional maximum capacity to the address tracker * Introduce a mutation outcome * Remove unneeded CompoundedClone::clone_arc * Provide inner mutability to `Indexes` and `Counters` * Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core, gRPC server and wRPC server * Measure memory footprint of CounterMap * Extend `UtxosChangedSubscription` inner mutability to its state * Group all wildcard `UtxosChangedSubscription` in broadcaster plan * Have kaspad use a single `SubscriptionContext` * Add event_type() to Scope * Reduce the number of mutation clones * Log some memory stats in a file * Consume the address vector of UtxosChangedScope * Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor * Enhance the termination of all gRPC server connections * Put `UtxosChangedSubscription` state and indexes under the same lock * Some Tracker lock fairness enhancements * Preallocate static UtxosChanged subscriptions * Address new lint rules * Move memory_monitor * Silent on simnet * Add a shutdown listener to `Core` * Add a `Task` trait and implement some tasks * New daemon memory benchmark running in its own child process * Refactor `ClientPool` with a start fn returning a vector of `JoinHandle` * Add start and shutdown signaling to `ClientPool` * Add full miner, tx sender, subscriber tasks and all their sub-tasks * Use the tasks in fn utxos_changed_subscriptions_client * Cleaning * Fix a rare case potentially preventing subscriber tasks to exit * Fill the mempool up to the target * Run actual memory benchmarks * Add a main task to `TasksRunner` * Move tasks * Move tasks (2) * Rename full to group * Rename full to group (2) * Fix cargo syntax error * Add a stopper status to some tasks * Let the main task run before adding sub tasks that need it alive * Mempool benchmark based on tasks * Small adjustments on the utxos changed subscribe benchmark * Prevent a race condition * Refactor * Move the core shutdown request signaling into `RpcCoreService` * Add a signal indicating the gRPC server has started * Recycle emptied tracker entries * Add `max-tracked-addresses` argument * Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard` * Cleaning: remove R&D code * Merge branch 'master' into address-tracker-subscriptions * Some comments and documentation * Use a preset listener id in direct mode * Add lower and upper bounds to the tracker max address count & change the default value * For each event type the notifier can have at most one subscriber * Add and document `GrpcClient::connect_with_args` * Some doc * Complete `UtxosChangedMutationPolicy` description * Validate --max-tracked-addresses argument * remove unused AddressesHash * fix minor warnings under `devnet-prealloc` feature code
…panet#427) * Add an integration test covering UTXOs propagation * Refactor daemon_utxos_propagation_test() * Add heap profiling feature * Cover VirtualDaaScoreChanged notifications in test * Merge branch 'master' into rpc-memory-benchmark * Assert all changed UTXOs * Use UtxosChangedScope ctor * Make active non-blanket UtxosChanged subscription unique * Refactor broadcaster subscription unregistering * Add Display to Connection trait * Save creation of UtxosChanged address vec on listener unregistration * Reduce UtxosChangedSubscription memory footprint on mutation * Add gRPC client timeout to every request & remove tpc keep alive * Reconnect gRPC client on broken pipe * Disable gRPC server http2 keepalive * Add a test benchmarking UtxosChanged notifications memory footprint * Fix log * Merge branch 'master' into rpc-memory-benchmark * Remove tokio dependency in crate notify * Add a UtxosChanged mutation policy to the notification system * Share subscriptions between listeners and broadcasters * Replace UtxoAddress with Address * Refactor Single::mutate * Refactor `UtxosChangedSubscription` internals * Make `UtxosChangedSubscription::hash()` cheaper to compute * mimalloc * mimalloc disable purge_decommits * Add to simpa and integration tests * remove redundant unix and replace with win/linux/mac * Add comment * Add comment * Sort out features * Remove request timeout from `heap` feature * Enhance `Broadcaster` memory release and logs * Track global `UtxosChangedSubscription` count * Fix heap profile * Let the number of broadcasters in gRPC server be configurable * Identify an active and filtering UtxosChangedSubscription with a ListenerId * Give address.rs its folder * Address tracker, indexing and counting registrations * Add a sharable subscription context to the notification system * Use a subscription context in `Notification` trait and in `single::UtxosChangedSubscription` * Add an index counter and use short names * Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait * Rely on hash sets & maps instead of sorted vectors in single and compounded UtxosChanged subscriptions * fix lint * Add an optional maximum capacity to the address tracker * Introduce a mutation outcome * Remove unneeded CompoundedClone::clone_arc * Provide inner mutability to `Indexes` and `Counters` * Restore the filtering of UtxosChanged notifications based on compounded subscriptions in RPC core, gRPC server and wRPC server * Measure memory footprint of CounterMap * Extend `UtxosChangedSubscription` inner mutability to its state * Group all wildcard `UtxosChangedSubscription` in broadcaster plan * Have kaspad use a single `SubscriptionContext` * Add event_type() to Scope * Reduce the number of mutation clones * Log some memory stats in a file * Consume the address vector of UtxosChangedScope * Retain the original address vector of a UtxosChanged mutation along the full chain of notifiers up to the IndexProcessor * Enhance the termination of all gRPC server connections * Put `UtxosChangedSubscription` state and indexes under the same lock * Some Tracker lock fairness enhancements * Preallocate static UtxosChanged subscriptions * Address new lint rules * Move memory_monitor * Silent on simnet * Add a shutdown listener to `Core` * Add a `Task` trait and implement some tasks * New daemon memory benchmark running in its own child process * Refactor `ClientPool` with a start fn returning a vector of `JoinHandle` * Add start and shutdown signaling to `ClientPool` * Add full miner, tx sender, subscriber tasks and all their sub-tasks * Use the tasks in fn utxos_changed_subscriptions_client * Cleaning * Fix a rare case potentially preventing subscriber tasks to exit * Fill the mempool up to the target * Run actual memory benchmarks * Add a main task to `TasksRunner` * Move tasks * Move tasks (2) * Rename full to group * Rename full to group (2) * Fix cargo syntax error * Add a stopper status to some tasks * Let the main task run before adding sub tasks that need it alive * Mempool benchmark based on tasks * Small adjustments on the utxos changed subscribe benchmark * Prevent a race condition * Refactor * Move the core shutdown request signaling into `RpcCoreService` * Add a signal indicating the gRPC server has started * Recycle emptied tracker entries * Add `max-tracked-addresses` argument * Rename `UtxosChangedMutationPolicy` `AllOrNothing` to `Wildcard` * Cleaning: remove R&D code * Merge branch 'master' into address-tracker-subscriptions * Some comments and documentation * Use a preset listener id in direct mode * Add lower and upper bounds to the tracker max address count & change the default value * For each event type the notifier can have at most one subscriber * Add and document `GrpcClient::connect_with_args` * Some doc * Complete `UtxosChangedMutationPolicy` description * Validate --max-tracked-addresses argument * remove unused AddressesHash * fix minor warnings under `devnet-prealloc` feature code
Subscriptions to
UtxosChanged
notifications is getting a deep revamp optimizing memory consumption (highly) and execution speed (moderately).Address Tracker
The subscription sub-system uses a new ubiquitous
SubscriptionContext
, containing essentially an addressTracker
whose role is to track addresses involved inUtxosChangedSubscriptions
, indexing them and counting references. All subscriptions use now address indexes instead ofScriptPublicKeys
, reducing memory consumption significantly.The
Tracker
features an optional maximum address count. When provided, it allows toKaspad Argument
The maximum tracked addresses can be defined in the kaspad command with argument
--max-tracked-addresses
with default value set to 1,835,007.The component used internally (an
IndexMap
) allocates maps with a capacity computed with following formula:The value passed by argument always gets expanded to the actual usable allocated capacity. For a target of 1M addresses the expanded capacity is 1,835,008. The tracker reserves the last entry for address recycling so the actual maximum is 1,835,007.
Subscription propagation and memory optimizations
Every
Notifier
is provided a newUtxosChangedMutationPolicy
that defines how an incomingUtxosChangedScope
mutations will be propagated to its parents. The policy supports two modes: fine-grainedAddressSet
or all or nothingWildcard
.The notification system is configured to propagate address sets upstream from all RPC Servers up to the Index Processor. This appears to be the optimal balance in memory consumption + processing effort between upstream subscriptions and downstream notifications.
The processing of an incoming
UtxosChangedScope
mutation with its embedded address vector is refactored in a way that retains the same vector all the way up from RPC server to the index processor, helping reduce memory fragmentation.A
Notifier
and itsBroadcaster
s now share the same subscriptions instances, significantly reducing the allocated memory for large address sets.UtxosChangedSubscription
The struct is refactored so that it gets inner mutability of its data field (containing state & address index set) and an immutable listener ID (used to identify the instance, notably for equality).
A subscription has 3 states:
None
,Selected
andAll
. The last 2 are indicative of an active subscription.Selected
indicates an address set.In a
Broadcaster
, the subscriptions withSelected
mode are used as such in the broadcastingPlan
, meaning that 2 listeners of this state are always considered distinct. The rationale here for not trying to compare their respective address set for possible equality is that the this outcome has a probability close to null while the computing cost of the comparison is high.In a
Broadcaster
, all subscriptions withAll
state are grouped under a unique ubiquitous instance provided by theSubscriptionContext
, allowing to mutate the state of a subscription without affecting the broadcastingPlan
.Benchmarking
New framework
A new test framework allows the quick and idiomatic setup of an integration test featuring ie. a node, a miner and any additional task the test needs, like submitting large sets of transactions (mempool benchmarking) or cycles of subscriptions/unsubscriptions for many clients at once.
As an example, the mempool benchmark setup gets refactored as:
Memory benchmarking of UTXOs changed subscriptions
A set of benchmarks is added, dedicated to measuring the memory consumption in various UTXOs changed subscriptions setups. In all cases, 500 clients connect to a mining node, processing ~100 TPS. 495 client are considered "wallets" and each do subscribe to 800 unique addresses. The 5 clients left are "monitoring services" that subscribe to addresses sets of length 200K to 1M by 200K increments, sets mainly overlapping with the wallets addresses.
Clients run subscription cycles of predefined duration. During 2/3 of the cycle, the client is subscribed to its addresses and the remaining 1/3 of the time, the client is fully unsubscribed.
The benchmark runs until 1.5M transactions are mined. The cycle duration appears to have some moderate impact on the overall benchmark duration. The sending and processing of the 500 subscriptions all in a row slows down the node TPS a bit during ~30 seconds, probably because the network bandwidth and gRPC server are getting briefly saturated.
The benchmark is structured into a client process spawning a child server process. The parent process runs all client activities: mining, submitting transactions and the 500 gRPC clients subscribing to UtxosChanged notifications cyclically. The server runs a node. This architecture has the desired property of having the client and the server running each in isolation, in particular each managing its own memory.
Memory consumption of the server process is tracked every 5 seconds all along the benchmark run. An average value is then computed on the data excluding the first 60 minutes considered as warmup.
The various benchmark setups are: