Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1515 limit transaction queue in bytes #1533

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libethashseal/EthashClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class EthashClient : public Client {
std::shared_ptr< InstanceMonitor > _instanceMonitor,
boost::filesystem::path const& _dbPath = {},
WithExisting _forceAction = WithExisting::Trust,
TransactionQueue::Limits const& _l = TransactionQueue::Limits{ 1024, 1024 } );
TransactionQueue::Limits const& _l = TransactionQueue::Limits{
1024, 1024, 12322916, 24645833 } );
~EthashClient();

Ethash* ethash() const;
Expand Down
3 changes: 2 additions & 1 deletion libethereum/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class Client : public ClientBase, protected Worker {
std::shared_ptr< InstanceMonitor > _instanceMonitor,
boost::filesystem::path const& _dbPath = boost::filesystem::path(),
WithExisting _forceAction = WithExisting::Trust,
TransactionQueue::Limits const& _l = TransactionQueue::Limits{ 1024, 1024 } );
TransactionQueue::Limits const& _l = TransactionQueue::Limits{
1024, 1024, 12322916, 24645833 } );
/// Destructor.
virtual ~Client();

Expand Down
3 changes: 2 additions & 1 deletion libethereum/ClientTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class ClientTest : public Client {
std::shared_ptr< InstanceMonitor > _instanceMonitor,
boost::filesystem::path const& _dbPath = boost::filesystem::path(),
WithExisting _forceAction = WithExisting::Trust,
TransactionQueue::Limits const& _l = TransactionQueue::Limits{ 1024, 1024 } );
TransactionQueue::Limits const& _l = TransactionQueue::Limits{
1024, 1024, 12322916, 24645833 } );
~ClientTest();

bool mineBlocks( unsigned _count ) noexcept;
Expand Down
17 changes: 14 additions & 3 deletions libethereum/TransactionQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ constexpr size_t c_maxVerificationQueueSize = 8192;
constexpr size_t c_maxDroppedTransactionCount = 1024;
} // namespace

TransactionQueue::TransactionQueue( unsigned _limit, unsigned _futureLimit )
TransactionQueue::TransactionQueue( unsigned _limit, unsigned _futureLimit,
unsigned _currentLimitBytes, unsigned _futureLimitBytes )
: m_dropped{ c_maxDroppedTransactionCount },
m_current( PriorityCompare{ *this } ),
m_limit( _limit ),
m_futureLimit( _futureLimit ),
m_currentSizeBytesLimit( _currentLimitBytes ),
m_futureSizeBytesLimit( _futureLimitBytes ),
m_aborting( false ) {
m_readyCondNotifier = this->onReady( [this]() {
this->m_cond.notify_all();
Expand Down Expand Up @@ -131,6 +134,7 @@ ImportResult TransactionQueue::import(
// if( t == fs->second.begin() ){
UpgradeGuard ul( l );
--m_futureSize;
m_futureSizeBytes -= t->second.transaction.rlp().size();
auto erasedHash = t->second.transaction.sha3();
LOG( m_loggerDetail ) << "Re-inserting future transaction " << erasedHash;
m_known.erase( erasedHash );
Expand Down Expand Up @@ -242,7 +246,7 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(
insertCurrent_WITH_LOCK( make_pair( _h, _transaction ) );
LOG( m_loggerDetail ) << "Queued vaguely legit-looking transaction " << _h;

while ( m_current.size() > m_limit ) {
while ( m_current.size() > m_limit || m_currentSizeBytes > m_currentSizeBytesLimit ) {
LOG( m_loggerDetail ) << "Dropping out of bounds transaction " << _h;
remove_WITH_LOCK( m_current.rbegin()->transaction.sha3() );
}
Expand Down Expand Up @@ -304,6 +308,7 @@ void TransactionQueue::insertCurrent_WITH_LOCK( std::pair< h256, Transaction > c
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;
#pragma GCC diagnostic pop
m_currentSizeBytes += t.rlp().size();

// Move following transactions from future to current
makeCurrent_WITH_LOCK( t );
Expand All @@ -321,6 +326,7 @@ bool TransactionQueue::remove_WITH_LOCK( h256 const& _txHash ) {
auto it = m_currentByAddressAndNonce.find( from );
assert( it != m_currentByAddressAndNonce.end() );
it->second.erase( ( *t->second ).transaction.nonce() );
m_currentSizeBytes -= ( *t->second ).transaction.rlp().size();
m_current.erase( t->second );
m_currentByHash.erase( t );
if ( it->second.empty() )
Expand Down Expand Up @@ -357,6 +363,8 @@ void TransactionQueue::setFuture_WITH_LOCK( h256 const& _txHash ) {
*( m->second ) ); // set has only const iterators. Since we are moving out of container
// that's fine
m_currentByHash.erase( t.transaction.sha3() );
m_currentSizeBytes -= t.transaction.rlp().size();
m_futureSizeBytes += t.transaction.rlp().size();
target.emplace( t.transaction.nonce(), move( t ) );
m_current.erase( m->second );
++m_futureSize;
Expand All @@ -365,10 +373,11 @@ void TransactionQueue::setFuture_WITH_LOCK( h256 const& _txHash ) {
if ( queue.empty() )
m_currentByAddressAndNonce.erase( from );

while ( m_futureSize > m_futureLimit ) {
while ( m_futureSize > m_futureLimit || m_futureSizeBytes > m_futureSizeBytesLimit ) {
// TODO: priority queue for future transactions
// For now just drop random chain end
--m_futureSize;
m_futureSizeBytes -= m_future.begin()->second.rbegin()->second.transaction.rlp().size();
auto erasedHash = m_future.begin()->second.rbegin()->second.transaction.sha3();
LOG( m_loggerDetail ) << "Dropping out of bounds future transaction " << erasedHash;
m_known.erase( erasedHash );
Expand Down Expand Up @@ -402,6 +411,8 @@ void TransactionQueue::makeCurrent_WITH_LOCK( Transaction const& _t ) {
inserted.first->second = handle;
m_currentByHash[( *handle ).transaction.sha3()] = handle;
#pragma GCC diagnostic pop
m_futureSizeBytes -= ( *handle ).transaction.rlp().size();
m_currentSizeBytes += ( *handle ).transaction.rlp().size();
--m_futureSize;
++ft;
++nonce;
Expand Down
26 changes: 20 additions & 6 deletions libethereum/TransactionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,22 @@ namespace eth {
class TransactionQueue {
public:
struct Limits {
size_t current;
size_t future;
size_t currentLimit;
size_t futureLimit;
size_t currentLimitBytes = 12322916;
size_t futureLimitBytes = 24645833;
};

/// @brief TransactionQueue
/// @param _limit Maximum number of pending transactions in the queue.
/// @param _futureLimit Maximum number of future nonce transactions.
TransactionQueue( unsigned _limit = 1024, unsigned _futureLimit = 1024 );
TransactionQueue( Limits const& _l ) : TransactionQueue( _l.current, _l.future ) {}
/// @param _currentLimitBytes Maximum size of pending transactions in the queue in bytes.
/// @param _futureLimitBytes Maximum size of future nonce transactions in bytes.
TransactionQueue( unsigned _limit = 1024, unsigned _futureLimit = 1024,
unsigned _currentLimitBytes = 12322916, unsigned _futureLimitBytes = 24645833 );
TransactionQueue( Limits const& _l )
: TransactionQueue(
_l.currentLimit, _l.futureLimit, _l.currentLimitBytes, _l.futureLimitBytes ) {}
~TransactionQueue();
void HandleDestruction();
/// Add transaction to the queue to be verified and imported.
Expand Down Expand Up @@ -151,12 +158,14 @@ class TransactionQueue {
ReadGuard l( m_lock );
ret.dropped = m_dropped.size();
ret.current = m_currentByHash.size();
ret.future = m_future.size();
ret.future = m_futureSize;
return ret;
}

/// @returns the transaction limits on current/future.
Limits limits() const { return Limits{ m_limit, m_futureLimit }; }
Limits limits() const {
return Limits{ m_limit, m_futureLimit, m_currentSizeBytes, m_futureSizeBytes };
}

/// @returns the number of tx in future queue.
size_t futureSize() const { return m_futureSize; }
Expand Down Expand Up @@ -323,6 +332,11 @@ class TransactionQueue {
unsigned m_futureLimit; ///< Max number of future transactions
unsigned m_futureSize = 0; ///< Current number of future transactions

unsigned m_currentSizeBytesLimit = 0; // max pending queue size in bytes
unsigned m_currentSizeBytes = 0; // current pending queue size in bytes
unsigned m_futureSizeBytesLimit = 0; // max future queue size in bytes
unsigned m_futureSizeBytes = 0; // current future queue size in bytes

std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry.
std::vector< std::thread > m_verifiers;
std::deque< UnverifiedTransaction > m_unverified; ///< Pending verification queue
Expand Down
26 changes: 22 additions & 4 deletions skaled/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,8 @@ int main( int argc, char** argv ) try {

unsigned c_transactionQueueSize = 100000;
unsigned c_futureTransactionQueueSize = 16000;
unsigned c_transactionQueueSizeBytes = 12322916;
unsigned c_futureTransactionQueueSizeBytes = 24645833;

if ( chainConfigParsed ) {
try {
Expand Down Expand Up @@ -1628,6 +1630,22 @@ int main( int argc, char** argv ) try {
} catch ( ... ) {
}

try {
if ( joConfig["skaleConfig"]["nodeInfo"].count( "transactionQueueSizeBytes" ) )
c_transactionQueueSizeBytes =
joConfig["skaleConfig"]["nodeInfo"]["transactionQueueSizeBytes"]
.get< unsigned >();
} catch ( ... ) {
}

try {
if ( joConfig["skaleConfig"]["nodeInfo"].count( "futureTransactionQueueBytes" ) )
c_futureTransactionQueueSizeBytes =
joConfig["skaleConfig"]["nodeInfo"]["futureTransactionQueueBytes"]
.get< unsigned >();
} catch ( ... ) {
}

try {
if ( joConfig["skaleConfig"]["nodeInfo"].count( "maxOpenLeveldbFiles" ) )
dev::db::c_maxOpenLeveldbFiles =
Expand Down Expand Up @@ -1912,14 +1930,14 @@ int main( int argc, char** argv ) try {
g_client.reset( new eth::EthashClient( chainParams, ( int ) chainParams.networkID,
shared_ptr< GasPricer >(), snapshotManager, instanceMonitor, getDataDir(),
withExisting,
TransactionQueue::Limits{
c_transactionQueueSize, c_futureTransactionQueueSize } ) );
TransactionQueue::Limits{ c_transactionQueueSize, c_futureTransactionQueueSize,
c_transactionQueueSizeBytes, c_futureTransactionQueueSizeBytes } ) );
} else if ( chainParams.sealEngineName == NoProof::name() ) {
g_client.reset( new eth::Client( chainParams, ( int ) chainParams.networkID,
shared_ptr< GasPricer >(), snapshotManager, instanceMonitor, getDataDir(),
withExisting,
TransactionQueue::Limits{
c_transactionQueueSize, c_futureTransactionQueueSize } ) );
TransactionQueue::Limits{ c_transactionQueueSize, c_futureTransactionQueueSize,
c_transactionQueueSizeBytes, c_futureTransactionQueueSizeBytes } ) );
} else
BOOST_THROW_EXCEPTION( ChainParamsInvalid() << errinfo_comment(
"Unknown seal engine: " + chainParams.sealEngineName ) );
Expand Down
37 changes: 37 additions & 0 deletions test/unittests/libethereum/TransactionQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,43 @@ BOOST_AUTO_TEST_CASE( tqLimit ) {
BOOST_REQUIRE( topTr.size() == 1 ); // 1 imported transaction
}

BOOST_AUTO_TEST_CASE( tqLimitBytes ) {
TransactionQueue tq( 100, 100, 250, 250 );

unsigned maxTxCount = 250 / TestTransaction::defaultTransaction( 1 ).transaction().rlp().size();

TestTransaction testTransaction = TestTransaction::defaultTransaction( 2 );
ImportResult res = tq.import( testTransaction.transaction(), IfDropped::Ignore, true );
BOOST_REQUIRE( res == ImportResult::Success );

testTransaction = TestTransaction::defaultTransaction( 3 );
res = tq.import( testTransaction.transaction(), IfDropped::Ignore, true );
BOOST_REQUIRE( res == ImportResult::Success );

BOOST_REQUIRE( tq.status().current == 0 );

BOOST_REQUIRE( tq.status().future == maxTxCount );

testTransaction = TestTransaction::defaultTransaction( 4 );
res = tq.import( testTransaction.transaction(), IfDropped::Ignore, true );
BOOST_REQUIRE( res == ImportResult::Success );

BOOST_REQUIRE( tq.status().current == 0 );

BOOST_REQUIRE( tq.status().future == maxTxCount );

for ( size_t i = 1; i < 10; i++ ) {
if (i == 2 || i == 3)
continue;
testTransaction = TestTransaction::defaultTransaction( i );
res = tq.import( testTransaction.transaction() );
BOOST_REQUIRE( res == ImportResult::Success );
}

BOOST_REQUIRE( tq.status().current == maxTxCount );
BOOST_REQUIRE( tq.status().future == 0 );
}

BOOST_AUTO_TEST_CASE( tqEqueue ) {
TransactionQueue tq;
TestTransaction testTransaction = TestTransaction::defaultTransaction();
Expand Down