From af6a8ab72de41f45861101bd5e21a847f8a8bb31 Mon Sep 17 00:00:00 2001 From: Tiram <18632023+tiram88@users.noreply.github.com> Date: Thu, 22 Jun 2023 11:36:35 +0200 Subject: [PATCH 1/5] Enhance broad DAG block interconnections (#208) * Ensure block diversity between nodes * Use the depth store to read the merge --- .../pipeline/virtual_processor/processor.rs | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 7a282aec9..d6a3caf24 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -17,6 +17,7 @@ use crate::{ acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, daa::DbDaaStore, + depth::{DbDepthStore, DepthStoreReader}, ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader}, headers::{DbHeadersStore, HeaderStoreReader}, past_pruning_points::DbPastPruningPointsStore, @@ -77,6 +78,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender use itertools::Itertools; use kaspa_utils::binary_heap::BinaryHeapExtensions; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use rand::seq::SliceRandom; use rayon::ThreadPool; use rocksdb::WriteBatch; use std::{ @@ -115,6 +117,7 @@ pub struct VirtualStateProcessor { pub(super) pruning_point_store: Arc>, pub(super) past_pruning_points_store: Arc, pub(super) body_tips_store: Arc>, + pub(super) depth_store: Arc, // Utxo-related stores pub(super) utxo_diffs_store: Arc, @@ -180,6 +183,7 @@ impl VirtualStateProcessor { pruning_point_store: storage.pruning_point_store.clone(), past_pruning_points_store: storage.past_pruning_points_store.clone(), body_tips_store: storage.body_tips_store.clone(), + depth_store: storage.depth_store.clone(), utxo_diffs_store: storage.utxo_diffs_store.clone(), utxo_multisets_store: storage.utxo_multisets_store.clone(), acceptance_data_store: storage.acceptance_data_store.clone(), @@ -500,7 +504,21 @@ impl VirtualStateProcessor { diff_point = self.calculate_utxo_state_relatively(stores, diff, diff_point, candidate); if diff_point == candidate { // This indicates that candidate has valid UTXO state and that `diff` represents its diff from virtual - return (candidate, heap.into_sorted_iter().take(self.max_virtual_parent_candidates()).map(|s| s.hash).collect()); + + // All blocks with lower blue work than filtering_root are: + // 1. not in its future (bcs blue work is monotonic), + // 2. will be removed eventually by the bounded merge check. + // So we prefer doing it in advance to allow better tips to be considered. + let filtering_root = self.depth_store.merge_depth_root(candidate).unwrap(); + let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default(); + return ( + candidate, + heap.into_sorted_iter() + .take(self.max_virtual_parent_candidates()) + .take_while(|s| s.blue_work >= filtering_blue_work) + .map(|s| s.hash) + .collect(), + ); } else { debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate) } @@ -530,14 +548,10 @@ impl VirtualStateProcessor { // TODO: tests let max_block_parents = self.max_block_parents as usize; - // Prioritize half the blocks with highest blue work and half with lowest, so the network will merge splits faster. - if candidates.len() >= max_block_parents { - let max_additional_parents = max_block_parents - 1; // We already have the selected parent - let mut j = candidates.len() - 1; - for i in max_additional_parents / 2..max_additional_parents { - candidates.swap(i, j); - j -= 1; - } + // Prioritize half the blocks with highest blue work and pick the rest randomly to ensure diversity between nodes + if candidates.len() > max_block_parents / 2 { + // `make_contiguous` should be a no op since the deque was just built + candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng()); } let mut virtual_parents = Vec::with_capacity(min(max_block_parents, candidates.len() + 1)); From 3b5fc261f0af3fd543807393e1f9f17c65ef21f2 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Thu, 22 Jun 2023 16:55:39 +0300 Subject: [PATCH 2/5] Add P2P reject message support (#209) * send and receive a reject message on logical protocol errors * automate flow name --- protocol/flows/src/flow_context.rs | 2 +- protocol/flows/src/flow_trait.rs | 10 ++- protocol/flows/src/v5/address.rs | 8 --- protocol/flows/src/v5/blockrelay/flow.rs | 4 -- .../src/v5/blockrelay/handle_requests.rs | 4 -- protocol/flows/src/v5/ibd/flow.rs | 4 -- protocol/flows/src/v5/mod.rs | 68 ++----------------- protocol/flows/src/v5/ping.rs | 8 --- protocol/flows/src/v5/request_anticone.rs | 4 -- .../flows/src/v5/request_block_locator.rs | 4 -- protocol/flows/src/v5/request_headers.rs | 4 -- protocol/flows/src/v5/request_ibd_blocks.rs | 4 -- .../src/v5/request_ibd_chain_block_locator.rs | 4 -- protocol/flows/src/v5/request_pp_proof.rs | 4 -- .../v5/request_pruning_point_and_anticone.rs | 4 -- .../src/v5/request_pruning_point_utxo_set.rs | 4 -- protocol/flows/src/v5/txrelay/flow.rs | 8 --- protocol/p2p/src/common.rs | 34 +++++++++- protocol/p2p/src/core/connection_handler.rs | 3 +- protocol/p2p/src/core/hub.rs | 5 +- protocol/p2p/src/core/router.rs | 60 ++++++++++++++-- utils/src/any.rs | 30 ++++++++ utils/src/lib.rs | 1 + 23 files changed, 137 insertions(+), 144 deletions(-) create mode 100644 utils/src/any.rs diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index f9656f199..a398dd05d 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -450,7 +450,6 @@ impl ConnectionInitializer for FlowContext { // Subnets are not currently supported let mut self_version_message = Version::new(None, self.node_id, network_name.clone(), None, PROTOCOL_VERSION); self_version_message.add_user_agent(name(), version(), &self.config.user_agent_comments); - // TODO: full and accurate version info // TODO: get number of live services // TODO: disable_relay_tx from config/cmd @@ -465,6 +464,7 @@ impl ConnectionInitializer for FlowContext { if self.hub.has_peer(router.key()) { return Err(ProtocolError::PeerAlreadyExists(router.key())); } + // And loopback connections... if self.node_id == router.identity() { return Err(ProtocolError::LoopbackConnection(router.key())); } diff --git a/protocol/flows/src/flow_trait.rs b/protocol/flows/src/flow_trait.rs index d5b85deae..7d05e4741 100644 --- a/protocol/flows/src/flow_trait.rs +++ b/protocol/flows/src/flow_trait.rs @@ -1,5 +1,6 @@ use kaspa_core::warn; use kaspa_p2p_lib::{common::ProtocolError, Router}; +use kaspa_utils::any::type_name_short; use std::sync::Arc; #[async_trait::async_trait] @@ -7,18 +8,21 @@ pub trait Flow where Self: 'static + Send + Sync, { - fn name(&self) -> &'static str; + fn name(&self) -> &'static str { + type_name_short::() + } + fn router(&self) -> Option>; async fn start(&mut self) -> Result<(), ProtocolError>; + fn launch(mut self: Box) { tokio::spawn(async move { let res = self.start().await; if let Err(err) = res { - // TODO: imp complete error handler (what happens in go?) if let Some(router) = self.router() { + router.try_sending_reject_message(&err).await; if router.close().await || !err.is_connection_closed_error() { - // TODO: send and receive an explicit reject message for easier tracing of bugs causing disconnections warn!("{} flow error: {}, disconnecting from peer {}.", self.name(), err, router); } } diff --git a/protocol/flows/src/v5/address.rs b/protocol/flows/src/v5/address.rs index d2d67bbd4..dc7d21208 100644 --- a/protocol/flows/src/v5/address.rs +++ b/protocol/flows/src/v5/address.rs @@ -26,10 +26,6 @@ pub struct ReceiveAddressesFlow { #[async_trait::async_trait] impl Flow for ReceiveAddressesFlow { - fn name(&self) -> &'static str { - "Receive addresses" - } - fn router(&self) -> Option> { Some(self.router.clone()) } @@ -74,10 +70,6 @@ pub struct SendAddressesFlow { #[async_trait::async_trait] impl Flow for SendAddressesFlow { - fn name(&self) -> &'static str { - "Send addresses" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 5cdeeae70..5cc7fc667 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -59,10 +59,6 @@ pub struct HandleRelayInvsFlow { #[async_trait::async_trait] impl Flow for HandleRelayInvsFlow { - fn name(&self) -> &'static str { - "HANDLE_RELAY_INVS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/blockrelay/handle_requests.rs b/protocol/flows/src/v5/blockrelay/handle_requests.rs index bde025823..87ad806bb 100644 --- a/protocol/flows/src/v5/blockrelay/handle_requests.rs +++ b/protocol/flows/src/v5/blockrelay/handle_requests.rs @@ -16,10 +16,6 @@ pub struct HandleRelayBlockRequests { #[async_trait::async_trait] impl Flow for HandleRelayBlockRequests { - fn name(&self) -> &'static str { - "HANDLE_RELAY_BLOCK_REQUESTS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/ibd/flow.rs b/protocol/flows/src/v5/ibd/flow.rs index 5f5b0c66c..42f5d3bd3 100644 --- a/protocol/flows/src/v5/ibd/flow.rs +++ b/protocol/flows/src/v5/ibd/flow.rs @@ -48,10 +48,6 @@ pub struct IbdFlow { #[async_trait::async_trait] impl Flow for IbdFlow { - fn name(&self) -> &'static str { - "IBD" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/mod.rs b/protocol/flows/src/v5/mod.rs index e33b26633..91afb6fe3 100644 --- a/protocol/flows/src/v5/mod.rs +++ b/protocol/flows/src/v5/mod.rs @@ -15,8 +15,7 @@ use self::{ }; use crate::{flow_context::FlowContext, flow_trait::Flow}; -use kaspa_p2p_lib::{pb::kaspad_message::Payload as KaspadMessagePayload, KaspadMessagePayloadType, Router}; -use log::{debug, warn}; +use kaspa_p2p_lib::{KaspadMessagePayloadType, Router}; use std::sync::Arc; mod address; @@ -141,67 +140,12 @@ pub fn register(ctx: FlowContext, router: Arc) -> Vec> { )), ]; - // TEMP: subscribe to remaining messages and ignore them - // NOTE: as flows are implemented, the below types should be all commented out - let mut unimplemented_messages_route = router.subscribe(vec![ - // KaspadMessagePayloadType::Addresses, - // KaspadMessagePayloadType::Block, - // KaspadMessagePayloadType::Transaction, - // KaspadMessagePayloadType::BlockLocator, - // KaspadMessagePayloadType::RequestAddresses, - // KaspadMessagePayloadType::RequestRelayBlocks, - // KaspadMessagePayloadType::RequestTransactions, - // KaspadMessagePayloadType::IbdBlock, - // KaspadMessagePayloadType::InvRelayBlock, - // KaspadMessagePayloadType::InvTransactions, - // KaspadMessagePayloadType::Ping, - // KaspadMessagePayloadType::Pong, - // KaspadMessagePayloadType::Verack, - // KaspadMessagePayloadType::Version, - // KaspadMessagePayloadType::Ready, - // KaspadMessagePayloadType::TransactionNotFound, - KaspadMessagePayloadType::Reject, - // KaspadMessagePayloadType::PruningPointUtxoSetChunk, - // KaspadMessagePayloadType::RequestIbdBlocks, - // KaspadMessagePayloadType::UnexpectedPruningPoint, - // KaspadMessagePayloadType::IbdBlockLocatorHighestHash, - // KaspadMessagePayloadType::RequestNextPruningPointUtxoSetChunk, - // KaspadMessagePayloadType::DonePruningPointUtxoSetChunks, - // KaspadMessagePayloadType::IbdBlockLocatorHighestHashNotFound, + // The reject message is handled as a special case by the router + // KaspadMessagePayloadType::Reject, - // We do not register the below two messages since they are deprecated also in go-kaspa - // KaspadMessagePayloadType::BlockWithTrustedData, - // KaspadMessagePayloadType::IbdBlockLocator, - - // KaspadMessagePayloadType::DoneBlocksWithTrustedData, - // KaspadMessagePayloadType::RequestPruningPointAndItsAnticone, - // KaspadMessagePayloadType::BlockHeaders, - // KaspadMessagePayloadType::RequestNextHeaders, - // KaspadMessagePayloadType::DoneHeaders, - // KaspadMessagePayloadType::RequestPruningPointUtxoSet, - // KaspadMessagePayloadType::RequestHeaders, - // KaspadMessagePayloadType::RequestBlockLocator, - // KaspadMessagePayloadType::PruningPoints, - // KaspadMessagePayloadType::RequestPruningPointProof, - // KaspadMessagePayloadType::PruningPointProof, - // KaspadMessagePayloadType::BlockWithTrustedDataV4, - // KaspadMessagePayloadType::TrustedData, - // KaspadMessagePayloadType::RequestIbdChainBlockLocator, - // KaspadMessagePayloadType::IbdChainBlockLocator, - // KaspadMessagePayloadType::RequestAnticone, - // KaspadMessagePayloadType::RequestNextPruningPointAndItsAnticoneBlocks, - ]); - - tokio::spawn(async move { - while let Some(msg) = unimplemented_messages_route.recv().await { - match msg.payload { - Some(KaspadMessagePayload::Reject(reject_msg)) => { - warn!("Got a reject message {} from peer {}", reject_msg.reason, router); - } - _ => debug!("P2P unimplemented routes message: {:?}", msg), - } - } - }); + // We do not register the below two messages since they are deprecated also in go-kaspa + // KaspadMessagePayloadType::BlockWithTrustedData, + // KaspadMessagePayloadType::IbdBlockLocator, flows } diff --git a/protocol/flows/src/v5/ping.rs b/protocol/flows/src/v5/ping.rs index 12ab94278..6a14bb22a 100644 --- a/protocol/flows/src/v5/ping.rs +++ b/protocol/flows/src/v5/ping.rs @@ -21,10 +21,6 @@ pub struct ReceivePingsFlow { #[async_trait::async_trait] impl Flow for ReceivePingsFlow { - fn name(&self) -> &'static str { - "Receive pings" - } - fn router(&self) -> Option> { Some(self.router.clone()) } @@ -64,10 +60,6 @@ pub struct SendPingsFlow { #[async_trait::async_trait] impl Flow for SendPingsFlow { - fn name(&self) -> &'static str { - "Send pings" - } - fn router(&self) -> Option> { self.router.upgrade() } diff --git a/protocol/flows/src/v5/request_anticone.rs b/protocol/flows/src/v5/request_anticone.rs index e59b8a2ef..13beda5ce 100644 --- a/protocol/flows/src/v5/request_anticone.rs +++ b/protocol/flows/src/v5/request_anticone.rs @@ -18,10 +18,6 @@ pub struct HandleAnticoneRequests { #[async_trait::async_trait] impl Flow for HandleAnticoneRequests { - fn name(&self) -> &'static str { - "HANDLE_ANTICONE_REQUESTS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_block_locator.rs b/protocol/flows/src/v5/request_block_locator.rs index 9d2a2e688..f800727b1 100644 --- a/protocol/flows/src/v5/request_block_locator.rs +++ b/protocol/flows/src/v5/request_block_locator.rs @@ -17,10 +17,6 @@ pub struct RequestBlockLocatorFlow { #[async_trait::async_trait] impl Flow for RequestBlockLocatorFlow { - fn name(&self) -> &'static str { - "BLOCK_LOCATOR" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_headers.rs b/protocol/flows/src/v5/request_headers.rs index 2de4d6033..a23f4568a 100644 --- a/protocol/flows/src/v5/request_headers.rs +++ b/protocol/flows/src/v5/request_headers.rs @@ -20,10 +20,6 @@ pub struct RequestHeadersFlow { #[async_trait::async_trait] impl Flow for RequestHeadersFlow { - fn name(&self) -> &'static str { - "REQUEST_HEADERS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_ibd_blocks.rs b/protocol/flows/src/v5/request_ibd_blocks.rs index ac3f1d124..1d16fe171 100644 --- a/protocol/flows/src/v5/request_ibd_blocks.rs +++ b/protocol/flows/src/v5/request_ibd_blocks.rs @@ -11,10 +11,6 @@ pub struct HandleIbdBlockRequests { #[async_trait::async_trait] impl Flow for HandleIbdBlockRequests { - fn name(&self) -> &'static str { - "HANDLE_IBD_BLOCK_REQUESTS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_ibd_chain_block_locator.rs b/protocol/flows/src/v5/request_ibd_chain_block_locator.rs index 45a90faf8..af0e637ce 100644 --- a/protocol/flows/src/v5/request_ibd_chain_block_locator.rs +++ b/protocol/flows/src/v5/request_ibd_chain_block_locator.rs @@ -18,10 +18,6 @@ pub struct RequestIbdChainBlockLocatorFlow { #[async_trait::async_trait] impl Flow for RequestIbdChainBlockLocatorFlow { - fn name(&self) -> &'static str { - "IBD_CHAIN_BLOCK_LOCATOR" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_pp_proof.rs b/protocol/flows/src/v5/request_pp_proof.rs index 593af8d17..1a9209991 100644 --- a/protocol/flows/src/v5/request_pp_proof.rs +++ b/protocol/flows/src/v5/request_pp_proof.rs @@ -18,10 +18,6 @@ pub struct RequestPruningPointProofFlow { #[async_trait::async_trait] impl Flow for RequestPruningPointProofFlow { - fn name(&self) -> &'static str { - "REQUEST_PROOF" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_pruning_point_and_anticone.rs b/protocol/flows/src/v5/request_pruning_point_and_anticone.rs index 5d86e067f..145e6e9fe 100644 --- a/protocol/flows/src/v5/request_pruning_point_and_anticone.rs +++ b/protocol/flows/src/v5/request_pruning_point_and_anticone.rs @@ -23,10 +23,6 @@ pub struct PruningPointAndItsAnticoneRequestsFlow { #[async_trait::async_trait] impl Flow for PruningPointAndItsAnticoneRequestsFlow { - fn name(&self) -> &'static str { - "PP_ANTICONE" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/request_pruning_point_utxo_set.rs b/protocol/flows/src/v5/request_pruning_point_utxo_set.rs index dc3443cfb..e3eb5cc15 100644 --- a/protocol/flows/src/v5/request_pruning_point_utxo_set.rs +++ b/protocol/flows/src/v5/request_pruning_point_utxo_set.rs @@ -21,10 +21,6 @@ pub struct RequestPruningPointUtxoSetFlow { #[async_trait::async_trait] impl Flow for RequestPruningPointUtxoSetFlow { - fn name(&self) -> &'static str { - "PP_UTXOS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/flows/src/v5/txrelay/flow.rs b/protocol/flows/src/v5/txrelay/flow.rs index 375017cf3..1c75e5710 100644 --- a/protocol/flows/src/v5/txrelay/flow.rs +++ b/protocol/flows/src/v5/txrelay/flow.rs @@ -48,10 +48,6 @@ pub struct RelayTransactionsFlow { #[async_trait::async_trait] impl Flow for RelayTransactionsFlow { - fn name(&self) -> &'static str { - "RELAY_TXS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } @@ -217,10 +213,6 @@ pub struct RequestTransactionsFlow { #[async_trait::async_trait] impl Flow for RequestTransactionsFlow { - fn name(&self) -> &'static str { - "REQUEST_TXS" - } - fn router(&self) -> Option> { Some(self.router.clone()) } diff --git a/protocol/p2p/src/common.rs b/protocol/p2p/src/common.rs index 2efd6fe39..983770019 100644 --- a/protocol/p2p/src/common.rs +++ b/protocol/p2p/src/common.rs @@ -52,7 +52,7 @@ pub enum ProtocolError { #[error("peer connection is closed")] ConnectionClosed, - #[error("incoming route capacity of flow {0:?} has been reached (peer: {1})")] + #[error("incoming route capacity for message type {0:?} has been reached (peer: {1})")] IncomingRouteCapacityReached(KaspadMessagePayloadType, String), #[error("outgoing route capacity has been reached (peer: {0})")] @@ -66,12 +66,44 @@ pub enum ProtocolError { #[error("loopback connection - node is connecting to itself")] LoopbackConnection(PeerKey), + + #[error("got reject message: {0}")] + Rejected(String), + + #[error("got reject message: {0}")] + IgnorableReject(String), } +/// String used as a P2P convention to signal connection is rejected because we are connecting to ourselves +const LOOPBACK_CONNECTION_MESSAGE: &str = "LOOPBACK_CONNECTION"; + +/// String used as a P2P convention to signal connection is rejected because the peer already exists +const DUPLICATE_CONNECTION_MESSAGE: &str = "DUPLICATE_CONNECTION"; + impl ProtocolError { pub fn is_connection_closed_error(&self) -> bool { matches!(self, Self::ConnectionClosed) } + + pub fn can_send_outgoing_message(&self) -> bool { + !matches!(self, Self::ConnectionClosed | Self::OutgoingRouteCapacityReached(_)) + } + + pub fn to_reject_message(&self) -> String { + match self { + Self::LoopbackConnection(_) => LOOPBACK_CONNECTION_MESSAGE.to_owned(), + Self::PeerAlreadyExists(_) => DUPLICATE_CONNECTION_MESSAGE.to_owned(), + err => err.to_string(), + } + } + + pub fn from_reject_message(reason: String) -> Self { + if reason == LOOPBACK_CONNECTION_MESSAGE || reason == DUPLICATE_CONNECTION_MESSAGE { + ProtocolError::IgnorableReject(reason) + } else { + ProtocolError::Rejected(reason) + } + } } /// Wraps an inner payload message into a valid `KaspadMessage`. diff --git a/protocol/p2p/src/core/connection_handler.rs b/protocol/p2p/src/core/connection_handler.rs index 838d67f49..1eaf9ceda 100644 --- a/protocol/p2p/src/core/connection_handler.rs +++ b/protocol/p2p/src/core/connection_handler.rs @@ -108,7 +108,8 @@ impl ConnectionHandler { } Err(err) => { - // Ignoring the router + router.try_sending_reject_message(&err).await; + // Ignoring the new router router.close().await; debug!("P2P, handshake failed for outbound peer {}: {}", router, err); return Err(ConnectionError::ProtocolError(err)); diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index 3c377442d..d9ccddea4 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -47,7 +47,8 @@ impl Hub { self.insert_new_router(new_router).await; } Err(err) => { - // Ignoring the router + new_router.try_sending_reject_message(&err).await; + // Ignoring the new router new_router.close().await; if matches!(err, ProtocolError::LoopbackConnection(_) | ProtocolError::PeerAlreadyExists(_)) { debug!("P2P, handshake failed for inbound peer {}: {}", new_router, err); @@ -60,6 +61,8 @@ impl Hub { } HubEvent::PeerClosing(router) => { if let Occupied(entry) = self.peers.write().entry(router.key()) { + // We search for the router by identity, but make sure to delete it only if it's actually the same object. + // This is extremely important in cases of duplicate connection rejection etc. if Arc::ptr_eq(entry.get(), &router) { entry.remove_entry(); debug!("P2P, Hub event loop, removing peer, router-id: {}", router.identity()); diff --git a/protocol/p2p/src/core/router.rs b/protocol/p2p/src/core/router.rs index 824e778f4..1a7c769d1 100644 --- a/protocol/p2p/src/core/router.rs +++ b/protocol/p2p/src/core/router.rs @@ -1,8 +1,9 @@ use crate::core::hub::HubEvent; -use crate::pb::KaspadMessage; -use crate::Peer; +use crate::pb::RejectMessage; +use crate::pb::{kaspad_message::Payload as KaspadMessagePayload, KaspadMessage}; use crate::{common::ProtocolError, KaspadMessagePayloadType}; -use kaspa_core::{debug, error, info, trace}; +use crate::{make_message, Peer}; +use kaspa_core::{debug, error, info, trace, warn}; use kaspa_utils::networking::PeerId; use parking_lot::{Mutex, RwLock}; use seqlock::SeqLock; @@ -162,17 +163,25 @@ impl Router { match router.route_to_flow(msg) { Ok(()) => {}, Err(e) => { - info!("P2P, Router receive loop - route error {} for peer: {}", e, router); + match e { + ProtocolError::IgnorableReject(reason) => debug!("P2P, got reject message: {} from peer: {}", reason, router), + ProtocolError::Rejected(reason) => warn!("P2P, got reject message: {} from peer: {}", reason, router), + e => warn!("P2P, route error: {} for peer: {}", e, router), + } break; }, } } Ok(None) => { - info!("P2P, Router receive loop - incoming stream ended from peer {}", router); + info!("P2P, incoming stream ended from peer {}", router); break; } - Err(err) => { - info!("P2P, Router receive loop - network error: {} from peer {}", err, router); + Err(status) => { + if let Some(err) = match_for_io_error(&status) { + info!("P2P, network error: {} from peer {}", err, router); + } else { + info!("P2P, network error: {} from peer {}", status, router); + } break; } } @@ -283,6 +292,11 @@ impl Router { return Err(ProtocolError::Other("received kaspad p2p message with empty payload")); } let msg_type: KaspadMessagePayloadType = msg.payload.as_ref().expect("payload was just verified").into(); + // Handle the special case of a reject message ending the connection + if msg_type == KaspadMessagePayloadType::Reject { + let Some(KaspadMessagePayload::Reject(reject)) = msg.payload else { unreachable!() }; + return Err(ProtocolError::from_reject_message(reject.reason)); + } let op = self.routing_map.read().get(&msg_type).cloned(); if let Some(sender) = op { match sender.try_send(msg) { @@ -313,6 +327,15 @@ impl Router { } } + /// Based on the type of the protocol error, tries sending a reject message before shutting down the connection + pub async fn try_sending_reject_message(&self, err: &ProtocolError) { + if err.can_send_outgoing_message() { + // Send an explicit reject message for easier tracing of logical bugs causing protocol errors. + // No need to handle errors since we are closing anyway + let _ = self.enqueue(make_message!(KaspadMessagePayload::Reject, RejectMessage { reason: err.to_reject_message() })).await; + } + } + /// Closes the router, signals exit, and cleans up all resources so that underlying connections will be aborted correctly. /// Returns true of this is the first call to close pub async fn close(self: &Arc) -> bool { @@ -344,3 +367,26 @@ impl Router { true } } + +fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> { + let mut err: &(dyn std::error::Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + // h2::Error do not expose std::io::Error with `source()` + // https://github.com/hyperium/h2/pull/462 + if let Some(h2_err) = err.downcast_ref::() { + if let Some(io_err) = h2_err.get_io() { + return Some(io_err); + } + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} diff --git a/utils/src/any.rs b/utils/src/any.rs new file mode 100644 index 000000000..c45f97813 --- /dev/null +++ b/utils/src/any.rs @@ -0,0 +1,30 @@ +/// Provides a best-effort short type name +pub fn type_name_short() -> &'static str { + let s = std::any::type_name::(); + const SEP: &str = "::"; + if s.find('<').is_some() { + // T is generic so would require more sophisticated processing + return s; + } + match s.rfind(SEP) { + Some(i) => s.split_at(i + SEP.len()).1, + None => s, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::IpAddr; + + #[test] + fn test_type_name_short() { + assert_eq!("u64", type_name_short::()); + assert_eq!("IpAddr", type_name_short::()); + assert_eq!( + std::any::type_name::>(), + type_name_short::>(), + "generic types are expected to remain long" + ); + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index addeb2510..e159365ef 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,3 +1,4 @@ +pub mod any; pub mod arc; pub mod binary_heap; pub mod channel; From e4770c7ab8ba93461ea4a73d8df6557d76a5c3f4 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Fri, 23 Jun 2023 13:34:43 +0300 Subject: [PATCH 3/5] Fixes a bug with deleting level relations during pruning (#210) * compile-time safety for detecting batch writer * reproduce zero cache delete bug with a simple test * refactor and use staging relations store for delete_level_relations algo * comments * remove trait duplication --- consensus/src/model/stores/relations.rs | 33 +++++----- .../pipeline/pruning_processor/processor.rs | 9 ++- .../pipeline/virtual_processor/processor.rs | 2 +- consensus/src/processes/block_depth.rs | 3 +- consensus/src/processes/pruning_proof/mod.rs | 5 +- .../src/processes/reachability/inquirer.rs | 13 ++-- .../src/processes/reachability/tests/mod.rs | 4 +- consensus/src/processes/relations.rs | 63 +++++++++++++++++-- database/src/lib.rs | 2 +- database/src/writer.rs | 20 +++--- 10 files changed, 106 insertions(+), 48 deletions(-) diff --git a/consensus/src/model/stores/relations.rs b/consensus/src/model/stores/relations.rs index f0854a68d..8ac3dcb55 100644 --- a/consensus/src/model/stores/relations.rs +++ b/consensus/src/model/stores/relations.rs @@ -1,14 +1,13 @@ use itertools::Itertools; use kaspa_consensus_core::BlockHashSet; use kaspa_consensus_core::{blockhash::BlockHashes, BlockHashMap, BlockHasher, BlockLevel, HashMapCustomHasher}; -use kaspa_database::prelude::MemoryWriter; use kaspa_database::prelude::StoreError; use kaspa_database::prelude::DB; use kaspa_database::prelude::{BatchDbWriter, DbWriter}; use kaspa_database::prelude::{CachedDbAccess, DbKey, DirectDbWriter}; +use kaspa_database::prelude::{DirectWriter, MemoryWriter}; use kaspa_database::registry::{DatabaseStorePrefixes, SEPARATOR}; use kaspa_hashes::Hash; -use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard}; use rocksdb::WriteBatch; use std::sync::Arc; @@ -24,7 +23,7 @@ pub trait RelationsStoreReader { /// Low-level write API for `RelationsStore` pub trait RelationsStore: RelationsStoreReader { - type DefaultWriter: DbWriter; + type DefaultWriter: DirectWriter; fn default_writer(&self) -> Self::DefaultWriter; fn set_parents(&mut self, writer: impl DbWriter, hash: Hash, parents: BlockHashes) -> Result<(), StoreError>; @@ -109,34 +108,32 @@ impl RelationsStore for DbRelationsStore { } pub struct StagingRelationsStore<'a> { - store_read: RwLockUpgradableReadGuard<'a, DbRelationsStore>, + store: &'a mut DbRelationsStore, staging_parents_writes: BlockHashMap, staging_children_writes: BlockHashMap, staging_deletions: BlockHashSet, } impl<'a> StagingRelationsStore<'a> { - pub fn new(store_read: RwLockUpgradableReadGuard<'a, DbRelationsStore>) -> Self { + pub fn new(store: &'a mut DbRelationsStore) -> Self { Self { - store_read, + store, staging_parents_writes: Default::default(), staging_children_writes: Default::default(), staging_deletions: Default::default(), } } - pub fn commit(self, batch: &mut WriteBatch) -> Result, StoreError> { - let store_write = RwLockUpgradableReadGuard::upgrade(self.store_read); + pub fn commit(self, batch: &mut WriteBatch) -> Result<(), StoreError> { for (k, v) in self.staging_parents_writes { - store_write.parents_access.write(BatchDbWriter::new(batch), k, v)? + self.store.parents_access.write(BatchDbWriter::new(batch), k, v)? } for (k, v) in self.staging_children_writes { - store_write.children_access.write(BatchDbWriter::new(batch), k, v)? + self.store.children_access.write(BatchDbWriter::new(batch), k, v)? } // Deletions always come after mutations - store_write.parents_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?; - store_write.children_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?; - Ok(store_write) + self.store.parents_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?; + self.store.children_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied()) } fn check_not_in_deletions(&self, hash: Hash) -> Result<(), StoreError> { @@ -179,7 +176,7 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { if let Some(data) = self.staging_parents_writes.get(&hash) { Ok(BlockHashes::clone(data)) } else { - self.store_read.get_parents(hash) + self.store.get_parents(hash) } } @@ -188,7 +185,7 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { if let Some(data) = self.staging_children_writes.get(&hash) { Ok(BlockHashes::clone(data)) } else { - self.store_read.get_children(hash) + self.store.get_children(hash) } } @@ -196,12 +193,12 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { if self.staging_deletions.contains(&hash) { return Ok(false); } - Ok(self.staging_parents_writes.contains_key(&hash) || self.store_read.has(hash)?) + Ok(self.staging_parents_writes.contains_key(&hash) || self.store.has(hash)?) } fn counts(&self) -> Result<(usize, usize), StoreError> { Ok(( - self.store_read + self.store .parents_access .iterator() .map(|r| r.unwrap().0) @@ -211,7 +208,7 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { .collect::() .difference(&self.staging_deletions) .count(), - self.store_read + self.store .children_access .iterator() .map(|r| r.unwrap().0) diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index 247aa5c84..738015354 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -344,7 +344,8 @@ impl PruningProcessor { if !keep_blocks.contains(¤t) { let mut batch = WriteBatch::default(); let mut level_relations_write = self.relations_stores.write(); - let mut staging_relations = StagingRelationsStore::new(self.reachability_relations_store.upgradable_read()); + let mut reachability_relations_write = self.reachability_relations_store.write(); + let mut staging_relations = StagingRelationsStore::new(&mut reachability_relations_write); let mut staging_reachability = StagingReachabilityStore::new(reachability_read); let mut statuses_write = self.statuses_store.write(); @@ -370,8 +371,10 @@ impl PruningProcessor { // TODO: consider adding block level to compact header data let block_level = self.headers_store.get_header_with_block_level(current).unwrap().block_level; (0..=block_level as usize).for_each(|level| { - relations::delete_level_relations(BatchDbWriter::new(&mut batch), &mut level_relations_write[level], current) + let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[level]); + relations::delete_level_relations(MemoryWriter::default(), &mut staging_level_relations, current) .unwrap_option(); + staging_level_relations.commit(&mut batch).unwrap(); self.ghostdag_stores[level].delete_batch(&mut batch, current).unwrap_option(); }); @@ -388,7 +391,7 @@ impl PruningProcessor { } let reachability_write = staging_reachability.commit(&mut batch).unwrap(); - let reachability_relations_write = staging_relations.commit(&mut batch).unwrap(); + staging_relations.commit(&mut batch).unwrap(); // Flush the batch to the DB self.db.write(batch).unwrap(); diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index d6a3caf24..e36d7a4e9 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -508,7 +508,7 @@ impl VirtualStateProcessor { // All blocks with lower blue work than filtering_root are: // 1. not in its future (bcs blue work is monotonic), // 2. will be removed eventually by the bounded merge check. - // So we prefer doing it in advance to allow better tips to be considered. + // Hence as an optimization we prefer removing such blocks in advance to allow valid tips to be considered. let filtering_root = self.depth_store.merge_depth_root(candidate).unwrap(); let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default(); return ( diff --git a/consensus/src/processes/block_depth.rs b/consensus/src/processes/block_depth.rs index 828d24b83..24d948f70 100644 --- a/consensus/src/processes/block_depth.rs +++ b/consensus/src/processes/block_depth.rs @@ -64,7 +64,8 @@ impl Bl }; // In this case we expect the pruning point or a block above it to be the block at depth. - // Note that above we already verified the chain and distance conditions for this + // Note that above we already verified the chain and distance conditions for this. + // Additionally observe that if `current` is a valid hash it must not be pruned for the same reason. if current == ORIGIN { current = pruning_point; } diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index 01a58f439..28a006338 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -325,8 +325,9 @@ impl PruningProofManager { // Prepare batch let mut batch = WriteBatch::default(); + let mut reachability_relations_write = self.reachability_relations_store.write(); let mut staging_reachability = StagingReachabilityStore::new(reachability_read); - let mut staging_reachability_relations = StagingRelationsStore::new(self.reachability_relations_store.upgradable_read()); + let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write); // Stage staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap(); @@ -340,7 +341,7 @@ impl PruningProofManager { // Commit let reachability_write = staging_reachability.commit(&mut batch).unwrap(); - let reachability_relations_write = staging_reachability_relations.commit(&mut batch).unwrap(); + staging_reachability_relations.commit(&mut batch).unwrap(); // Write self.db.write(batch).unwrap(); diff --git a/consensus/src/processes/reachability/inquirer.rs b/consensus/src/processes/reachability/inquirer.rs index 310541018..6364bc9fd 100644 --- a/consensus/src/processes/reachability/inquirer.rs +++ b/consensus/src/processes/reachability/inquirer.rs @@ -391,8 +391,9 @@ mod tests { // Add blocks via a staging store { + let mut relations_write = relations.write(); let mut staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read()); - let mut staging_relations = StagingRelationsStore::new(relations.upgradable_read()); + let mut staging_relations = StagingRelationsStore::new(&mut relations_write); let mut builder = DagBuilder::new(&mut staging_reachability, &mut staging_relations); builder.init(); builder.add_block(DagBlock::new(test.genesis.into(), vec![ORIGIN])); @@ -404,7 +405,7 @@ mod tests { { let mut batch = WriteBatch::default(); let reachability_write = staging_reachability.commit(&mut batch).unwrap(); - let relations_write = staging_relations.commit(&mut batch).unwrap(); + staging_relations.commit(&mut batch).unwrap(); db.write(batch).unwrap(); drop(reachability_write); drop(relations_write); @@ -443,8 +444,9 @@ mod tests { drop(relations_read); let mut batch = WriteBatch::default(); + let mut relations_write = relations.write(); let mut staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read()); - let mut staging_relations = StagingRelationsStore::new(relations.upgradable_read()); + let mut staging_relations = StagingRelationsStore::new(&mut relations_write); for (i, block) in test.ids().choose_multiple(&mut rand::thread_rng(), test.blocks.len()).into_iter().chain(once(test.genesis)).enumerate() @@ -460,7 +462,7 @@ mod tests { // Commit the staging changes { let reachability_write = staging_reachability.commit(&mut batch).unwrap(); - let relations_write = staging_relations.commit(&mut batch).unwrap(); + staging_relations.commit(&mut batch).unwrap(); db.write(batch).unwrap(); drop(reachability_write); drop(relations_write); @@ -483,8 +485,9 @@ mod tests { // Recapture staging stores batch = WriteBatch::default(); + relations_write = relations.write(); staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read()); - staging_relations = StagingRelationsStore::new(relations.upgradable_read()); + staging_relations = StagingRelationsStore::new(&mut relations_write); } } } diff --git a/consensus/src/processes/reachability/tests/mod.rs b/consensus/src/processes/reachability/tests/mod.rs index dc1c71964..e2317ed17 100644 --- a/consensus/src/processes/reachability/tests/mod.rs +++ b/consensus/src/processes/reachability/tests/mod.rs @@ -18,7 +18,7 @@ use kaspa_consensus_core::{ blockhash::{BlockHashExtensions, BlockHashes, ORIGIN}, BlockHashMap, BlockHashSet, }; -use kaspa_database::prelude::{DbWriter, StoreError}; +use kaspa_database::prelude::{DirectWriter, StoreError}; use kaspa_hashes::Hash; use std::collections::{ hash_map::Entry::{Occupied, Vacant}, @@ -120,7 +120,7 @@ impl<'a, T: ReachabilityStore + ?Sized, S: RelationsStore + ?Sized> DagBuilder<' self.delete_block_with_writer(self.relations.default_writer(), hash) } - pub fn delete_block_with_writer(&mut self, writer: impl DbWriter, hash: Hash) -> &mut Self { + pub fn delete_block_with_writer(&mut self, writer: impl DirectWriter, hash: Hash) -> &mut Self { let mergeset = delete_reachability_relations(writer, self.relations, self.reachability, hash); delete_block(self.reachability, hash, &mut mergeset.iter().cloned()).unwrap(); self diff --git a/consensus/src/processes/relations.rs b/consensus/src/processes/relations.rs index b2d864132..662af3ada 100644 --- a/consensus/src/processes/relations.rs +++ b/consensus/src/processes/relations.rs @@ -5,7 +5,7 @@ use kaspa_consensus_core::{ blockhash::{BlockHashIteratorExtensions, BlockHashes, ORIGIN}, BlockHashSet, }; -use kaspa_database::prelude::{BatchDbWriter, DbWriter, StoreError}; +use kaspa_database::prelude::{BatchDbWriter, DbWriter, DirectWriter, StoreError}; use kaspa_hashes::Hash; use rocksdb::WriteBatch; @@ -21,9 +21,12 @@ pub fn init(relations: &mut S) { /// kept topologically continuous. If any child of this `hash` will remain with no parent, we make /// sure to connect it to `origin`. Note that apart from the special case of `origin`, these relations /// are always a subset of the original header relations for this level. +/// +/// NOTE: this algorithm does not support a batch writer bcs it might write to the same entry multiple times +/// (and writes will not accumulate if the entry gets out of the cache in between the calls) pub fn delete_level_relations(mut writer: W, relations: &mut S, hash: Hash) -> Result<(), StoreError> where - W: DbWriter, + W: DirectWriter, S: RelationsStore + ?Sized, { let children = relations.get_children(hash)?; // if the first entry was found, we expect all others as well, hence we unwrap below @@ -46,12 +49,10 @@ where /// (and writes will not accumulate if the entry gets out of the cache in between the calls) pub fn delete_reachability_relations(mut writer: W, relations: &mut S, reachability: &U, hash: Hash) -> BlockHashSet where - W: DbWriter, + W: DirectWriter, S: RelationsStore + ?Sized, U: ReachabilityService + ?Sized, { - assert!(!W::IS_BATCH, "batch writes are not supported for this algo, see doc."); - let selected_parent = reachability.get_chain_parent(hash); let parents = relations.get_parents(hash).unwrap(); let children = relations.get_children(hash).unwrap(); @@ -157,3 +158,55 @@ pub trait RelationsStoreExtensions: RelationsStore { } impl RelationsStoreExtensions for S {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::model::stores::relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore}; + use kaspa_core::assert_match; + use kaspa_database::{prelude::MemoryWriter, utils::create_temp_db}; + use std::sync::Arc; + + #[test] + fn test_delete_level_relations_zero_cache() { + let (_lifetime, db) = create_temp_db(); + let cache_size = 0; + let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size); + relations.insert(ORIGIN, Default::default()).unwrap(); + relations.insert(1.into(), Arc::new(vec![ORIGIN])).unwrap(); + relations.insert(2.into(), Arc::new(vec![1.into()])).unwrap(); + + assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); + assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [1.into()]); + assert_eq!(relations.get_parents(1.into()).unwrap().as_slice(), [ORIGIN]); + assert_eq!(relations.get_children(1.into()).unwrap().as_slice(), [2.into()]); + assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [1.into()]); + assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []); + + let mut batch = WriteBatch::default(); + let mut staging_relations = StagingRelationsStore::new(&mut relations); + delete_level_relations(MemoryWriter::default(), &mut staging_relations, 1.into()).unwrap(); + staging_relations.commit(&mut batch).unwrap(); + db.write(batch).unwrap(); + + assert_match!(relations.get_parents(1.into()), Err(StoreError::KeyNotFound(_))); + assert_match!(relations.get_children(1.into()), Err(StoreError::KeyNotFound(_))); + + assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); + assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [2.into()]); + assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [ORIGIN]); + assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []); + + let mut batch = WriteBatch::default(); + let mut staging_relations = StagingRelationsStore::new(&mut relations); + delete_level_relations(MemoryWriter::default(), &mut staging_relations, 2.into()).unwrap(); + staging_relations.commit(&mut batch).unwrap(); + db.write(batch).unwrap(); + + assert_match!(relations.get_parents(2.into()), Err(StoreError::KeyNotFound(_))); + assert_match!(relations.get_children(2.into()), Err(StoreError::KeyNotFound(_))); + + assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); + assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), []); + } +} diff --git a/database/src/lib.rs b/database/src/lib.rs index a398e021a..c079ece21 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -16,7 +16,7 @@ pub mod prelude { pub use super::cache::Cache; pub use super::item::CachedDbItem; pub use super::key::DbKey; - pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, MemoryWriter}; + pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, DirectWriter, MemoryWriter}; pub use db::{delete_db, open_db, DB}; pub use errors::{StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions}; } diff --git a/database/src/writer.rs b/database/src/writer.rs index bb2954b12..3d64f0458 100644 --- a/database/src/writer.rs +++ b/database/src/writer.rs @@ -5,8 +5,6 @@ use crate::prelude::DB; /// Abstraction over direct/batched DB writing pub trait DbWriter { - const IS_BATCH: bool; - fn put(&mut self, key: K, value: V) -> Result<(), rocksdb::Error> where K: AsRef<[u8]>, @@ -14,6 +12,10 @@ pub trait DbWriter { fn delete>(&mut self, key: K) -> Result<(), rocksdb::Error>; } +/// A trait which is intentionally not implemented for the batch writer. +/// Aimed for compile-time safety of operations which do not support batch writing semantics +pub trait DirectWriter: DbWriter {} + pub struct DirectDbWriter<'a> { db: Refs<'a, DB>, } @@ -29,8 +31,6 @@ impl<'a> DirectDbWriter<'a> { } impl DbWriter for DirectDbWriter<'_> { - const IS_BATCH: bool = false; - fn put(&mut self, key: K, value: V) -> Result<(), rocksdb::Error> where K: AsRef<[u8]>, @@ -44,6 +44,8 @@ impl DbWriter for DirectDbWriter<'_> { } } +impl DirectWriter for DirectDbWriter<'_> {} + pub struct BatchDbWriter<'a> { batch: &'a mut WriteBatch, } @@ -55,8 +57,6 @@ impl<'a> BatchDbWriter<'a> { } impl DbWriter for BatchDbWriter<'_> { - const IS_BATCH: bool = true; - fn put(&mut self, key: K, value: V) -> Result<(), rocksdb::Error> where K: AsRef<[u8]>, @@ -73,8 +73,6 @@ impl DbWriter for BatchDbWriter<'_> { } impl DbWriter for &mut T { - const IS_BATCH: bool = T::IS_BATCH; - #[inline] fn put(&mut self, key: K, value: V) -> Result<(), rocksdb::Error> where @@ -90,13 +88,13 @@ impl DbWriter for &mut T { } } +impl DirectWriter for &mut T {} + /// A writer for memory stores which writes nothing to the DB #[derive(Default)] pub struct MemoryWriter; impl DbWriter for MemoryWriter { - const IS_BATCH: bool = false; - fn put(&mut self, _key: K, _value: V) -> Result<(), rocksdb::Error> where K: AsRef<[u8]>, @@ -109,3 +107,5 @@ impl DbWriter for MemoryWriter { Ok(()) } } + +impl DirectWriter for MemoryWriter {} From 3d2f0009adce7a3690b4f8bee6c23d006032803d Mon Sep 17 00:00:00 2001 From: Tiram <18632023+tiram88@users.noreply.github.com> Date: Fri, 23 Jun 2023 13:04:27 +0200 Subject: [PATCH 4/5] Last High-BPS testnet requirements (for the coming kaspa-testnet-11) (#205) * Add NetworkId::iter() * In `Params`, make `deflationary_phase_daa_score` and `pre_deflationary_phase_base_subsidy` BPS dependent * Add a DNS seeder to testnet-11 * Make `CoinbaseManager` BPS dependent * Test pre-deflationary phase subsidy * Address review * Bump version to 0.1.1 --- Cargo.lock | 90 +++++----- Cargo.toml | 52 +++--- consensus/core/src/config/bps.rs | 16 ++ consensus/core/src/config/params.rs | 16 +- consensus/core/src/networktype.rs | 11 ++ consensus/pow/Cargo.toml | 7 +- consensus/src/consensus/services.rs | 1 + consensus/src/processes/coinbase.rs | 256 +++++++++++++++++++--------- protocol/flows/Cargo.toml | 7 +- rpc/wrpc/client/Cargo.toml | 7 +- rpc/wrpc/proxy/Cargo.toml | 7 +- rpc/wrpc/server/Cargo.toml | 7 +- wallet/bip32/Cargo.toml | 7 +- wallet/cli/Cargo.toml | 7 +- wallet/core/Cargo.toml | 7 +- wallet/native/Cargo.toml | 7 +- wallet/wasm/Cargo.toml | 7 +- 17 files changed, 336 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de12047c6..aef9811ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1540,7 +1540,7 @@ dependencies = [ [[package]] name = "kaspa-addresses" -version = "0.1.0" +version = "0.1.1" dependencies = [ "borsh", "criterion", @@ -1551,7 +1551,7 @@ dependencies = [ [[package]] name = "kaspa-addressmanager" -version = "0.1.0" +version = "0.1.1" dependencies = [ "borsh", "itertools 0.10.5", @@ -1567,7 +1567,7 @@ dependencies = [ [[package]] name = "kaspa-bip32" -version = "0.1.0" +version = "0.1.1" dependencies = [ "bs58", "faster-hex", @@ -1586,7 +1586,7 @@ dependencies = [ [[package]] name = "kaspa-connectionmanager" -version = "0.1.0" +version = "0.1.1" dependencies = [ "duration-string", "futures-util", @@ -1603,7 +1603,7 @@ dependencies = [ [[package]] name = "kaspa-consensus" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "bincode", @@ -1645,7 +1645,7 @@ dependencies = [ [[package]] name = "kaspa-consensus-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "borsh", "cfg-if", @@ -1667,7 +1667,7 @@ dependencies = [ [[package]] name = "kaspa-consensus-notify" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "cfg-if", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "kaspa-consensusmanager" -version = "0.1.0" +version = "0.1.1" dependencies = [ "duration-string", "futures", @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "kaspa-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "cfg-if", "ctrlc", @@ -1722,7 +1722,7 @@ dependencies = [ [[package]] name = "kaspa-database" -version = "0.1.0" +version = "0.1.1" dependencies = [ "bincode", "enum-primitive-derive", @@ -1744,7 +1744,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-client" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-stream", @@ -1771,7 +1771,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-stream", @@ -1798,7 +1798,7 @@ dependencies = [ [[package]] name = "kaspa-grpc-server" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-stream", @@ -1828,7 +1828,7 @@ dependencies = [ [[package]] name = "kaspa-hashes" -version = "0.1.0" +version = "0.1.1" dependencies = [ "blake2b_simd", "borsh", @@ -1845,7 +1845,7 @@ dependencies = [ [[package]] name = "kaspa-index-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -1864,7 +1864,7 @@ dependencies = [ [[package]] name = "kaspa-index-processor" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -1892,7 +1892,7 @@ dependencies = [ [[package]] name = "kaspa-math" -version = "0.1.0" +version = "0.1.1" dependencies = [ "borsh", "criterion", @@ -1905,14 +1905,14 @@ dependencies = [ [[package]] name = "kaspa-merkle" -version = "0.1.0" +version = "0.1.1" dependencies = [ "kaspa-hashes", ] [[package]] name = "kaspa-mining" -version = "0.1.0" +version = "0.1.1" dependencies = [ "criterion", "futures-util", @@ -1935,7 +1935,7 @@ dependencies = [ [[package]] name = "kaspa-mining-errors" -version = "0.1.0" +version = "0.1.1" dependencies = [ "kaspa-consensus-core", "thiserror", @@ -1943,7 +1943,7 @@ dependencies = [ [[package]] name = "kaspa-muhash" -version = "0.1.0" +version = "0.1.1" dependencies = [ "criterion", "kaspa-hashes", @@ -1956,7 +1956,7 @@ dependencies = [ [[package]] name = "kaspa-notify" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -1983,7 +1983,7 @@ dependencies = [ [[package]] name = "kaspa-p2p-flows" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "futures", @@ -2012,7 +2012,7 @@ dependencies = [ [[package]] name = "kaspa-p2p-lib" -version = "0.1.0" +version = "0.1.1" dependencies = [ "borsh", "ctrlc", @@ -2041,7 +2041,7 @@ dependencies = [ [[package]] name = "kaspa-pow" -version = "0.1.0" +version = "0.1.1" dependencies = [ "criterion", "kaspa-consensus-core", @@ -2051,7 +2051,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -2080,7 +2080,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-macros" -version = "0.1.0" +version = "0.1.1" dependencies = [ "convert_case 0.5.0", "proc-macro-error", @@ -2092,7 +2092,7 @@ dependencies = [ [[package]] name = "kaspa-rpc-service" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "kaspa-addresses", @@ -2116,7 +2116,7 @@ dependencies = [ [[package]] name = "kaspa-testing-integration" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "bincode", @@ -2161,7 +2161,7 @@ dependencies = [ [[package]] name = "kaspa-txscript" -version = "0.1.0" +version = "0.1.1" dependencies = [ "blake2b_simd", "borsh", @@ -2185,7 +2185,7 @@ dependencies = [ [[package]] name = "kaspa-txscript-errors" -version = "0.1.0" +version = "0.1.1" dependencies = [ "secp256k1", "thiserror", @@ -2193,7 +2193,7 @@ dependencies = [ [[package]] name = "kaspa-utils" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "async-trait", @@ -2211,7 +2211,7 @@ dependencies = [ [[package]] name = "kaspa-utxoindex" -version = "0.1.0" +version = "0.1.1" dependencies = [ "futures", "kaspa-consensus", @@ -2232,7 +2232,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-cli" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "convert_case 0.5.0", @@ -2249,7 +2249,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-cli-native" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-std", "async-trait", @@ -2261,7 +2261,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-cli-wasm" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "js-sys", @@ -2275,7 +2275,7 @@ dependencies = [ [[package]] name = "kaspa-wallet-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "faster-hex", @@ -2299,7 +2299,7 @@ dependencies = [ [[package]] name = "kaspa-wasm" -version = "0.1.0" +version = "0.1.1" dependencies = [ "kaspa-addresses", "kaspa-core", @@ -2312,7 +2312,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-client" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-std", "async-trait", @@ -2337,7 +2337,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-proxy" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "clap 4.1.8", @@ -2356,7 +2356,7 @@ dependencies = [ [[package]] name = "kaspa-wrpc-server" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-trait", "borsh", @@ -2382,14 +2382,14 @@ dependencies = [ [[package]] name = "kaspa-wrpc-wasm" -version = "0.1.0" +version = "0.1.1" dependencies = [ "kaspa-wrpc-client", ] [[package]] name = "kaspad" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "clap 4.1.8", @@ -3223,7 +3223,7 @@ dependencies = [ [[package]] name = "rothschild" -version = "0.1.0" +version = "0.1.1" dependencies = [ "clap 4.1.8", "faster-hex", @@ -3536,7 +3536,7 @@ dependencies = [ [[package]] name = "simpa" -version = "0.1.0" +version = "0.1.1" dependencies = [ "async-channel", "clap 4.1.8", diff --git a/Cargo.toml b/Cargo.toml index 69ef4ec71..a364b7d95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ members = [ ] [workspace.package] -version = "0.1.0" +version = "0.1.1" authors = ["Kaspa developers"] license = "MIT/Apache-2.0" edition = "2021" @@ -65,33 +65,33 @@ kaspa-p2p-flows = { path = "protocol/flows" } kaspa-p2p-lib = { path = "protocol/p2p" } kaspa-testing-integration = { path = "testing/integration" } kaspa-utxoindex = { path = "indexes/utxoindex" } -kaspa-rpc-service = { version = "0.1.0", path = "rpc/service" } +kaspa-rpc-service = { version = "0.1.1", path = "rpc/service" } # published -kaspa-addresses = { version = "0.1.0", path = "crypto/addresses" } -kaspa-bip32 = { version = "0.1.0", path = "wallet/bip32" } -kaspa-consensus = { version = "0.1.0", path = "consensus" } -kaspa-consensus-core = { version = "0.1.0", path = "consensus/core" } -kaspa-consensus-notify = { version = "0.1.0", path = "consensus/notify" } -kaspa-core = { version = "0.1.0", path = "core" } -kaspa-database = { version = "0.1.0", path = "database" } -kaspa-grpc-client = { version = "0.1.0", path = "rpc/grpc/client" } -kaspa-grpc-core = { version = "0.1.0", path = "rpc/grpc/core" } -kaspa-hashes = { version = "0.1.0", path = "crypto/hashes" } -kaspa-math = { version = "0.1.0", path = "math" } -kaspa-merkle = { version = "0.1.0", path = "crypto/merkle" } -kaspa-muhash = { version = "0.1.0", path = "crypto/muhash" } -kaspa-notify = { version = "0.1.0", path = "notify" } -kaspa-pow = { version = "0.1.0", path = "consensus/pow" } -kaspa-rpc-core = { version = "0.1.0", path = "rpc/core" } -kaspa-rpc-macros = { version = "0.1.0", path = "rpc/macros" } -kaspa-txscript = { version = "0.1.0", path = "crypto/txscript" } -kaspa-txscript-errors = { version = "0.1.0", path = "crypto/txscript/errors" } -kaspa-utils = { version = "0.1.0", path = "utils" } -kaspa-wallet-core = { version = "0.1.0", path = "wallet/core" } -kaspa-wasm = { version = "0.1.0", path = "wasm" } -kaspa-wrpc-client = { version = "0.1.0", path = "rpc/wrpc/client" } -kaspa-wrpc-wasm = { version = "0.1.0", path = "rpc/wrpc/wasm" } +kaspa-addresses = { version = "0.1.1", path = "crypto/addresses" } +kaspa-bip32 = { version = "0.1.1", path = "wallet/bip32" } +kaspa-consensus = { version = "0.1.1", path = "consensus" } +kaspa-consensus-core = { version = "0.1.1", path = "consensus/core" } +kaspa-consensus-notify = { version = "0.1.1", path = "consensus/notify" } +kaspa-core = { version = "0.1.1", path = "core" } +kaspa-database = { version = "0.1.1", path = "database" } +kaspa-grpc-client = { version = "0.1.1", path = "rpc/grpc/client" } +kaspa-grpc-core = { version = "0.1.1", path = "rpc/grpc/core" } +kaspa-hashes = { version = "0.1.1", path = "crypto/hashes" } +kaspa-math = { version = "0.1.1", path = "math" } +kaspa-merkle = { version = "0.1.1", path = "crypto/merkle" } +kaspa-muhash = { version = "0.1.1", path = "crypto/muhash" } +kaspa-notify = { version = "0.1.1", path = "notify" } +kaspa-pow = { version = "0.1.1", path = "consensus/pow" } +kaspa-rpc-core = { version = "0.1.1", path = "rpc/core" } +kaspa-rpc-macros = { version = "0.1.1", path = "rpc/macros" } +kaspa-txscript = { version = "0.1.1", path = "crypto/txscript" } +kaspa-txscript-errors = { version = "0.1.1", path = "crypto/txscript/errors" } +kaspa-utils = { version = "0.1.1", path = "utils" } +kaspa-wallet-core = { version = "0.1.1", path = "wallet/core" } +kaspa-wasm = { version = "0.1.1", path = "wasm" } +kaspa-wrpc-client = { version = "0.1.1", path = "rpc/wrpc/client" } +kaspa-wrpc-wasm = { version = "0.1.1", path = "rpc/wrpc/wasm" } # not published kaspa-grpc-server = { path = "rpc/grpc/server" } diff --git a/consensus/core/src/config/bps.rs b/consensus/core/src/config/bps.rs index 087fd03aa..84cf5be5b 100644 --- a/consensus/core/src/config/bps.rs +++ b/consensus/core/src/config/bps.rs @@ -105,6 +105,22 @@ impl Bps { BPS * LEGACY_COINBASE_MATURITY } + /// DAA score after which the pre-deflationary period switches to the deflationary period. + /// + /// This number is calculated as follows: + /// + /// - We define a year as 365.25 days + /// - Half a year in seconds = 365.25 / 2 * 24 * 60 * 60 = 15778800 + /// - The network was down for three days shortly after launch + /// - Three days in seconds = 3 * 24 * 60 * 60 = 259200 + pub const fn deflationary_phase_daa_score() -> u64 { + BPS * (15778800 - 259200) + } + + pub const fn pre_deflationary_phase_base_subsidy() -> u64 { + 50000000000 / BPS + } + // TODO: we might need to increase max_block_level (at least for mainnet) as a function of BPS // since higher BPS means easier difficulty puzzles -> less zeros in pow hash // pub const fn max_block_level() -> u64 { } diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index 164687139..d8c4170c2 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -75,7 +75,10 @@ pub struct Params { pub mass_per_script_pub_key_byte: u64, pub mass_per_sig_op: u64, pub max_block_mass: u64, + + /// DAA score after which the pre-deflationary period switches to the deflationary period pub deflationary_phase_daa_score: u64, + pub pre_deflationary_phase_base_subsidy: u64, pub coinbase_maturity: u64, pub skip_proof_of_work: bool, @@ -394,7 +397,8 @@ pub const TESTNET_PARAMS: Params = Params { pub const TESTNET11_PARAMS: Params = Params { dns_seeders: &[ - // No seeders yet + // This DNS seeder is run by Tiram + "seeder1-testnet-11.kaspad.net", ], net: NetworkId::with_suffix(NetworkType::Testnet, 11), genesis: TESTNET11_GENESIS, @@ -421,6 +425,8 @@ pub const TESTNET11_PARAMS: Params = Params { finality_depth: Testnet11Bps::finality_depth(), pruning_depth: Testnet11Bps::pruning_depth(), pruning_proof_m: Testnet11Bps::pruning_proof_m(), + deflationary_phase_daa_score: Testnet11Bps::deflationary_phase_daa_score(), + pre_deflationary_phase_base_subsidy: Testnet11Bps::pre_deflationary_phase_base_subsidy(), coinbase_maturity: Testnet11Bps::coinbase_maturity(), coinbase_payload_script_public_key_max_len: 150, @@ -440,14 +446,6 @@ pub const TESTNET11_PARAMS: Params = Params { mass_per_sig_op: 1000, max_block_mass: 500_000, - // deflationary_phase_daa_score is the DAA score after which the pre-deflationary period - // switches to the deflationary period. This number is calculated as follows: - // We define a year as 365.25 days - // Half a year in seconds = 365.25 / 2 * 24 * 60 * 60 = 15778800 - // The network was down for three days shortly after launch - // Three days in seconds = 3 * 24 * 60 * 60 = 259200 - deflationary_phase_daa_score: 15778800 - 259200, - pre_deflationary_phase_base_subsidy: 50000000000, skip_proof_of_work: false, max_block_level: 250, }; diff --git a/consensus/core/src/networktype.rs b/consensus/core/src/networktype.rs index ce63a5ed4..ab06ef8de 100644 --- a/consensus/core/src/networktype.rs +++ b/consensus/core/src/networktype.rs @@ -140,6 +140,17 @@ impl NetworkId { NetworkType::Devnet => 16611, } } + + pub fn iter() -> impl Iterator { + static NETWORK_IDS: [NetworkId; 5] = [ + NetworkId::new(NetworkType::Mainnet), + NetworkId::with_suffix(NetworkType::Testnet, 10), + NetworkId::with_suffix(NetworkType::Testnet, 11), + NetworkId::new(NetworkType::Devnet), + NetworkId::new(NetworkType::Simnet), + ]; + NETWORK_IDS.iter().copied() + } } impl Deref for NetworkId { diff --git a/consensus/pow/Cargo.toml b/consensus/pow/Cargo.toml index 4cb499dfc..4427c235c 100644 --- a/consensus/pow/Cargo.toml +++ b/consensus/pow/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-pow" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/consensus/src/consensus/services.rs b/consensus/src/consensus/services.rs index c1dfe3f1e..347a96593 100644 --- a/consensus/src/consensus/services.rs +++ b/consensus/src/consensus/services.rs @@ -129,6 +129,7 @@ impl ConsensusServices { params.max_coinbase_payload_len, params.deflationary_phase_daa_score, params.pre_deflationary_phase_base_subsidy, + params.target_time_per_block, ); let mass_calculator = diff --git a/consensus/src/processes/coinbase.rs b/consensus/src/processes/coinbase.rs index 30c3c86af..7e72b8453 100644 --- a/consensus/src/processes/coinbase.rs +++ b/consensus/src/processes/coinbase.rs @@ -17,12 +17,26 @@ const LENGTH_OF_SCRIPT_PUB_KEY_LENGTH: usize = size_of::(); const MIN_PAYLOAD_LENGTH: usize = LENGTH_OF_BLUE_SCORE + LENGTH_OF_SUBSIDY + LENGTH_OF_SCRIPT_PUB_KEY_VERSION + LENGTH_OF_SCRIPT_PUB_KEY_LENGTH; +// We define a year as 365.25 days and a month as 365.25 / 12 = 30.4375 +// SECONDS_PER_MONTH = 30.4375 * 24 * 60 * 60 +const SECONDS_PER_MONTH: u64 = 2629800; + +pub const SUBSIDY_BY_MONTH_TABLE_SIZE: usize = 426; +pub type SubsidyByMonthTable = [u64; SUBSIDY_BY_MONTH_TABLE_SIZE]; + #[derive(Clone)] pub struct CoinbaseManager { coinbase_payload_script_public_key_max_len: u8, max_coinbase_payload_len: usize, deflationary_phase_daa_score: u64, pre_deflationary_phase_base_subsidy: u64, + target_time_per_block: u64, + + /// Precomputed number of blocks per month + blocks_per_month: u64, + + /// Precomputed subsidy by month table + subsidy_by_month_table: SubsidyByMonthTable, } /// Struct used to streamline payload parsing @@ -49,15 +63,33 @@ impl CoinbaseManager { max_coinbase_payload_len: usize, deflationary_phase_daa_score: u64, pre_deflationary_phase_base_subsidy: u64, + target_time_per_block: u64, ) -> Self { + assert!(1000 % target_time_per_block == 0); + let bps = 1000 / target_time_per_block; + let blocks_per_month = SECONDS_PER_MONTH * bps; + + // Precomputed subsidy by month table for the actual block per second rate + // Here values are rounded up so that we keep the same number of rewarding months as in the original 1 BPS table. + // In a 10 BPS network, the induced increase in total rewards is 51 KAS (see tests::calc_high_bps_total_rewards_delta()) + let subsidy_by_month_table: SubsidyByMonthTable = core::array::from_fn(|i| (SUBSIDY_BY_MONTH_TABLE[i] + bps - 1) / bps); Self { coinbase_payload_script_public_key_max_len, max_coinbase_payload_len, deflationary_phase_daa_score, pre_deflationary_phase_base_subsidy, + target_time_per_block, + blocks_per_month, + subsidy_by_month_table, } } + #[cfg(test)] + #[inline] + pub fn bps(&self) -> u64 { + 1000 / self.target_time_per_block + } + pub fn expected_coinbase_transaction>( &self, daa_score: u64, @@ -182,9 +214,20 @@ impl CoinbaseManager { return self.pre_deflationary_phase_base_subsidy; } - // We define a year as 365.25 days and a month as 365.25 / 12 = 30.4375 - // SECONDS_PER_MONTH = 30.4375 * 24 * 60 * 60 - const SECONDS_PER_MONTH: u64 = 2629800; + let months_since_deflationary_phase_started = + ((daa_score - self.deflationary_phase_daa_score) / self.blocks_per_month) as usize; + if months_since_deflationary_phase_started >= self.subsidy_by_month_table.len() { + *(self.subsidy_by_month_table).last().unwrap() + } else { + self.subsidy_by_month_table[months_since_deflationary_phase_started] + } + } + + #[cfg(test)] + pub fn legacy_calc_block_subsidy(&self, daa_score: u64) -> u64 { + if daa_score < self.deflationary_phase_daa_score { + return self.pre_deflationary_phase_base_subsidy; + } // Note that this calculation implicitly assumes that block per second = 1 (by assuming daa score diff is in second units). let months_since_deflationary_phase_started = (daa_score - self.deflationary_phase_daa_score) / SECONDS_PER_MONTH; @@ -200,7 +243,8 @@ impl CoinbaseManager { /* This table was pre-calculated by calling `calcDeflationaryPeriodBlockSubsidyFloatCalc` (in kaspad-go) for all months until reaching 0 subsidy. - To regenerate this table, run `TestBuildSubsidyTable` in coinbasemanager_test.go (note the `deflationaryPhaseBaseSubsidy` therein) + To regenerate this table, run `TestBuildSubsidyTable` in coinbasemanager_test.go (note the `deflationaryPhaseBaseSubsidy` therein). + These values apply to 1 block per second. */ #[rustfmt::skip] const SUBSIDY_BY_MONTH_TABLE: [u64; 426] = [ @@ -228,85 +272,136 @@ const SUBSIDY_BY_MONTH_TABLE: [u64; 426] = [ mod tests { use super::*; use crate::params::MAINNET_PARAMS; - use kaspa_consensus_core::tx::scriptvec; + use kaspa_consensus_core::{ + config::params::{Params, TESTNET11_PARAMS}, + constants::SOMPI_PER_KASPA, + networktype::NetworkId, + tx::scriptvec, + }; #[test] - fn subsidy_test() { - let params = &MAINNET_PARAMS; - let cbm = CoinbaseManager::new( - params.coinbase_payload_script_public_key_max_len, - params.max_coinbase_payload_len, - params.deflationary_phase_daa_score, - params.pre_deflationary_phase_base_subsidy, - ); + fn calc_high_bps_total_rewards_delta() { + const SECONDS_PER_MONTH: u64 = 2629800; + + let legacy_cbm = create_legacy_manager(); + let pre_deflationary_rewards = legacy_cbm.pre_deflationary_phase_base_subsidy * legacy_cbm.deflationary_phase_daa_score; + let total_rewards: u64 = pre_deflationary_rewards + SUBSIDY_BY_MONTH_TABLE.iter().map(|x| x * SECONDS_PER_MONTH).sum::(); + let testnet_11_bps = TESTNET11_PARAMS.bps(); + let total_high_bps_rewards_rounded_up: u64 = pre_deflationary_rewards + + SUBSIDY_BY_MONTH_TABLE + .iter() + .map(|x| ((x + testnet_11_bps - 1) / testnet_11_bps * testnet_11_bps) * SECONDS_PER_MONTH) + .sum::(); + + let cbm = create_manager(&TESTNET11_PARAMS); + let total_high_bps_rewards: u64 = + pre_deflationary_rewards + cbm.subsidy_by_month_table.iter().map(|x| x * cbm.blocks_per_month).sum::(); + assert_eq!(total_high_bps_rewards_rounded_up, total_high_bps_rewards, "subsidy adjusted to bps must be rounded up"); + + let delta = total_high_bps_rewards as i64 - total_rewards as i64; + + println!("Total rewards: {} sompi => {} KAS", total_rewards, total_rewards / SOMPI_PER_KASPA); + println!("Total high bps rewards: {} sompi => {} KAS", total_high_bps_rewards, total_high_bps_rewards / SOMPI_PER_KASPA); + println!("Delta: {} sompi => {} KAS", delta, delta / SOMPI_PER_KASPA as i64); + } + #[test] + fn subsidy_by_month_table_test() { + let cbm = create_legacy_manager(); + cbm.subsidy_by_month_table.iter().enumerate().for_each(|(i, x)| { + assert_eq!(SUBSIDY_BY_MONTH_TABLE[i], *x, "for 1 BPS, const table and precomputed values must match"); + }); + + for network_id in NetworkId::iter() { + let cbm = create_manager(&network_id.into()); + cbm.subsidy_by_month_table.iter().enumerate().for_each(|(i, x)| { + assert_eq!( + (SUBSIDY_BY_MONTH_TABLE[i] + cbm.bps() - 1) / cbm.bps(), + *x, + "{}: locally computed and precomputed values must match", + network_id + ); + }); + } + } + + #[test] + fn subsidy_test() { + const PRE_DEFLATIONARY_PHASE_BASE_SUBSIDY: u64 = 50000000000; const DEFLATIONARY_PHASE_INITIAL_SUBSIDY: u64 = 44000000000; const SECONDS_PER_MONTH: u64 = 2629800; const SECONDS_PER_HALVING: u64 = SECONDS_PER_MONTH * 12; - struct Test { - name: &'static str, - daa_score: u64, - expected: u64, - } + for network_id in NetworkId::iter() { + let params = &network_id.into(); + let cbm = create_manager(params); - let tests = vec![ - Test { - name: "before deflationary phase", - daa_score: params.deflationary_phase_daa_score - 1, - expected: params.pre_deflationary_phase_base_subsidy, - }, - Test { - name: "start of deflationary phase", - daa_score: params.deflationary_phase_daa_score, - expected: DEFLATIONARY_PHASE_INITIAL_SUBSIDY, - }, - Test { - name: "after one halving", - daa_score: params.deflationary_phase_daa_score + SECONDS_PER_HALVING, - expected: DEFLATIONARY_PHASE_INITIAL_SUBSIDY / 2, - }, - Test { - name: "after 2 halvings", - daa_score: params.deflationary_phase_daa_score + 2 * SECONDS_PER_HALVING, - expected: DEFLATIONARY_PHASE_INITIAL_SUBSIDY / 4, - }, - Test { - name: "after 5 halvings", - daa_score: params.deflationary_phase_daa_score + 5 * SECONDS_PER_HALVING, - expected: DEFLATIONARY_PHASE_INITIAL_SUBSIDY / 32, - }, - Test { - name: "after 32 halvings", - daa_score: params.deflationary_phase_daa_score + 32 * SECONDS_PER_HALVING, - expected: DEFLATIONARY_PHASE_INITIAL_SUBSIDY / 4294967296, - }, - Test { - name: "just before subsidy depleted", - daa_score: params.deflationary_phase_daa_score + 35 * SECONDS_PER_HALVING, - expected: 1, - }, - Test { - name: "after subsidy depleted", - daa_score: params.deflationary_phase_daa_score + 36 * SECONDS_PER_HALVING, - expected: 0, - }, - ]; + let pre_deflationary_phase_base_subsidy = PRE_DEFLATIONARY_PHASE_BASE_SUBSIDY / params.bps(); + let deflationary_phase_initial_subsidy = DEFLATIONARY_PHASE_INITIAL_SUBSIDY / params.bps(); + let blocks_per_halving = SECONDS_PER_HALVING * params.bps(); - for t in tests { - assert_eq!(cbm.calc_block_subsidy(t.daa_score), t.expected, "test '{}' failed", t.name); + struct Test { + name: &'static str, + daa_score: u64, + expected: u64, + } + + let tests = vec![ + Test { name: "first mined block", daa_score: 1, expected: pre_deflationary_phase_base_subsidy }, + Test { + name: "before deflationary phase", + daa_score: params.deflationary_phase_daa_score - 1, + expected: pre_deflationary_phase_base_subsidy, + }, + Test { + name: "start of deflationary phase", + daa_score: params.deflationary_phase_daa_score, + expected: deflationary_phase_initial_subsidy, + }, + Test { + name: "after one halving", + daa_score: params.deflationary_phase_daa_score + blocks_per_halving, + expected: deflationary_phase_initial_subsidy / 2, + }, + Test { + name: "after 2 halvings", + daa_score: params.deflationary_phase_daa_score + 2 * blocks_per_halving, + expected: deflationary_phase_initial_subsidy / 4, + }, + Test { + name: "after 5 halvings", + daa_score: params.deflationary_phase_daa_score + 5 * blocks_per_halving, + expected: deflationary_phase_initial_subsidy / 32, + }, + Test { + name: "after 32 halvings", + daa_score: params.deflationary_phase_daa_score + 32 * blocks_per_halving, + expected: ((DEFLATIONARY_PHASE_INITIAL_SUBSIDY / 2_u64.pow(32)) + cbm.bps() - 1) / cbm.bps(), + }, + Test { + name: "just before subsidy depleted", + daa_score: params.deflationary_phase_daa_score + 35 * blocks_per_halving, + expected: 1, + }, + Test { + name: "after subsidy depleted", + daa_score: params.deflationary_phase_daa_score + 36 * blocks_per_halving, + expected: 0, + }, + ]; + + for t in tests { + assert_eq!(cbm.calc_block_subsidy(t.daa_score), t.expected, "{} test '{}' failed", network_id, t.name); + if params.bps() == 1 { + assert_eq!(cbm.legacy_calc_block_subsidy(t.daa_score), t.expected, "{} test '{}' failed", network_id, t.name); + } + } } } #[test] fn payload_serialization_test() { - let params = &MAINNET_PARAMS; - let cbm = CoinbaseManager::new( - params.coinbase_payload_script_public_key_max_len, - params.max_coinbase_payload_len, - params.deflationary_phase_daa_score, - params.pre_deflationary_phase_base_subsidy, - ); + let cbm = create_manager(&MAINNET_PARAMS); let script_data = [33u8, 255]; let extra_data = [2u8, 3]; @@ -350,13 +445,7 @@ mod tests { #[test] fn modify_payload_test() { - let params = &MAINNET_PARAMS; - let cbm = CoinbaseManager::new( - params.coinbase_payload_script_public_key_max_len, - params.max_coinbase_payload_len, - params.deflationary_phase_daa_score, - params.pre_deflationary_phase_base_subsidy, - ); + let cbm = create_manager(&MAINNET_PARAMS); let script_data = [33u8, 255]; let extra_data = [2u8, 3, 23, 98]; @@ -385,4 +474,19 @@ mod tests { assert_eq!(data2, deserialized_data); } + + fn create_manager(params: &Params) -> CoinbaseManager { + CoinbaseManager::new( + params.coinbase_payload_script_public_key_max_len, + params.max_coinbase_payload_len, + params.deflationary_phase_daa_score, + params.pre_deflationary_phase_base_subsidy, + params.target_time_per_block, + ) + } + + /// Return a CoinbaseManager with legacy golang 1 BPS properties + fn create_legacy_manager() -> CoinbaseManager { + CoinbaseManager::new(150, 204, 15778800 - 259200, 50000000000, 1000) + } } diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index 587080575..2d328677e 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-p2p-flows" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/rpc/wrpc/client/Cargo.toml b/rpc/wrpc/client/Cargo.toml index e7857f304..f2339a909 100644 --- a/rpc/wrpc/client/Cargo.toml +++ b/rpc/wrpc/client/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wrpc-client" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [lib] crate-type = ["cdylib", "lib"] diff --git a/rpc/wrpc/proxy/Cargo.toml b/rpc/wrpc/proxy/Cargo.toml index cec273b74..3e7da496d 100644 --- a/rpc/wrpc/proxy/Cargo.toml +++ b/rpc/wrpc/proxy/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wrpc-proxy" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [dependencies] kaspa-consensus-core.workspace = true diff --git a/rpc/wrpc/server/Cargo.toml b/rpc/wrpc/server/Cargo.toml index 22704f630..f898d5d2b 100644 --- a/rpc/wrpc/server/Cargo.toml +++ b/rpc/wrpc/server/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wrpc-server" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [lib] crate-type = ["cdylib", "lib"] diff --git a/wallet/bip32/Cargo.toml b/wallet/bip32/Cargo.toml index a1087d6fd..741822e81 100644 --- a/wallet/bip32/Cargo.toml +++ b/wallet/bip32/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-bip32" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [dependencies] rand_core.workspace = true diff --git a/wallet/cli/Cargo.toml b/wallet/cli/Cargo.toml index c4ea55493..c775454d6 100644 --- a/wallet/cli/Cargo.toml +++ b/wallet/cli/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wallet-cli" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [lib] crate-type = ["cdylib", "lib"] diff --git a/wallet/core/Cargo.toml b/wallet/core/Cargo.toml index 33d522a9c..4945d201e 100644 --- a/wallet/core/Cargo.toml +++ b/wallet/core/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wallet-core" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [lib] crate-type = ["cdylib", "lib"] diff --git a/wallet/native/Cargo.toml b/wallet/native/Cargo.toml index b502da54e..8266d000f 100644 --- a/wallet/native/Cargo.toml +++ b/wallet/native/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wallet-cli-native" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [dependencies] kaspa-wallet-cli.workspace = true diff --git a/wallet/wasm/Cargo.toml b/wallet/wasm/Cargo.toml index 837f5d213..eb205db1d 100644 --- a/wallet/wasm/Cargo.toml +++ b/wallet/wasm/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kaspa-wallet-cli-wasm" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true [lib] crate-type = ["cdylib", "lib"] From cca338f2de64df8fdfcba8f5bd9143cebae7512c Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Sun, 25 Jun 2023 12:02:30 +0300 Subject: [PATCH 5/5] A few small mempool fixes (#212) * use `request.allow_orphan` when passing the rpc tx to the mempool * minor * include the original accepted transaction as well * for now keep the original timestamp deviation tolerance --- Cargo.lock | 3 ++- consensus/core/src/config/bps.rs | 8 +------- consensus/core/src/config/constants.rs | 4 ++-- mining/Cargo.toml | 1 + mining/src/manager_tests.rs | 10 +++++----- .../mempool/validate_and_insert_transaction.rs | 17 ++++++++--------- protocol/flows/src/flow_context.rs | 7 ++++++- protocol/p2p/Cargo.toml | 2 +- protocol/p2p/src/common.rs | 2 +- rpc/service/src/service.rs | 6 +++++- utils/src/vec.rs | 9 +++++++++ 11 files changed, 41 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aef9811ae..0636c1677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1924,6 +1924,7 @@ dependencies = [ "kaspa-mining-errors", "kaspa-muhash", "kaspa-txscript", + "kaspa-utils", "log", "parking_lot", "rand 0.8.5", @@ -2024,7 +2025,7 @@ dependencies = [ "kaspa-core", "kaspa-hashes", "kaspa-math", - "kaspa-mining", + "kaspa-mining-errors", "kaspa-utils", "log", "parking_lot", diff --git a/consensus/core/src/config/bps.rs b/consensus/core/src/config/bps.rs index 84cf5be5b..95b2ca8cf 100644 --- a/consensus/core/src/config/bps.rs +++ b/consensus/core/src/config/bps.rs @@ -85,9 +85,7 @@ impl Bps { } pub const fn pruning_proof_m() -> u64 { - // Since the important levels remain logarithmically long, it seems that this - // constant does not need to scale with BPS. - // TODO: finalize this + // No need to scale this constant with BPS since the important block levels (higher) remain logarithmically long PRUNING_PROOF_M } @@ -120,10 +118,6 @@ impl Bps { pub const fn pre_deflationary_phase_base_subsidy() -> u64 { 50000000000 / BPS } - - // TODO: we might need to increase max_block_level (at least for mainnet) as a function of BPS - // since higher BPS means easier difficulty puzzles -> less zeros in pow hash - // pub const fn max_block_level() -> u64 { } } #[cfg(test)] diff --git a/consensus/core/src/config/constants.rs b/consensus/core/src/config/constants.rs index ea61887ca..1e6d68a44 100644 --- a/consensus/core/src/config/constants.rs +++ b/consensus/core/src/config/constants.rs @@ -27,8 +27,8 @@ pub mod consensus { pub const LEGACY_TIMESTAMP_DEVIATION_TOLERANCE: u64 = 132; /// **New** timestamp deviation tolerance (seconds). - /// KIP-0004: 605 (~10 minutes) - pub const NEW_TIMESTAMP_DEVIATION_TOLERANCE: u64 = 605; + /// TODO: KIP-0004: 605 (~10 minutes) + pub const NEW_TIMESTAMP_DEVIATION_TOLERANCE: u64 = 132; /// The desired interval between samples of the median time window (seconds). /// KIP-0004: 10 seconds diff --git a/mining/Cargo.toml b/mining/Cargo.toml index 1c96c6963..8cf9d3f52 100644 --- a/mining/Cargo.toml +++ b/mining/Cargo.toml @@ -15,6 +15,7 @@ kaspa-txscript.workspace = true kaspa-core.workspace = true kaspa-mining-errors.workspace = true kaspa-consensusmanager.workspace = true +kaspa-utils.workspace = true thiserror.workspace = true serde.workspace = true log.workspace = true diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 95070a22a..b06d1539c 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -476,9 +476,9 @@ mod tests { let unorphaned_txs = result.unwrap(); let (populated_txs, orphans) = mining_manager.get_all_transactions(true, true); assert_eq!( - unorphaned_txs.len(), SKIPPED_TXS, + unorphaned_txs.len(), SKIPPED_TXS + 1, "the mempool is expected to have unorphaned the remaining child transaction after the matching parent transaction was inserted into the mempool: expected: {}, got: {}", - unorphaned_txs.len(), SKIPPED_TXS + SKIPPED_TXS + 1, unorphaned_txs.len() ); assert_eq!( SKIPPED_TXS + SKIPPED_TXS, @@ -632,13 +632,13 @@ mod tests { let unorphaned_txs = result.as_ref().unwrap(); assert_eq!( test.should_unorphan, - !unorphaned_txs.is_empty(), + unorphaned_txs.len() > 1, "{}: child transaction should have been {} the orphan pool", test.name, test.parent_insert_result() ); - if !unorphaned_txs.is_empty() { - assert_eq!(unorphaned_txs[0].id(), child_txs[i].id(), "the unorphaned transaction should match the inserted parent"); + if unorphaned_txs.len() > 1 { + assert_eq!(unorphaned_txs[1].id(), child_txs[i].id(), "the unorphaned transaction should match the inserted parent"); } } } diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 4978f224f..f4e915de6 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -11,6 +11,7 @@ use kaspa_consensus_core::{ tx::{MutableTransaction, Transaction, TransactionId, TransactionOutpoint, UtxoEntry}, }; use kaspa_core::info; +use kaspa_utils::vec::VecExtensions; use super::tx::{Orphan, Priority}; @@ -60,8 +61,10 @@ impl Mempool { // transaction reference and mutably for the call to process_orphans_after_accepted_transaction let accepted_transaction = self.transaction_pool.add_transaction(transaction, consensus.get_virtual_daa_score(), priority)?.mtx.tx.clone(); - let accepted_orphans = self.process_orphans_after_accepted_transaction(consensus, &accepted_transaction)?; - Ok(accepted_orphans) + let mut accepted_transactions = self.process_orphans_after_accepted_transaction(consensus, &accepted_transaction)?; + // We include the original accepted transaction as well + accepted_transactions.swap_insert(0, accepted_transaction); + Ok(accepted_transactions) } fn validate_transaction_pre_utxo_entry(&self, transaction: &MutableTransaction) -> RuleResult<()> { @@ -98,16 +101,12 @@ impl Mempool { ) -> RuleResult>> { // Rust rewrite: // - The function is relocated from OrphanPool into Mempool - let mut added_transactions = Vec::new(); - let mut unorphaned_transactions = - self.get_unorphaned_transactions_after_accepted_transaction(consensus, accepted_transaction)?; - while !unorphaned_transactions.is_empty() { - let transaction = unorphaned_transactions.pop().unwrap(); - + let unorphaned_transactions = self.get_unorphaned_transactions_after_accepted_transaction(consensus, accepted_transaction)?; + let mut added_transactions = Vec::with_capacity(unorphaned_transactions.len() + 1); // +1 since some callers add the accepted tx itself + for transaction in unorphaned_transactions { // The returned transactions are leaving the mempool but must also be added to // the transaction pool so we clone. added_transactions.push(transaction.mtx.tx.clone()); - self.transaction_pool.add_mempool_transaction(transaction)?; } Ok(added_transactions) diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index a398dd05d..871f0a022 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -404,7 +404,12 @@ impl FlowContext { // TODO: call a handler function or a predefined registered service } - pub async fn add_transaction( + /// Adds the rpc-submitted transaction to the mempool and propagates it to peers. + /// + /// Transactions submitted through rpc are considered high priority. This definition does not affect the tx selection algorithm + /// but only changes how we manage the lifetime of the tx. A high-priority tx does not expire and is repeatedly rebroadcasted to + /// peers + pub async fn submit_rpc_transaction( &self, consensus: &ConsensusProxy, transaction: Transaction, diff --git a/protocol/p2p/Cargo.toml b/protocol/p2p/Cargo.toml index f4043b73f..2eb3e91de 100644 --- a/protocol/p2p/Cargo.toml +++ b/protocol/p2p/Cargo.toml @@ -20,7 +20,7 @@ path = "./src/bin/server.rs" [dependencies] kaspa-core.workspace = true kaspa-consensus-core.workspace = true -kaspa-mining.workspace = true +kaspa-mining-errors.workspace = true kaspa-hashes.workspace = true kaspa-math.workspace = true kaspa-utils.workspace = true diff --git a/protocol/p2p/src/common.rs b/protocol/p2p/src/common.rs index 983770019..a7ff65e33 100644 --- a/protocol/p2p/src/common.rs +++ b/protocol/p2p/src/common.rs @@ -1,6 +1,6 @@ use crate::{convert::error::ConversionError, core::peer::PeerKey, KaspadMessagePayloadType}; use kaspa_consensus_core::errors::{block::RuleError, consensus::ConsensusError, pruning::PruningImportError}; -use kaspa_mining::errors::MiningManagerError; +use kaspa_mining_errors::manager::MiningManagerError; use std::time::Duration; use thiserror::Error; diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index 0a2f9b3b8..2297ffbe8 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -384,7 +384,11 @@ impl RpcApi for RpcCoreService { let transaction: Transaction = (&request.transaction).try_into()?; let transaction_id = transaction.id(); let session = self.consensus_manager.consensus().session().await; - self.flow_context.add_transaction(&session, transaction, Orphan::Allowed).await.map_err(|err| { + let orphan = match request.allow_orphan { + true => Orphan::Allowed, + false => Orphan::Forbidden, + }; + self.flow_context.submit_rpc_transaction(&session, transaction, orphan).await.map_err(|err| { let err = RpcError::RejectedTransaction(transaction_id, err.to_string()); debug!("{err}"); err diff --git a/utils/src/vec.rs b/utils/src/vec.rs index 909cf123a..01bd59b9e 100644 --- a/utils/src/vec.rs +++ b/utils/src/vec.rs @@ -1,6 +1,9 @@ pub trait VecExtensions { /// Pushes the provided value to the container if the container is empty fn push_if_empty(self, value: T) -> Self; + + /// Inserts the provided `value` at `index` while swapping the item at index to the end of the container + fn swap_insert(&mut self, index: usize, value: T); } impl VecExtensions for Vec { @@ -10,4 +13,10 @@ impl VecExtensions for Vec { } self } + + fn swap_insert(&mut self, index: usize, value: T) { + self.push(value); + let loc = self.len() - 1; + self.swap(index, loc); + } }