diff --git a/ecal/core/src/service/ecal_service_client_id_impl.cpp b/ecal/core/src/service/ecal_service_client_id_impl.cpp index 327d0e5a2b..f4232538bf 100644 --- a/ecal/core/src/service/ecal_service_client_id_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_id_impl.cpp @@ -28,14 +28,11 @@ #include "registration/ecal_registration_provider.h" #include "serialization/ecal_serialize_service.h" -#include #include #include #include -#include #include #include -#include namespace { @@ -124,57 +121,50 @@ namespace eCAL const std::string& service_name_, const ServiceMethodInformationMapT& method_information_map_, const ClientEventIDCallbackT& event_callback_) : m_service_name(service_name_), m_method_information_map(method_information_map_) { - InitializeMethodCallCounts(); - GenerateClientID(); - AddEventCallback(event_callback_); - Register(); - } - - // Destructor: Resets callbacks, unregisters the client, and clears data - CServiceClientIDImpl::~CServiceClientIDImpl() - { - ResetAllCallbacks(); - Unregister(); - } - - // Initializes the method call count map to track the number of calls for each method - void CServiceClientIDImpl::InitializeMethodCallCounts() - { + // initialize method call counts for (const auto& method_information_pair : m_method_information_map) { m_method_call_count_map[method_information_pair.first] = 0; } - } - // Generates a unique client ID based on the current time - void CServiceClientIDImpl::GenerateClientID() - { + // create unique client ID std::stringstream counter; counter << std::chrono::steady_clock::now().time_since_epoch().count(); m_client_id = counter.str(); - } - // Adds a callback function for a client events - bool CServiceClientIDImpl::AddEventCallback(ClientEventIDCallbackT callback_) - { - const std::lock_guard lock(m_event_callback_sync); - m_event_callback = std::move(callback_); - return true; + // add event callback + { + const std::lock_guard lock(m_event_callback_sync); + m_event_callback = event_callback_; + } + + // register client + if (!m_service_name.empty() && g_registration_provider() != nullptr) + { + g_registration_provider()->RegisterSample(GetRegistrationSample()); + } } - // Resets all callbacks and clears stored client information - void CServiceClientIDImpl::ResetAllCallbacks() + // Destructor: Resets callbacks, unregisters the client, and clears data + CServiceClientIDImpl::~CServiceClientIDImpl() { + // reset client map { const std::lock_guard lock(m_client_session_map_sync); m_client_session_map.clear(); } + + // reset event callback { const std::lock_guard lock(m_event_callback_sync); m_event_callback = nullptr; } - m_service_name.clear(); - m_client_id.clear(); + + // unregister client + if (g_registration_provider() != nullptr) + { + g_registration_provider()->UnregisterSample(GetUnregistrationSample()); + } } // Retrieve service IDs of all matching services @@ -199,10 +189,10 @@ namespace eCAL const std::string& request_, int timeout_ms_, const ResponseIDCallbackT& response_callback_) { SClient client; - if (!TryGetClient(entity_id_, client)) + if (!GetClientByEntity(entity_id_, client)) return { false, SServiceResponse() }; - auto response = CallBlocking(entity_id_, client, method_name_, request_, std::chrono::milliseconds(timeout_ms_)); + auto response = CallMethodWithTimeout(entity_id_, client, method_name_, request_, std::chrono::milliseconds(timeout_ms_)); // If a callback is provided and the call was successful, invoke the callback if (response_callback_ && response.first) @@ -219,56 +209,12 @@ namespace eCAL return response; } - // Blocking call to a service with a specified timeout - std::pair CServiceClientIDImpl::CallBlocking( - const Registration::SEntityId& entity_id_, SClient& client_, const std::string& method_name_, - const std::string& request_, std::chrono::nanoseconds timeout_) - { - if (method_name_.empty()) - return { false, SServiceResponse() }; - - // Serialize the request - auto request_shared_ptr = SerializeRequest(method_name_, request_); - if (!request_shared_ptr) - return { false, SServiceResponse() }; - - // Prepare response data - auto response_data = PrepareInitialResponse(client_, method_name_); - auto response_callback = CreateResponseCallback(response_data); - - // Send the service call - const bool call_success = client_.client_session->async_call_service(request_shared_ptr, response_callback); - if (!call_success) - return { false, CreateErrorResponse(entity_id_, m_service_name, method_name_, "Call failed") }; - - // Wait for the response or timeout - std::unique_lock lock(*response_data->mutex); - if (timeout_ > std::chrono::nanoseconds::zero()) - { - if (!response_data->condition_variable->wait_for(lock, timeout_, [&] { return *response_data->finished; })) - { - response_data->response->first = false; - response_data->response->second.error_msg = "Timeout"; - response_data->response->second.call_state = eCallState::call_state_timeouted; - } - } - else - { - response_data->condition_variable->wait(lock, [&] { return *response_data->finished; }); - } - - // Increment method call count - IncrementMethodCallCount(method_name_); - - return *response_data->response; - } - // Asynchronous call to a service with a specified timeout bool CServiceClientIDImpl::CallWithCallbackAsync(const Registration::SEntityId& entity_id_, const std::string& method_name_, const std::string& request_, const ResponseIDCallbackT& response_callback_) { // Retrieve the client SClient client; - if (!TryGetClient(entity_id_, client)) + if (!GetClientByEntity(entity_id_, client)) return false; // Validate service and method names @@ -323,61 +269,6 @@ namespace eCAL return true; } - // Attempts to retrieve a client session for a given entity ID - bool CServiceClientIDImpl::TryGetClient(const Registration::SEntityId& entity_id_, SClient& client_) - { - const std::lock_guard lock(m_client_session_map_sync); - auto iter = m_client_session_map.find(entity_id_); - if (iter == m_client_session_map.end()) - return false; - - client_ = iter->second; - return true; - } - - std::shared_ptr CServiceClientIDImpl::PrepareInitialResponse(SClient& client_, const std::string& method_name_) - { - auto data = std::make_shared(); - data->response->first = false; - data->response->second.host_name = client_.service_attr.hname; - data->response->second.service_name = client_.service_attr.sname; - data->response->second.service_id = client_.service_attr.key; - data->response->second.method_name = method_name_; - data->response->second.call_state = eCallState::call_state_none; - return data; - } - - eCAL::service::ClientResponseCallbackT CServiceClientIDImpl::CreateResponseCallback(const std::shared_ptr& response_data_) - { - return [response_data_](const eCAL::service::Error& error, const std::shared_ptr& response_) - { - const std::lock_guard lock(*response_data_->mutex); - if (!*response_data_->block_modifying_response) - { - if (error) - { - response_data_->response->first = false; - response_data_->response->second.error_msg = error.ToString(); - response_data_->response->second.call_state = eCallState::call_state_failed; - response_data_->response->second.ret_state = 0; - } - else - { - response_data_->response->first = true; - response_data_->response->second = DeserializedResponse(*response_); - } - } - *response_data_->finished = true; - response_data_->condition_variable->notify_all(); - }; - } - - void CServiceClientIDImpl::IncrementMethodCallCount(const std::string& method_name_) - { - const std::lock_guard lock(m_method_information_map_sync); - m_method_call_count_map[method_name_]++; - } - // Check if a specific service is connected bool CServiceClientIDImpl::IsConnected(const Registration::SEntityId& entity_id_) { @@ -398,7 +289,7 @@ namespace eCAL if (client_manager == nullptr || client_manager->is_stopped()) return; // Event callback (unused) - const eCAL::service::ClientSession::EventCallbackT event_callback = [] (eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void + const eCAL::service::ClientSession::EventCallbackT event_callback = [](eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void { // TODO: Replace current connect/disconnect state logic with this client event callback logic }; @@ -486,22 +377,60 @@ namespace eCAL return ecal_reg_sample; } - // Register client information - void CServiceClientIDImpl::Register() + // Attempts to retrieve a client session for a given entity ID + bool CServiceClientIDImpl::GetClientByEntity(const Registration::SEntityId& entity_id_, SClient& client_) { - if (!m_service_name.empty() && g_registration_provider() != nullptr) - { - g_registration_provider()->RegisterSample(GetRegistrationSample()); - } + const std::lock_guard lock(m_client_session_map_sync); + auto iter = m_client_session_map.find(entity_id_); + if (iter == m_client_session_map.end()) + return false; + + client_ = iter->second; + return true; } - // Unregister client information - void CServiceClientIDImpl::Unregister() + // Blocking call to a service with a specified timeout + std::pair CServiceClientIDImpl::CallMethodWithTimeout( + const Registration::SEntityId& entity_id_, SClient& client_, const std::string& method_name_, + const std::string& request_, std::chrono::nanoseconds timeout_) { - if (!m_service_name.empty() && g_registration_provider() != nullptr) + if (method_name_.empty()) + return { false, SServiceResponse() }; + + // Serialize the request + auto request_shared_ptr = SerializeRequest(method_name_, request_); + if (!request_shared_ptr) + return { false, SServiceResponse() }; + + // Prepare response data + auto response_data = PrepareInitialResponse(client_, method_name_); + auto response_callback = CreateResponseCallback(response_data); + + // Send the service call + const bool call_success = client_.client_session->async_call_service(request_shared_ptr, response_callback); + if (!call_success) + return { false, CreateErrorResponse(entity_id_, m_service_name, method_name_, "Call failed") }; + + // Wait for the response or timeout + std::unique_lock lock(*response_data->mutex); + if (timeout_ > std::chrono::nanoseconds::zero()) { - g_registration_provider()->UnregisterSample(GetUnregistrationSample()); + if (!response_data->condition_variable->wait_for(lock, timeout_, [&] { return *response_data->finished; })) + { + response_data->response->first = false; + response_data->response->second.error_msg = "Timeout"; + response_data->response->second.call_state = eCallState::call_state_timeouted; + } } + else + { + response_data->condition_variable->wait(lock, [&] { return *response_data->finished; }); + } + + // Increment method call count + IncrementMethodCallCount(method_name_); + + return *response_data->response; } // Updates the connection states for the client sessions @@ -515,9 +444,9 @@ namespace eCAL auto state = client_data.client_session->get_state(); Registration::SEntityId entity_id; - entity_id.entity_id = client_data.service_attr.sid; + entity_id.entity_id = client_data.service_attr.sid; entity_id.process_id = client_data.service_attr.pid; - entity_id.host_name = client_data.service_attr.hname; + entity_id.host_name = client_data.service_attr.hname; if (!client_data.connected && state == eCAL::service::State::CONNECTED) { @@ -538,6 +467,13 @@ namespace eCAL } } + void CServiceClientIDImpl::IncrementMethodCallCount(const std::string& method_name_) + { + const std::lock_guard lock(m_method_information_map_sync); + m_method_call_count_map[method_name_]++; + } + + // Helper function to notify event callback void CServiceClientIDImpl::NotifyEventCallback(const Registration::SEntityId& entity_id_, eCAL_Client_Event event_type_, const SServiceAttr& service_attr_) { @@ -547,14 +483,51 @@ namespace eCAL SClientEventCallbackData callback_data; callback_data.type = event_type_; callback_data.time = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + std::chrono::steady_clock::now().time_since_epoch()).count(); callback_data.attr = service_attr_; Registration::SServiceId service_id; - service_id.service_id = entity_id_; + service_id.service_id = entity_id_; service_id.service_name = m_service_name; - service_id.method_name = ""; + service_id.method_name = ""; m_event_callback(service_id, &callback_data); } + + std::shared_ptr CServiceClientIDImpl::PrepareInitialResponse(SClient& client_, const std::string& method_name_) + { + auto data = std::make_shared(); + data->response->first = false; + data->response->second.host_name = client_.service_attr.hname; + data->response->second.service_name = client_.service_attr.sname; + data->response->second.service_id = client_.service_attr.key; + data->response->second.method_name = method_name_; + data->response->second.call_state = eCallState::call_state_none; + return data; + } + + eCAL::service::ClientResponseCallbackT CServiceClientIDImpl::CreateResponseCallback(const std::shared_ptr& response_data_) + { + return [response_data_](const eCAL::service::Error& error, const std::shared_ptr& response_) + { + const std::lock_guard lock(*response_data_->mutex); + if (!*response_data_->block_modifying_response) + { + if (error) + { + response_data_->response->first = false; + response_data_->response->second.error_msg = error.ToString(); + response_data_->response->second.call_state = eCallState::call_state_failed; + response_data_->response->second.ret_state = 0; + } + else + { + response_data_->response->first = true; + response_data_->response->second = DeserializedResponse(*response_); + } + } + *response_data_->finished = true; + response_data_->condition_variable->notify_all(); + }; + } } diff --git a/ecal/core/src/service/ecal_service_client_id_impl.h b/ecal/core/src/service/ecal_service_client_id_impl.h index 0cab312fee..6aca912d80 100644 --- a/ecal/core/src/service/ecal_service_client_id_impl.h +++ b/ecal/core/src/service/ecal_service_client_id_impl.h @@ -34,6 +34,7 @@ #include #include #include +#include namespace eCAL { @@ -57,7 +58,8 @@ namespace eCAL // Retrieve service IDs of all matching services std::vector GetServiceIDs(); - // Blocking call to a specific service; calls callback if provided and returns response as pair + // Blocking call to a specific service; returns response as pair + // if a callback is provided call the callback as well std::pair CallWithCallback( const Registration::SEntityId& entity_id_, const std::string& method_name_, const std::string& request_, int timeout_ms_, const ResponseIDCallbackT& response_callback_ = nullptr); @@ -73,7 +75,7 @@ namespace eCAL // Called by the registration receiver to process a service registration void RegisterService(const Registration::SEntityId& entity_id_, const SServiceAttr& service_); - // Called by the registration layer to get a registration sample + // Called by the registration provider to get a registration sample Registration::Sample GetRegistration(); // Prevent copy and move operations @@ -83,6 +85,10 @@ namespace eCAL CServiceClientIDImpl& operator=(CServiceClientIDImpl&&) = delete; private: + // Prepare and retrieve registration and unregistration samples + Registration::Sample GetRegistrationSample(); + Registration::Sample GetUnregistrationSample(); + // SClient struct representing a client session and its connection state struct SClient { @@ -91,6 +97,22 @@ namespace eCAL bool connected = false; }; + // Get client for specific entity id + bool GetClientByEntity(const Registration::SEntityId& entity_id_, SClient& client_); + + // Blocking call to a specific service method with timeout + std::pair CallMethodWithTimeout(const Registration::SEntityId& entity_id_, SClient& client_, + const std::string& method_name_, const std::string& request_, std::chrono::nanoseconds timeout_); + + // Update the connection states for client sessions + void UpdateConnectionStates(); + + // Increment method call count for tracking + void IncrementMethodCallCount(const std::string& method_name_); + + // Notify specific event callback + void NotifyEventCallback(const Registration::SEntityId& entity_id_, eCAL_Client_Event event_type_, const SServiceAttr& service_attr_); + // SResponseData struct for handling response callbacks struct SResponseData { @@ -109,46 +131,11 @@ namespace eCAL {} }; - // Initializes the method call count map - void InitializeMethodCallCounts(); - - // Generates a unique client ID - void GenerateClientID(); - - // Add and remove callback function for client events - bool AddEventCallback(ClientEventIDCallbackT callback_); - - // Resets all callbacks and clears stored client information - void ResetAllCallbacks(); - - // Blocking call to a specific service with timeout - std::pair CallBlocking(const Registration::SEntityId& entity_id_, SClient& client_, - const std::string& method_name_, const std::string& request_, std::chrono::nanoseconds timeout_); - - // Prepare and retrieve registration and unregistration samples - Registration::Sample GetRegistrationSample(); - Registration::Sample GetUnregistrationSample(); - - // Register and unregister client information - void Register(); - void Unregister(); - - // Update the connection states for client sessions - void UpdateConnectionStates(); - - // Increment method call count for tracking - void IncrementMethodCallCount(const std::string& method_name_); - - // Helper methods for client session handling and request serialization - bool TryGetClient(const Registration::SEntityId& entity_id_, SClient& client_); static std::shared_ptr PrepareInitialResponse(SClient& client_, const std::string& method_name_); static eCAL::service::ClientResponseCallbackT CreateResponseCallback(const std::shared_ptr& response_data_); - // Notify specific event callback - void NotifyEventCallback(const Registration::SEntityId& entity_id_, eCAL_Client_Event event_type_, const SServiceAttr& service_attr_); - // Client version (incremented for protocol or functionality changes) - static constexpr int m_client_version = 1; + static constexpr int m_client_version = 1; // Service attributes std::string m_service_name;