Skip to content

Commit d3fa315

Browse files
Merge #6952: feat: rework message processing in CNode to prioritize quorum messages
627a8a5 fix: whitepace (pasta) d0d42c7 refactor: prefix add (PastaPastaPasta) 1d16451 fix: ensure that flooding quorum messages cannot starve the normal queue of processing (pasta) 6c955b7 fix: whitespace (pasta) df6cc76 feat: rework message processing in CNode to prioritize quorum messages (pasta) Pull request description: ## Issue being fixed or feature implemented This update introduces a new queue for quorum-priority messages, allowing for better handling of critical network messages. The `MarkReceivedMsgsForProcessing` function now classifies incoming messages into quorum and normal queues, while the `PollMessage` function prioritizes messages from the quorum queue when available. This change enhances the efficiency of message processing and ensures that important messages are handled promptly. ## What was done? _Describe your changes in detail_ ## How Has This Been Tested? _Please describe in detail how you tested your changes._ _Include details of your testing environment, and the tests you ran to see how your change affects other areas of the code, etc._ ## Breaking Changes _Please describe any breaking changes your code introduces_ ## Checklist: _Go over all the following points, and put an `x` in all the boxes that apply._ - [ ] I have performed a self-review of my own code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_ ACKs for top commit: UdjinM6: utACK 627a8a5 kwvg: utACK 627a8a5 Tree-SHA512: d8eac0934c377daa36e12ed75a7d944ec0025f021c5d36a609a2967bdc41464e66ba8395abc4559a949f41c136ca496bda6276b032ec9011213563b0134764d4
2 parents c4ca959 + 627a8a5 commit d3fa315

File tree

2 files changed

+79
-8
lines changed

2 files changed

+79
-8
lines changed

src/net.cpp

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4652,31 +4652,79 @@ void CNode::MarkReceivedMsgsForProcessing()
46524652
{
46534653
AssertLockNotHeld(m_msg_process_queue_mutex);
46544654

4655-
size_t nSizeAdded = 0;
4656-
for (const auto& msg : vRecvMsg) {
4655+
size_t nQuorumSizeAdded = 0;
4656+
size_t nNormalSizeAdded = 0;
4657+
std::list<CNetMessage> quorumMsgs;
4658+
std::list<CNetMessage> normalMsgs;
4659+
4660+
// Classify messages into quorum-priority and normal queues
4661+
for (auto it = vRecvMsg.begin(); it != vRecvMsg.end();) {
4662+
auto& msg = *it;
46574663
// vRecvMsg contains only completed CNetMessage
46584664
// the single possible partially deserialized message are held by TransportDeserializer
4659-
nSizeAdded += msg.m_raw_message_size;
4665+
if (IsQuorumPriorityMessage(msg.m_type)) {
4666+
quorumMsgs.splice(quorumMsgs.end(), vRecvMsg, it++);
4667+
nQuorumSizeAdded += msg.m_raw_message_size;
4668+
} else {
4669+
normalMsgs.splice(normalMsgs.end(), vRecvMsg, it++);
4670+
nNormalSizeAdded += msg.m_raw_message_size;
4671+
}
46604672
}
46614673

46624674
LOCK(m_msg_process_queue_mutex);
4663-
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
4664-
m_msg_process_queue_size += nSizeAdded;
4665-
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
4675+
// Splice classified messages into appropriate queues
4676+
m_msg_quorum_queue.splice(m_msg_quorum_queue.end(), quorumMsgs);
4677+
m_msg_quorum_queue_size += nQuorumSizeAdded;
4678+
m_msg_process_queue.splice(m_msg_process_queue.end(), normalMsgs);
4679+
m_msg_process_queue_size += nNormalSizeAdded;
4680+
// Compute backpressure over combined size of both queues
4681+
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
46664682
}
46674683

46684684
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
46694685
{
46704686
LOCK(m_msg_process_queue_mutex);
4687+
4688+
// Ratio-based processing: process N quorum messages for every 1 normal message
4689+
// This ensures forward progress for both queues while strongly prioritizing quorum messages
4690+
// However, if normal queue is empty, process quorum messages in bursts (like old algorithm)
4691+
constexpr size_t QUORUM_TO_NORMAL_RATIO = 100;
4692+
4693+
// Check if we should process normal queue for forward progress
4694+
// Only apply ratio when both queues have messages to allow burst processing when normal queue is empty
4695+
bool skip_quorum_processing = !m_msg_process_queue.empty() &&
4696+
m_quorum_msg_count_since_normal >= QUORUM_TO_NORMAL_RATIO;
4697+
4698+
// Prioritize quorum queue: pop from it first if non-empty and ratio not reached
4699+
// If normal queue is empty, process quorum messages without ratio limit (burst mode)
4700+
if (!m_msg_quorum_queue.empty() && !skip_quorum_processing) {
4701+
std::list<CNetMessage> msgs;
4702+
// Just take one message from quorum queue
4703+
msgs.splice(msgs.begin(), m_msg_quorum_queue, m_msg_quorum_queue.begin());
4704+
m_msg_quorum_queue_size -= msgs.front().m_raw_message_size;
4705+
// Only increment counter if normal queue has messages (to track ratio)
4706+
// If normal queue is empty, don't increment so we can process bursts quickly
4707+
if (!m_msg_process_queue.empty()) {
4708+
++m_quorum_msg_count_since_normal;
4709+
}
4710+
// Compute backpressure over combined size of both queues
4711+
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
4712+
// Return true for 'more' if either queue has remaining messages
4713+
return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
4714+
}
4715+
4716+
// Process normal queue (either because quorum queue is empty or ratio reached)
46714717
if (m_msg_process_queue.empty()) return std::nullopt;
46724718

46734719
std::list<CNetMessage> msgs;
46744720
// Just take one message
46754721
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
46764722
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
4677-
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
4723+
m_quorum_msg_count_since_normal = 0; // Reset counter after processing normal message
4724+
// Compute backpressure over combined size of both queues
4725+
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
46784726

4679-
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
4727+
return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
46804728
}
46814729

46824730
bool CConnman::NodeFullyConnected(const CNode* pnode)

src/net.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,9 @@ class CNode
10941094
Mutex m_msg_process_queue_mutex;
10951095
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
10961096
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
1097+
std::list<CNetMessage> m_msg_quorum_queue GUARDED_BY(m_msg_process_queue_mutex);
1098+
size_t m_msg_quorum_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
1099+
size_t m_quorum_msg_count_since_normal GUARDED_BY(m_msg_process_queue_mutex){0};
10971100

10981101
// Our address, as reported by the peer
10991102
CService addrLocal GUARDED_BY(m_addr_local_mutex);
@@ -2002,4 +2005,24 @@ class CExplicitNetCleanup
20022005
static void callCleanup();
20032006
};
20042007

2008+
// Helper function to determine if a message type should be prioritized in the quorum queue
2009+
inline bool IsQuorumPriorityMessage(const std::string& msg_type)
2010+
{
2011+
// LLMQ signing/data messages
2012+
if (msg_type == NetMsgType::QSIGSHARE ||
2013+
msg_type == NetMsgType::QBSIGSHARES ||
2014+
msg_type == NetMsgType::QSIGSHARESINV ||
2015+
msg_type == NetMsgType::QGETSIGSHARES ||
2016+
msg_type == NetMsgType::QSIGSESANN ||
2017+
msg_type == NetMsgType::QSIGREC) {
2018+
return true;
2019+
}
2020+
// High-level lock messages (ChainLocks, InstantSend locks)
2021+
if (msg_type == NetMsgType::CLSIG ||
2022+
msg_type == NetMsgType::ISDLOCK) {
2023+
return true;
2024+
}
2025+
return false;
2026+
}
2027+
20052028
#endif // BITCOIN_NET_H

0 commit comments

Comments
 (0)