Skip to content

Commit

Permalink
fuzz: make it possible to mock (fuzz) CThreadInterrupt
Browse files Browse the repository at this point in the history
* Make the methods of `CThreadInterrupt` virtual and store a pointer to
  it in `CConnman`, thus making it possible to override with a mocked
  instance.
* Initialize `CConnman::m_interrupt_net` from the constructor, making it
  possible for callers to supply mocked version.
* Introduce `FuzzedThreadInterrupt` and `ConsumeThreadInterrupt()` and
  use them in `src/test/fuzz/connman.cpp` and `src/test/fuzz/i2p.cpp`.

This improves the CPU utilization of the `connman` fuzz test.

As a nice side effect, the `std::shared_ptr` used for
`CConnman::m_interrupt_net` resolves the possible lifetime issues with
it (see the removed comment for that variable).
  • Loading branch information
vasild committed Nov 28, 2024
1 parent b73961f commit 687a9af
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 65 deletions.
8 changes: 4 additions & 4 deletions src/i2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ namespace sam {

Session::Session(const fs::path& private_key_file,
const Proxy& control_host,
CThreadInterrupt* interrupt)
std::shared_ptr<CThreadInterrupt> interrupt)
: m_private_key_file{private_key_file},
m_control_host{control_host},
m_interrupt{interrupt},
m_transient{false}
{
}

Session::Session(const Proxy& control_host, CThreadInterrupt* interrupt)
Session::Session(const Proxy& control_host, std::shared_ptr<CThreadInterrupt> interrupt)
: m_control_host{control_host},
m_interrupt{interrupt},
m_transient{true}
Expand Down Expand Up @@ -161,7 +161,7 @@ bool Session::Accept(Connection& conn)
std::string errmsg;
bool disconnect{false};

while (!*m_interrupt) {
while (!m_interrupt->interrupted()) {
Sock::Event occurred;
if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) {
errmsg = "wait on socket failed";
Expand Down Expand Up @@ -204,7 +204,7 @@ bool Session::Accept(Connection& conn)
return true;
}

if (*m_interrupt) {
if (m_interrupt->interrupted()) {
LogPrintLevel(BCLog::I2P, BCLog::Level::Debug, "Accept was interrupted\n");
} else {
LogPrintLevel(BCLog::I2P, BCLog::Level::Debug, "Error accepting%s: %s\n", disconnect ? " (will close the session)" : "", errmsg);
Expand Down
14 changes: 5 additions & 9 deletions src/i2p.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ class Session
* private key will be generated and saved into the file.
* @param[in] control_host Location of the SAM proxy.
* @param[in,out] interrupt If this is signaled then all operations are canceled as soon as
* possible and executing methods throw an exception. Notice: only a pointer to the
* `CThreadInterrupt` object is saved, so it must not be destroyed earlier than this
* `Session` object.
* possible and executing methods throw an exception.
*/
Session(const fs::path& private_key_file,
const Proxy& control_host,
CThreadInterrupt* interrupt);
std::shared_ptr<CThreadInterrupt> interrupt);

/**
* Construct a transient session which will generate its own I2P private key
Expand All @@ -78,11 +76,9 @@ class Session
* the session will be lazily created later when first used.
* @param[in] control_host Location of the SAM proxy.
* @param[in,out] interrupt If this is signaled then all operations are canceled as soon as
* possible and executing methods throw an exception. Notice: only a pointer to the
* `CThreadInterrupt` object is saved, so it must not be destroyed earlier than this
* `Session` object.
* possible and executing methods throw an exception.
*/
Session(const Proxy& control_host, CThreadInterrupt* interrupt);
Session(const Proxy& control_host, std::shared_ptr<CThreadInterrupt> interrupt);

/**
* Destroy the session, closing the internally used sockets. The sockets that have been
Expand Down Expand Up @@ -235,7 +231,7 @@ class Session
/**
* Cease network activity when this is signaled.
*/
CThreadInterrupt* const m_interrupt;
const std::shared_ptr<CThreadInterrupt> m_interrupt;

/**
* Mutex protecting the members that can be concurrently accessed.
Expand Down
70 changes: 40 additions & 30 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
LOCK(m_unused_i2p_sessions_mutex);
if (m_unused_i2p_sessions.empty()) {
i2p_transient_session =
std::make_unique<i2p::sam::Session>(proxy, &interruptNet);
std::make_unique<i2p::sam::Session>(proxy, m_interrupt_net);
} else {
i2p_transient_session.swap(m_unused_i2p_sessions.front());
m_unused_i2p_sessions.pop();
Expand Down Expand Up @@ -2042,7 +2042,7 @@ void CConnman::SocketHandler()
// empty sets.
events_per_sock = GenerateWaitSockets(snap.Nodes());
if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) {
interruptNet.sleep_for(timeout);
m_interrupt_net->sleep_for(timeout);
}

// Service (send/receive) each of the already connected nodes.
Expand All @@ -2059,8 +2059,9 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
AssertLockNotHeld(m_total_bytes_sent_mutex);

for (CNode* pnode : nodes) {
if (interruptNet)
if (m_interrupt_net->interrupted()) {
return;
}

//
// Receive
Expand Down Expand Up @@ -2151,7 +2152,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
{
for (const ListenSocket& listen_socket : vhListenSocket) {
if (interruptNet) {
if (m_interrupt_net->interrupted()) {
return;
}
const auto it = events_per_sock.find(listen_socket.sock);
Expand All @@ -2165,8 +2166,7 @@ void CConnman::ThreadSocketHandler()
{
AssertLockNotHeld(m_total_bytes_sent_mutex);

while (!interruptNet)
{
while (!m_interrupt_net->interrupted()) {
DisconnectNodes();
NotifyNumConnectionsChanged();
SocketHandler();
Expand All @@ -2190,9 +2190,10 @@ void CConnman::ThreadDNSAddressSeed()
auto start = NodeClock::now();
constexpr std::chrono::seconds SEEDNODE_TIMEOUT = 30s;
LogPrintf("-seednode enabled. Trying the provided seeds for %d seconds before defaulting to the dnsseeds.\n", SEEDNODE_TIMEOUT.count());
while (!interruptNet) {
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
while (!m_interrupt_net->interrupted()) {
if (!m_interrupt_net->sleep_for(500ms)) {
return;
}

// Abort if we have spent enough time without reaching our target.
// Giving seed nodes 30 seconds so this does not become a race against fixedseeds (which triggers after 1 min)
Expand Down Expand Up @@ -2253,7 +2254,7 @@ void CConnman::ThreadDNSAddressSeed()
// early to see if we have enough peers and can stop
// this thread entirely freeing up its resources
std::chrono::seconds w = std::min(DNSSEEDS_DELAY_FEW_PEERS, to_wait);
if (!interruptNet.sleep_for(w)) return;
if (!m_interrupt_net->sleep_for(w)) return;
to_wait -= w;

if (GetFullOutboundConnCount() >= SEED_OUTBOUND_CONNECTION_THRESHOLD) {
Expand All @@ -2269,13 +2270,13 @@ void CConnman::ThreadDNSAddressSeed()
}
}

if (interruptNet) return;
if (m_interrupt_net->interrupted()) return;

// hold off on querying seeds if P2P network deactivated
if (!fNetworkActive) {
LogPrintf("Waiting for network to be reactivated before querying DNS seeds.\n");
do {
if (!interruptNet.sleep_for(std::chrono::seconds{1})) return;
if (!m_interrupt_net->sleep_for(1s)) return;
} while (!fNetworkActive);
}

Expand Down Expand Up @@ -2470,12 +2471,14 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa
OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/use_v2transport);
for (int i = 0; i < 10 && i < nLoop; i++)
{
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
if (!m_interrupt_net->sleep_for(500ms)) {
return;
}
}
}
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
if (!m_interrupt_net->sleep_for(500ms)) {
return;
}
PerformReconnections();
}
}
Expand All @@ -2499,8 +2502,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa
LogPrintf("Fixed seeds are disabled\n");
}

while (!interruptNet)
{
while (!m_interrupt_net->interrupted()) {
if (add_addr_fetch) {
add_addr_fetch = false;
const auto& seed{SpanPopBack(seed_nodes)};
Expand All @@ -2515,14 +2517,16 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa

ProcessAddrFetch();

if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
if (!m_interrupt_net->sleep_for(500ms)) {
return;
}

PerformReconnections();

CSemaphoreGrant grant(*semOutbound);
if (interruptNet)
if (m_interrupt_net->interrupted()) {
return;
}

const std::unordered_set<Network> fixed_seed_networks{GetReachableEmptyNetworks()};
if (add_fixed_seeds && !fixed_seed_networks.empty()) {
Expand Down Expand Up @@ -2696,8 +2700,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa
int nTries = 0;
const auto reachable_nets{g_reachable_nets.All()};

while (!interruptNet)
{
while (!m_interrupt_net->interrupted()) {
if (anchor && !m_anchors.empty()) {
const CAddress addr = m_anchors.back();
m_anchors.pop_back();
Expand Down Expand Up @@ -2799,7 +2802,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa
if (addrConnect.IsValid()) {
if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization.
if (!interruptNet.sleep_for(rng.rand_uniform_duration<CThreadInterrupt::Clock>(FEELER_SLEEP_WINDOW))) {
if (!m_interrupt_net->sleep_for(rng.rand_uniform_duration<CThreadInterrupt::Clock>(FEELER_SLEEP_WINDOW))) {
return;
}
LogDebug(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort());
Expand Down Expand Up @@ -2910,14 +2913,15 @@ void CConnman::ThreadOpenAddedConnections()
tried = true;
CAddress addr(CService(), NODE_NONE);
OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return;
if (!m_interrupt_net->sleep_for(500ms)) return;
grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true);
}
// See if any reconnections are desired.
PerformReconnections();
// Retry every 60 seconds if a connection was attempted, otherwise two seconds
if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
if (!m_interrupt_net->sleep_for(tried ? 60s : 2s)) {
return;
}
}
}

Expand All @@ -2930,7 +2934,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
//
// Initiate outbound network connection
//
if (interruptNet) {
if (m_interrupt_net->interrupted()) {
return;
}
if (!fNetworkActive) {
Expand Down Expand Up @@ -3011,13 +3015,13 @@ void CConnman::ThreadI2PAcceptIncoming()
i2p::Connection conn;

auto SleepOnFailure = [&]() {
interruptNet.sleep_for(err_wait);
m_interrupt_net->sleep_for(err_wait);
if (err_wait < err_wait_cap) {
err_wait += 1s;
}
};

while (!interruptNet) {
while (!m_interrupt_net->interrupted()) {

if (!m_i2p_sam_session->Listen(conn)) {
if (advertising_listen_addr && conn.me.IsValid()) {
Expand Down Expand Up @@ -3140,12 +3144,18 @@ void CConnman::SetNetworkActive(bool active)
}
}

CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In, AddrMan& addrman_in,
const NetGroupManager& netgroupman, const CChainParams& params, bool network_active)
CConnman::CConnman(uint64_t nSeed0In,
uint64_t nSeed1In,
AddrMan& addrman_in,
const NetGroupManager& netgroupman,
const CChainParams& params,
bool network_active,
std::shared_ptr<CThreadInterrupt> interrupt_net)
: addrman(addrman_in)
, m_netgroupman{netgroupman}
, nSeed0(nSeed0In)
, nSeed1(nSeed1In)
, m_interrupt_net{interrupt_net}
, m_params(params)
{
SetTryNewOutboundPeer(false);
Expand Down Expand Up @@ -3241,7 +3251,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
Proxy i2p_sam;
if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) {
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(gArgs.GetDataDirNet() / "i2p_private_key",
i2p_sam, &interruptNet);
i2p_sam, m_interrupt_net);
}

// Randomize the order in which we may query seednode to potentially prevent connecting to the same one every restart (and signal that we have restarted)
Expand Down Expand Up @@ -3278,7 +3288,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
// Start threads
//
assert(m_msgproc);
interruptNet.reset();
m_interrupt_net->reset();
flagInterruptMsgProc = false;

{
Expand Down Expand Up @@ -3354,7 +3364,7 @@ void CConnman::Interrupt()
}
condMsgProc.notify_all();

interruptNet();
(*m_interrupt_net)();
g_socks5_interrupt();

if (semOutbound) {
Expand Down
15 changes: 9 additions & 6 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,13 @@ class CConnman
whitelist_relay = connOptions.whitelist_relay;
}

CConnman(uint64_t seed0, uint64_t seed1, AddrMan& addrman, const NetGroupManager& netgroupman,
const CChainParams& params, bool network_active = true);
CConnman(uint64_t seed0,
uint64_t seed1,
AddrMan& addrman,
const NetGroupManager& netgroupman,
const CChainParams& params,
bool network_active = true,
std::shared_ptr<CThreadInterrupt> interrupt_net = std::make_shared<CThreadInterrupt>());

~CConnman();

Expand Down Expand Up @@ -1523,11 +1528,9 @@ class CConnman

/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
* A copy of this is saved in `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
const std::shared_ptr<CThreadInterrupt> m_interrupt_net;

/**
* I2P SAM session.
Expand Down
4 changes: 3 additions & 1 deletion src/test/fuzz/connman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <test/fuzz/fuzz.h>
#include <test/fuzz/util.h>
#include <test/fuzz/util/net.h>
#include <test/fuzz/util/threadinterrupt.h>
#include <test/util/setup_common.h>
#include <util/translation.h>

Expand Down Expand Up @@ -69,7 +70,8 @@ FUZZ_TARGET(connman, .init = initialize_connman)
addr_man,
netgroupman,
Params(),
fuzzed_data_provider.ConsumeBool()};
fuzzed_data_provider.ConsumeBool(),
ConsumeThreadInterrupt(fuzzed_data_provider)};

const uint64_t max_outbound_limit{fuzzed_data_provider.ConsumeIntegral<uint64_t>()};
CConnman::Options options;
Expand Down
Loading

0 comments on commit 687a9af

Please sign in to comment.