Skip to content

Commit

Permalink
Use a subscription context in compounded::UtxosChangedSubscription
Browse files Browse the repository at this point in the history
…and in `Subscription` trait
  • Loading branch information
tiram88 committed Jan 21, 2024
1 parent d092158 commit d33808d
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 84 deletions.
51 changes: 40 additions & 11 deletions notify/src/address/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use indexmap::{map::Entry, IndexMap};
use kaspa_addresses::{Address, Prefix};
use kaspa_consensus_core::tx::ScriptPublicKey;
use kaspa_core::trace;
use kaspa_txscript::{extract_script_pub_key_address, pay_to_address_script};
use parking_lot::RwLock;
use std::{
Expand All @@ -12,6 +13,7 @@ pub trait Indexer {
fn contains(&self, index: Index) -> bool;
fn insert(&mut self, index: Index) -> bool;
fn remove(&mut self, index: Index) -> bool;
fn unlock(&mut self);
}

pub type Index = u32;
Expand All @@ -21,11 +23,24 @@ pub type RefCount = u16;
pub struct Counter {
pub index: Index,
pub count: RefCount,
pub locked: bool,
}

impl Counter {
pub fn new(index: Index, count: RefCount) -> Self {
Self { index, count }
Self { index, count, locked: false }
}

pub fn active(&self) -> bool {
self.count > 0
}

pub fn locked(&self) -> bool {
self.locked
}

pub fn unlock(&mut self) {
self.locked = false
}
}

Expand All @@ -47,6 +62,7 @@ impl Ord for Counter {
}
}

#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Counters(Vec<Counter>);

impl Counters {
Expand Down Expand Up @@ -78,11 +94,15 @@ impl Indexer for Counters {
}

fn insert(&mut self, index: Index) -> bool {
let item = Counter::new(index, 1);
let item = Counter { index, count: 1, locked: true };
match self.0.binary_search(&item) {
Ok(rank) => {
self.0[rank].count += 1;
false
let counter = self.0.get_mut(rank).unwrap();
if !counter.locked {
counter.locked = true;
counter.count += 1;
}
counter.count == 1
}
Err(rank) => {
self.0.insert(rank, item);
Expand All @@ -94,17 +114,22 @@ impl Indexer for Counters {
fn remove(&mut self, index: Index) -> bool {
match self.0.binary_search(&Counter::new(index, 0)) {
Ok(rank) => {
if self.0[rank].count == 1 {
self.0.remove(rank);
true
let counter = self.0.get_mut(rank).unwrap();
if counter.count > 0 && !counter.locked {
counter.locked = true;
counter.count -= 1;
counter.count == 0
} else {
self.0[rank].count -= 1;
false
}
}
Err(_) => false,
}
}

fn unlock(&mut self) {
self.0.iter_mut().for_each(Counter::unlock);
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -157,6 +182,8 @@ impl Indexer for Indexes {
Err(_) => false,
}
}

fn unlock(&mut self) {}
}

impl From<Vec<Index>> for Indexes {
Expand Down Expand Up @@ -204,7 +231,7 @@ impl Inner {
Entry::Occupied(entry) => entry.index() as Index,
Entry::Vacant(entry) => {
let index = entry.index() as Index;
// trace!("AddressTracker insert #{} {}", index, extract_script_pub_key_address(entry.key(), Prefix::Mainnet).unwrap());
trace!("AddressTracker insert #{} {}", index, extract_script_pub_key_address(entry.key(), Prefix::Mainnet).unwrap());
let _ = *entry.insert(0);
index
}
Expand All @@ -214,7 +241,7 @@ impl Inner {
fn inc_count(&mut self, index: Index) {
if let Some((_, count)) = self.script_pub_keys.get_index_mut(index as usize) {
*count += 1;
// trace!("AddressTracker inc count #{} to {}", index, *count);
trace!("AddressTracker inc count #{} to {}", index, *count);
}
}

Expand All @@ -224,7 +251,7 @@ impl Inner {
panic!("Address tracker is trying to decrease an address counter that is already at zero");
}
*count -= 1;
// trace!("AddressTracker dec count #{} to {}", index, *count);
trace!("AddressTracker dec count #{} to {}", index, *count);
}
}

Expand Down Expand Up @@ -301,6 +328,7 @@ impl Tracker {

pub fn register<T: Indexer>(&self, indexes: &mut T, addresses: &[Address]) -> Vec<Address> {
let mut added = Vec::with_capacity(addresses.len());
indexes.unlock();
for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) {
let mut inner = self.inner.write();
for address in chunk {
Expand All @@ -317,6 +345,7 @@ impl Tracker {

pub fn unregister<T: Indexer>(&self, indexes: &mut T, addresses: &[Address]) -> Vec<Address> {
let mut removed = Vec::with_capacity(addresses.len());
indexes.unlock();
for chunk in addresses.chunks(Self::ADDRESS_CHUNK_SIZE) {
let mut inner = self.inner.write();
for address in chunk {
Expand Down
15 changes: 11 additions & 4 deletions notify/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ where
self.broadcasters.iter().try_for_each(|broadcaster| broadcaster.unregister(event, id))?;
}
}
self.apply_mutations(event, mutations)?;
self.apply_mutations(event, mutations, &self.subscription_context)?;
} else {
trace!("[Notifier {}] {command} notifying listener {id} about {scope:?} is ignored (no mutation)", self.name);
// In case we have a sync channel, report that the command was processed.
Expand All @@ -378,12 +378,12 @@ where
Ok(())
}

fn apply_mutations(&self, event: EventType, mutations: Vec<Mutation>) -> Result<()> {
fn apply_mutations(&self, event: EventType, mutations: Vec<Mutation>, context: &SubscriptionContext) -> Result<()> {
let mut subscriptions = self.subscriptions.lock();
// Compound mutations
let mut compound_result = None;
for mutation in mutations {
compound_result = subscriptions[event].compound(mutation);
compound_result = subscriptions[event].compound(mutation, context);
}
// Report to the parents
if let Some(mutation) = compound_result {
Expand All @@ -410,7 +410,7 @@ where
fn renew_subscriptions(&self) -> Result<()> {
let subscriptions = self.subscriptions.lock();
EVENT_TYPE_ARRAY.iter().copied().filter(|x| self.enabled_events[*x] && subscriptions[*x].active()).try_for_each(|x| {
let mutation = Mutation::new(Command::Start, subscriptions[x].scope());
let mutation = Mutation::new(Command::Start, subscriptions[x].scope(&self.subscription_context));
self.subscribers.iter().try_for_each(|subscriber| subscriber.mutate(mutation.clone()))?;
Ok(())
})
Expand Down Expand Up @@ -848,6 +848,13 @@ mod tests {
"{} - {}: the listener[{}] mutation {mutation:?} yielded the wrong subscription",
self.name, step.name, idx
);
assert!(
self.subscription_receiver.is_empty(),
"{} - {}: listener[{}] mutation {mutation:?} yielded an extra subscription but should not",
self.name,
step.name,
idx
);
} else {
assert!(
self.subscription_receiver.is_empty(),
Expand Down
Loading

0 comments on commit d33808d

Please sign in to comment.