diff --git a/Cargo.lock b/Cargo.lock index 565320cae..3123944e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,6 +3243,7 @@ dependencies = [ "kaspa-hashes", "kaspa-index-processor", "kaspa-mining", + "kaspa-notify", "kaspa-p2p-flows", "kaspa-perf-monitor", "kaspa-rpc-core", @@ -4510,6 +4511,7 @@ dependencies = [ "kaspa-consensus-core", "kaspa-core", "kaspa-grpc-client", + "kaspa-notify", "kaspa-rpc-core", "kaspa-txscript", "kaspa-utils", diff --git a/consensus/notify/src/service.rs b/consensus/notify/src/service.rs index 128563c94..d3e219d60 100644 --- a/consensus/notify/src/service.rs +++ b/consensus/notify/src/service.rs @@ -12,7 +12,7 @@ use kaspa_core::{ use kaspa_notify::{ events::{EventSwitches, EVENT_TYPE_ARRAY}, subscriber::Subscriber, - subscription::{MutationPolicies, UtxosChangedMutationPolicy}, + subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_utils::triggers::SingleTrigger; use std::sync::Arc; @@ -25,12 +25,24 @@ pub struct NotifyService { } impl NotifyService { - pub fn new(root: Arc, notification_receiver: Receiver) -> Self { + pub fn new( + root: Arc, + notification_receiver: Receiver, + subscription_context: SubscriptionContext, + ) -> Self { let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into(); let collector = Arc::new(ConsensusCollector::new(NOTIFY_SERVICE, notification_receiver, Arc::new(ConsensusConverter::new()))); let subscriber = Arc::new(Subscriber::new(NOTIFY_SERVICE, root_events, root, 0)); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - let notifier = Arc::new(ConsensusNotifier::new(NOTIFY_SERVICE, root_events, vec![collector], vec![subscriber], 1, policies)); + let notifier = Arc::new(ConsensusNotifier::new( + NOTIFY_SERVICE, + root_events, + vec![collector], + vec![subscriber], + subscription_context, + 1, + policies, + )); Self { notifier, shutdown: SingleTrigger::default() } } diff --git a/indexes/processor/src/service.rs b/indexes/processor/src/service.rs index 5c9eeeb88..e574edf27 100644 --- a/indexes/processor/src/service.rs +++ b/indexes/processor/src/service.rs @@ -11,7 +11,7 @@ use kaspa_notify::{ connection::ChannelType, events::{EventSwitches, EventType}, scope::{PruningPointUtxoSetOverrideScope, UtxosChangedScope}, - subscription::{MutationPolicies, UtxosChangedMutationPolicy}, + subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_utils::{channel::Channel, triggers::SingleTrigger}; use kaspa_utxoindex::api::UtxoIndexProxy; @@ -26,7 +26,11 @@ pub struct IndexService { } impl IndexService { - pub fn new(consensus_notifier: &Arc, utxoindex: Option) -> Self { + pub fn new( + consensus_notifier: &Arc, + subscription_context: SubscriptionContext, + utxoindex: Option, + ) -> Self { // Prepare consensus-notify objects let consensus_notify_channel = Channel::::default(); let consensus_notify_listener_id = consensus_notifier.register_new_listener(ConsensusChannelConnection::new( @@ -40,7 +44,7 @@ impl IndexService { let events: EventSwitches = [EventType::UtxosChanged, EventType::PruningPointUtxoSetOverride].as_ref().into(); let collector = Arc::new(Processor::new(utxoindex.clone(), consensus_notify_channel.receiver())); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - let notifier = Arc::new(IndexNotifier::new(INDEX_SERVICE, events, vec![collector], vec![], 1, policies)); + let notifier = Arc::new(IndexNotifier::new(INDEX_SERVICE, events, vec![collector], vec![], subscription_context, 1, policies)); // Manually subscribe to index-processor related event types consensus_notifier diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 377485065..575696e50 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -27,6 +27,7 @@ kaspa-grpc-server.workspace = true kaspa-hashes.workspace = true kaspa-index-processor.workspace = true kaspa-mining.workspace = true +kaspa-notify.workspace = true kaspa-p2p-flows.workspace = true kaspa-perf-monitor.workspace = true kaspa-rpc-core.workspace = true diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 896cc9d22..79d976329 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -10,6 +10,7 @@ use kaspa_core::{core::Core, info, trace}; use kaspa_core::{kaspad_env::version, task::tick::TickService}; use kaspa_database::prelude::CachePolicy; use kaspa_grpc_server::service::GrpcService; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_rpc_service::service::RpcCoreService; use kaspa_txscript::caches::TxScriptCacheCounters; use kaspa_utils::networking::ContextualNetAddress; @@ -395,7 +396,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm Arc::new(perf_monitor_builder.build()) }; - let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv)); + let subscription_context = SubscriptionContext::new(); + let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv, subscription_context.clone())); let index_service: Option> = if args.utxoindex { // Use only a single thread for none-consensus databases let utxoindex_db = kaspa_database::prelude::ConnBuilder::default() @@ -404,7 +406,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm .build() .unwrap(); let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap()); - let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(utxoindex))); + let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), subscription_context.clone(), Some(utxoindex))); Some(index_service) } else { None @@ -448,6 +450,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm index_service.as_ref().map(|x| x.notifier()), mining_manager, flow_context, + subscription_context, index_service.as_ref().map(|x| x.utxoindex().unwrap()), config.clone(), core.clone(), diff --git a/notify/src/address/mod.rs b/notify/src/address/mod.rs index 100a125da..ff56e49b2 100644 --- a/notify/src/address/mod.rs +++ b/notify/src/address/mod.rs @@ -5,11 +5,13 @@ pub mod test_helpers { use kaspa_addresses::Address; use kaspa_addresses::{Prefix, Version}; + pub const ADDRESS_PREFIX: Prefix = Prefix::Mainnet; + pub fn get_3_addresses(sorted: bool) -> Vec
{ let mut addresses = vec![ - Address::new(Prefix::Mainnet, Version::PubKey, &[1u8; 32]), - Address::new(Prefix::Mainnet, Version::PubKey, &[2u8; 32]), - Address::new(Prefix::Mainnet, Version::PubKey, &[0u8; 32]), + Address::new(ADDRESS_PREFIX, Version::PubKey, &[1u8; 32]), + Address::new(ADDRESS_PREFIX, Version::PubKey, &[2u8; 32]), + Address::new(ADDRESS_PREFIX, Version::PubKey, &[0u8; 32]), ]; if sorted { addresses.sort() diff --git a/notify/src/address/tracker.rs b/notify/src/address/tracker.rs index f73af340a..4a48b5c45 100644 --- a/notify/src/address/tracker.rs +++ b/notify/src/address/tracker.rs @@ -68,7 +68,8 @@ impl From> for AddressIndexes { } } -pub struct Inner { +#[derive(Debug)] +struct Inner { script_pub_keys: IndexMap, } @@ -152,6 +153,7 @@ impl Inner { } /// Tracker of multiple [`Address`](kaspa_addresses::Address), indexing and counting registrations +#[derive(Debug)] pub struct AddressTracker { inner: RwLock, } diff --git a/notify/src/broadcaster.rs b/notify/src/broadcaster.rs index 1d0607d89..457453ce0 100644 --- a/notify/src/broadcaster.rs +++ b/notify/src/broadcaster.rs @@ -246,6 +246,7 @@ mod tests { notifier::test_helpers::{ overall_test_steps, utxos_changed_test_steps, virtual_chain_changed_test_steps, Step, TestConnection, }, + subscription::context::SubscriptionContext, }; use async_channel::{unbounded, Sender}; @@ -256,6 +257,7 @@ mod tests { broadcaster: Arc, /// Listeners, vector index = ListenerId listeners: Vec>, + subscription_context: SubscriptionContext, ctl_sender: Sender>, sync_receiver: Receiver<()>, notification_sender: Sender, @@ -270,6 +272,7 @@ mod tests { let (notification_sender, notification_receiver) = unbounded(); let broadcaster = Arc::new(TestBroadcaster::new(IDENT, 0, notification_receiver, Some(sync_sender))); let mut listeners = Vec::with_capacity(listener_count); + let subscription_context = SubscriptionContext::new(); let mut notification_receivers = Vec::with_capacity(listener_count); for i in 0..listener_count { let (sender, receiver) = unbounded(); @@ -282,6 +285,7 @@ mod tests { name, broadcaster: broadcaster.clone(), listeners, + subscription_context, ctl_sender: broadcaster.ctl.sender.clone(), sync_receiver, notification_sender, @@ -299,7 +303,7 @@ mod tests { for (idx, mutation) in step.mutations.iter().enumerate() { if let Some(ref mutation) = mutation { let event = mutation.event_type(); - if self.listeners[idx].mutate(mutation.clone(), Default::default()).is_some() { + if self.listeners[idx].mutate(mutation.clone(), Default::default(), &self.subscription_context).is_some() { let ctl = match mutation.active() { true => Ctl::Register( self.listeners[idx].subscriptions[event].clone(), diff --git a/notify/src/listener.rs b/notify/src/listener.rs index 485f9ac85..60cf632af 100644 --- a/notify/src/listener.rs +++ b/notify/src/listener.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; extern crate derive_more; -use crate::subscription::{DynSubscription, MutateSingle, MutationPolicies}; +use crate::subscription::{context::SubscriptionContext, DynSubscription, MutateSingle, MutationPolicies}; use super::{ connection::Connection, @@ -37,9 +37,9 @@ where /// /// Return Some mutations to be applied to a compounded state if any change occurred /// in the subscription state and None otherwise. - pub fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies) -> Option> { + pub fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, context: &SubscriptionContext) -> Option> { let event_type = mutation.event_type(); - self.subscriptions[event_type].mutate(mutation, policies, self.id) + self.subscriptions[event_type].mutate(mutation, policies, context, self.id) } pub fn close(&self) { diff --git a/notify/src/notifier.rs b/notify/src/notifier.rs index 4dfa884fd..658f9d4c3 100644 --- a/notify/src/notifier.rs +++ b/notify/src/notifier.rs @@ -1,4 +1,7 @@ -use crate::{events::EVENT_TYPE_ARRAY, subscription::MutationPolicies}; +use crate::{ + events::EVENT_TYPE_ARRAY, + subscription::{context::SubscriptionContext, MutationPolicies}, +}; use super::{ broadcaster::Broadcaster, @@ -80,10 +83,11 @@ where enabled_events: EventSwitches, collectors: Vec>, subscribers: Vec>, + subscription_context: SubscriptionContext, broadcasters: usize, policies: MutationPolicies, ) -> Self { - Self::with_sync(name, enabled_events, collectors, subscribers, broadcasters, policies, None) + Self::with_sync(name, enabled_events, collectors, subscribers, subscription_context, broadcasters, policies, None) } pub fn with_sync( @@ -91,11 +95,27 @@ where enabled_events: EventSwitches, collectors: Vec>, subscribers: Vec>, + subscription_context: SubscriptionContext, broadcasters: usize, policies: MutationPolicies, _sync: Option>, ) -> Self { - Self { inner: Arc::new(Inner::new(name, enabled_events, collectors, subscribers, broadcasters, policies, _sync)) } + Self { + inner: Arc::new(Inner::new( + name, + enabled_events, + collectors, + subscribers, + subscription_context, + broadcasters, + policies, + _sync, + )), + } + } + + pub fn subscription_context(&self) -> &SubscriptionContext { + &self.inner.subscription_context } pub fn start(self: Arc) { @@ -193,6 +213,9 @@ where /// Subscribers subscribers: Vec>, + /// Subscription context + subscription_context: SubscriptionContext, + /// Mutation policies policies: MutationPolicies, @@ -213,6 +236,7 @@ where enabled_events: EventSwitches, collectors: Vec>, subscribers: Vec>, + subscription_context: SubscriptionContext, broadcasters: usize, policies: MutationPolicies, _sync: Option>, @@ -231,6 +255,7 @@ where broadcasters, collectors, subscribers, + subscription_context, policies, name, _sync, @@ -317,7 +342,9 @@ where ) -> Result<()> { let event: EventType = (&scope).into(); debug!("[Notifier {}] {command} notifying about {scope} to listener {id} - {}", self.name, listener.connection()); - if let Some(mutations) = listener.mutate(Mutation::new(command, scope.clone()), self.policies.clone()) { + if let Some(mutations) = + listener.mutate(Mutation::new(command, scope.clone()), self.policies.clone(), &self.subscription_context) + { trace!("[Notifier {}] {command} notifying listener {id} about {scope:?} involves mutations {mutations:?}", self.name); // Update broadcasters match listener.subscriptions[event].active() { @@ -746,6 +773,7 @@ mod tests { let (subscription_sender, subscription_receiver) = unbounded(); let collector = Arc::new(TestCollector::new(IDENT, notification_receiver, Arc::new(TestConverter::new()))); let subscription_manager = Arc::new(SubscriptionManagerMock::new(subscription_sender)); + let subscription_context = SubscriptionContext::new(); let subscriber = Arc::new(Subscriber::new("test", EVENT_TYPE_ARRAY[..].into(), subscription_manager, SUBSCRIPTION_MANAGER_ID)); let notifier = Arc::new(TestNotifier::with_sync( @@ -753,6 +781,7 @@ mod tests { EVENT_TYPE_ARRAY[..].into(), vec![collector], vec![subscriber], + subscription_context, 1, Default::default(), Some(sync_sender), diff --git a/notify/src/root.rs b/notify/src/root.rs index b3936d2c4..017d01228 100644 --- a/notify/src/root.rs +++ b/notify/src/root.rs @@ -7,7 +7,8 @@ use crate::{ scope::Scope, subscriber::SubscriptionManager, subscription::{ - array::ArrayBuilder, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies, UtxosChangedMutationPolicy, + array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies, + UtxosChangedMutationPolicy, }, }; use async_channel::Sender; @@ -36,7 +37,12 @@ where N: Notification, { pub fn new(sender: Sender) -> Self { - let inner = Arc::new(Inner::new(sender)); + let subscription_context = SubscriptionContext::new(); + Self::with_context(sender, subscription_context) + } + + pub fn with_context(sender: Sender, subscription_context: SubscriptionContext) -> Self { + let inner = Arc::new(Inner::new(sender, subscription_context)); Self { inner } } @@ -92,6 +98,7 @@ where { sender: Sender, subscriptions: RwLock>, + subscription_context: SubscriptionContext, policies: MutationPolicies, } @@ -101,10 +108,10 @@ where { const ROOT_LISTENER_ID: ListenerId = 1; - fn new(sender: Sender) -> Self { + fn new(sender: Sender, subscription_context: SubscriptionContext) -> Self { let subscriptions = RwLock::new(ArrayBuilder::single()); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - Self { sender, subscriptions, policies } + Self { sender, subscriptions, subscription_context, policies } } fn send(&self, notification: N) -> Result<()> { @@ -119,7 +126,12 @@ where pub fn execute_subscribe_command(&self, scope: Scope, command: Command) -> Result<()> { let mutation = Mutation::new(command, scope); let mut subscriptions = self.subscriptions.write(); - subscriptions[mutation.event_type()].mutate(mutation, self.policies.clone(), Self::ROOT_LISTENER_ID); + subscriptions[mutation.event_type()].mutate( + mutation, + self.policies.clone(), + &self.subscription_context, + Self::ROOT_LISTENER_ID, + ); Ok(()) } diff --git a/notify/src/subscription/context.rs b/notify/src/subscription/context.rs new file mode 100644 index 000000000..f5fa3ffea --- /dev/null +++ b/notify/src/subscription/context.rs @@ -0,0 +1,34 @@ +use crate::address::tracker::AddressTracker; +use std::{ops::Deref, sync::Arc}; + +#[derive(Debug, Default)] +pub struct SubscriptionContextInner { + pub address_tracker: AddressTracker, +} + +impl SubscriptionContextInner { + pub fn new() -> Self { + let address_tracker = AddressTracker::new(); + Self { address_tracker } + } +} + +#[derive(Clone, Debug, Default)] +pub struct SubscriptionContext { + inner: Arc, +} + +impl SubscriptionContext { + pub fn new() -> Self { + let inner = Arc::new(SubscriptionContextInner::new()); + Self { inner } + } +} + +impl Deref for SubscriptionContext { + type Target = SubscriptionContextInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} diff --git a/notify/src/subscription/mod.rs b/notify/src/subscription/mod.rs index 3cf3f52f8..e00fe7981 100644 --- a/notify/src/subscription/mod.rs +++ b/notify/src/subscription/mod.rs @@ -1,6 +1,6 @@ -use crate::listener::ListenerId; - -use super::{events::EventType, notification::Notification, scope::Scope}; +use crate::{ + events::EventType, listener::ListenerId, notification::Notification, scope::Scope, subscription::context::SubscriptionContext, +}; use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -14,6 +14,7 @@ use std::{ pub mod array; pub mod compounded; +pub mod context; pub mod single; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] @@ -118,17 +119,30 @@ pub trait Single: Subscription + AsAny + DynHash + DynEq + Debug + Send + Sync { &self, mutation: Mutation, policies: MutationPolicies, + context: &SubscriptionContext, listener_id: ListenerId, ) -> Option<(DynSubscription, Vec)>; } pub trait MutateSingle: Deref { - fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, listener_id: ListenerId) -> Option>; + fn mutate( + &mut self, + mutation: Mutation, + policies: MutationPolicies, + context: &SubscriptionContext, + listener_id: ListenerId, + ) -> Option>; } impl MutateSingle for Arc { - fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, listener_id: ListenerId) -> Option> { - self.mutated_and_mutations(mutation, policies, listener_id).map(|(mutated, mutations)| { + fn mutate( + &mut self, + mutation: Mutation, + policies: MutationPolicies, + context: &SubscriptionContext, + listener_id: ListenerId, + ) -> Option> { + self.mutated_and_mutations(mutation, policies, context, listener_id).map(|(mutated, mutations)| { *self = mutated; mutations }) diff --git a/notify/src/subscription/single.rs b/notify/src/subscription/single.rs index 491467a6d..66ededa1d 100644 --- a/notify/src/subscription/single.rs +++ b/notify/src/subscription/single.rs @@ -2,7 +2,10 @@ use crate::{ events::EventType, listener::ListenerId, scope::{Scope, UtxosChangedScope, VirtualChainChangedScope}, - subscription::{Command, DynSubscription, Mutation, MutationPolicies, Single, Subscription, UtxosChangedMutationPolicy}, + subscription::{ + context::SubscriptionContext, Command, DynSubscription, Mutation, MutationPolicies, Single, Subscription, + UtxosChangedMutationPolicy, + }, }; use itertools::Itertools; use kaspa_addresses::Address; @@ -40,6 +43,7 @@ impl Single for OverallSubscription { &self, mutation: Mutation, _: MutationPolicies, + _: &SubscriptionContext, _: ListenerId, ) -> Option<(DynSubscription, Vec)> { assert_eq!(self.event_type(), mutation.event_type()); @@ -89,6 +93,7 @@ impl Single for VirtualChainChangedSubscription { &self, mutation: Mutation, _: MutationPolicies, + _: &SubscriptionContext, _: ListenerId, ) -> Option<(DynSubscription, Vec)> { assert_eq!(self.event_type(), mutation.event_type()); @@ -246,6 +251,7 @@ impl Single for UtxosChangedSubscription { &self, mutation: Mutation, policies: MutationPolicies, + _: &SubscriptionContext, listener_id: ListenerId, ) -> Option<(DynSubscription, Vec)> { assert_eq!(self.event_type(), mutation.event_type()); @@ -546,19 +552,21 @@ mod tests { struct MutationTests { tests: Vec, + context: SubscriptionContext, } impl MutationTests { pub const LISTENER_ID: ListenerId = 1; fn new(tests: Vec) -> Self { - Self { tests } + let context = SubscriptionContext::new(); + Self { tests, context } } fn run(&self) { for test in self.tests.iter() { let mut new_state = test.state.clone(); - let result = new_state.mutate(test.mutation.clone(), Default::default(), Self::LISTENER_ID); + let result = new_state.mutate(test.mutation.clone(), Default::default(), &self.context, Self::LISTENER_ID); assert_eq!(test.new_state.active(), new_state.active(), "Testing '{}': wrong new state activity", test.name); assert_eq!(*test.new_state, *new_state, "Testing '{}': wrong new state", test.name); assert_eq!(test.result, result, "Testing '{}': wrong result", test.name); diff --git a/rothschild/Cargo.toml b/rothschild/Cargo.toml index 1a64be73f..cec2bf56a 100644 --- a/rothschild/Cargo.toml +++ b/rothschild/Cargo.toml @@ -8,20 +8,21 @@ include.workspace = true license.workspace = true [dependencies] -kaspa-core.workspace = true +kaspa-addresses.workspace = true kaspa-consensus-core.workspace = true +kaspa-core.workspace = true kaspa-grpc-client.workspace = true +kaspa-notify.workspace = true kaspa-rpc-core.workspace = true -kaspa-addresses.workspace = true kaspa-txscript.workspace = true kaspa-utils.workspace = true -async-channel.workspace = true -parking_lot.workspace = true +async-channel.workspace = true clap.workspace = true faster-hex.workspace = true itertools.workspace = true log.workspace = true +parking_lot.workspace = true rayon.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std"] } tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } diff --git a/rothschild/src/main.rs b/rothschild/src/main.rs index 3137ad9bc..185916fe9 100644 --- a/rothschild/src/main.rs +++ b/rothschild/src/main.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use clap::{Arg, ArgAction, Command}; use itertools::Itertools; -use kaspa_addresses::Address; +use kaspa_addresses::{Address, Prefix, Version}; use kaspa_consensus_core::{ config::params::{TESTNET11_PARAMS, TESTNET_PARAMS}, constants::{SOMPI_PER_KASPA, TX_VERSION}, @@ -12,6 +12,7 @@ use kaspa_consensus_core::{ }; use kaspa_core::{info, kaspad_env::version, time::unix_now, warn}; use kaspa_grpc_client::{ClientPool, GrpcClient}; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_rpc_core::{api::rpc::RpcApi, notify::mode::NotificationMode}; use kaspa_txscript::pay_to_address_script; use parking_lot::Mutex; @@ -22,6 +23,8 @@ use tokio::time::{interval, MissedTickBehavior}; const DEFAULT_SEND_AMOUNT: u64 = 10 * SOMPI_PER_KASPA; const FEE_PER_MASS: u64 = 10; const MILLIS_PER_TICK: u64 = 10; +const ADDRESS_PREFIX: Prefix = Prefix::Testnet; +const ADDRESS_VERSION: Version = Version::PubKey; struct Stats { num_txs: usize, @@ -84,10 +87,19 @@ pub fn cli() -> Command { .arg(Arg::new("unleashed").long("unleashed").action(ArgAction::SetTrue).hide(true).help("Allow higher TPS")) } -async fn new_rpc_client(address: &str) -> GrpcClient { - GrpcClient::connect(NotificationMode::Direct, format!("grpc://{}", address), true, None, false, Some(500_000), Default::default()) - .await - .unwrap() +async fn new_rpc_client(subscription_context: &SubscriptionContext, address: &str) -> GrpcClient { + GrpcClient::connect( + NotificationMode::Direct, + format!("grpc://{}", address), + Some(subscription_context.clone()), + true, + None, + false, + Some(500_000), + Default::default(), + ) + .await + .unwrap() } struct ClientPoolArg { @@ -104,9 +116,11 @@ async fn main() { kaspa_core::log::init_logger(None, ""); let args = Args::parse(); let stats = Arc::new(Mutex::new(Stats { num_txs: 0, since: unix_now(), num_utxos: 0, utxos_amount: 0, num_outs: 0 })); + let subscription_context = SubscriptionContext::new(); let rpc_client = GrpcClient::connect( NotificationMode::Direct, format!("grpc://{}", args.rpc_server), + Some(subscription_context.clone()), true, None, false, @@ -124,8 +138,7 @@ async fn main() { secp256k1::KeyPair::from_seckey_slice(secp256k1::SECP256K1, &private_key_bytes).unwrap() } else { let (sk, pk) = &secp256k1::generate_keypair(&mut thread_rng()); - let kaspa_addr = - Address::new(kaspa_addresses::Prefix::Testnet, kaspa_addresses::Version::PubKey, &pk.x_only_public_key().0.serialize()); + let kaspa_addr = Address::new(ADDRESS_PREFIX, ADDRESS_VERSION, &pk.x_only_public_key().0.serialize()); info!( "Generated private key {} and address {}. Send some funds to this address and rerun rothschild with `--private-key {}`", sk.display_secret(), @@ -135,11 +148,7 @@ async fn main() { return; }; - let kaspa_addr = Address::new( - kaspa_addresses::Prefix::Testnet, - kaspa_addresses::Version::PubKey, - &schnorr_key.x_only_public_key().0.serialize(), - ); + let kaspa_addr = Address::new(ADDRESS_PREFIX, ADDRESS_VERSION, &schnorr_key.x_only_public_key().0.serialize()); rayon::ThreadPoolBuilder::new().num_threads(args.threads as usize).build_global().unwrap(); @@ -168,7 +177,7 @@ async fn main() { const CLIENT_POOL_SIZE: usize = 8; let mut rpc_clients = Vec::with_capacity(CLIENT_POOL_SIZE); for _ in 0..CLIENT_POOL_SIZE { - rpc_clients.push(Arc::new(new_rpc_client(&args.rpc_server).await)); + rpc_clients.push(Arc::new(new_rpc_client(&subscription_context, &args.rpc_server).await)); } let submit_tx_pool = ClientPool::new(rpc_clients, 1000, |c, arg: ClientPoolArg| async move { diff --git a/rpc/grpc/client/src/lib.rs b/rpc/grpc/client/src/lib.rs index 96deba969..0dc985ae5 100644 --- a/rpc/grpc/client/src/lib.rs +++ b/rpc/grpc/client/src/lib.rs @@ -23,7 +23,8 @@ use kaspa_notify::{ scope::Scope, subscriber::{Subscriber, SubscriptionManager}, subscription::{ - array::ArrayBuilder, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies, UtxosChangedMutationPolicy, + array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies, + UtxosChangedMutationPolicy, }, }; use kaspa_rpc_core::{ @@ -74,6 +75,7 @@ pub struct GrpcClient { /// In direct mode, a Collector relaying incoming notifications to any provided DynNotify collector: Option>, subscriptions: Option>, + subscription_context: SubscriptionContext, policies: MutationPolicies, notification_mode: NotificationMode, client_id: ListenerId, @@ -85,6 +87,7 @@ impl GrpcClient { pub async fn connect( notification_mode: NotificationMode, url: String, + subscription_context: Option, reconnect: bool, connection_event_sender: Option>, override_handle_stop_notify: bool, @@ -105,13 +108,21 @@ impl GrpcClient { .await?; let converter = Arc::new(RpcCoreConverter::new()); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); + let subscription_context = subscription_context.unwrap_or_default(); let (notifier, collector, subscriptions) = match notification_mode { NotificationMode::MultiListeners => { let enabled_events = EVENT_TYPE_ARRAY[..].into(); let collector = Arc::new(GrpcClientCollector::new(GRPC_CLIENT, inner.notification_channel_receiver(), converter)); let subscriber = Arc::new(Subscriber::new(GRPC_CLIENT, enabled_events, inner.clone(), 0)); - let notifier: GrpcClientNotifier = - Notifier::new(GRPC_CLIENT, enabled_events, vec![collector], vec![subscriber], 3, policies.clone()); + let notifier: GrpcClientNotifier = Notifier::new( + GRPC_CLIENT, + enabled_events, + vec![collector], + vec![subscriber], + subscription_context.clone(), + 3, + policies.clone(), + ); (Some(Arc::new(notifier)), None, None) } NotificationMode::Direct => { @@ -127,7 +138,7 @@ impl GrpcClient { } let client_id = u64::from_le_bytes(rand::random::<[u8; 8]>()); - Ok(Self { inner, notifier, collector, subscriptions, policies, notification_mode, client_id }) + Ok(Self { inner, notifier, collector, subscriptions, subscription_context, policies, notification_mode, client_id }) } #[inline(always)] @@ -268,6 +279,7 @@ impl RpcApi for GrpcClient { self.subscriptions.as_ref().unwrap().lock().await[event].mutate( Mutation::new(Command::Start, scope.clone()), self.policies.clone(), + &self.subscription_context, self.client_id, ); self.inner.start_notify_to_client(scope).await?; @@ -288,6 +300,7 @@ impl RpcApi for GrpcClient { self.subscriptions.as_ref().unwrap().lock().await[event].mutate( Mutation::new(Command::Stop, scope.clone()), self.policies.clone(), + &self.subscription_context, self.client_id, ); self.inner.stop_notify_to_client(scope).await?; @@ -513,10 +526,6 @@ impl Inner { .accept_compressed(CompressionEncoding::Gzip) .max_decoding_message_size(RPC_MAX_MESSAGE_SIZE); - // Force the opening of the stream when connected to a go kaspad server. - // This is also needed for querying server capabilities. - request_sender.send(GetInfoRequestMessage {}.into()).await?; - // Prepare a request receiver stream let stream_receiver = request_receiver.clone(); let request_stream = async_stream::stream! { @@ -530,6 +539,7 @@ impl Inner { // Collect server capabilities as stated in GetInfoResponse let mut server_features = ServerFeatures::default(); + request_sender.send(GetInfoRequestMessage {}.into()).await?; match stream.message().await? { Some(ref msg) => { trace!("GRPC client: try_connect - GetInfo got a response"); diff --git a/rpc/grpc/server/src/adaptor.rs b/rpc/grpc/server/src/adaptor.rs index efc01394c..620f3be6b 100644 --- a/rpc/grpc/server/src/adaptor.rs +++ b/rpc/grpc/server/src/adaptor.rs @@ -1,6 +1,6 @@ use crate::{connection_handler::ConnectionHandler, manager::Manager}; use kaspa_core::debug; -use kaspa_notify::notifier::Notifier; +use kaspa_notify::{notifier::Notifier, subscription::context::SubscriptionContext}; use kaspa_rpc_core::{api::rpc::DynRpcService, notify::connection::ChannelConnection, Notification, RpcResult}; use kaspa_utils::networking::NetAddress; use kaspa_utils_tower::counters::TowerConnectionCounters; @@ -37,12 +37,20 @@ impl Adaptor { manager: Manager, core_service: DynRpcService, core_notifier: Arc>, + subscription_context: SubscriptionContext, broadcasters: usize, counters: Arc, ) -> Arc { let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size()); - let connection_handler = - ConnectionHandler::new(network_bps, manager_sender, core_service.clone(), core_notifier, broadcasters, counters); + let connection_handler = ConnectionHandler::new( + network_bps, + manager_sender, + core_service.clone(), + core_notifier, + subscription_context, + broadcasters, + counters, + ); let server_termination = connection_handler.serve(serve_address); let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address)); adaptor.manager.clone().start_event_loop(manager_receiver); diff --git a/rpc/grpc/server/src/connection_handler.rs b/rpc/grpc/server/src/connection_handler.rs index 6946d2e0a..020d57725 100644 --- a/rpc/grpc/server/src/connection_handler.rs +++ b/rpc/grpc/server/src/connection_handler.rs @@ -18,7 +18,7 @@ use kaspa_notify::{ events::EVENT_TYPE_ARRAY, notifier::Notifier, subscriber::Subscriber, - subscription::{MutationPolicies, UtxosChangedMutationPolicy}, + subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_rpc_core::{ api::rpc::DynRpcService, @@ -85,6 +85,7 @@ impl ConnectionHandler { manager_sender: MpscSender, core_service: DynRpcService, core_notifier: Arc>, + subscription_context: SubscriptionContext, broadcasters: usize, counters: Arc, ) -> Self { @@ -99,8 +100,15 @@ impl ConnectionHandler { let collector = Arc::new(GrpcServiceCollector::new(GRPC_SERVER, core_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(GRPC_SERVER, core_events, core_notifier, core_listener_id)); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - let notifier: Arc> = - Arc::new(Notifier::new(GRPC_SERVER, core_events, vec![collector], vec![subscriber], broadcasters, policies)); + let notifier: Arc> = Arc::new(Notifier::new( + GRPC_SERVER, + core_events, + vec![collector], + vec![subscriber], + subscription_context, + broadcasters, + policies, + )); let server_context = ServerContext::new(core_service, notifier); let interface = Arc::new(Factory::new_interface(server_context.clone(), network_bps)); let running = Default::default(); diff --git a/rpc/grpc/server/src/service.rs b/rpc/grpc/server/src/service.rs index 68f70a6d6..1fb52078b 100644 --- a/rpc/grpc/server/src/service.rs +++ b/rpc/grpc/server/src/service.rs @@ -53,6 +53,7 @@ impl AsyncService for GrpcService { manager, self.core_service.clone(), self.core_service.notifier(), + self.core_service.subscription_context(), self.broadcasters, self.counters.clone(), ); diff --git a/rpc/grpc/server/src/tests/client_server.rs b/rpc/grpc/server/src/tests/client_server.rs index 046d5edce..59b560bc9 100644 --- a/rpc/grpc/server/src/tests/client_server.rs +++ b/rpc/grpc/server/src/tests/client_server.rs @@ -21,6 +21,7 @@ async fn test_client_server_sanity_check() { let client = create_client(server.serve_address()).await; assert_eq!(server.active_connections().len(), 1, "the client failed to connect to the server"); + assert!(client.handle_message_id() && client.handle_stop_notify(), "the client failed to collect server features"); // Stop the fake service rpc_core_service.join().await; @@ -189,12 +190,21 @@ async fn test_client_server_notifications() { fn create_server(core_service: Arc) -> Arc { let manager = Manager::new(128); - Adaptor::server(get_free_net_address(), 1, manager, core_service.clone(), core_service.core_notifier(), 3, Default::default()) + Adaptor::server( + get_free_net_address(), + 1, + manager, + core_service.clone(), + core_service.core_notifier(), + core_service.subscription_context(), + 3, + Default::default(), + ) } async fn create_client(server_address: NetAddress) -> GrpcClient { let server_url = format!("grpc://localhost:{}", server_address.port); - GrpcClient::connect(NotificationMode::Direct, server_url, false, None, false, None, Default::default()).await.unwrap() + GrpcClient::connect(NotificationMode::Direct, server_url, None, false, None, false, None, Default::default()).await.unwrap() } fn get_free_net_address() -> NetAddress { diff --git a/rpc/grpc/server/src/tests/rpc_core_mock.rs b/rpc/grpc/server/src/tests/rpc_core_mock.rs index 0cae422a2..bad1dfc25 100644 --- a/rpc/grpc/server/src/tests/rpc_core_mock.rs +++ b/rpc/grpc/server/src/tests/rpc_core_mock.rs @@ -4,6 +4,7 @@ use kaspa_notify::events::EVENT_TYPE_ARRAY; use kaspa_notify::listener::ListenerId; use kaspa_notify::notifier::{Notifier, Notify}; use kaspa_notify::scope::Scope; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_notify::subscription::{MutationPolicies, UtxosChangedMutationPolicy}; use kaspa_rpc_core::{api::rpc::RpcApi, *}; use kaspa_rpc_core::{notify::connection::ChannelConnection, RpcResult}; @@ -18,13 +19,30 @@ pub(super) struct RpcCoreMock { impl RpcCoreMock { pub(super) fn new() -> Self { - Self::default() + let (sync_sender, sync_receiver) = unbounded(); + let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); + let subscription_context = SubscriptionContext::new(); + let core_notifier: Arc = Arc::new(Notifier::with_sync( + "rpc-core", + EVENT_TYPE_ARRAY[..].into(), + vec![], + vec![], + subscription_context, + 10, + policies, + Some(sync_sender), + )); + Self { core_notifier, _sync_receiver: sync_receiver } } pub(super) fn core_notifier(&self) -> Arc { self.core_notifier.clone() } + pub(super) fn subscription_context(&self) -> SubscriptionContext { + self.core_notifier.subscription_context().clone() + } + #[allow(dead_code)] pub(super) fn notify_new_block_template(&self) -> kaspa_notify::error::Result<()> { let notification = Notification::NewBlockTemplate(NewBlockTemplateNotification {}); @@ -45,16 +63,6 @@ impl RpcCoreMock { } } -impl Default for RpcCoreMock { - fn default() -> Self { - let (sync_sender, sync_receiver) = unbounded(); - let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); - let core_notifier: Arc = - Arc::new(Notifier::with_sync("rpc-core", EVENT_TYPE_ARRAY[..].into(), vec![], vec![], 10, policies, Some(sync_sender))); - Self { core_notifier, _sync_receiver: sync_receiver } - } -} - #[async_trait] impl RpcApi for RpcCoreMock { // This fn needs to succeed while the client connects @@ -65,8 +73,8 @@ impl RpcApi for RpcCoreMock { server_version: "mock".to_string(), is_utxo_indexed: false, is_synced: false, - has_notify_command: false, - has_message_id: false, + has_notify_command: true, + has_message_id: true, }) } diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index 11eaa39e4..826e3c467 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -36,6 +36,7 @@ use kaspa_index_core::{ }; use kaspa_mining::model::tx_query::TransactionQuery; use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::Orphan}; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_notify::subscription::{MutationPolicies, UtxosChangedMutationPolicy}; use kaspa_notify::{ collector::DynCollector, @@ -117,6 +118,7 @@ impl RpcCoreService { index_notifier: Option>, mining_manager: MiningManagerProxy, flow_context: Arc, + subscription_context: SubscriptionContext, utxoindex: Option, config: Arc, core: Arc, @@ -176,7 +178,8 @@ impl RpcCoreService { // Create the rcp-core notifier let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - let notifier = Arc::new(Notifier::new(RPC_CORE, EVENT_TYPE_ARRAY[..].into(), collectors, subscribers, 1, policies)); + let notifier = + Arc::new(Notifier::new(RPC_CORE, EVENT_TYPE_ARRAY[..].into(), collectors, subscribers, subscription_context, 1, policies)); Self { consensus_manager, @@ -214,6 +217,11 @@ impl RpcCoreService { self.notifier.clone() } + #[inline(always)] + pub fn subscription_context(&self) -> SubscriptionContext { + self.notifier.subscription_context().clone() + } + async fn get_utxo_set_by_script_public_key<'a>( &self, addresses: impl Iterator, diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index 14d0fa8e0..4bb0d342e 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -2,7 +2,7 @@ use crate::error::Error; use crate::imports::*; use crate::parse::parse_host; use kaspa_consensus_core::network::NetworkType; -use kaspa_notify::subscription::{MutationPolicies, UtxosChangedMutationPolicy}; +use kaspa_notify::subscription::{context::SubscriptionContext, MutationPolicies, UtxosChangedMutationPolicy}; use kaspa_rpc_core::{ api::ctl::RpcCtl, notify::collector::{RpcCoreCollector, RpcCoreConverter}, @@ -173,12 +173,18 @@ pub struct KaspaRpcClient { impl KaspaRpcClient { /// Create a new `KaspaRpcClient` with the given Encoding and URL - pub fn new(encoding: Encoding, url: &str) -> Result { - Self::new_with_args(encoding, NotificationMode::Direct, url) + // FIXME + pub fn new(encoding: Encoding, url: &str, subscription_context: Option) -> Result { + Self::new_with_args(encoding, NotificationMode::Direct, url, subscription_context) } /// Extended constructor that accepts [`NotificationMode`] argument. - pub fn new_with_args(encoding: Encoding, notification_mode: NotificationMode, url: &str) -> Result { + pub fn new_with_args( + encoding: Encoding, + notification_mode: NotificationMode, + url: &str, + subscription_context: Option, + ) -> Result { let inner = Arc::new(Inner::new(encoding, url)?); let notifier = if matches!(notification_mode, NotificationMode::MultiListeners) { let enabled_events = EVENT_TYPE_ARRAY[..].into(); @@ -186,7 +192,15 @@ impl KaspaRpcClient { let collector = Arc::new(RpcCoreCollector::new(WRPC_CLIENT, inner.notification_channel_receiver(), converter)); let subscriber = Arc::new(Subscriber::new(WRPC_CLIENT, enabled_events, inner.clone(), 0)); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); - Some(Arc::new(Notifier::new(WRPC_CLIENT, enabled_events, vec![collector], vec![subscriber], 3, policies))) + Some(Arc::new(Notifier::new( + WRPC_CLIENT, + enabled_events, + vec![collector], + vec![subscriber], + subscription_context.unwrap_or_default(), + 3, + policies, + ))) } else { None }; diff --git a/rpc/wrpc/client/src/wasm.rs b/rpc/wrpc/client/src/wasm.rs index b20eeae5c..9718ee5a4 100644 --- a/rpc/wrpc/client/src/wasm.rs +++ b/rpc/wrpc/client/src/wasm.rs @@ -41,7 +41,7 @@ impl RpcClient { let url = if let Some(network_type) = network_type { Self::parse_url(url, encoding, network_type)? } else { url.to_string() }; let rpc_client = RpcClient { - client: Arc::new(KaspaRpcClient::new(encoding, url.as_str()).unwrap_or_else(|err| panic!("{err}"))), + client: Arc::new(KaspaRpcClient::new(encoding, url.as_str(), None).unwrap_or_else(|err| panic!("{err}"))), inner: Arc::new(Inner { notification_task: AtomicBool::new(false), notification_ctl: DuplexChannel::oneshot(), diff --git a/rpc/wrpc/server/src/server.rs b/rpc/wrpc/server/src/server.rs index 12f67c86f..e49ed5ecc 100644 --- a/rpc/wrpc/server/src/server.rs +++ b/rpc/wrpc/server/src/server.rs @@ -75,8 +75,15 @@ impl Server { let collector = Arc::new(WrpcServiceCollector::new(WRPC_SERVER, notification_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(WRPC_SERVER, enabled_events, service.notifier(), listener_id)); let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AllOrNothing); - let wrpc_notifier = - Arc::new(Notifier::new(WRPC_SERVER, enabled_events, vec![collector], vec![subscriber], tasks, policies)); + let wrpc_notifier = Arc::new(Notifier::new( + WRPC_SERVER, + enabled_events, + vec![collector], + vec![subscriber], + service.subscription_context(), + tasks, + policies, + )); Some(RpcCore { service, wrpc_notifier }) } else { None @@ -111,6 +118,7 @@ impl Server { let grpc_client = GrpcClient::connect( NotificationMode::Direct, grpc_proxy_address.to_owned(), + None, false, None, true, diff --git a/testing/integration/src/common/daemon.rs b/testing/integration/src/common/daemon.rs index b6a5e8dab..952b8e1f0 100644 --- a/testing/integration/src/common/daemon.rs +++ b/testing/integration/src/common/daemon.rs @@ -3,6 +3,7 @@ use kaspa_consensus_core::network::NetworkId; use kaspa_core::{core::Core, signals::Shutdown}; use kaspa_database::utils::get_kaspa_tempdir; use kaspa_grpc_client::GrpcClient; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_rpc_core::notify::mode::NotificationMode; use kaspad_lib::{args::Args, daemon::create_core_with_runtime}; use std::{sync::Arc, time::Duration}; @@ -13,6 +14,7 @@ use kaspa_grpc_client::ClientPool; pub struct Daemon { // Type and suffix of the daemon network pub network: NetworkId, + pub context: SubscriptionContext, // Daemon ports pub rpc_port: u16, @@ -58,8 +60,9 @@ impl Daemon { args.appdir = Some(appdir_tempdir.path().to_str().unwrap().to_owned()); let network = args.network(); + let context = SubscriptionContext::new(); let (core, _) = create_core_with_runtime(&Default::default(), &args, fd_total_budget); - Daemon { network, rpc_port, p2p_port, core, workers: None, _appdir_tempdir: appdir_tempdir } + Daemon { network, context, rpc_port, p2p_port, core, workers: None, _appdir_tempdir: appdir_tempdir } } pub async fn start(&mut self) -> GrpcClient { @@ -80,6 +83,7 @@ impl Daemon { GrpcClient::connect( NotificationMode::Direct, format!("grpc://localhost:{}", self.rpc_port), + Some(self.context.clone()), true, None, false, @@ -102,6 +106,7 @@ impl Daemon { GrpcClient::connect( NotificationMode::MultiListeners, format!("grpc://localhost:{}", self.rpc_port), + Some(self.context.clone()), true, None, false, diff --git a/testing/integration/src/consensus_integration_tests.rs b/testing/integration/src/consensus_integration_tests.rs index 819951883..8c7daf126 100644 --- a/testing/integration/src/consensus_integration_tests.rs +++ b/testing/integration/src/consensus_integration_tests.rs @@ -53,6 +53,7 @@ use kaspa_database::prelude::{CachePolicy, ConnBuilder}; use kaspa_index_processor::service::IndexService; use kaspa_math::Uint256; use kaspa_muhash::MuHash; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_txscript::caches::TxScriptCacheCounters; use kaspa_utxoindex::api::{UtxoIndexApi, UtxoIndexProxy}; use kaspa_utxoindex::UtxoIndex; @@ -940,7 +941,8 @@ async fn json_test(file_path: &str, concurrency: bool) { let tick_service = Arc::new(TickService::default()); let (notification_send, notification_recv) = unbounded(); let tc = Arc::new(TestConsensus::with_notifier(&config, notification_send)); - let notify_service = Arc::new(NotifyService::new(tc.notification_root(), notification_recv)); + let subscription_context = SubscriptionContext::new(); + let notify_service = Arc::new(NotifyService::new(tc.notification_root(), notification_recv, subscription_context.clone())); // External storage for storing block bodies. This allows separating header and body processing phases let (_external_db_lifetime, external_storage) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); @@ -948,7 +950,11 @@ async fn json_test(file_path: &str, concurrency: bool) { let (_utxoindex_db_lifetime, utxoindex_db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); let consensus_manager = Arc::new(ConsensusManager::new(Arc::new(TestConsensusFactory::new(tc.clone())))); let utxoindex = UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap(); - let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(UtxoIndexProxy::new(utxoindex.clone())))); + let index_service = Arc::new(IndexService::new( + ¬ify_service.notifier(), + subscription_context.clone(), + Some(UtxoIndexProxy::new(utxoindex.clone())), + )); let async_runtime = Arc::new(AsyncRuntime::new(2)); async_runtime.register(tick_service.clone()); diff --git a/testing/integration/src/daemon_integration_tests.rs b/testing/integration/src/daemon_integration_tests.rs index ea6e5a697..9c0e11718 100644 --- a/testing/integration/src/daemon_integration_tests.rs +++ b/testing/integration/src/daemon_integration_tests.rs @@ -26,9 +26,11 @@ async fn daemon_sanity_test() { let total_fd_limit = 10; let mut kaspad1 = Daemon::new_random(total_fd_limit); let rpc_client1 = kaspad1.start().await; + assert!(rpc_client1.handle_message_id() && rpc_client1.handle_stop_notify(), "the client failed to collect server features"); let mut kaspad2 = Daemon::new_random(total_fd_limit); let rpc_client2 = kaspad2.start().await; + assert!(rpc_client2.handle_message_id() && rpc_client2.handle_stop_notify(), "the client failed to collect server features"); tokio::time::sleep(Duration::from_secs(1)).await; rpc_client1.disconnect().await.unwrap(); diff --git a/wallet/core/src/tests/rpc_core_mock.rs b/wallet/core/src/tests/rpc_core_mock.rs index c43436767..31f1367bf 100644 --- a/wallet/core/src/tests/rpc_core_mock.rs +++ b/wallet/core/src/tests/rpc_core_mock.rs @@ -6,6 +6,7 @@ use kaspa_notify::events::EVENT_TYPE_ARRAY; use kaspa_notify::listener::ListenerId; use kaspa_notify::notifier::{Notifier, Notify}; use kaspa_notify::scope::Scope; +use kaspa_notify::subscription::context::SubscriptionContext; use kaspa_notify::subscription::{MutationPolicies, UtxosChangedMutationPolicy}; use kaspa_rpc_core::api::ctl::RpcCtl; use kaspa_rpc_core::{api::rpc::RpcApi, *}; @@ -28,7 +29,19 @@ pub struct RpcCoreMock { impl RpcCoreMock { pub fn new() -> Self { - Self::default() + let (sync_sender, sync_receiver) = unbounded(); + let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); + let core_notifier: Arc = Arc::new(Notifier::with_sync( + "rpc-core", + EVENT_TYPE_ARRAY[..].into(), + vec![], + vec![], + SubscriptionContext::new(), + 10, + policies, + Some(sync_sender), + )); + Self { core_notifier, _sync_receiver: sync_receiver, ctl: RpcCtl::new() } } pub fn core_notifier(&self) -> Arc { @@ -63,11 +76,7 @@ impl RpcCoreMock { impl Default for RpcCoreMock { fn default() -> Self { - let (sync_sender, sync_receiver) = unbounded(); - let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); - let core_notifier: Arc = - Arc::new(Notifier::with_sync("rpc-core", EVENT_TYPE_ARRAY[..].into(), vec![], vec![], 10, policies, Some(sync_sender))); - Self { core_notifier, _sync_receiver: sync_receiver, ctl: RpcCtl::new() } + Self::new() } } diff --git a/wallet/core/src/wallet/mod.rs b/wallet/core/src/wallet/mod.rs index 206c97504..0640e80bc 100644 --- a/wallet/core/src/wallet/mod.rs +++ b/wallet/core/src/wallet/mod.rs @@ -64,8 +64,12 @@ impl Wallet { } pub fn try_with_wrpc(store: Arc, network_id: Option) -> Result { - let rpc_client = - Arc::new(KaspaRpcClient::new_with_args(WrpcEncoding::Borsh, NotificationMode::MultiListeners, "wrpc://127.0.0.1:17110")?); + let rpc_client = Arc::new(KaspaRpcClient::new_with_args( + WrpcEncoding::Borsh, + NotificationMode::MultiListeners, + "wrpc://127.0.0.1:17110", + None, + )?); let rpc_ctl = rpc_client.ctl().clone(); let rpc_api: Arc = rpc_client; let rpc = Rpc::new(rpc_api, rpc_ctl);