Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Nov 29, 2024
1 parent 18f1da3 commit c204c78
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 190 deletions.
277 changes: 125 additions & 152 deletions ecal/core/src/service/ecal_service_client_id_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@
#include "registration/ecal_registration_provider.h"
#include "serialization/ecal_serialize_service.h"

#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

namespace
{
Expand Down Expand Up @@ -124,57 +121,50 @@ namespace eCAL
const std::string& service_name_, const ServiceMethodInformationMapT& method_information_map_, const ClientEventIDCallbackT& event_callback_)
: m_service_name(service_name_), m_method_information_map(method_information_map_)
{
InitializeMethodCallCounts();
GenerateClientID();
AddEventCallback(event_callback_);
Register();
}

// Destructor: Resets callbacks, unregisters the client, and clears data
CServiceClientIDImpl::~CServiceClientIDImpl()
{
ResetAllCallbacks();
Unregister();
}

// Initializes the method call count map to track the number of calls for each method
void CServiceClientIDImpl::InitializeMethodCallCounts()
{
// initialize method call counts
for (const auto& method_information_pair : m_method_information_map)
{
m_method_call_count_map[method_information_pair.first] = 0;
}
}

// Generates a unique client ID based on the current time
void CServiceClientIDImpl::GenerateClientID()
{
// create unique client ID
std::stringstream counter;
counter << std::chrono::steady_clock::now().time_since_epoch().count();
m_client_id = counter.str();
}

// Adds a callback function for a client events
bool CServiceClientIDImpl::AddEventCallback(ClientEventIDCallbackT callback_)
{
const std::lock_guard<std::mutex> lock(m_event_callback_sync);
m_event_callback = std::move(callback_);
return true;
// add event callback
{
const std::lock_guard<std::mutex> lock(m_event_callback_sync);
m_event_callback = event_callback_;
}

// register client
if (!m_service_name.empty() && g_registration_provider() != nullptr)
{
g_registration_provider()->RegisterSample(GetRegistrationSample());
}
}

// Resets all callbacks and clears stored client information
void CServiceClientIDImpl::ResetAllCallbacks()
// Destructor: Resets callbacks, unregisters the client, and clears data
CServiceClientIDImpl::~CServiceClientIDImpl()
{
// reset client map
{
const std::lock_guard<std::mutex> lock(m_client_session_map_sync);
m_client_session_map.clear();
}

// reset event callback
{
const std::lock_guard<std::mutex> lock(m_event_callback_sync);
m_event_callback = nullptr;
}
m_service_name.clear();
m_client_id.clear();

// unregister client
if (g_registration_provider() != nullptr)
{
g_registration_provider()->UnregisterSample(GetUnregistrationSample());
}
}

// Retrieve service IDs of all matching services
Expand All @@ -199,10 +189,10 @@ namespace eCAL
const std::string& request_, int timeout_ms_, const ResponseIDCallbackT& response_callback_)
{
SClient client;
if (!TryGetClient(entity_id_, client))
if (!GetClientByEntity(entity_id_, client))
return { false, SServiceResponse() };

auto response = CallBlocking(entity_id_, client, method_name_, request_, std::chrono::milliseconds(timeout_ms_));
auto response = CallMethodWithTimeout(entity_id_, client, method_name_, request_, std::chrono::milliseconds(timeout_ms_));

// If a callback is provided and the call was successful, invoke the callback
if (response_callback_ && response.first)
Expand All @@ -219,56 +209,12 @@ namespace eCAL
return response;
}

// Blocking call to a service with a specified timeout
std::pair<bool, SServiceResponse> CServiceClientIDImpl::CallBlocking(
const Registration::SEntityId& entity_id_, SClient& client_, const std::string& method_name_,
const std::string& request_, std::chrono::nanoseconds timeout_)
{
if (method_name_.empty())
return { false, SServiceResponse() };

// Serialize the request
auto request_shared_ptr = SerializeRequest(method_name_, request_);
if (!request_shared_ptr)
return { false, SServiceResponse() };

// Prepare response data
auto response_data = PrepareInitialResponse(client_, method_name_);
auto response_callback = CreateResponseCallback(response_data);

// Send the service call
const bool call_success = client_.client_session->async_call_service(request_shared_ptr, response_callback);
if (!call_success)
return { false, CreateErrorResponse(entity_id_, m_service_name, method_name_, "Call failed") };

// Wait for the response or timeout
std::unique_lock<std::mutex> lock(*response_data->mutex);
if (timeout_ > std::chrono::nanoseconds::zero())
{
if (!response_data->condition_variable->wait_for(lock, timeout_, [&] { return *response_data->finished; }))
{
response_data->response->first = false;
response_data->response->second.error_msg = "Timeout";
response_data->response->second.call_state = eCallState::call_state_timeouted;
}
}
else
{
response_data->condition_variable->wait(lock, [&] { return *response_data->finished; });
}

// Increment method call count
IncrementMethodCallCount(method_name_);

return *response_data->response;
}

// Asynchronous call to a service with a specified timeout
bool CServiceClientIDImpl::CallWithCallbackAsync(const Registration::SEntityId& entity_id_, const std::string& method_name_, const std::string& request_, const ResponseIDCallbackT& response_callback_)
{
// Retrieve the client
SClient client;
if (!TryGetClient(entity_id_, client))
if (!GetClientByEntity(entity_id_, client))
return false;

// Validate service and method names
Expand Down Expand Up @@ -323,61 +269,6 @@ namespace eCAL
return true;
}

// Attempts to retrieve a client session for a given entity ID
bool CServiceClientIDImpl::TryGetClient(const Registration::SEntityId& entity_id_, SClient& client_)
{
const std::lock_guard<std::mutex> lock(m_client_session_map_sync);
auto iter = m_client_session_map.find(entity_id_);
if (iter == m_client_session_map.end())
return false;

client_ = iter->second;
return true;
}

std::shared_ptr<CServiceClientIDImpl::SResponseData> CServiceClientIDImpl::PrepareInitialResponse(SClient& client_, const std::string& method_name_)
{
auto data = std::make_shared<SResponseData>();
data->response->first = false;
data->response->second.host_name = client_.service_attr.hname;
data->response->second.service_name = client_.service_attr.sname;
data->response->second.service_id = client_.service_attr.key;
data->response->second.method_name = method_name_;
data->response->second.call_state = eCallState::call_state_none;
return data;
}

eCAL::service::ClientResponseCallbackT CServiceClientIDImpl::CreateResponseCallback(const std::shared_ptr<SResponseData>& response_data_)
{
return [response_data_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
{
const std::lock_guard<std::mutex> lock(*response_data_->mutex);
if (!*response_data_->block_modifying_response)
{
if (error)
{
response_data_->response->first = false;
response_data_->response->second.error_msg = error.ToString();
response_data_->response->second.call_state = eCallState::call_state_failed;
response_data_->response->second.ret_state = 0;
}
else
{
response_data_->response->first = true;
response_data_->response->second = DeserializedResponse(*response_);
}
}
*response_data_->finished = true;
response_data_->condition_variable->notify_all();
};
}

void CServiceClientIDImpl::IncrementMethodCallCount(const std::string& method_name_)
{
const std::lock_guard<std::mutex> lock(m_method_information_map_sync);
m_method_call_count_map[method_name_]++;
}

// Check if a specific service is connected
bool CServiceClientIDImpl::IsConnected(const Registration::SEntityId& entity_id_)
{
Expand All @@ -398,7 +289,7 @@ namespace eCAL
if (client_manager == nullptr || client_manager->is_stopped()) return;

// Event callback (unused)
const eCAL::service::ClientSession::EventCallbackT event_callback = [] (eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void
const eCAL::service::ClientSession::EventCallbackT event_callback = [](eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void
{
// TODO: Replace current connect/disconnect state logic with this client event callback logic
};
Expand Down Expand Up @@ -486,22 +377,60 @@ namespace eCAL
return ecal_reg_sample;
}

// Register client information
void CServiceClientIDImpl::Register()
// Attempts to retrieve a client session for a given entity ID
bool CServiceClientIDImpl::GetClientByEntity(const Registration::SEntityId& entity_id_, SClient& client_)
{
if (!m_service_name.empty() && g_registration_provider() != nullptr)
{
g_registration_provider()->RegisterSample(GetRegistrationSample());
}
const std::lock_guard<std::mutex> lock(m_client_session_map_sync);
auto iter = m_client_session_map.find(entity_id_);
if (iter == m_client_session_map.end())
return false;

client_ = iter->second;
return true;
}

// Unregister client information
void CServiceClientIDImpl::Unregister()
// Blocking call to a service with a specified timeout
std::pair<bool, SServiceResponse> CServiceClientIDImpl::CallMethodWithTimeout(
const Registration::SEntityId& entity_id_, SClient& client_, const std::string& method_name_,
const std::string& request_, std::chrono::nanoseconds timeout_)
{
if (!m_service_name.empty() && g_registration_provider() != nullptr)
if (method_name_.empty())
return { false, SServiceResponse() };

// Serialize the request
auto request_shared_ptr = SerializeRequest(method_name_, request_);
if (!request_shared_ptr)
return { false, SServiceResponse() };

// Prepare response data
auto response_data = PrepareInitialResponse(client_, method_name_);
auto response_callback = CreateResponseCallback(response_data);

// Send the service call
const bool call_success = client_.client_session->async_call_service(request_shared_ptr, response_callback);
if (!call_success)
return { false, CreateErrorResponse(entity_id_, m_service_name, method_name_, "Call failed") };

// Wait for the response or timeout
std::unique_lock<std::mutex> lock(*response_data->mutex);
if (timeout_ > std::chrono::nanoseconds::zero())
{
g_registration_provider()->UnregisterSample(GetUnregistrationSample());
if (!response_data->condition_variable->wait_for(lock, timeout_, [&] { return *response_data->finished; }))
{
response_data->response->first = false;
response_data->response->second.error_msg = "Timeout";
response_data->response->second.call_state = eCallState::call_state_timeouted;
}
}
else
{
response_data->condition_variable->wait(lock, [&] { return *response_data->finished; });
}

// Increment method call count
IncrementMethodCallCount(method_name_);

return *response_data->response;
}

// Updates the connection states for the client sessions
Expand All @@ -515,9 +444,9 @@ namespace eCAL
auto state = client_data.client_session->get_state();

Registration::SEntityId entity_id;
entity_id.entity_id = client_data.service_attr.sid;
entity_id.entity_id = client_data.service_attr.sid;
entity_id.process_id = client_data.service_attr.pid;
entity_id.host_name = client_data.service_attr.hname;
entity_id.host_name = client_data.service_attr.hname;

if (!client_data.connected && state == eCAL::service::State::CONNECTED)
{
Expand All @@ -538,6 +467,13 @@ namespace eCAL
}
}

void CServiceClientIDImpl::IncrementMethodCallCount(const std::string& method_name_)
{
const std::lock_guard<std::mutex> lock(m_method_information_map_sync);
m_method_call_count_map[method_name_]++;
}


// Helper function to notify event callback
void CServiceClientIDImpl::NotifyEventCallback(const Registration::SEntityId& entity_id_, eCAL_Client_Event event_type_, const SServiceAttr& service_attr_)
{
Expand All @@ -547,14 +483,51 @@ namespace eCAL
SClientEventCallbackData callback_data;
callback_data.type = event_type_;
callback_data.time = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
std::chrono::steady_clock::now().time_since_epoch()).count();
callback_data.attr = service_attr_;

Registration::SServiceId service_id;
service_id.service_id = entity_id_;
service_id.service_id = entity_id_;
service_id.service_name = m_service_name;
service_id.method_name = "";
service_id.method_name = "";

m_event_callback(service_id, &callback_data);
}

std::shared_ptr<CServiceClientIDImpl::SResponseData> CServiceClientIDImpl::PrepareInitialResponse(SClient& client_, const std::string& method_name_)
{
auto data = std::make_shared<SResponseData>();
data->response->first = false;
data->response->second.host_name = client_.service_attr.hname;
data->response->second.service_name = client_.service_attr.sname;
data->response->second.service_id = client_.service_attr.key;
data->response->second.method_name = method_name_;
data->response->second.call_state = eCallState::call_state_none;
return data;
}

eCAL::service::ClientResponseCallbackT CServiceClientIDImpl::CreateResponseCallback(const std::shared_ptr<SResponseData>& response_data_)
{
return [response_data_](const eCAL::service::Error& error, const std::shared_ptr<std::string>& response_)
{
const std::lock_guard<std::mutex> lock(*response_data_->mutex);
if (!*response_data_->block_modifying_response)
{
if (error)
{
response_data_->response->first = false;
response_data_->response->second.error_msg = error.ToString();
response_data_->response->second.call_state = eCallState::call_state_failed;
response_data_->response->second.ret_state = 0;
}
else
{
response_data_->response->first = true;
response_data_->response->second = DeserializedResponse(*response_);
}
}
*response_data_->finished = true;
response_data_->condition_variable->notify_all();
};
}
}
Loading

0 comments on commit c204c78

Please sign in to comment.