Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StorageImpl #123

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ iroha.conf
peers.list
cmake-build*

cmake-build*
.gtm
/.gtm/

Expand Down
21 changes: 21 additions & 0 deletions irohad/ametsuchi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
# SPDX-License-Identifier: Apache-2.0
#

add_library(failover_callback
impl/failover_callback.cpp
impl/failover_callback_holder.cpp
)

target_link_libraries(failover_callback
logger
SOCI::postgresql
SOCI::core
)

add_library(pool_wrapper
impl/pool_wrapper.cpp
)

target_link_libraries(pool_wrapper
failover_callback
SOCI::core
)

add_library(ametsuchi
impl/flat_file/flat_file.cpp
impl/command_executor.cpp
Expand Down Expand Up @@ -35,6 +55,7 @@ target_link_libraries(ametsuchi
common
shared_model_interfaces
shared_model_stateless_validation
failover_callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
failover_callback

Because it is not used directly by ametsuchi and transitively linked by pool_wrapper.

SOCI::postgresql
SOCI::core
)
Expand Down
149 changes: 149 additions & 0 deletions irohad/ametsuchi/impl/failover_callback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/impl/failover_callback.hpp"

#include <soci/postgresql/soci-postgresql.h>

#include "logger/logger.hpp"

using namespace iroha::ametsuchi;

FailoverCallback::FailoverCallback(
soci::session &connection,
InitFunctionType init,
std::string connection_options,
std::unique_ptr<ReconnectionStrategy> reconnection_strategy,
logger::LoggerPtr log)
: connection_(connection),
init_session_(std::move(init)),
connection_options_(std::move(connection_options)),
reconnection_strategy_(std::move(reconnection_strategy)),
log_(std::move(log)) {}

void FailoverCallback::started() {
reconnection_strategy_->reset();
log_->debug("Reconnection process is initiated");
}

void FailoverCallback::finished(soci::session &) {}

void FailoverCallback::failed(bool &should_reconnect, std::string &) {
// don't rely on reconnection in soci because we are going to conduct
// our own reconnection process
should_reconnect = false;
log_->warn(
"failed to connect to the database. The system will try to "
"reconnect");
auto is_reconnected = reconnectionLoop();
log_->info("re-established: {}", is_reconnected);
}

void FailoverCallback::aborted() {
log_->error("has invoked aborted method of FailoverCallback");
}

bool FailoverCallback::reconnectionLoop() {
bool successful_reconnection = false;
while (reconnection_strategy_->canReconnect()
and not successful_reconnection) {
try {
soci::connection_parameters parameters(*soci::factory_postgresql(),
connection_options_);
auto *pg_connection = static_cast<soci::postgresql_session_backend *>(
connection_.get_backend());
auto &conn_ = pg_connection->conn_;

auto clean_up = [](auto &conn_) {
if (0 != conn_) {
PQfinish(conn_);
conn_ = 0;
}
};

auto check_for_data = [](auto &conn, auto *result, auto *errMsg) {
std::string msg(errMsg);

ExecStatusType const status = PQresultStatus(result);
switch (status) {
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
// No data but don't throw neither.
return false;

case PGRES_TUPLES_OK:
return true;

case PGRES_FATAL_ERROR:
msg += " Fatal error.";

if (PQstatus(conn) == CONNECTION_BAD) {
msg += " Connection failed.";
}

break;

default:
// Some of the other status codes are not really errors
// but we're not prepared to handle them right now and
// shouldn't ever receive them so throw nevertheless

break;
}

const char *const pqError = PQresultErrorMessage(result);
if (pqError && *pqError) {
msg += " ";
msg += pqError;
}

const char *sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
const char *const blank_sql_state = " ";
if (!sqlstate) {
sqlstate = blank_sql_state;
}

throw std::runtime_error(msg);
};

auto connect = [check_for_data](auto &conn, auto &parameters) {
PGconn *new_conn = PQconnectdb(parameters.get_connect_string().c_str());
if (0 == new_conn || CONNECTION_OK != PQstatus(new_conn)) {
std::string msg = "Cannot establish connection to the database.";
if (0 != new_conn) {
msg += '\n';
msg += PQerrorMessage(new_conn);
PQfinish(new_conn);
}

throw std::runtime_error(msg);
}

// Increase the number of digits used for floating point values to
// ensure that the conversions to/from text round trip correctly,
// which is not the case with the default value of 0. Use the
// maximal supported value, which was 2 until 9.x and is 3 since
// it.
int const version = PQserverVersion(new_conn);
check_for_data(new_conn,
PQexec(new_conn,
version >= 90000 ? "SET extra_float_digits = 3"
: "SET extra_float_digits = 2"),
"Cannot set extra_float_digits parameter");

conn = new_conn;
};

clean_up(conn_);
connect(conn_, parameters);

init_session_(connection_);
successful_reconnection = true;
} catch (const std::exception &e) {
log_->warn("attempt to reconnect has failed: {}", e.what());
}
}
return successful_reconnection;
}
58 changes: 58 additions & 0 deletions irohad/ametsuchi/impl/failover_callback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_FAILOVER_CALLBACK_HPP
#define IROHA_FAILOVER_CALLBACK_HPP

#include <memory>
lebdron marked this conversation as resolved.
Show resolved Hide resolved

#include <soci/soci.h>

#include <soci/callbacks.h>

#include "ametsuchi/reconnection_strategy.hpp"
#include "logger/logger_fwd.hpp"

namespace iroha {
namespace ametsuchi {
/**
* Class provides reconnection callback for postgresql session
* Note: the class is a workaround for SOCI 4.0, support in future versions
* is not guaranteed
*/
class FailoverCallback final : public soci::failover_callback {
public:
using InitFunctionType = std::function<void(soci::session &)>;
FailoverCallback(
soci::session &connection,
InitFunctionType init,
std::string connection_options,
std::unique_ptr<ReconnectionStrategy> reconnection_strategy,
logger::LoggerPtr log);

FailoverCallback(const FailoverCallback &) = delete;
FailoverCallback &operator=(const FailoverCallback &) = delete;

void started() override;

void finished(soci::session &) override;

void failed(bool &should_reconnect, std::string &) override;

void aborted() override;

private:
bool reconnectionLoop();

soci::session &connection_;
InitFunctionType init_session_;
const std::string connection_options_;
std::unique_ptr<ReconnectionStrategy> reconnection_strategy_;
logger::LoggerPtr log_;
};
} // namespace ametsuchi
} // namespace iroha

#endif // IROHA_FAILOVER_CALLBACK_HPP
23 changes: 23 additions & 0 deletions irohad/ametsuchi/impl/failover_callback_holder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/impl/failover_callback_holder.hpp"

using namespace iroha::ametsuchi;

FailoverCallback &FailoverCallbackHolder::makeFailoverCallback(
soci::session &connection,
FailoverCallback::InitFunctionType init,
std::string connection_options,
std::unique_ptr<ReconnectionStrategy> reconnection_strategy,
logger::LoggerPtr log) {
callbacks_.push_back(
std::make_unique<FailoverCallback>(connection,
std::move(init),
std::move(connection_options),
std::move(reconnection_strategy),
std::move(log)));
return *callbacks_.back();
}
28 changes: 28 additions & 0 deletions irohad/ametsuchi/impl/failover_callback_holder.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_FAILOVER_CALLBACK_HOLDER_HPP
#define IROHA_FAILOVER_CALLBACK_HOLDER_HPP

#include "ametsuchi/impl/failover_callback.hpp"

namespace iroha {
namespace ametsuchi {
class FailoverCallbackHolder {
public:
FailoverCallback &makeFailoverCallback(
soci::session &connection,
FailoverCallback::InitFunctionType init,
std::string connection_options,
std::unique_ptr<ReconnectionStrategy> reconnection_strategy,
logger::LoggerPtr log);

private:
std::vector<std::unique_ptr<FailoverCallback>> callbacks_;
};
} // namespace ametsuchi
} // namespace iroha

#endif // IROHA_FAILOVER_CALLBACK_HOLDER_HPP
19 changes: 19 additions & 0 deletions irohad/ametsuchi/impl/pool_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/impl/pool_wrapper.hpp"

#include <soci/soci.h>
#include "ametsuchi/impl/failover_callback_holder.hpp"

using namespace iroha::ametsuchi;

PoolWrapper::PoolWrapper(
std::shared_ptr<soci::connection_pool> connection_pool,
std::unique_ptr<FailoverCallbackHolder> failover_callback_holder,
bool enable_prepared_transactions)
: connection_pool_(std::move(connection_pool)),
failover_callback_holder_(std::move(failover_callback_holder)),
enable_prepared_transactions_(enable_prepared_transactions) {}
33 changes: 33 additions & 0 deletions irohad/ametsuchi/impl/pool_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_POOL_WRAPPER_HPP
#define IROHA_POOL_WRAPPER_HPP

#include <memory>

namespace soci {
class connection_pool;
}

namespace iroha {
namespace ametsuchi {
class FailoverCallbackHolder;

struct PoolWrapper {
PoolWrapper(
std::shared_ptr<soci::connection_pool> connection_pool,
std::unique_ptr<FailoverCallbackHolder> failover_callback_holder,
bool enable_prepared_transactions);

std::shared_ptr<soci::connection_pool> connection_pool_;
std::unique_ptr<FailoverCallbackHolder> failover_callback_holder_;
bool enable_prepared_transactions_;
};

} // namespace ametsuchi
} // namespace iroha

#endif // IROHA_POOL_WRAPPER_HPP
Loading