Skip to content

Commit

Permalink
2014 network limit 1.2 +utils +toc -doc -drmonero
Browse files Browse the repository at this point in the history
new update of the pr with network limits

more debug options:
discarding downloaded blocks all or after given height.
trying to trigger the locking errors.

debug levels polished/tuned to sane values.
debug/logging improved.

warning: this pr should be correct code, but it could make
an existing (in master version) locking error appear more often.

it's a race on the list (map) of peers, e.g. between closing/deleting
them versus working on them in net-limit sleep in sending chunk.

the bug is not in this code/this pr, but in the master version.

the locking problem of master will be fixed in other pr.

problem is ub, and in practice is seems to usually cause program abort
(tested on debian stable with updated gcc). see --help for option
to add sleep to trigger the error faster.
  • Loading branch information
rfree2monero committed Feb 20, 2015
1 parent 0f06dca commit ae2a506
Show file tree
Hide file tree
Showing 23 changed files with 257 additions and 184 deletions.
21 changes: 10 additions & 11 deletions contrib/epee/include/net/abstract_tcp_server2.inl
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::add_ref()
{
TRY_ENTRY();
//_info("[sock " << socket_.native_handle() << "] add_ref");
//_dbg3("[sock " << socket_.native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number);
CRITICAL_REGION_LOCAL(m_self_refs_lock);
//_dbg3("[sock " << socket_.native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number);

// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
Expand Down Expand Up @@ -364,8 +365,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ // LOCK: chunking
epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical ***

_mark_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
_mark("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
_dbg3_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
t_safe all = cb; // all bytes to send
t_safe pos = 0; // current sending position
// 01234567890
Expand Down Expand Up @@ -393,14 +393,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
ASRT(chunk_start >= ptr); // not wrapped around address?
//std::memcpy( (void*)buf, chunk_start, len);

_dbg1_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len);
_dbg3_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len);

bool ok = do_send_chunk(chunk_start, len); // <====== ***

all_ok = all_ok && ok;
if (!all_ok) {
_mark_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr);
_mark ( "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr);
_dbg1_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr);
_dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless "
<< " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb);
return false; // partial failure in sending
Expand All @@ -410,8 +409,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
// (in catch block, or uniq pointer) delete buf;
} // each chunk

_mark_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
_mark ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
_dbg3_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
_dbg3 ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);

_info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type);

Expand Down Expand Up @@ -469,7 +468,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (retry > retry_limit) {
send_guard.unlock();
_erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
// _mark_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
// _dbg1_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
close();
return false;
}
Expand All @@ -496,7 +495,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}

auto size_now = m_send_que.front().size();
_mark_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B");
_dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B");
do_send_handler_write( ptr , size_now ); // (((H)))

ASRT( size_now == m_send_que.front().size() );
Expand Down Expand Up @@ -580,7 +579,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{
//have more data to send
auto size_now = m_send_que.front().size();
_mark_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
_dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H)))
ASRT( size_now == m_send_que.front().size() );
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) ,
Expand Down
12 changes: 8 additions & 4 deletions contrib/epee/include/syncobj.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <thread>
#include <chrono>

namespace epee
{

extern unsigned int g_test_dbg_lock_sleep;

struct simple_event
{
simple_event() : m_rised(false)
Expand Down Expand Up @@ -215,10 +219,10 @@ namespace epee
#define SHARED_CRITICAL_REGION_BEGIN(x) { shared_guard critical_region_var(x)
#define EXCLUSIVE_CRITICAL_REGION_BEGIN(x) { exclusive_guard critical_region_var(x)

#define CRITICAL_REGION_LOCAL(x) epee::critical_region_t<decltype(x)> critical_region_var(x)
#define CRITICAL_REGION_BEGIN(x) { epee::critical_region_t<decltype(x)> critical_region_var(x)
#define CRITICAL_REGION_LOCAL1(x) epee::critical_region_t<decltype(x)> critical_region_var1(x)
#define CRITICAL_REGION_BEGIN1(x) { epee::critical_region_t<decltype(x)> critical_region_var1(x)
#define CRITICAL_REGION_LOCAL(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var(x)
#define CRITICAL_REGION_BEGIN(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var(x)
#define CRITICAL_REGION_LOCAL1(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var1(x)
#define CRITICAL_REGION_BEGIN1(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var1(x)

#define CRITICAL_REGION_END() }

Expand Down
1 change: 1 addition & 0 deletions contrib/otshell_utils/lib_common1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <functional>
#include <memory>
#include <thread>
#include <atomic>
#include <mutex>


Expand Down
17 changes: 16 additions & 1 deletion contrib/otshell_utils/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,22 @@ std::string get_current_time() {

cNullstream g_nullstream; // extern a stream that does nothing (eats/discards data)

std::mutex gLoggerGuard; // extern
std::recursive_mutex gLoggerGuard; // extern
std::atomic<int> gLoggerGuardDepth; // extern

std::atomic<int> & gLoggerGuardDepth_Get() {
// TODO std::once would be nicer here

static bool once=0;

if (!once) { // initialize it once
once=1;
gLoggerGuardDepth=0;
}

return gLoggerGuardDepth; // global, atomic counter
}


// ====================================================================

Expand Down
57 changes: 47 additions & 10 deletions contrib/otshell_utils/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,55 @@ extern cNullstream g_nullstream; // a stream that does nothing (eats/discards da

// TODO make _dbg_ignore thread-safe everywhere

extern std::mutex gLoggerGuard;


extern std::recursive_mutex gLoggerGuard; // the mutex guarding logging/debugging code e.g. protecting streams, files, etc

std::atomic<int> & gLoggerGuardDepth_Get(); // getter for the global singleton of counter (it guarantees initializing it to 0). This counter shows the current recursion (re-entrant) level of debug macros.

// TODO more debug of the debug system:
// detect lock() error e.g. recursive limit
// detect stream e.g. operator<< error

#define _debug_level(LEVEL,VAR) do { if (_dbg_ignore< LEVEL) { \
auto level=LEVEL; short int part=0; \
try { \
std::lock_guard<std::recursive_mutex> mutex_guard( nOT::nUtils::gLoggerGuard ); \
part=1; \
try { \
++nOT::nUtils::gLoggerGuardDepth_Get(); \
/* int counter = nOT::nUtils::gLoggerGuardDepth_Get(); if (counter!=1) gCurrentLogger.write_stream(100,"")<<"DEBUG-ERROR: recursion, counter="<<counter<<gCurrentLogger.endline(); */ \
gCurrentLogger.write_stream(LEVEL,"") << nOT::nUtils::get_current_time() << ' ' << OT_CODE_STAMP << ' ' << VAR << gCurrentLogger.endline() << std::flush; \
part=9; \
} catch(...) { \
gCurrentLogger.write_stream(std::max(level,90),"") << nOT::nUtils::get_current_time() << ' ' << OT_CODE_STAMP << ' ' << "(ERROR IN DEBUG)" << gCurrentLogger.endline(); \
--nOT::nUtils::gLoggerGuardDepth_Get(); throw ; \
} \
--nOT::nUtils::gLoggerGuardDepth_Get(); \
} catch(...) { if (part<8) gCurrentLogger.write_stream(100,"")<<"DEBUG-ERROR: problem in debug mechanism e.g. in locking." <<gCurrentLogger.endline(); throw ; } \
} } while(0)

// info for code below: oss object is normal stack variable, using it does not need lock protection
#define _debug_level_c(CHANNEL,LEVEL,VAR) do { if (_dbg_ignore< LEVEL) { \
nOT::nUtils::gLoggerGuard.try_lock(); \
gCurrentLogger.write_stream(LEVEL,CHANNEL) << nOT::nUtils::get_current_time() << ' ' << OT_CODE_STAMP << ' ' << VAR << gCurrentLogger.endline() << std::flush; \
nOT::nUtils::gLoggerGuard.unlock(); \
auto level=LEVEL; short int part=0; \
try { \
std::lock_guard<std::recursive_mutex> mutex_guard( nOT::nUtils::gLoggerGuard ); \
part=1; \
try { \
++nOT::nUtils::gLoggerGuardDepth_Get(); \
std::ostringstream oss; \
oss << nOT::nUtils::get_current_time() << ' ' << OT_CODE_STAMP << ' ' << VAR << gCurrentLogger.endline() << std::flush; \
std::string as_string = oss.str(); \
/* int counter = nOT::nUtils::gLoggerGuardDepth_Get(); if (counter!=1) gCurrentLogger.write_stream(100,"")<<"DEBUG-ERROR: recursion, counter="<<counter<<gCurrentLogger.endline(); */ \
gCurrentLogger.write_stream(LEVEL,"" ) << as_string << gCurrentLogger.endline() << std::flush; \
gCurrentLogger.write_stream(LEVEL,CHANNEL) << as_string << gCurrentLogger.endline() << std::flush; \
part=9; \
} catch(...) { \
gCurrentLogger.write_stream(std::max(level,90),CHANNEL) << nOT::nUtils::get_current_time() << ' ' << OT_CODE_STAMP << ' ' << "(ERROR IN DEBUG)" << gCurrentLogger.endline(); \
--nOT::nUtils::gLoggerGuardDepth_Get(); throw ; \
} \
--nOT::nUtils::gLoggerGuardDepth_Get(); \
} catch(...) { if (part<8) gCurrentLogger.write_stream(100,CHANNEL)<<"DEBUG-ERROR: problem in debug mechanism e.g. in locking." <<gCurrentLogger.endline(); throw ; } \
} } while(0)

#define _debug_level(LEVEL,VAR) _debug_level_c("",LEVEL,VAR)

#define _dbg3(VAR) _debug_level( 20,VAR)
#define _dbg2(VAR) _debug_level( 30,VAR)
#define _dbg1(VAR) _debug_level( 40,VAR) // details
Expand All @@ -98,10 +135,10 @@ extern std::mutex gLoggerGuard;
#define _warn_c(C,VAR) _debug_level_c(C, 90,VAR) // some problem
#define _erro_c(C,VAR) _debug_level_c(C,100,VAR) // error - report

// lock // because od VAR
// lock // because of VAR
#define _scope_debug_level_c(CHANNEL,LEVEL,VAR) \
std::ostringstream debug_detail_oss; \
nOT::nUtils::gLoggerGuard.try_lock(); \
nOT::nUtils::gLoggerGuard.lock(); \
debug_detail_oss << OT_CODE_STAMP << ' ' << VAR ; \
nOT::nUtils::nDetail::cDebugScopeGuard debugScopeGuard; \
if (_dbg_ignore<LEVEL) debugScopeGuard.Assign(CHANNEL,LEVEL, debug_detail_oss.str()); \
Expand Down
2 changes: 2 additions & 0 deletions src/connectivity_tool/conn_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace po = boost::program_options;
using namespace cryptonote;
using namespace nodetool;

unsigned int epee::g_test_dbg_lock_sleep = 0;

namespace
{
const command_line::arg_descriptor<std::string, true> arg_ip = {"ip", "set ip"};
Expand Down
19 changes: 2 additions & 17 deletions src/cryptonote_core/blockchain_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "cryptonote_core/checkpoints_create.h"
//#include "serialization/json_archive.h"
#include "../../contrib/otshell_utils/utils.hpp"
#include "../../src/p2p/data_logger.hpp"

using namespace cryptonote;

Expand Down Expand Up @@ -178,22 +179,6 @@ bool blockchain_storage::store_genesis_block(bool testnet) {
return true;
}
//------------------------------------------------------------------
void blockchain_storage::logger_handle(long int ms)
{
std::ofstream log_file;
log_file.open("log/dr-monero/blockchain_log.data", std::ofstream::out | std::ofstream::app);
log_file.precision(7);

using namespace boost::chrono;
auto point = steady_clock::now();
auto time_from_epoh = point.time_since_epoch();
auto m_ms = duration_cast< milliseconds >( time_from_epoh ).count();
double ms_f = m_ms;
ms_f /= 1000.;

log_file << ms_f << " " << ms << std::endl;
}
//------------------------------------------------------------------
bool blockchain_storage::store_blockchain()
{
m_is_blockchain_storing = true;
Expand Down Expand Up @@ -1776,7 +1761,7 @@ bool blockchain_storage::handle_block_to_main_chain(const block& bl, const crypt
<< "), coinbase_blob_size: " << coinbase_blob_size << ", cumulative size: " << cumulative_block_size
<< ", " << block_processing_time << "("<< target_calculating_time << "/" << longhash_calculating_time << ")ms");

logger_handle(block_processing_time);
epee::net_utils::data_logger::get_instance().add_data("blockchain_processing_time", block_processing_time);

bvc.m_added_to_main_chain = true;
/*if(!m_orphanes_reorganize_in_work)
Expand Down
1 change: 0 additions & 1 deletion src/cryptonote_core/blockchain_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ namespace cryptonote
bool complete_timestamps_vector(uint64_t start_height, std::vector<uint64_t>& timestamps);
bool update_next_comulative_size_limit();
bool store_genesis_block(bool testnet);
void logger_handle(long int ms);
};


Expand Down
24 changes: 20 additions & 4 deletions src/cryptonote_core/cryptonote_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,30 @@ namespace cryptonote
return m_fast_exit;
}
//-----------------------------------------------------------------------------------------------
void core::no_check_blocks()
void core::test_drop_download()
{
m_check_blocks = false;
m_test_drop_download = false;
}
//-----------------------------------------------------------------------------------------------
bool core::get_check_blocks()
void core::test_drop_download_height(uint64_t height)
{
return m_check_blocks;
m_test_drop_download_height = height;
}
//-----------------------------------------------------------------------------------------------
bool core::get_test_drop_download()
{
return m_test_drop_download;
}
//-----------------------------------------------------------------------------------------------
bool core::get_test_drop_download_height()
{
if (m_test_drop_download_height == 0)
return true;

if (get_blockchain_storage().get_current_blockchain_height() <= m_test_drop_download_height)
return true;

return false;
}
//-----------------------------------------------------------------------------------------------
bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block)
Expand Down
10 changes: 6 additions & 4 deletions src/cryptonote_core/cryptonote_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ namespace cryptonote
bool deinit();
static void set_fast_exit();
static bool get_fast_exit();
void no_check_blocks();
bool get_check_blocks();
void test_drop_download();
void test_drop_download_height(uint64_t height);
bool get_test_drop_download();
bool get_test_drop_download_height();
uint64_t get_current_blockchain_height();
bool get_blockchain_top(uint64_t& heeight, crypto::hash& top_id);
bool get_blocks(uint64_t start_offset, size_t count, std::list<block>& blocks, std::list<transaction>& txs);
Expand Down Expand Up @@ -151,8 +153,8 @@ namespace cryptonote
bool check_tx_inputs_keyimages_diff(const transaction& tx);
void graceful_exit();
static std::atomic<bool> m_fast_exit;
bool m_check_blocks = true;

bool m_test_drop_download = true;
uint64_t m_test_drop_download_height = 0;

tx_memory_pool m_mempool;
blockchain_storage m_blockchain_storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void cryptonote_protocol_handler_base::handler_request_blocks_history(std::list<
// TODO
}

void cryptonote_protocol_handler_base::handler_response_blocks_now(size_t packet_size) { _scope_mark("");
void cryptonote_protocol_handler_base::handler_response_blocks_now(size_t packet_size) { _scope_dbg1("");
using namespace epee::net_utils;
double delay=0; // will be calculated
_dbg1("Packet size: " << packet_size);
Expand Down
3 changes: 3 additions & 0 deletions src/cryptonote_protocol/cryptonote_protocol_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@
#include "cryptonote_core/connection_context.h"
#include "cryptonote_core/cryptonote_stat_info.h"
#include "cryptonote_core/verification_context.h"
// #include <netinet/in.h>
#include <boost/circular_buffer.hpp>

PUSH_WARNINGS
DISABLE_VS_WARNINGS(4355)

#define LOCALHOST_INT 2130706433

namespace cryptonote
{

Expand Down
Loading

0 comments on commit ae2a506

Please sign in to comment.