Skip to content

Commit

Permalink
Use a subscription context in Notification trait and in `single::Ut…
Browse files Browse the repository at this point in the history
…xosChangedSubscription`
  • Loading branch information
tiram88 committed Jan 19, 2024
1 parent 59307d1 commit 57f22fe
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 151 deletions.
15 changes: 12 additions & 3 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand Down Expand Up @@ -45,14 +46,18 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(&self, subscription: &VirtualChainChangedSubscription) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(
&self,
subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
match subscription.active() {
true => {
// If the subscription excludes accepted transaction ids and the notification includes some
Expand All @@ -72,7 +77,11 @@ impl NotificationTrait for Notification {
}
}

fn apply_utxos_changed_subscription(&self, _subscription: &UtxosChangedSubscription) -> Option<Self> {
fn apply_utxos_changed_subscription(
&self,
_subscription: &UtxosChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
// No effort is made here to apply the subscription addresses.
// This will be achieved farther along the notification backbone.
Some(self.clone())
Expand Down
60 changes: 39 additions & 21 deletions indexes/core/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use kaspa_notify::{
full_featured,
notification::Notification as NotificationTrait,
subscription::{
context::SubscriptionContext,
single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription},
Subscription,
},
Expand All @@ -23,22 +24,30 @@ pub enum Notification {
}

impl NotificationTrait for Notification {
fn apply_overall_subscription(&self, subscription: &OverallSubscription) -> Option<Self> {
fn apply_overall_subscription(&self, subscription: &OverallSubscription, _context: &SubscriptionContext) -> Option<Self> {
match subscription.active() {
true => Some(self.clone()),
false => None,
}
}

fn apply_virtual_chain_changed_subscription(&self, _subscription: &VirtualChainChangedSubscription) -> Option<Self> {
fn apply_virtual_chain_changed_subscription(
&self,
_subscription: &VirtualChainChangedSubscription,
_context: &SubscriptionContext,
) -> Option<Self> {
Some(self.clone())
}

fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
match subscription.active() {
true => {
let Self::UtxosChanged(notification) = self else { return None };
notification.apply_utxos_changed_subscription(subscription).map(Self::UtxosChanged)
notification.apply_utxos_changed_subscription(subscription, context).map(Self::UtxosChanged)
}
false => None,
}
Expand Down Expand Up @@ -69,12 +78,16 @@ impl UtxosChangedNotification {
Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) }
}

pub(crate) fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option<Self> {
pub(crate) fn apply_utxos_changed_subscription(
&self,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> Option<Self> {
if subscription.to_all() {
Some(self.clone())
} else {
let added = Self::filter_utxo_set(&self.added, subscription);
let removed = Self::filter_utxo_set(&self.removed, subscription);
let added = Self::filter_utxo_set(&self.added, subscription, context);
let removed = Self::filter_utxo_set(&self.removed, subscription, context);
if added.is_empty() && removed.is_empty() {
None
} else {
Expand All @@ -83,23 +96,28 @@ impl UtxosChangedNotification {
}
}

fn filter_utxo_set(utxo_set: &UtxoSetByScriptPublicKey, subscription: &UtxosChangedSubscription) -> UtxoSetByScriptPublicKey {
fn filter_utxo_set(
utxo_set: &UtxoSetByScriptPublicKey,
subscription: &UtxosChangedSubscription,
context: &SubscriptionContext,
) -> UtxoSetByScriptPublicKey {
// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
let mut result = HashMap::default();
if utxo_set.len() < subscription.len() {
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription.contains(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
});
} else {
subscription.iter().filter(|script_public_key| utxo_set.contains_key(script_public_key)).for_each(|script_public_key| {
if let Some(collection) = utxo_set.get(script_public_key) {
result.insert(script_public_key.clone(), collection.clone());
}
});
}
// FIXME
// if utxo_set.len() < subscription.len() {
utxo_set.iter().for_each(|(script_public_key, collection)| {
if subscription.contains(script_public_key, context) {
result.insert(script_public_key.clone(), collection.clone());
}
});
// } else {
// subscription.iter().filter(|script_public_key| utxo_set.contains_key(script_public_key)).for_each(|script_public_key| {
// if let Some(collection) = utxo_set.get(script_public_key) {
// result.insert(script_public_key.clone(), collection.clone());
// }
// });
// }
result
}
}
47 changes: 31 additions & 16 deletions notify/src/address/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{ops::Deref, slice::Iter};

use indexmap::{map::Entry, IndexMap};
use kaspa_addresses::{Address, Prefix};
use kaspa_consensus_core::tx::ScriptPublicKey;
use kaspa_txscript::{extract_script_pub_key_address, pay_to_address_script};
use parking_lot::RwLock;
use std::{
fmt::Display,
slice::{Chunks, Iter},
};

pub type AddressIndex = u32;
pub type RefCount = u16;

#[derive(Debug, Clone)]
pub struct AddressIndexes(Vec<AddressIndex>);

impl AddressIndexes {
Expand Down Expand Up @@ -52,13 +55,9 @@ impl AddressIndexes {
pub fn iter(&self) -> Iter<'_, AddressIndex> {
self.0.iter()
}
}

impl Deref for AddressIndexes {
type Target = [AddressIndex];

fn deref(&self) -> &Self::Target {
&self.0
pub fn chunks(&self, chunk_size: usize) -> Chunks<'_, AddressIndex> {
self.0.chunks(chunk_size)
}
}

Expand Down Expand Up @@ -107,6 +106,7 @@ impl Inner {
Entry::Occupied(entry) => entry.index() as AddressIndex,
Entry::Vacant(entry) => {
let index = entry.index() as AddressIndex;
// trace!("AddressTracker insert #{} {}", index, extract_script_pub_key_address(entry.key(), Prefix::Mainnet).unwrap());
let _ = *entry.insert(0);
index
}
Expand All @@ -116,6 +116,7 @@ impl Inner {
fn inc_count(&mut self, address_idx: AddressIndex) {
if let Some((_, count)) = self.script_pub_keys.get_index_mut(address_idx as usize) {
*count += 1;
// trace!("AddressTracker inc count #{} to {}", address_idx, *count);
}
}

Expand All @@ -124,7 +125,8 @@ impl Inner {
if *count == 0 {
panic!("Address tracker is trying to decrease an address counter that is already at zero");
}
*count -= 1
*count -= 1;
// trace!("AddressTracker dec count #{} to {}", address_idx, *count);
}
}

Expand Down Expand Up @@ -158,6 +160,12 @@ pub struct AddressTracker {
inner: RwLock<Inner>,
}

impl Display for AddressTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} addresses", self.inner.read().script_pub_keys.len())
}
}

impl AddressTracker {
const ADDRESS_CHUNK_SIZE: usize = 256;

Expand Down Expand Up @@ -186,14 +194,14 @@ impl AddressTracker {
}

pub fn contains(&self, indexes: &AddressIndexes, spk: &ScriptPublicKey) -> bool {
if let Some((address_idx, _)) = self.inner.read().get(spk) {
indexes.contains(address_idx)
} else {
false
}
self.get(spk).is_some_and(|(address_idx, _)| indexes.contains(address_idx))
}

pub fn register(&mut self, indexes: &mut AddressIndexes, addresses: &[Address]) -> Vec<Address> {
pub fn contains_address(&self, indexes: &AddressIndexes, address: &Address) -> bool {
self.contains(indexes, &pay_to_address_script(address))
}

pub fn register(&self, indexes: &mut AddressIndexes, addresses: &[Address]) -> Vec<Address> {
let mut added = Vec::with_capacity(addresses.len());
for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) {
let mut inner = self.inner.write();
Expand All @@ -209,7 +217,7 @@ impl AddressTracker {
added
}

pub fn unregister(&mut self, indexes: &mut AddressIndexes, addresses: &[Address]) -> Vec<Address> {
pub fn unregister(&self, indexes: &mut AddressIndexes, addresses: &[Address]) -> Vec<Address> {
let mut removed = Vec::with_capacity(addresses.len());
for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) {
let mut inner = self.inner.write();
Expand All @@ -226,6 +234,13 @@ impl AddressTracker {
removed
}

pub fn unregister_indexes(&self, indexes: &AddressIndexes) {
for chunk in indexes.chunks(Self::ADDRESS_CHUNK_SIZE) {
let mut inner = self.inner.write();
chunk.iter().for_each(|address_idx| inner.dec_count(*address_idx));
}
}

pub fn to_addresses(&self, indexes: &[AddressIndex], prefix: Prefix) -> Vec<Address> {
let mut addresses = Vec::with_capacity(indexes.len());
for chunk in indexes.chunks(Self::ADDRESS_CHUNK_SIZE) {
Expand Down
Loading

0 comments on commit 57f22fe

Please sign in to comment.