Skip to content

Commit

Permalink
Merge pull request #39 from bringauto/end_on_ext_client_deadlock
Browse files Browse the repository at this point in the history
End the program when toExternalClient queue is too large
  • Loading branch information
MarioIvancik authored Jan 10, 2025
2 parents 7d0f41f + 1cd5f13 commit 3496061
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 116 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.25 FATAL_ERROR)
PROJECT(ModuleGateway)
INCLUDE(GNUInstallDirs)

SET(BRINGAUTO_MODULE_GATEWAY_VERSION 1.3.0)
SET(BRINGAUTO_MODULE_GATEWAY_VERSION 1.3.1)
ADD_COMPILE_DEFINITIONS(MODULE_GATEWAY_VERSION="${BRINGAUTO_MODULE_GATEWAY_VERSION}")
SET(CMAKE_INSTALL_RPATH "$ORIGIN/../${CMAKE_INSTALL_LIBDIR}")
SET(CMAKE_CXX_STANDARD 20)
Expand Down
46 changes: 18 additions & 28 deletions include/bringauto/external_client/connection/ExternalConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

namespace bringauto::external_client::connection {
/**
* @brief Class representing connection to one external server endpoint
* @brief Class representing a connection to one external server endpoint
*/
class ExternalConnection {
public:
Expand All @@ -32,20 +32,19 @@ class ExternalConnection {
const std::shared_ptr <structures::AtomicQueue<structures::ReconnectQueueItem>>& reconnectQueue);

/**
* @brief Initialize external connection
* it has to be called after constructor
* @brief Initialize the external connection
* it has to be called after the constructor
*
* @param company name of the company
* @param vehicleName name of the vehicle
* @param communicationChannel communication channel to the external server
*/
void init(const std::string &company, const std::string &vehicleName);
void init(const std::shared_ptr <communication::ICommunicationChannel> &communicationChannel);

/**
* @brief Handles all etapes of connect sequence. If connect sequence is successful,
* infinite receive loop is started in new thread.
* @brief Handles all stages of the connect sequence. If the connect sequence is successful,
* an infinite receive loop is started in a new thread.
*
* @param connectedDevices devices that are connected to the internal server
* @return 0 if OK otherwise NOT_OK
* @return OK if successful, otherwise NOT_OK
*/
int initializeConnection(const std::vector<structures::DeviceIdentification>& connectedDevices);

Expand All @@ -59,7 +58,7 @@ class ExternalConnection {
void deinitializeConnection(bool completeDisconnect);

/**
* @brief Send status message to the external server
* @brief Send a status message to the external server
*
* @param status status message
* @param deviceState state of the device
Expand All @@ -70,15 +69,8 @@ class ExternalConnection {
const modules::Buffer& errorMessage = modules::Buffer {});

/**
* @brief Check if any device is connected to the external connection
*
* @return true if yes otherwise false
*/
bool hasAnyDeviceConnected();

/**
* @brief Force aggregation on all devices in all modules that connection service
* Is used before connect sequence to assure that every device has available status to be sent
* @brief Force aggregation on all devices in all modules that the connection services.
* Is used before the connect sequence to assure that every device has an available status to be sent
*
* @return number of devices
*/
Expand Down Expand Up @@ -123,7 +115,7 @@ class ExternalConnection {
/**
* @brief Generate and set the session id
*/
void setSessionId();
void generateSessionId();

[[nodiscard]] u_int32_t getNextStatusCounter();

Expand All @@ -132,7 +124,7 @@ class ExternalConnection {
int connectMessageHandle(const std::vector <structures::DeviceIdentification> &devices);

/**
* @brief Takes care of second etape of connect sequence - for all devices send their last status
* @brief Takes care of second stage of the connect sequence - send the last status of all devices
* @param devices
*/
int statusMessageHandle(const std::vector <structures::DeviceIdentification> &devices);
Expand All @@ -143,9 +135,9 @@ class ExternalConnection {
* @brief Check if command is in order and send commandResponse
*
* @param commandMessage
* @return 0 if OK
* @return -1 if command is out of order
* @return -2 if command has incorrect session ID
* @return OK if successful
* @return COMMAND_INVALID if command is out of order or has incorrect session ID
* @return NOT_OK otherwise
*/
int handleCommand(const ExternalProtocol::Command &commandMessage);

Expand All @@ -165,7 +157,7 @@ class ExternalConnection {
/// ID of the current external connection session, changes with every connect sequence
std::string sessionId_ {};
/// Communication channel to the external server
std::unique_ptr <communication::ICommunicationChannel> communicationChannel_ {};
std::shared_ptr <communication::ICommunicationChannel> communicationChannel_ {};
/// Thread for receiving loop
std::jthread listeningThread {};

Expand All @@ -180,14 +172,12 @@ class ExternalConnection {
/// Class handling sent messages - timers, not acknowledged statuses etc.
std::unique_ptr <messages::SentMessagesHandler> sentMessagesHandler_ {};
/// @brief Map of error aggregators, key is module number
std::map<unsigned int, ErrorAggregator> errorAggregators {};
std::map<unsigned int, ErrorAggregator> errorAggregators_ {};
/// Queue of commands received from external server, commands are processed by aggregator
std::shared_ptr <structures::AtomicQueue<InternalProtocol::DeviceCommand>> commandQueue_ {};

std::shared_ptr <structures::AtomicQueue<structures::ReconnectQueueItem>>
reconnectQueue_ {};
/// Unique id of the vehicle - car name + session id
std::string vehicleId_ {};
/// Name of the vehicle
std::string vehicleName_ {};
/// Name of the company
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ class ICommunicationChannel {

virtual ~ICommunicationChannel() = default;

/**
* @brief Set the basic properties to the communication object
*
* @param company name of the company
* @param vehicleName name of the vehicle
*/
virtual void setProperties(const std::string &company, const std::string &vehicleName) = 0;

/**
* @brief Initialize connection with the server
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ namespace bringauto::external_client::connection::communication {

class MqttCommunication: public ICommunicationChannel {
public:
explicit MqttCommunication(const structures::ExternalConnectionSettings &settings);
explicit MqttCommunication(const structures::ExternalConnectionSettings &settings, const std::string &company,
const std::string &vehicleName);

~MqttCommunication() override;

void setProperties(const std::string &company, const std::string &vehicleName) override;
void setProperties(const std::string &company, const std::string &vehicleName);

void initializeConnection() override;

Expand Down
5 changes: 5 additions & 0 deletions include/bringauto/modules/ModuleHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class ModuleHandler {
*/
void handleStatus(const InternalProtocol::DeviceStatus &status);

/**
* @brief Throws an error if external queue size is too big
*/
void checkExternalQueueSize();

std::shared_ptr <structures::GlobalContext> context_ {};

structures::ModuleLibrary &moduleLibrary_;
Expand Down
5 changes: 5 additions & 0 deletions include/bringauto/settings/Constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ constexpr size_t buffer_length { 1024 };
*/
constexpr unsigned int max_external_commands { 3 };

/**
* @brief how many messages can be in the message queue sent to External Client before it is considered unresponsive
*/
constexpr unsigned int max_external_queue_size { 500 };

/**
* @brief Constants for Mqtt communication
*/
Expand Down
17 changes: 16 additions & 1 deletion source/bringauto/external_client/ExternalClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <bringauto/settings/Constants.hpp>
#include <bringauto/common_utils/ProtobufUtils.hpp>
#include <bringauto/external_client/connection/ConnectionState.hpp>
#include <bringauto/external_client/connection/communication/MqttCommunication.hpp>

#include <bringauto/settings/LoggerId.hpp>

Expand Down Expand Up @@ -87,7 +88,21 @@ void ExternalClient::initConnections() {
externalConnectionsList_.emplace_back(context_, moduleLibrary_, connection, fromExternalQueue_,
reconnectQueue_);
auto &newConnection = externalConnectionsList_.back();
newConnection.init(context_->settings->company, context_->settings->vehicleName);
std::shared_ptr<connection::communication::ICommunicationChannel> communicationChannel;

switch(connection.protocolType) {
case structures::ProtocolType::MQTT:
communicationChannel = std::make_shared<connection::communication::MqttCommunication>(
connection, context_->settings->company, context_->settings->vehicleName
);
break;
case structures::ProtocolType::INVALID:
default:
settings::Logger::logError("Invalid external communication protocol type");
throw std::invalid_argument("Invalid external communication protocol type");
}

newConnection.init(communicationChannel);
for(auto const &moduleNumber: connection.modules) {
externalConnectionMap_.emplace(moduleNumber, newConnection);
}
Expand Down
Loading

0 comments on commit 3496061

Please sign in to comment.