Skip to content

Commit

Permalink
[WIP] fix: ensure tests pass
Browse files Browse the repository at this point in the history
In particular, several of the commits in this set assume their
`blocking_queue`s will be empty by the time the destructor is called.
However, this is not guaranteeable, and causes segfaults and/or
indefinite hangs when encountered. This commit predominantly ensures
that the queues are all `clear()`d appropriately.

Signed-off-by: Sam Stuewe <stuewe@mit.edu>
  • Loading branch information
HalosGhost committed Apr 30, 2024
1 parent e31a309 commit a6fddab
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH)
endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
add_compile_options(-fprofile-arcs -ftest-coverage)
add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3)
endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
shard
watchtower
locking_shard
raft
transaction
rpc
network
Expand All @@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
crypto
secp256k1
${LEVELDB_LIBRARY}
${NURAFT_LIBRARY}
${CMAKE_THREAD_LIBS_INIT})
12 changes: 6 additions & 6 deletions src/uhs/twophase/coordinator/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ namespace cbdc::coordinator {
m_start_thread.join();
}

m_attestation_check_queue.clear();
for(auto& t : m_attestation_check_threads) {
if(t.joinable()) {
t.join();
Expand Down Expand Up @@ -710,15 +711,15 @@ namespace cbdc::coordinator {
tx,
m_opts.m_sentinel_public_keys,
m_opts.m_attestation_threshold);
cb(std::move(tx), valid);
cb(tx, valid);
}
}
}

auto controller::check_tx_attestation(const transaction::compact_tx& tx,
attestation_check_callback cb)
-> bool {
m_attestation_check_queue.push({std::move(tx), std::move(cb)});
m_attestation_check_queue.push({tx, std::move(cb)});
return true;
}

Expand All @@ -731,7 +732,7 @@ namespace cbdc::coordinator {
}

return check_tx_attestation(
std::move(tx),
tx,
[&,
res_cb = std::move(result_callback)](transaction::compact_tx tx2,
bool result) {
Expand Down Expand Up @@ -760,9 +761,8 @@ namespace cbdc::coordinator {
auto idx = m_current_batch->add_tx(tx2);
// Map the index of the tx to the transaction ID and
// sentinel ID
m_current_txs->emplace(
tx2.m_id,
std::make_pair(std::move(res_cb), idx));
m_current_txs->emplace(tx2.m_id,
std::make_pair(res_cb, idx));
return true;
}();
if(added) {
Expand Down
30 changes: 21 additions & 9 deletions src/uhs/twophase/locking_shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace cbdc::locking_shard {
controller::controller(size_t shard_id,
size_t node_id,
const cbdc::config::options& opts,
cbdc::config::options opts,
std::shared_ptr<logging::log> logger)
: m_opts(std::move(opts)),
m_logger(std::move(logger)),
Expand Down Expand Up @@ -112,6 +112,18 @@ namespace cbdc::locking_shard {
return true;
}

controller::~controller() {
m_running = false;
m_validation_queue.clear();
for(auto& t : m_validation_threads) {
if(t.joinable()) {
t.join();
}
}
m_validation_threads.clear();
m_server.reset();
}

auto controller::raft_callback(nuraft::cb_func::Type type,
nuraft::cb_func::Param* /* param */)
-> nuraft::cb_func::ReturnCode {
Expand Down Expand Up @@ -158,24 +170,24 @@ namespace cbdc::locking_shard {
auto v = validation_request();
if(m_validation_queue.pop(v)) {
auto [req, cb] = v;
validate_request(std::move(req), std::move(cb));
validate_request(std::move(req), cb);
}
}
}

auto
controller::enqueue_validation(cbdc::buffer buf,
controller::enqueue_validation(cbdc::buffer request,
cbdc::raft::rpc::validation_callback cb)
-> bool {
m_validation_queue.push({std::move(buf), std::move(cb)});
m_validation_queue.push({std::move(request), std::move(cb)});
return true;
}

auto controller::validate_request(cbdc::buffer buf,
cbdc::raft::rpc::validation_callback cb)
-> bool {
auto controller::validate_request(
cbdc::buffer request,
const cbdc::raft::rpc::validation_callback& cb) -> bool {
auto maybe_req
= cbdc::from_buffer<cbdc::rpc::request<rpc::request>>(buf);
= cbdc::from_buffer<cbdc::rpc::request<rpc::request>>(request);
auto valid = true;
if(maybe_req) {
valid = std::visit(
Expand Down Expand Up @@ -208,7 +220,7 @@ namespace cbdc::locking_shard {
valid = false;
}

cb(std::move(buf), valid);
cb(std::move(request), valid);
return true;
}
}
6 changes: 4 additions & 2 deletions src/uhs/twophase/locking_shard/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace cbdc::locking_shard {
size_t node_id,
const cbdc::config::options& opts,
std::shared_ptr<logging::log> logger);
~controller() = default;

~controller();

controller() = delete;
controller(const controller&) = delete;
Expand All @@ -48,7 +49,8 @@ namespace cbdc::locking_shard {
nuraft::cb_func::Param* param)
-> nuraft::cb_func::ReturnCode;
auto validate_request(cbdc::buffer request,
cbdc::raft::rpc::validation_callback cb) -> bool;
const cbdc::raft::rpc::validation_callback& cb)
-> bool;

auto enqueue_validation(cbdc::buffer request,
cbdc::raft::rpc::validation_callback cb)
Expand Down
36 changes: 22 additions & 14 deletions src/uhs/twophase/sentinel_2pc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,23 @@ namespace cbdc::sentinel_2pc {
return true;
}

controller::~controller() {
stop();
}

void controller::validation_worker() {
while(m_running) {
auto v = queued_validation();
if(m_validation_queue.pop(v)) {
auto [tx, cb] = v;
cb(std::move(tx), transaction::validation::check_tx(tx));
cb(tx, transaction::validation::check_tx(tx));
}
}
}

auto controller::validate_tx(const transaction::full_tx& tx,
validation_callback cb) -> bool {
m_validation_queue.push({std::move(tx), std::move(cb)});
m_validation_queue.push({tx, std::move(cb)});
return true;
}

Expand All @@ -133,22 +137,22 @@ namespace cbdc::sentinel_2pc {
if(m_attestation_queue.pop(v)) {
auto [tx, cb] = v;
auto compact_tx = cbdc::transaction::compact_tx(tx);
cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey));
cb(tx, compact_tx.sign(m_secp.get(), m_privkey));
}
}
}

auto controller::attest_tx(const transaction::full_tx& tx,
attestation_callback cb) -> bool {
m_attestation_queue.push({std::move(tx), std::move(cb)});
m_attestation_queue.push({tx, std::move(cb)});
return true;
}

auto controller::execute_transaction(
transaction::full_tx tx,
execute_result_callback_type result_callback) -> bool {
return controller::validate_tx(
std::move(tx),
tx,
[&, result_callback](
const transaction::full_tx& tx2,
std::optional<cbdc::transaction::validation::tx_error> err) {
Expand All @@ -166,10 +170,7 @@ namespace cbdc::sentinel_2pc {
}

auto compact_tx = cbdc::transaction::compact_tx(tx2);
gather_attestations(std::move(tx2),
std::move(result_callback),
compact_tx,
{});
gather_attestations(tx2, result_callback, compact_tx, {});
return;
});
}
Expand All @@ -194,7 +195,7 @@ namespace cbdc::sentinel_2pc {
transaction::full_tx tx,
validate_result_callback_type result_callback) -> bool {
return controller::validate_tx(
std::move(tx),
tx,
[&, result_callback](
const transaction::full_tx& tx2,
std::optional<cbdc::transaction::validation::tx_error> err) {
Expand All @@ -203,7 +204,7 @@ namespace cbdc::sentinel_2pc {
return;
}
controller::attest_tx(
std::move(tx2),
tx2,
[&, result_callback](
const transaction::full_tx& /* tx3 */,
std::optional<cbdc::sentinel::validate_response> res) {
Expand Down Expand Up @@ -233,17 +234,24 @@ namespace cbdc::sentinel_2pc {

void controller::stop() {
m_running = false;
m_rpc_server.reset();

m_validation_queue.clear();
m_attestation_queue.clear();

for(auto& t : m_validation_threads) {
if(t.joinable()) {
t.join();
}
}
m_validation_threads.clear();

for(auto& t : m_attestation_threads) {
if(t.joinable()) {
t.join();
}
}
m_attestation_threads.clear();
}

void controller::gather_attestations(
Expand All @@ -252,10 +260,10 @@ namespace cbdc::sentinel_2pc {
const transaction::compact_tx& ctx,
std::unordered_set<size_t> requested) {
if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) {
if(ctx.m_attestations.size() == 0) {
if(ctx.m_attestations.empty()) {
// Self-attest first
controller::attest_tx(
std::move(tx),
tx,
[&, ctx, result_callback](const transaction::full_tx& tx2,
validate_result res) {
validate_result_handler(res,
Expand Down Expand Up @@ -297,7 +305,7 @@ namespace cbdc::sentinel_2pc {
void
controller::send_compact_tx(const transaction::compact_tx& ctx,
execute_result_callback_type result_callback) {
auto cb = [&, this, ctx, res_cb = std::move(result_callback)](
auto cb = [&, ctx, res_cb = std::move(result_callback)](
std::optional<bool> res) {
result_handler(res, res_cb);
};
Expand Down
2 changes: 1 addition & 1 deletion src/uhs/twophase/sentinel_2pc/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace cbdc::sentinel_2pc {
const config::options& opts,
std::shared_ptr<logging::log> logger);

~controller() override = default;
~controller() override;

/// Initializes the controller. Connects to the shard coordinator
/// network and launches a server thread for external clients.
Expand Down
6 changes: 4 additions & 2 deletions src/uhs/twophase/sentinel_2pc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ namespace cbdc::sentinel::rpc {
m_srv->register_handler_callback(
[&](const request& req,
async_interface::result_callback_type callback) {
auto req_item = request_queue_t{req, callback};
auto req_item = request_queue_t{req, std::move(callback)};
m_request_queue.push(req_item);
return true;
});
}
bool operator<(const request_queue_t& a, const request_queue_t& b) {
auto operator<(const request_queue_t& a, const request_queue_t& b)
-> bool {
// Prioritize validate requests over execute requests
return (std::holds_alternative<validate_request>(a.m_req)
&& std::holds_alternative<execute_request>(b.m_req));
}
async_server::~async_server() {
m_running = false;
m_request_queue.clear();
if(m_processing_thread.joinable()) {
m_processing_thread.join();
}
Expand Down
6 changes: 5 additions & 1 deletion src/uhs/twophase/sentinel_2pc/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace cbdc::sentinel::rpc {
async_interface::result_callback_type m_cb;
};

bool operator<(const request_queue_t& a, const request_queue_t& b);
auto operator<(const request_queue_t& a, const request_queue_t& b) -> bool;
/// Asynchronous RPC server for a sentinel.
class async_server {
public:
Expand All @@ -33,6 +33,10 @@ namespace cbdc::sentinel::rpc {
std::unique_ptr<cbdc::rpc::async_server<request, response>> srv);

~async_server();
async_server(async_server&&) noexcept = default;
auto operator=(async_server&&) noexcept -> async_server& = default;
async_server(const async_server&) = default;
auto operator=(const async_server&) -> async_server& = default;

private:
void process();
Expand Down
13 changes: 7 additions & 6 deletions src/util/raft/rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace cbdc::raft::rpc {
/// \param impl pointer to the raft node.
/// \see cbdc::rpc::server
void register_raft_node(std::shared_ptr<node> impl) {
register_raft_node(impl, std::nullopt);
register_raft_node(std::move(impl), std::nullopt);
}

/// Registers the raft node whose state machine handles RPC requests
Expand All @@ -42,10 +42,11 @@ namespace cbdc::raft::rpc {
if(validate.has_value()) {
m_validate_func = std::move(validate.value());
} else {
m_validate_func = [&](buffer b, validation_callback cb) {
cb(std::move(b), true);
return true;
};
m_validate_func
= [&](buffer b, const validation_callback& cb) {
cb(std::move(b), true);
return true;
};
}
cbdc::rpc::raw_async_server::register_handler_callback(
[&](buffer req, response_callback_type resp_cb) {
Expand Down Expand Up @@ -84,7 +85,7 @@ namespace cbdc::raft::rpc {

auto success = m_impl->replicate(
new_log,
[&, resp_cb = std::move(res_cb), req_buf = new_log](
[&, resp_cb = res_cb, req_buf = new_log](
result_type& r,
nuraft::ptr<std::exception>& err) {
if(err) {
Expand Down
Loading

0 comments on commit a6fddab

Please sign in to comment.