Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Added boost lockfree queue and consum thread #172
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Oct 6, 2017
1 parent 637ae0d commit 63fb9f0
Showing 1 changed file with 46 additions and 12 deletions.
58 changes: 46 additions & 12 deletions plugins/db_plugin/db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include <boost/range/algorithm_ext.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>

namespace fc { class variant; }

Expand All @@ -32,19 +34,15 @@ using namespace boost::multi_index;
class db_plugin_impl {
public:
void applied_irreversible_block(const signed_block&);
void process_irreversible_block(const signed_block&);

chain_plugin* chain_plug;
std::set<AccountName> filter_on;
std::unique_ptr<boost::lockfree::spsc_queue<signed_block>> queue;
boost::thread consum_thread;
boost::atomic<bool> startup{true};
boost::atomic<bool> done{false};

private:
struct block_comp
{
bool operator()(const block_id_type& a, const block_id_type& b) const
{
return chain::block_header::num_from_id(a) > chain::block_header::num_from_id(b);
}
};
typedef std::multimap<block_id_type, transaction_id_type, block_comp> block_transaction_id_map;
void consum_blocks();

bool is_scope_relevant(const eos::types::Vector<AccountName>& scope);
static void add(chainbase::database& db, const vector<types::KeyPermissionWeight>& keys, const AccountName& account_name, const PermissionName& permission);
Expand Down Expand Up @@ -77,7 +75,32 @@ const PermissionName db_plugin_impl::OWNER = "owner";
const PermissionName db_plugin_impl::ACTIVE = "active";
const PermissionName db_plugin_impl::RECOVERY = "recovery";

void db_plugin_impl::applied_irreversible_block(const signed_block& block)
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
if (startup) {
// on startup we don't want to queue, insteas push back on caller
process_irreversible_block(block);
} else {
if (!queue->push(block)) {
// TODO what to do if full
elog("queue is full!!!!!");
}
}
}

void db_plugin_impl::consum_blocks() {
signed_block block;
while (!done) {
while (queue->pop(block)) {
process_irreversible_block(block);
}
}
while (queue->pop(block)) {
process_irreversible_block(block);
}
ilog("db_plugin consum thread shutdown gracefully");
}

void db_plugin_impl::process_irreversible_block(const signed_block& block)
{
const auto block_id = block.id();
ilog("block ${bid}", ("bid", block_id));
Expand Down Expand Up @@ -193,6 +216,8 @@ void db_plugin::set_program_options(options_description& cli, options_descriptio
cfg.add_options()
("filter_on_accounts,f", bpo::value<vector<string>>()->composing(),
"Track only transactions whose scopes involve the listed accounts. Default is to track all transactions.")
("queue_size,q", bpo::value<uint>()->default_value(1024),
"The block queue size")
;
}

Expand All @@ -215,6 +240,9 @@ void db_plugin::plugin_initialize(const variables_map& options)
auto foa = options.at("filter_on_accounts").as<vector<string>>();
for(auto filter_account : foa)
my->filter_on.emplace(filter_account);
} else if (options.count("queue_size")) {
auto size = options.at("queue_size").as<uint>();
my->queue = std::make_unique<boost::lockfree::spsc_queue<signed_block>>(size);
}
}

Expand All @@ -223,17 +251,23 @@ void db_plugin::plugin_startup()
ilog("starting db plugin");
// TODO: during chain startup we want to pushback on apply so that chainbase has to wait for db.
// TODO: assert that last irreversible in db is one less than received (on startup only?, every so often?)
my->chain_plug = app().find_plugin<chain_plugin>();
// my->chain_plug = app().find_plugin<chain_plugin>();
// auto& db = my->chain_plug->chain().get_mutable_database();
// db.add_index<account_control_history_multi_index>();
// db.add_index<account_transaction_history_multi_index>();
// db.add_index<public_key_history_multi_index>();
// db.add_index<transaction_history_multi_index>();

my->consum_thread = boost::thread([this]{ my->consum_blocks(); });

// chain_controller is created and has resynced or replayed if needed
my->startup = false;
}

void db_plugin::plugin_shutdown()
{
my->done = true;
my->consum_thread.join();
}

} // namespace eos

0 comments on commit 63fb9f0

Please sign in to comment.