Skip to content

Commit

Permalink
Merge pull request PIVX-Project#2704 from codablock/pr_llmq_optimizat…
Browse files Browse the repository at this point in the history
…ions1

Optimize LLMQs sending of sig shares
  • Loading branch information
codablock authored and panleone committed Nov 10, 2024
1 parent 1d4b479 commit 5f391cd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 11 deletions.
20 changes: 12 additions & 8 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1038,13 +1038,13 @@ bool CSigSharesManager::SendMessages()
llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId());
msgs.emplace_back(sigSesAnn);
if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
didSend = true;
}
}
Expand All @@ -1058,13 +1058,13 @@ bool CSigSharesManager::SendMessages()
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
didSend = true;
}
}
Expand All @@ -1078,7 +1078,7 @@ bool CSigSharesManager::SendMessages()
LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToInvString(), pnode->GetId());
if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false);
msgs.clear();
totalSigsCount = 0;
didSend = true;
Expand All @@ -1087,7 +1087,7 @@ bool CSigSharesManager::SendMessages()
msgs.emplace_back(std::move(p.second));
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false);
didSend = true;
}
}
Expand All @@ -1101,13 +1101,13 @@ bool CSigSharesManager::SendMessages()
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
didSend = true;
}
}
Expand All @@ -1116,6 +1116,10 @@ bool CSigSharesManager::SendMessages()
// looped through all nodes, release them
g_connman->ReleaseNodeVector(vNodesCopy);

if (didSend) {
g_connman->WakeSelect();
}

return didSend;
}

Expand Down
63 changes: 61 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,15 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET>& recv_set, std::set<SOCKET>& s
}
}

#ifndef WIN32
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is added to send buffers (vSendMsg) or when new peers are added
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
recv_set.insert(wakeupPipe[0]);
#endif

return !recv_set.empty() || !send_set.empty() || !error_set.empty();
}

Expand Down Expand Up @@ -1420,6 +1429,20 @@ void CConnman::SocketHandler()
std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set);

#ifndef WIN32
// drain the wakeup pipe
if (recv_set.count(wakeupPipe[0])) {
LogPrint(BCLog::NET, "woke up select()\n");
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif

if (interruptNet) return;

//
Expand Down Expand Up @@ -1534,6 +1557,21 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one();
}

void CConnman::WakeSelect()
{
#ifndef WIN32
if (wakeupPipe[1] == -1) {
return;
}

LogPrint(BCLog::NET, "waking up select()\n");

char buf[1];
if (write(wakeupPipe[1], buf, 1) != 1) {
LogPrint(BCLog::NET, "write to wakeupPipe failed\n");
}
#endif
}

static std::string GetDNSHost(const CDNSSeedData& data, ServiceFlags* requiredServiceBits)
{
Expand Down Expand Up @@ -2242,6 +2280,22 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
fMsgProcWake = false;
}

#ifndef WIN32
if (pipe(wakeupPipe) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n");
} else {
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0);
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
}
#endif

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

Expand Down Expand Up @@ -2371,6 +2425,11 @@ void CConnman::Stop()
vhListenSocket.clear();
semOutbound.reset();
semAddnode.reset();
#ifndef WIN32
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
}

void CConnman::DeleteNode(CNode* pnode)
Expand Down Expand Up @@ -2683,7 +2742,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
}

void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend)
{
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
Expand All @@ -2700,7 +2759,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
bool optimisticSend(pnode->vSendMsg.empty());
bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty());

//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
Expand Down
10 changes: 9 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class CConnman
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
bool ForNode(const CService& addr, const std::function<bool(const CNode* pnode)>& cond, const std::function<bool(CNode* pnode)>& func);

void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true);

template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
Expand Down Expand Up @@ -360,6 +360,9 @@ class CConnman
TierTwoConnMan* GetTierTwoConnMan() { return m_tiertwo_conn_man.get(); };
/** Update the node to be a iqr member if needed */
void UpdateQuorumRelayMemberIfNeeded(CNode* pnode);
/** Interrupt the select/poll system call **/
void WakeSelect();

private:
struct ListenSocket {
SOCKET socket;
Expand Down Expand Up @@ -479,6 +482,11 @@ class CConnman

CThreadInterrupt interruptNet;

#ifndef WIN32
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1, -1};
#endif

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
Expand Down

0 comments on commit 5f391cd

Please sign in to comment.