From d33808d9741cd084ddba7180789653144f71ab35 Mon Sep 17 00:00:00 2001 From: Tiram <18632023+tiram88@users.noreply.github.com> Date: Sun, 21 Jan 2024 02:12:48 +0000 Subject: [PATCH] Use a subscription context in `compounded::UtxosChangedSubscription` and in `Subscription` trait --- notify/src/address/tracker.rs | 51 ++++++-- notify/src/notifier.rs | 15 ++- notify/src/subscription/compounded.rs | 126 ++++++++++--------- notify/src/subscription/mod.rs | 4 +- notify/src/subscription/single.rs | 10 +- rpc/grpc/client/src/lib.rs | 8 +- testing/integration/src/notify_benchmarks.rs | 4 +- 7 files changed, 134 insertions(+), 84 deletions(-) diff --git a/notify/src/address/tracker.rs b/notify/src/address/tracker.rs index b6303a140..34131ae60 100644 --- a/notify/src/address/tracker.rs +++ b/notify/src/address/tracker.rs @@ -1,6 +1,7 @@ use indexmap::{map::Entry, IndexMap}; use kaspa_addresses::{Address, Prefix}; use kaspa_consensus_core::tx::ScriptPublicKey; +use kaspa_core::trace; use kaspa_txscript::{extract_script_pub_key_address, pay_to_address_script}; use parking_lot::RwLock; use std::{ @@ -12,6 +13,7 @@ pub trait Indexer { fn contains(&self, index: Index) -> bool; fn insert(&mut self, index: Index) -> bool; fn remove(&mut self, index: Index) -> bool; + fn unlock(&mut self); } pub type Index = u32; @@ -21,11 +23,24 @@ pub type RefCount = u16; pub struct Counter { pub index: Index, pub count: RefCount, + pub locked: bool, } impl Counter { pub fn new(index: Index, count: RefCount) -> Self { - Self { index, count } + Self { index, count, locked: false } + } + + pub fn active(&self) -> bool { + self.count > 0 + } + + pub fn locked(&self) -> bool { + self.locked + } + + pub fn unlock(&mut self) { + self.locked = false } } @@ -47,6 +62,7 @@ impl Ord for Counter { } } +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct Counters(Vec); impl Counters { @@ -78,11 +94,15 @@ impl Indexer for Counters { } fn insert(&mut self, index: Index) -> bool { - let item = Counter::new(index, 1); + let item = Counter { index, count: 1, locked: true }; match self.0.binary_search(&item) { Ok(rank) => { - self.0[rank].count += 1; - false + let counter = self.0.get_mut(rank).unwrap(); + if !counter.locked { + counter.locked = true; + counter.count += 1; + } + counter.count == 1 } Err(rank) => { self.0.insert(rank, item); @@ -94,17 +114,22 @@ impl Indexer for Counters { fn remove(&mut self, index: Index) -> bool { match self.0.binary_search(&Counter::new(index, 0)) { Ok(rank) => { - if self.0[rank].count == 1 { - self.0.remove(rank); - true + let counter = self.0.get_mut(rank).unwrap(); + if counter.count > 0 && !counter.locked { + counter.locked = true; + counter.count -= 1; + counter.count == 0 } else { - self.0[rank].count -= 1; false } } Err(_) => false, } } + + fn unlock(&mut self) { + self.0.iter_mut().for_each(Counter::unlock); + } } #[derive(Debug, Clone)] @@ -157,6 +182,8 @@ impl Indexer for Indexes { Err(_) => false, } } + + fn unlock(&mut self) {} } impl From> for Indexes { @@ -204,7 +231,7 @@ impl Inner { Entry::Occupied(entry) => entry.index() as Index, Entry::Vacant(entry) => { let index = entry.index() as Index; - // trace!("AddressTracker insert #{} {}", index, extract_script_pub_key_address(entry.key(), Prefix::Mainnet).unwrap()); + trace!("AddressTracker insert #{} {}", index, extract_script_pub_key_address(entry.key(), Prefix::Mainnet).unwrap()); let _ = *entry.insert(0); index } @@ -214,7 +241,7 @@ impl Inner { fn inc_count(&mut self, index: Index) { if let Some((_, count)) = self.script_pub_keys.get_index_mut(index as usize) { *count += 1; - // trace!("AddressTracker inc count #{} to {}", index, *count); + trace!("AddressTracker inc count #{} to {}", index, *count); } } @@ -224,7 +251,7 @@ impl Inner { panic!("Address tracker is trying to decrease an address counter that is already at zero"); } *count -= 1; - // trace!("AddressTracker dec count #{} to {}", index, *count); + trace!("AddressTracker dec count #{} to {}", index, *count); } } @@ -301,6 +328,7 @@ impl Tracker { pub fn register(&self, indexes: &mut T, addresses: &[Address]) -> Vec
{ let mut added = Vec::with_capacity(addresses.len()); + indexes.unlock(); for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) { let mut inner = self.inner.write(); for address in chunk { @@ -317,6 +345,7 @@ impl Tracker { pub fn unregister(&self, indexes: &mut T, addresses: &[Address]) -> Vec
{ let mut removed = Vec::with_capacity(addresses.len()); + indexes.unlock(); for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) { let mut inner = self.inner.write(); for address in chunk { diff --git a/notify/src/notifier.rs b/notify/src/notifier.rs index 7538beea7..8078dee1b 100644 --- a/notify/src/notifier.rs +++ b/notify/src/notifier.rs @@ -366,7 +366,7 @@ where self.broadcasters.iter().try_for_each(|broadcaster| broadcaster.unregister(event, id))?; } } - self.apply_mutations(event, mutations)?; + self.apply_mutations(event, mutations, &self.subscription_context)?; } else { trace!("[Notifier {}] {command} notifying listener {id} about {scope:?} is ignored (no mutation)", self.name); // In case we have a sync channel, report that the command was processed. @@ -378,12 +378,12 @@ where Ok(()) } - fn apply_mutations(&self, event: EventType, mutations: Vec) -> Result<()> { + fn apply_mutations(&self, event: EventType, mutations: Vec, context: &SubscriptionContext) -> Result<()> { let mut subscriptions = self.subscriptions.lock(); // Compound mutations let mut compound_result = None; for mutation in mutations { - compound_result = subscriptions[event].compound(mutation); + compound_result = subscriptions[event].compound(mutation, context); } // Report to the parents if let Some(mutation) = compound_result { @@ -410,7 +410,7 @@ where fn renew_subscriptions(&self) -> Result<()> { let subscriptions = self.subscriptions.lock(); EVENT_TYPE_ARRAY.iter().copied().filter(|x| self.enabled_events[*x] && subscriptions[*x].active()).try_for_each(|x| { - let mutation = Mutation::new(Command::Start, subscriptions[x].scope()); + let mutation = Mutation::new(Command::Start, subscriptions[x].scope(&self.subscription_context)); self.subscribers.iter().try_for_each(|subscriber| subscriber.mutate(mutation.clone()))?; Ok(()) }) @@ -848,6 +848,13 @@ mod tests { "{} - {}: the listener[{}] mutation {mutation:?} yielded the wrong subscription", self.name, step.name, idx ); + assert!( + self.subscription_receiver.is_empty(), + "{} - {}: listener[{}] mutation {mutation:?} yielded an extra subscription but should not", + self.name, + step.name, + idx + ); } else { assert!( self.subscription_receiver.is_empty(), diff --git a/notify/src/subscription/compounded.rs b/notify/src/subscription/compounded.rs index dc699b8f5..dc442248a 100644 --- a/notify/src/subscription/compounded.rs +++ b/notify/src/subscription/compounded.rs @@ -1,11 +1,12 @@ -use super::{Compounded, Mutation, Subscription}; +use super::{context::SubscriptionContext, Compounded, Mutation, Subscription}; use crate::{ + address::tracker::Counters, events::EventType, scope::{Scope, UtxosChangedScope, VirtualChainChangedScope}, subscription::Command, }; -use kaspa_addresses::Address; -use std::collections::{HashMap, HashSet}; +use itertools::Itertools; +use kaspa_addresses::{Address, Prefix}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct OverallSubscription { @@ -20,7 +21,7 @@ impl OverallSubscription { } impl Compounded for OverallSubscription { - fn compound(&mut self, mutation: Mutation) -> Option { + fn compound(&mut self, mutation: Mutation, _context: &SubscriptionContext) -> Option { assert_eq!(self.event_type(), mutation.event_type()); match mutation.command { Command::Start => { @@ -51,7 +52,7 @@ impl Subscription for OverallSubscription { self.active > 0 } - fn scope(&self) -> Scope { + fn scope(&self, _context: &SubscriptionContext) -> Scope { self.event_type.into() } } @@ -84,7 +85,7 @@ impl VirtualChainChangedSubscription { } impl Compounded for VirtualChainChangedSubscription { - fn compound(&mut self, mutation: Mutation) -> Option { + fn compound(&mut self, mutation: Mutation, _context: &SubscriptionContext) -> Option { assert_eq!(self.event_type(), mutation.event_type()); if let Scope::VirtualChainChanged(ref scope) = mutation.scope { let all = scope.include_accepted_transaction_ids; @@ -144,7 +145,7 @@ impl Subscription for VirtualChainChangedSubscription { self.include_accepted_transaction_ids.iter().sum::() > 0 } - fn scope(&self) -> Scope { + fn scope(&self, _context: &SubscriptionContext) -> Scope { Scope::VirtualChainChanged(VirtualChainChangedScope::new(self.all() > 0)) } } @@ -152,71 +153,64 @@ impl Subscription for VirtualChainChangedSubscription { #[derive(Clone, Default, Debug, PartialEq, Eq)] pub struct UtxosChangedSubscription { all: usize, - addresses: HashMap, + indexes: Counters, +} + +impl UtxosChangedSubscription { + pub fn to_addresses(&self, prefix: Prefix, context: &SubscriptionContext) -> Vec
{ + self.indexes + .iter() + .filter(|counter| counter.count > 0) + .filter_map(|counter| context.address_tracker.get_index_address(counter.index, prefix)) + .collect_vec() + } + + pub fn register(&mut self, addresses: &[Address], context: &SubscriptionContext) -> Vec
{ + context.address_tracker.register(&mut self.indexes, addresses) + } + + pub fn unregister(&mut self, addresses: &[Address], context: &SubscriptionContext) -> Vec
{ + context.address_tracker.unregister(&mut self.indexes, addresses) + } } impl Compounded for UtxosChangedSubscription { - fn compound(&mut self, mutation: Mutation) -> Option { + fn compound(&mut self, mutation: Mutation, context: &SubscriptionContext) -> Option { assert_eq!(self.event_type(), mutation.event_type()); - if let Scope::UtxosChanged(mut scope) = mutation.scope { + if let Scope::UtxosChanged(scope) = mutation.scope { match mutation.command { Command::Start => { if scope.addresses.is_empty() { // Add All self.all += 1; if self.all == 1 { - return Some(Mutation::new(Command::Start, Scope::UtxosChanged(UtxosChangedScope::default()))); + return Some(Mutation::new(Command::Start, UtxosChangedScope::default().into())); } } else { // Add(A) - let mut added = vec![]; - // Make sure no duplicate exists in addresses - let addresses: HashSet
= scope.addresses.drain(0..).collect(); - for address in addresses { - self.addresses.entry(address.clone()).and_modify(|counter| *counter += 1).or_insert_with(|| { - added.push(address); - 1 - }); - } + let added = self.register(&scope.addresses, context); if !added.is_empty() && self.all == 0 { - return Some(Mutation::new(Command::Start, Scope::UtxosChanged(UtxosChangedScope::new(added)))); + return Some(Mutation::new(Command::Start, UtxosChangedScope::new(added).into())); } } } Command::Stop => { if !scope.addresses.is_empty() { // Remove(R) - let mut removed = vec![]; - // Make sure no duplicate exists in addresses - let addresses: HashSet
= scope.addresses.drain(0..).collect(); - for address in addresses { - assert!(self.addresses.contains_key(&address)); - self.addresses.entry(address.clone()).and_modify(|counter| { - *counter -= 1; - if *counter == 0 { - removed.push(address); - } - }); - } - // Cleanup self.addresses - removed.iter().for_each(|x| { - self.addresses.remove(x); - }); + let removed = self.unregister(&scope.addresses, context); if !removed.is_empty() && self.all == 0 { - return Some(Mutation::new(Command::Stop, Scope::UtxosChanged(UtxosChangedScope::new(removed)))); + return Some(Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())); } } else { // Remove All assert!(self.all > 0); self.all -= 1; if self.all == 0 { - if !self.addresses.is_empty() { - return Some(Mutation::new( - Command::Start, - Scope::UtxosChanged(UtxosChangedScope::new(self.addresses.keys().cloned().collect())), - )); + let addresses = self.to_addresses(Prefix::Mainnet, context); + if !addresses.is_empty() { + return Some(Mutation::new(Command::Start, UtxosChangedScope::new(addresses).into())); } else { - return Some(Mutation::new(Command::Stop, Scope::UtxosChanged(UtxosChangedScope::default()))); + return Some(Mutation::new(Command::Stop, UtxosChangedScope::default().into())); } } } @@ -234,20 +228,25 @@ impl Subscription for UtxosChangedSubscription { } fn active(&self) -> bool { - self.all > 0 || !self.addresses.is_empty() + self.all > 0 || !self.indexes.is_empty() } - fn scope(&self) -> Scope { - let addresses = if self.all > 0 { vec![] } else { self.addresses.keys().cloned().collect() }; + fn scope(&self, context: &SubscriptionContext) -> Scope { + let addresses = if self.all > 0 { vec![] } else { self.to_addresses(Prefix::Mainnet, context) }; Scope::UtxosChanged(UtxosChangedScope::new(addresses)) } } #[cfg(test)] mod tests { + use kaspa_core::trace; + use super::super::*; use super::*; - use crate::{address::test_helpers::get_3_addresses, scope::BlockAddedScope}; + use crate::{ + address::{test_helpers::get_3_addresses, tracker::Counter}, + scope::BlockAddedScope, + }; use std::panic::AssertUnwindSafe; struct Step { @@ -258,6 +257,7 @@ mod tests { struct Test { name: &'static str, + context: SubscriptionContext, initial_state: CompoundedSubscription, steps: Vec, final_state: CompoundedSubscription, @@ -266,9 +266,11 @@ mod tests { impl Test { fn run(&self) -> CompoundedSubscription { let mut state = self.initial_state.clone_box(); - for step in self.steps.iter() { - let result = state.compound(step.mutation.clone()); + for (idx, step) in self.steps.iter().enumerate() { + trace!("{}: {}", idx, step.name); + let result = state.compound(step.mutation.clone(), &self.context); assert_eq!(step.result, result, "{} - {}: wrong compound result", self.name, step.name); + trace!("{}: state = {:?}", idx, state); } assert_eq!(*self.final_state, *state, "{}: wrong final state", self.name); state @@ -283,6 +285,7 @@ mod tests { let remove = || Mutation::new(Command::Stop, Scope::BlockAdded(BlockAddedScope {})); let test = Test { name: "OverallSubscription 0 to 2 to 0", + context: SubscriptionContext::new(), initial_state: none(), steps: vec![ Step { name: "add 1", mutation: add(), result: Some(add()) }, @@ -295,7 +298,7 @@ mod tests { let mut state = test.run(); // Removing once more must panic - let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove()))); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove(), &test.context))); assert!(result.is_err(), "{}: trying to remove when counter is zero must panic", test.name); } @@ -312,6 +315,7 @@ mod tests { let remove_all = || m(Command::Stop, true); let test = Test { name: "VirtualChainChanged", + context: SubscriptionContext::new(), initial_state: none(), steps: vec![ Step { name: "add all 1", mutation: add_all(), result: Some(add_all()) }, @@ -335,15 +339,16 @@ mod tests { let mut state = test.run(); // Removing once more must panic - let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_all()))); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_all(), &test.context))); assert!(result.is_err(), "{}: trying to remove all when counter is zero must panic", test.name); - let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_reduced()))); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_reduced(), &test.context))); assert!(result.is_err(), "{}: trying to remove reduced when counter is zero must panic", test.name); } #[test] #[allow(clippy::redundant_clone)] fn test_utxos_changed_compounding() { + kaspa_core::log::try_init_logger("trace,kaspa_notify=trace"); let a_stock = get_3_addresses(true); let a = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::>(); @@ -362,6 +367,7 @@ mod tests { let test = Test { name: "UtxosChanged", + context: SubscriptionContext::new(), initial_state: none(), steps: vec![ Step { name: "add all 1", mutation: add_all(), result: Some(add_all()) }, @@ -383,14 +389,20 @@ mod tests { Step { name: "remove all 1, revealing a0", mutation: remove_all(), result: Some(add_0()) }, Step { name: "remove a0", mutation: remove_0(), result: Some(remove_0()) }, ], - final_state: none(), + final_state: Box::new(UtxosChangedSubscription { + all: 0, + indexes: Counters::new(vec![ + Counter { index: 0, count: 0, locked: true }, + Counter { index: 1, count: 0, locked: false }, + ]), + }), }; let mut state = test.run(); // Removing once more must panic - let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_all()))); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_all(), &test.context))); assert!(result.is_err(), "{}: trying to remove all when counter is zero must panic", test.name); - let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_0()))); - assert!(result.is_err(), "{}: trying to remove an address when its counter is zero must panic", test.name); + // let result = std::panic::catch_unwind(AssertUnwindSafe(|| state.compound(remove_0(), &test.context))); + // assert!(result.is_err(), "{}: trying to remove an address when its counter is zero must panic", test.name); } } diff --git a/notify/src/subscription/mod.rs b/notify/src/subscription/mod.rs index e00fe7981..c1c80f3b7 100644 --- a/notify/src/subscription/mod.rs +++ b/notify/src/subscription/mod.rs @@ -98,11 +98,11 @@ impl Mutation { pub trait Subscription { fn event_type(&self) -> EventType; fn active(&self) -> bool; - fn scope(&self) -> Scope; + fn scope(&self, context: &SubscriptionContext) -> Scope; } pub trait Compounded: Subscription + AsAny + DynEq + CompoundedClone + Debug + Send + Sync { - fn compound(&mut self, mutation: Mutation) -> Option; + fn compound(&mut self, mutation: Mutation, context: &SubscriptionContext) -> Option; } impl PartialEq for dyn Compounded { diff --git a/notify/src/subscription/single.rs b/notify/src/subscription/single.rs index 93a846883..a42993836 100644 --- a/notify/src/subscription/single.rs +++ b/notify/src/subscription/single.rs @@ -65,7 +65,7 @@ impl Subscription for OverallSubscription { self.active } - fn scope(&self) -> Scope { + fn scope(&self, _context: &SubscriptionContext) -> Scope { self.event_type.into() } } @@ -159,7 +159,7 @@ impl Subscription for VirtualChainChangedSubscription { self.active } - fn scope(&self) -> Scope { + fn scope(&self, _context: &SubscriptionContext) -> Scope { VirtualChainChangedScope::new(self.include_accepted_transaction_ids).into() } } @@ -426,9 +426,9 @@ impl Subscription for UtxosChangedSubscription { self.active } - fn scope(&self) -> Scope { - //UtxosChangedScope::new(self.to_addresses(Prefix::Mainnet, context)).into() - UtxosChangedScope::new(vec![]).into() // FIXME + fn scope(&self, context: &SubscriptionContext) -> Scope { + UtxosChangedScope::new(self.to_addresses(Prefix::Mainnet, context)).into() + // FIXME } } diff --git a/rpc/grpc/client/src/lib.rs b/rpc/grpc/client/src/lib.rs index 0dc985ae5..464b86669 100644 --- a/rpc/grpc/client/src/lib.rs +++ b/rpc/grpc/client/src/lib.rs @@ -134,7 +134,7 @@ impl GrpcClient { if reconnect { // Start the connection monitor - inner.clone().spawn_connection_monitor(notifier.clone(), subscriptions.clone()); + inner.clone().spawn_connection_monitor(notifier.clone(), subscriptions.clone(), subscription_context.clone()); } let client_id = u64::from_le_bytes(rand::random::<[u8; 8]>()); @@ -562,6 +562,7 @@ impl Inner { self: Arc, notifier: Option>, subscriptions: Option>, + subscription_context: &SubscriptionContext, ) -> RpcResult<()> { assert_ne!( notifier.is_some(), @@ -593,7 +594,7 @@ impl Inner { let subscriptions = subscriptions.lock().await; for event in EVENT_TYPE_ARRAY { if subscriptions[event].active() { - self.clone().start_notify_to_client(subscriptions[event].scope()).await?; + self.clone().start_notify_to_client(subscriptions[event].scope(subscription_context)).await?; } } } @@ -775,6 +776,7 @@ impl Inner { self: Arc, notifier: Option>, subscriptions: Option>, + subscription_context: SubscriptionContext, ) { // Note: self is a cloned Arc here so that it can be used in the spawned task. @@ -797,7 +799,7 @@ impl Inner { _ = delay => { trace!("GRPC client: connection monitor task - running"); if !self.is_connected() { - match self.clone().reconnect(notifier.clone(), subscriptions.clone()).await { + match self.clone().reconnect(notifier.clone(), subscriptions.clone(), &subscription_context).await { Ok(_) => { trace!("GRPC client: reconnection to server succeeded"); }, diff --git a/testing/integration/src/notify_benchmarks.rs b/testing/integration/src/notify_benchmarks.rs index 9cecd5658..5c2e3e219 100644 --- a/testing/integration/src/notify_benchmarks.rs +++ b/testing/integration/src/notify_benchmarks.rs @@ -60,8 +60,8 @@ async fn bench_utxos_changed_subscriptions_footprint() { #[cfg(not(feature = "heap"))] const MAX_MEMORY: u64 = 31_000_000_000; - const NOTIFY_CLIENTS: usize = 200; - const MAX_ADDRESSES: usize = 250_000; + const NOTIFY_CLIENTS: usize = 400; + const MAX_ADDRESSES: usize = 1_00_000; let tick_service = Arc::new(TickService::new()); let memory_monitor = MemoryMonitor::new(tick_service.clone(), Duration::from_secs(1), MAX_MEMORY);