Skip to content

Commit

Permalink
Add a sharable subscription context to the notification system
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Jan 16, 2024
1 parent b8e5dad commit 3362171
Show file tree
Hide file tree
Showing 31 changed files with 338 additions and 100 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions consensus/notify/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kaspa_core::{
use kaspa_notify::{
events::{EventSwitches, EVENT_TYPE_ARRAY},
subscriber::Subscriber,
subscription::{MutationPolicies, UtxosChangedMutationPolicy},
subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
};
use kaspa_utils::triggers::SingleTrigger;
use std::sync::Arc;
Expand All @@ -25,12 +25,24 @@ pub struct NotifyService {
}

impl NotifyService {
pub fn new(root: Arc<ConsensusNotificationRoot>, notification_receiver: Receiver<Notification>) -> Self {
pub fn new(
root: Arc<ConsensusNotificationRoot>,
notification_receiver: Receiver<Notification>,
subscription_context: SubscriptionContext,
) -> Self {
let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into();
let collector = Arc::new(ConsensusCollector::new(NOTIFY_SERVICE, notification_receiver, Arc::new(ConsensusConverter::new())));
let subscriber = Arc::new(Subscriber::new(NOTIFY_SERVICE, root_events, root, 0));
let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing);
let notifier = Arc::new(ConsensusNotifier::new(NOTIFY_SERVICE, root_events, vec![collector], vec![subscriber], 1, policies));
let notifier = Arc::new(ConsensusNotifier::new(
NOTIFY_SERVICE,
root_events,
vec![collector],
vec![subscriber],
subscription_context,
1,
policies,
));
Self { notifier, shutdown: SingleTrigger::default() }
}

Expand Down
10 changes: 7 additions & 3 deletions indexes/processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use kaspa_notify::{
connection::ChannelType,
events::{EventSwitches, EventType},
scope::{PruningPointUtxoSetOverrideScope, UtxosChangedScope},
subscription::{MutationPolicies, UtxosChangedMutationPolicy},
subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy},
};
use kaspa_utils::{channel::Channel, triggers::SingleTrigger};
use kaspa_utxoindex::api::UtxoIndexProxy;
Expand All @@ -26,7 +26,11 @@ pub struct IndexService {
}

impl IndexService {
pub fn new(consensus_notifier: &Arc<ConsensusNotifier>, utxoindex: Option<UtxoIndexProxy>) -> Self {
pub fn new(
consensus_notifier: &Arc<ConsensusNotifier>,
subscription_context: SubscriptionContext,
utxoindex: Option<UtxoIndexProxy>,
) -> Self {
// Prepare consensus-notify objects
let consensus_notify_channel = Channel::<ConsensusNotification>::default();
let consensus_notify_listener_id = consensus_notifier.register_new_listener(ConsensusChannelConnection::new(
Expand All @@ -40,7 +44,7 @@ impl IndexService {
let events: EventSwitches = [EventType::UtxosChanged, EventType::PruningPointUtxoSetOverride].as_ref().into();
let collector = Arc::new(Processor::new(utxoindex.clone(), consensus_notify_channel.receiver()));
let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing);
let notifier = Arc::new(IndexNotifier::new(INDEX_SERVICE, events, vec![collector], vec![], 1, policies));
let notifier = Arc::new(IndexNotifier::new(INDEX_SERVICE, events, vec![collector], vec![], subscription_context, 1, policies));

// Manually subscribe to index-processor related event types
consensus_notifier
Expand Down
1 change: 1 addition & 0 deletions kaspad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ kaspa-grpc-server.workspace = true
kaspa-hashes.workspace = true
kaspa-index-processor.workspace = true
kaspa-mining.workspace = true
kaspa-notify.workspace = true
kaspa-p2p-flows.workspace = true
kaspa-perf-monitor.workspace = true
kaspa-rpc-core.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use kaspa_core::{core::Core, info, trace};
use kaspa_core::{kaspad_env::version, task::tick::TickService};
use kaspa_database::prelude::CachePolicy;
use kaspa_grpc_server::service::GrpcService;
use kaspa_notify::subscription::context::SubscriptionContext;
use kaspa_rpc_service::service::RpcCoreService;
use kaspa_txscript::caches::TxScriptCacheCounters;
use kaspa_utils::networking::ContextualNetAddress;
Expand Down Expand Up @@ -395,7 +396,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
Arc::new(perf_monitor_builder.build())
};

let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv));
let subscription_context = SubscriptionContext::new();
let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv, subscription_context.clone()));
let index_service: Option<Arc<IndexService>> = if args.utxoindex {
// Use only a single thread for none-consensus databases
let utxoindex_db = kaspa_database::prelude::ConnBuilder::default()
Expand All @@ -404,7 +406,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
.build()
.unwrap();
let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap());
let index_service = Arc::new(IndexService::new(&notify_service.notifier(), Some(utxoindex)));
let index_service = Arc::new(IndexService::new(&notify_service.notifier(), subscription_context.clone(), Some(utxoindex)));
Some(index_service)
} else {
None
Expand Down Expand Up @@ -448,6 +450,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
index_service.as_ref().map(|x| x.notifier()),
mining_manager,
flow_context,
subscription_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config.clone(),
core.clone(),
Expand Down
8 changes: 5 additions & 3 deletions notify/src/address/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ pub mod test_helpers {
use kaspa_addresses::Address;
use kaspa_addresses::{Prefix, Version};

pub const ADDRESS_PREFIX: Prefix = Prefix::Mainnet;

pub fn get_3_addresses(sorted: bool) -> Vec<Address> {
let mut addresses = vec![
Address::new(Prefix::Mainnet, Version::PubKey, &[1u8; 32]),
Address::new(Prefix::Mainnet, Version::PubKey, &[2u8; 32]),
Address::new(Prefix::Mainnet, Version::PubKey, &[0u8; 32]),
Address::new(ADDRESS_PREFIX, Version::PubKey, &[1u8; 32]),
Address::new(ADDRESS_PREFIX, Version::PubKey, &[2u8; 32]),
Address::new(ADDRESS_PREFIX, Version::PubKey, &[0u8; 32]),
];
if sorted {
addresses.sort()
Expand Down
4 changes: 3 additions & 1 deletion notify/src/address/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl From<Vec<AddressIndex>> for AddressIndexes {
}
}

pub struct Inner {
#[derive(Debug)]
struct Inner {
script_pub_keys: IndexMap<ScriptPublicKey, RefCount>,
}

Expand Down Expand Up @@ -152,6 +153,7 @@ impl Inner {
}

/// Tracker of multiple [`Address`](kaspa_addresses::Address), indexing and counting registrations
#[derive(Debug)]
pub struct AddressTracker {
inner: RwLock<Inner>,
}
Expand Down
6 changes: 5 additions & 1 deletion notify/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
notifier::test_helpers::{
overall_test_steps, utxos_changed_test_steps, virtual_chain_changed_test_steps, Step, TestConnection,
},
subscription::context::SubscriptionContext,
};
use async_channel::{unbounded, Sender};

Expand All @@ -256,6 +257,7 @@ mod tests {
broadcaster: Arc<TestBroadcaster>,
/// Listeners, vector index = ListenerId
listeners: Vec<Listener<TestConnection>>,
subscription_context: SubscriptionContext,
ctl_sender: Sender<Ctl<TestConnection>>,
sync_receiver: Receiver<()>,
notification_sender: Sender<TestNotification>,
Expand All @@ -270,6 +272,7 @@ mod tests {
let (notification_sender, notification_receiver) = unbounded();
let broadcaster = Arc::new(TestBroadcaster::new(IDENT, 0, notification_receiver, Some(sync_sender)));
let mut listeners = Vec::with_capacity(listener_count);
let subscription_context = SubscriptionContext::new();
let mut notification_receivers = Vec::with_capacity(listener_count);
for i in 0..listener_count {
let (sender, receiver) = unbounded();
Expand All @@ -282,6 +285,7 @@ mod tests {
name,
broadcaster: broadcaster.clone(),
listeners,
subscription_context,
ctl_sender: broadcaster.ctl.sender.clone(),
sync_receiver,
notification_sender,
Expand All @@ -299,7 +303,7 @@ mod tests {
for (idx, mutation) in step.mutations.iter().enumerate() {
if let Some(ref mutation) = mutation {
let event = mutation.event_type();
if self.listeners[idx].mutate(mutation.clone(), Default::default()).is_some() {
if self.listeners[idx].mutate(mutation.clone(), Default::default(), &self.subscription_context).is_some() {
let ctl = match mutation.active() {
true => Ctl::Register(
self.listeners[idx].subscriptions[event].clone(),
Expand Down
6 changes: 3 additions & 3 deletions notify/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;
extern crate derive_more;
use crate::subscription::{DynSubscription, MutateSingle, MutationPolicies};
use crate::subscription::{context::SubscriptionContext, DynSubscription, MutateSingle, MutationPolicies};

use super::{
connection::Connection,
Expand Down Expand Up @@ -37,9 +37,9 @@ where
///
/// Return Some mutations to be applied to a compounded state if any change occurred
/// in the subscription state and None otherwise.
pub fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies) -> Option<Vec<Mutation>> {
pub fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, context: &SubscriptionContext) -> Option<Vec<Mutation>> {
let event_type = mutation.event_type();
self.subscriptions[event_type].mutate(mutation, policies, self.id)
self.subscriptions[event_type].mutate(mutation, policies, context, self.id)
}

pub fn close(&self) {
Expand Down
37 changes: 33 additions & 4 deletions notify/src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{events::EVENT_TYPE_ARRAY, subscription::MutationPolicies};
use crate::{
events::EVENT_TYPE_ARRAY,
subscription::{context::SubscriptionContext, MutationPolicies},
};

use super::{
broadcaster::Broadcaster,
Expand Down Expand Up @@ -80,22 +83,39 @@ where
enabled_events: EventSwitches,
collectors: Vec<DynCollector<N>>,
subscribers: Vec<Arc<Subscriber>>,
subscription_context: SubscriptionContext,
broadcasters: usize,
policies: MutationPolicies,
) -> Self {
Self::with_sync(name, enabled_events, collectors, subscribers, broadcasters, policies, None)
Self::with_sync(name, enabled_events, collectors, subscribers, subscription_context, broadcasters, policies, None)
}

pub fn with_sync(
name: &'static str,
enabled_events: EventSwitches,
collectors: Vec<DynCollector<N>>,
subscribers: Vec<Arc<Subscriber>>,
subscription_context: SubscriptionContext,
broadcasters: usize,
policies: MutationPolicies,
_sync: Option<Sender<()>>,
) -> Self {
Self { inner: Arc::new(Inner::new(name, enabled_events, collectors, subscribers, broadcasters, policies, _sync)) }
Self {
inner: Arc::new(Inner::new(
name,
enabled_events,
collectors,
subscribers,
subscription_context,
broadcasters,
policies,
_sync,
)),
}
}

pub fn subscription_context(&self) -> &SubscriptionContext {
&self.inner.subscription_context
}

pub fn start(self: Arc<Self>) {
Expand Down Expand Up @@ -193,6 +213,9 @@ where
/// Subscribers
subscribers: Vec<Arc<Subscriber>>,

/// Subscription context
subscription_context: SubscriptionContext,

/// Mutation policies
policies: MutationPolicies,

Expand All @@ -213,6 +236,7 @@ where
enabled_events: EventSwitches,
collectors: Vec<DynCollector<N>>,
subscribers: Vec<Arc<Subscriber>>,
subscription_context: SubscriptionContext,
broadcasters: usize,
policies: MutationPolicies,
_sync: Option<Sender<()>>,
Expand All @@ -231,6 +255,7 @@ where
broadcasters,
collectors,
subscribers,
subscription_context,
policies,
name,
_sync,
Expand Down Expand Up @@ -317,7 +342,9 @@ where
) -> Result<()> {
let event: EventType = (&scope).into();
debug!("[Notifier {}] {command} notifying about {scope} to listener {id} - {}", self.name, listener.connection());
if let Some(mutations) = listener.mutate(Mutation::new(command, scope.clone()), self.policies.clone()) {
if let Some(mutations) =
listener.mutate(Mutation::new(command, scope.clone()), self.policies.clone(), &self.subscription_context)
{
trace!("[Notifier {}] {command} notifying listener {id} about {scope:?} involves mutations {mutations:?}", self.name);
// Update broadcasters
match listener.subscriptions[event].active() {
Expand Down Expand Up @@ -746,13 +773,15 @@ mod tests {
let (subscription_sender, subscription_receiver) = unbounded();
let collector = Arc::new(TestCollector::new(IDENT, notification_receiver, Arc::new(TestConverter::new())));
let subscription_manager = Arc::new(SubscriptionManagerMock::new(subscription_sender));
let subscription_context = SubscriptionContext::new();
let subscriber =
Arc::new(Subscriber::new("test", EVENT_TYPE_ARRAY[..].into(), subscription_manager, SUBSCRIPTION_MANAGER_ID));
let notifier = Arc::new(TestNotifier::with_sync(
"test",
EVENT_TYPE_ARRAY[..].into(),
vec![collector],
vec![subscriber],
subscription_context,
1,
Default::default(),
Some(sync_sender),
Expand Down
22 changes: 17 additions & 5 deletions notify/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
scope::Scope,
subscriber::SubscriptionManager,
subscription::{
array::ArrayBuilder, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies, UtxosChangedMutationPolicy,
array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies,
UtxosChangedMutationPolicy,
},
};
use async_channel::Sender;
Expand Down Expand Up @@ -36,7 +37,12 @@ where
N: Notification,
{
pub fn new(sender: Sender<N>) -> Self {
let inner = Arc::new(Inner::new(sender));
let subscription_context = SubscriptionContext::new();
Self::with_context(sender, subscription_context)
}

pub fn with_context(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
let inner = Arc::new(Inner::new(sender, subscription_context));
Self { inner }
}

Expand Down Expand Up @@ -92,6 +98,7 @@ where
{
sender: Sender<N>,
subscriptions: RwLock<EventArray<DynSubscription>>,
subscription_context: SubscriptionContext,
policies: MutationPolicies,
}

Expand All @@ -101,10 +108,10 @@ where
{
const ROOT_LISTENER_ID: ListenerId = 1;

fn new(sender: Sender<N>) -> Self {
fn new(sender: Sender<N>, subscription_context: SubscriptionContext) -> Self {
let subscriptions = RwLock::new(ArrayBuilder::single());
let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing);
Self { sender, subscriptions, policies }
Self { sender, subscriptions, subscription_context, policies }
}

fn send(&self, notification: N) -> Result<()> {
Expand All @@ -119,7 +126,12 @@ where
pub fn execute_subscribe_command(&self, scope: Scope, command: Command) -> Result<()> {
let mutation = Mutation::new(command, scope);
let mut subscriptions = self.subscriptions.write();
subscriptions[mutation.event_type()].mutate(mutation, self.policies.clone(), Self::ROOT_LISTENER_ID);
subscriptions[mutation.event_type()].mutate(
mutation,
self.policies.clone(),
&self.subscription_context,
Self::ROOT_LISTENER_ID,
);
Ok(())
}

Expand Down
Loading

0 comments on commit 3362171

Please sign in to comment.