@@ -4652,31 +4652,62 @@ void CNode::MarkReceivedMsgsForProcessing()
46524652{
46534653 AssertLockNotHeld (m_msg_process_queue_mutex);
46544654
4655- size_t nSizeAdded = 0 ;
4656- for (const auto & msg : vRecvMsg) {
4655+ size_t nQuorumSizeAdded = 0 ;
4656+ size_t nNormalSizeAdded = 0 ;
4657+ std::list<CNetMessage> quorumMsgs;
4658+ std::list<CNetMessage> normalMsgs;
4659+
4660+ // Classify messages into quorum-priority and normal queues
4661+ for (auto it = vRecvMsg.begin (); it != vRecvMsg.end ();) {
4662+ auto & msg = *it;
46574663 // vRecvMsg contains only completed CNetMessage
46584664 // the single possible partially deserialized message are held by TransportDeserializer
4659- nSizeAdded += msg.m_raw_message_size ;
4665+ if (IsQuorumPriorityMessage (msg.m_type )) {
4666+ quorumMsgs.splice (quorumMsgs.end (), vRecvMsg, it++);
4667+ nQuorumSizeAdded += msg.m_raw_message_size ;
4668+ } else {
4669+ normalMsgs.splice (normalMsgs.end (), vRecvMsg, it++);
4670+ nNormalSizeAdded += msg.m_raw_message_size ;
4671+ }
46604672 }
46614673
46624674 LOCK (m_msg_process_queue_mutex);
4663- m_msg_process_queue.splice (m_msg_process_queue.end (), vRecvMsg);
4664- m_msg_process_queue_size += nSizeAdded;
4665- fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
4675+ // Splice classified messages into appropriate queues
4676+ m_msg_quorum_queue.splice (m_msg_quorum_queue.end (), quorumMsgs);
4677+ m_msg_quorum_queue_size += nQuorumSizeAdded;
4678+ m_msg_process_queue.splice (m_msg_process_queue.end (), normalMsgs);
4679+ m_msg_process_queue_size += nNormalSizeAdded;
4680+ // Compute backpressure over combined size of both queues
4681+ fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
46664682}
46674683
46684684std::optional<std::pair<CNetMessage, bool >> CNode::PollMessage ()
46694685{
46704686 LOCK (m_msg_process_queue_mutex);
4687+
4688+ // Prioritize quorum queue: pop from it first if non-empty
4689+ if (!m_msg_quorum_queue.empty ()) {
4690+ std::list<CNetMessage> msgs;
4691+ // Just take one message from quorum queue
4692+ msgs.splice (msgs.begin (), m_msg_quorum_queue, m_msg_quorum_queue.begin ());
4693+ m_msg_quorum_queue_size -= msgs.front ().m_raw_message_size ;
4694+ // Compute backpressure over combined size of both queues
4695+ fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
4696+ // Return true for 'more' if either queue has remaining messages
4697+ return std::make_pair (std::move (msgs.front ()), !m_msg_quorum_queue.empty () || !m_msg_process_queue.empty ());
4698+ }
4699+
4700+ // Fall back to normal queue if quorum queue is empty
46714701 if (m_msg_process_queue.empty ()) return std::nullopt ;
46724702
46734703 std::list<CNetMessage> msgs;
46744704 // Just take one message
46754705 msgs.splice (msgs.begin (), m_msg_process_queue, m_msg_process_queue.begin ());
46764706 m_msg_process_queue_size -= msgs.front ().m_raw_message_size ;
4677- fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
4707+ // Compute backpressure over combined size of both queues
4708+ fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
46784709
4679- return std::make_pair (std::move (msgs.front ()), !m_msg_process_queue.empty ());
4710+ return std::make_pair (std::move (msgs.front ()), !m_msg_quorum_queue. empty () || ! m_msg_process_queue.empty ());
46804711}
46814712
46824713bool CConnman::NodeFullyConnected (const CNode* pnode)
0 commit comments