Skip to content

Commit

Permalink
SDLCORE-459:Make refactoring in mb_controller.cc (#3381)
Browse files Browse the repository at this point in the history
SDLCORE-459

Fix race condition ragarding Helgrind issue.
Error 9 : Possible race condition in case of using assignment operator
with atomic variable 'shutdown_', fix via using 'atomic_exchange'.
Error 10: Incorrect sequence of closing boost asio objects.
Add member function into CMessageBrokerController for correct close sequence.

Co-authored-by: Maksym Shvaiko <MShvaiko@luxoft.com>
  • Loading branch information
LuxoftAKutsan and Maksym Shvaiko authored Jul 13, 2020
1 parent 648a510 commit 81fb32e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class CMessageBrokerController
int getNextControllerId();

private:
void CloseConnection();

boost::asio::io_context ioc_;
const std::string& address_;
uint16_t port_;
Expand Down Expand Up @@ -182,4 +184,4 @@ class CMessageBrokerController

} // namespace hmi_message_handler

#endif /* MB_CONTROLLER_H */
#endif /* MB_CONTROLLER_H */
64 changes: 34 additions & 30 deletions src/components/hmi_message_handler/src/mb_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,8 @@ CMessageBrokerController::CMessageBrokerController(const std::string& address,
}

CMessageBrokerController::~CMessageBrokerController() {
boost::system::error_code ec;
socket_.close();
acceptor_.close(ec);
if (ec) {
std::string str_err = "ErrorMessage Close: " + ec.message();
LOG4CXX_ERROR(mb_logger_, str_err);
}
shutdown_ = true;
ioc_.stop();
shutdown_.exchange(true);
CloseConnection();
}

bool CMessageBrokerController::StartListener() {
Expand Down Expand Up @@ -114,9 +107,8 @@ void CMessageBrokerController::WaitForConnection() {

void CMessageBrokerController::StartSession(boost::system::error_code ec) {
if (ec) {
std::string str_err = "ErrorMessage: " + ec.message();
LOG4CXX_ERROR(mb_logger_, str_err);
ioc_.stop();
LOG4CXX_ERROR(mb_logger_, "ErrorMessage: " << ec.message());
CloseConnection();
return;
}
if (shutdown_) {
Expand Down Expand Up @@ -147,7 +139,7 @@ void CMessageBrokerController::sendNotification(Json::Value& message) {
int subscribersCount = getSubscribersFd(methodName, result);
if (0 < subscribersCount) {
std::vector<WebsocketSession*>::iterator it;
for (it = result.begin(); it != result.end(); it++) {
for (it = result.begin(); it != result.end(); ++it) {
(*it)->sendJsonMessage(message);
}
} else {
Expand Down Expand Up @@ -216,7 +208,8 @@ bool CMessageBrokerController::Connect() {
}

void CMessageBrokerController::exitReceivingThread() {
shutdown_ = true;
shutdown_.exchange(true);

mConnectionListLock.Acquire();
std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> >::iterator
it;
Expand All @@ -225,19 +218,7 @@ void CMessageBrokerController::exitReceivingThread() {
it = mConnectionList.erase(it);
}
mConnectionListLock.Release();

boost::system::error_code ec;
socket_.close();
acceptor_.cancel(ec);
if (ec) {
std::string str_err = "ErrorMessage Cancel: " + ec.message();
LOG4CXX_ERROR(mb_logger_, str_err);
}
acceptor_.close(ec);
if (ec) {
std::string str_err = "ErrorMessage Close: " + ec.message();
}
ioc_.stop();
CloseConnection();
}

std::string CMessageBrokerController::getMethodName(std::string& method) {
Expand Down Expand Up @@ -287,7 +268,7 @@ void CMessageBrokerController::deleteController(WebsocketSession* ws_session) {
if (it->second == ws_session) {
mControllersList.erase(it++);
} else {
it++;
++it;
}
}
}
Expand Down Expand Up @@ -341,7 +322,7 @@ bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session,
p = mSubscribersList.equal_range(name);
if (p.first != p.second) {
std::multimap<std::string, WebsocketSession*>::iterator itr;
for (itr = p.first; itr != p.second; itr++) {
for (itr = p.first; itr != p.second; ++itr) {
if (ws_session == itr->second) {
result = false;
LOG4CXX_ERROR(mb_logger_, ("Subscriber already exists!\n"));
Expand Down Expand Up @@ -384,7 +365,7 @@ int CMessageBrokerController::getSubscribersFd(
p = mSubscribersList.equal_range(name);
if (p.first != p.second) {
std::multimap<std::string, WebsocketSession*>::iterator itr;
for (itr = p.first; itr != p.second; itr++) {
for (itr = p.first; itr != p.second; ++itr) {
result.push_back(itr->second);
}
}
Expand Down Expand Up @@ -501,4 +482,27 @@ void CMessageBrokerController::processInternalRequest(
int CMessageBrokerController::getNextControllerId() {
return 1000 * mControllersIdCounter++;
}

void CMessageBrokerController::CloseConnection() {
if (!ioc_.stopped()) {
boost::system::error_code ec;

acceptor_.cancel(ec);
if (ec) {
LOG4CXX_ERROR(mb_logger_, "Acceptor cancel error: " << ec.message());
}

acceptor_.close(ec);
if (ec) {
LOG4CXX_ERROR(mb_logger_, "Acceptor close error: " << ec.message());
}

socket_.close(ec);
if (ec) {
LOG4CXX_ERROR(mb_logger_, "Socket close error : " << ec.message());
}

ioc_.stop();
}
}
} // namespace hmi_message_handler

0 comments on commit 81fb32e

Please sign in to comment.