Skip to content

Commit

Permalink
SKALE-5059 update uninstall watch
Browse files Browse the repository at this point in the history
  • Loading branch information
olehnikolaiev committed Apr 5, 2022
1 parent c6db184 commit fd32775
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 26 deletions.
18 changes: 14 additions & 4 deletions libethereum/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1320,19 +1320,29 @@ void Client::updateIMABLSPublicKey() {

// new block watch
unsigned Client::installNewBlockWatch(
std::function< void( const unsigned&, const Block& ) >& fn ) {
std::function< void( const unsigned&, const Block& ) >& fn, const std::string& strOrigin ) {
if ( m_filtersByIp[strOrigin] == MAX_FILTERS_PER_IP_COUNT )
throw std::invalid_argument( "Too many filters created from " + strOrigin );
m_filtersByIp[strOrigin] += 1;
return m_new_block_watch.install( fn );
}
bool Client::uninstallNewBlockWatch( const unsigned& k ) {
bool Client::uninstallNewBlockWatch( const unsigned& k, const std::string& strOrigin ) {
m_filtersByIp[strOrigin] -= 1;
return m_new_block_watch.uninstall( k );
}

// new pending transation watch
unsigned Client::installNewPendingTransactionWatch(
std::function< void( const unsigned&, const Transaction& ) >& fn ) {
std::function< void( const unsigned&, const Transaction& ) >& fn,
const std::string& strOrigin ) {
if ( m_filtersByIp[strOrigin] == MAX_FILTERS_PER_IP_COUNT )
throw std::invalid_argument( "Too many filters created from " + strOrigin );
m_filtersByIp[strOrigin] += 1;
return m_new_pending_transaction_watch.install( fn );
}
bool Client::uninstallNewPendingTransactionWatch( const unsigned& k ) {
bool Client::uninstallNewPendingTransactionWatch(
const unsigned& k, const std::string& strOrigin ) {
m_filtersByIp[strOrigin] -= 1;
return m_new_pending_transaction_watch.uninstall( k );
}

Expand Down
10 changes: 6 additions & 4 deletions libethereum/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,15 @@ class Client : public ClientBase, protected Worker {
public:
// new block watch
virtual unsigned installNewBlockWatch(
std::function< void( const unsigned&, const Block& ) >& ) override;
virtual bool uninstallNewBlockWatch( const unsigned& ) override;
std::function< void( const unsigned&, const Block& ) >&, const std::string& ) override;
virtual bool uninstallNewBlockWatch( const unsigned&, const std::string& ) override;

// new pending transation watch
virtual unsigned installNewPendingTransactionWatch(
std::function< void( const unsigned&, const Transaction& ) >& ) override;
virtual bool uninstallNewPendingTransactionWatch( const unsigned& ) override;
std::function< void( const unsigned&, const Transaction& ) >&,
const std::string& ) override;
virtual bool uninstallNewPendingTransactionWatch(
const unsigned&, const std::string& ) override;
};

} // namespace eth
Expand Down
1 change: 1 addition & 0 deletions libethereum/ClientBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ bool ClientBase::uninstallWatch( unsigned _i ) {
LOG( m_loggerWatch ) << "*X*" << fit->first << ":" << fit->second.filter;
m_filters.erase( fit );
}
m_filtersByIp[""] -= 1;
return true;
}

Expand Down
14 changes: 8 additions & 6 deletions libethereum/Interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,21 +264,23 @@ class Interface {

public:
// new block watch
virtual unsigned installNewBlockWatch(
std::function< void( const unsigned&, const Block& ) >& ) { // not implemented here
virtual unsigned installNewBlockWatch( std::function< void( const unsigned&, const Block& ) >&,
const std::string& ) { // not implemented here
return unsigned( -1 );
}
virtual bool uninstallNewBlockWatch( const unsigned& ) { // not implemented here
virtual bool uninstallNewBlockWatch( const unsigned&, const std::string& ) { // not implemented
// here
return false;
}

// new pending transation watch
virtual unsigned installNewPendingTransactionWatch( // not implemented here
std::function< void( const unsigned&, const Transaction& ) >& ) {
std::function< void( const unsigned&, const Transaction& ) >&, const std::string& ) {
return unsigned( -1 );
}
virtual bool uninstallNewPendingTransactionWatch( const unsigned& ) { // not implemented
// here
virtual bool uninstallNewPendingTransactionWatch(
const unsigned&, const std::string& ) { // not implemented
// here
return false;
}
};
Expand Down
26 changes: 14 additions & 12 deletions libskale/httpserveroverride.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ void SkaleWsPeer::uninstallAllWatches() {
setInstalledWatchesNewPendingTransactions_.clear();
for ( auto iw : sw ) {
try {
pEthereum->uninstallNewPendingTransactionWatch( iw );
pEthereum->uninstallNewPendingTransactionWatch( iw, m_strUnDdosOrigin );
} catch ( ... ) {
}
}
Expand All @@ -1115,7 +1115,7 @@ void SkaleWsPeer::uninstallAllWatches() {
setInstalledWatchesNewBlocks_.clear();
for ( auto iw : sw ) {
try {
pEthereum->uninstallNewBlockWatch( iw );
pEthereum->uninstallNewBlockWatch( iw, m_strUnDdosOrigin );
} catch ( ... ) {
}
}
Expand Down Expand Up @@ -1467,12 +1467,14 @@ void SkaleWsPeer::eth_subscribe_newPendingTransactions(
( std::string( "RPC/" ) + pThis->getRelay().nfoGetSchemeUC() ).c_str(),
"eth_subscription/newPendingTransactions" );
stats::register_stats_error( "RPC", "eth_subscription/newPendingTransactions" );
pThis->ethereum()->uninstallNewPendingTransactionWatch( iw );
pThis->ethereum()->uninstallNewPendingTransactionWatch(
iw, pThis->m_strUnDdosOrigin );
}
//} );
} );
};
unsigned iw = ethereum()->installNewPendingTransactionWatch( fnOnSunscriptionEvent );
unsigned iw = ethereum()->installNewPendingTransactionWatch(
fnOnSunscriptionEvent, m_strUnDdosOrigin );
setInstalledWatchesNewPendingTransactions_.insert( iw );
iw |= SKALED_WS_SUBSCRIPTION_TYPE_NEW_PENDING_TRANSACTION;
std::string strIW = dev::toJS( iw );
Expand Down Expand Up @@ -1593,12 +1595,12 @@ void SkaleWsPeer::eth_subscribe_newHeads( e_server_mode_t /*esm*/,
( std::string( "RPC/" ) + pThis->getRelay().nfoGetSchemeUC() ).c_str(),
"eth_subscription/newHeads" );
stats::register_stats_error( "RPC", "eth_subscription/newHeads" );
pThis->ethereum()->uninstallNewBlockWatch( iw );
pThis->ethereum()->uninstallNewBlockWatch( iw, pThis->m_strUnDdosOrigin );
}
//} );
} );
};
unsigned iw = ethereum()->installNewBlockWatch( fnOnSunscriptionEvent );
unsigned iw = ethereum()->installNewBlockWatch( fnOnSunscriptionEvent, m_strUnDdosOrigin );
setInstalledWatchesNewBlocks_.insert( iw );
iw |= SKALED_WS_SUBSCRIPTION_TYPE_NEW_BLOCK;
std::string strIW = dev::toJS( iw );
Expand Down Expand Up @@ -1747,7 +1749,7 @@ void SkaleWsPeer::eth_unsubscribe(
joResponse["error"] = joError;
return;
}
ethereum()->uninstallNewPendingTransactionWatch( iw );
ethereum()->uninstallNewPendingTransactionWatch( iw, m_strUnDdosOrigin );
setInstalledWatchesNewPendingTransactions_.erase(
iw & ( ~( SKALED_WS_SUBSCRIPTION_TYPE_MASK ) ) );
} else if ( x == SKALED_WS_SUBSCRIPTION_TYPE_NEW_BLOCK ) {
Expand All @@ -1770,7 +1772,7 @@ void SkaleWsPeer::eth_unsubscribe(
joResponse["error"] = joError;
return;
}
ethereum()->uninstallNewBlockWatch( iw );
ethereum()->uninstallNewBlockWatch( iw, m_strUnDdosOrigin );
setInstalledWatchesNewBlocks_.erase( iw & ( ~( SKALED_WS_SUBSCRIPTION_TYPE_MASK ) ) );
} else if ( x == SKALED_WS_SUBSCRIPTION_TYPE_SKALE_STATS ) {
SkaleStatsSubscriptionManager::subscription_id_t idSubscription =
Expand Down Expand Up @@ -2100,7 +2102,7 @@ SkaleServerOverride::SkaleServerOverride(
statsTransactions_.event_queue_add( "transactions",
0 // stats::g_nSizeDefaultOnQueueAdd
);
iwBlockStats_ = ethereum()->installNewBlockWatch( fnOnSunscriptionEvent );
iwBlockStats_ = ethereum()->installNewBlockWatch( fnOnSunscriptionEvent, "0.0.0.0" );
} // block
{ // block
std::function< void( const unsigned& iw, const dev::eth::Transaction& tx ) >
Expand All @@ -2113,17 +2115,17 @@ SkaleServerOverride::SkaleServerOverride(
0 // stats::g_nSizeDefaultOnQueueAdd
);
iwPendingTransactionStats_ =
ethereum()->installNewPendingTransactionWatch( fnOnSunscriptionEvent );
ethereum()->installNewPendingTransactionWatch( fnOnSunscriptionEvent, "0.0.0.0" );
} // block
}

SkaleServerOverride::~SkaleServerOverride() {
if ( iwBlockStats_ != unsigned( -1 ) ) {
ethereum()->uninstallNewBlockWatch( iwBlockStats_ );
ethereum()->uninstallNewBlockWatch( iwBlockStats_, "0.0.0.0" );
iwBlockStats_ = unsigned( -1 );
}
if ( iwPendingTransactionStats_ != unsigned( -1 ) ) {
ethereum()->uninstallNewPendingTransactionWatch( iwPendingTransactionStats_ );
ethereum()->uninstallNewPendingTransactionWatch( iwPendingTransactionStats_, "0.0.0.0" );
iwPendingTransactionStats_ = unsigned( -1 );
}
StopListening();
Expand Down

0 comments on commit fd32775

Please sign in to comment.