diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index 3c696ea8bf..84e92f1d98 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -194,16 +194,15 @@ namespace eCAL } } - void CPubGate::RefreshRegistrations() + void CPubGate::GetRegistrations(Registration::SampleList& reg_sample_list_) { if (!m_created) return; - // refresh publisher registrations + // read reader registrations const std::shared_lock lock(m_topic_name_datawriter_sync); for (const auto& iter : m_topic_name_datawriter_map) { - // force data writer to (re)register itself on registration provider - iter.second->RefreshRegistration(); + reg_sample_list_.samples.emplace_back(iter.second->GetRegistration()); } } } diff --git a/ecal/core/src/pubsub/ecal_pubgate.h b/ecal/core/src/pubsub/ecal_pubgate.h index 2714bbeca0..0db416cc06 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.h +++ b/ecal/core/src/pubsub/ecal_pubgate.h @@ -56,7 +56,7 @@ namespace eCAL void ApplySubRegistration(const Registration::Sample& ecal_sample_); void ApplySubUnregistration(const Registration::Sample& ecal_sample_); - void RefreshRegistrations(); + void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: static std::atomic m_created; diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index aabe5af905..61c254a37c 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -300,16 +300,15 @@ namespace eCAL } } - void CSubGate::RefreshRegistrations() + void CSubGate::GetRegistrations(Registration::SampleList& reg_sample_list_) { if (!m_created) return; - // refresh reader registrations + // read reader registrations const std::shared_lock lock(m_topic_name_datareader_sync); for (const auto& iter : m_topic_name_datareader_map) { - // force data reader to (re)register itself on registration provider - iter.second->RefreshRegistration(); + reg_sample_list_.samples.emplace_back(iter.second->GetRegistration()); } } } diff --git a/ecal/core/src/pubsub/ecal_subgate.h b/ecal/core/src/pubsub/ecal_subgate.h index 1e78578a71..cc1b589c76 100644 --- a/ecal/core/src/pubsub/ecal_subgate.h +++ b/ecal/core/src/pubsub/ecal_subgate.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ namespace eCAL void ApplyPubRegistration(const Registration::Sample& ecal_sample_); void ApplyPubUnregistration(const Registration::Sample& ecal_sample_); - void RefreshRegistrations(); + void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: static std::atomic m_created; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index cb19b57201..a2a450206d 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -351,28 +351,6 @@ namespace eCAL } } - void CDataReader::RefreshRegistration() - { - if (!m_created) return; - - // ensure that registration is not called within zero nanoseconds - // normally it will be called from registration logic every second - - // register without send - Register(false); - - // check connection timeouts - { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.erase_expired(); - - if (m_pub_map.empty()) - { - FireDisconnectEvent(); - } - } - } - void CDataReader::InitializeLayers() { // initialize udp layer @@ -555,15 +533,56 @@ namespace eCAL return(out.str()); } - bool CDataReader::Register(const bool force_) + void CDataReader::Register(bool force_) + { +#if ECAL_CORE_REGISTRATION + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Register"); +#endif +#endif // ECAL_CORE_REGISTRATION + } + + void CDataReader::Unregister() { #if ECAL_CORE_REGISTRATION - if (!m_created) return(false); - if(m_topic_name.empty()) return(false); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); +#endif +#endif // ECAL_CORE_REGISTRATION + } + + void CDataReader::CheckConnections() + { + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map.erase_expired(); - // create command parameter + if (m_pub_map.empty()) + { + FireDisconnectEvent(); + } + } + + Registration::Sample CDataReader::GetRegistration() + { + // check connection timeouts + CheckConnections(); + + // return registration + return GetRegistrationSample(); + } + + Registration::Sample CDataReader::GetRegistrationSample() + { + // create registration sample Registration::Sample ecal_reg_sample; ecal_reg_sample.cmd_type = bct_reg_subscriber; + auto& ecal_reg_sample_topic = ecal_reg_sample.topic; ecal_reg_sample_topic.hname = m_host_name; ecal_reg_sample_topic.hgname = m_host_group_name; @@ -632,28 +651,15 @@ namespace eCAL ecal_reg_sample_topic.connections_loc = 0; ecal_reg_sample_topic.connections_ext = 0; - // register subscriber - if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); -#endif - - return(true); -#else // ECAL_CORE_REGISTRATION - (void)force_; - return(false); -#endif // ECAL_CORE_REGISTRATION + return ecal_reg_sample; } - bool CDataReader::Unregister() + Registration::Sample CDataReader::GetUnregistrationSample() { -#if ECAL_CORE_REGISTRATION - if (m_topic_name.empty()) return(false); - - // create command parameter + // create unregistration sample Registration::Sample ecal_unreg_sample; ecal_unreg_sample.cmd_type = bct_unreg_subscriber; + auto& ecal_reg_sample_topic = ecal_unreg_sample.topic; ecal_reg_sample_topic.hname = m_host_name; ecal_reg_sample_topic.hgname = m_host_group_name; @@ -663,19 +669,9 @@ namespace eCAL ecal_reg_sample_topic.tid = m_topic_id; ecal_reg_sample_topic.uname = Process::GetUnitName(); - // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); -#endif - - return(true); -#else // ECAL_CORE_REGISTRATION - return(false); -#endif // ECAL_CORE_REGISTRATION + return ecal_unreg_sample; } - + void CDataReader::StartTransportLayer() { #if ECAL_CORE_TRANSPORT_UDP diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 13f2a50291..5716610dd6 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -100,8 +100,7 @@ namespace eCAL void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_); - void RefreshRegistration(); - + Registration::Sample GetRegistration(); bool IsCreated() const { return(m_created); } bool IsPublished() const @@ -126,8 +125,13 @@ namespace eCAL std::string Dump(const std::string& indent_ = ""); protected: - bool Register(bool force_); - bool Unregister(); + void Register(bool force_); + void Unregister(); + + void CheckConnections(); + + Registration::Sample GetRegistrationSample(); + Registration::Sample GetUnregistrationSample(); void StartTransportLayer(); void StopTransportLayer(); diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 24b81d5aed..c2616ec054 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -547,25 +547,6 @@ namespace eCAL #endif } - void CDataWriter::RefreshRegistration() - { - if (!m_created) return; - - // register without send - Register(false); - - // check connection timeouts - { - const std::lock_guard lock(m_sub_map_mtx); - m_sub_map.erase_expired(); - - if (m_sub_map.empty()) - { - FireDisconnectEvent(); - } - } - } - void CDataWriter::RefreshSendCounter() { // increase write clock @@ -605,13 +586,53 @@ namespace eCAL return(out.str()); } - bool CDataWriter::Register(bool force_) + void CDataWriter::Register(bool force_) + { +#if ECAL_CORE_REGISTRATION + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register"); +#endif +#endif // ECAL_CORE_REGISTRATION + } + + void CDataWriter::Unregister() { #if ECAL_CORE_REGISTRATION - if (!m_created) return(false); - if (m_topic_name.empty()) return(false); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); - // create command parameter +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Unregister"); +#endif +#endif // ECAL_CORE_REGISTRATION + } + + void CDataWriter::CheckConnections() + { + const std::lock_guard lock(m_sub_map_mtx); + m_sub_map.erase_expired(); + + if (m_sub_map.empty()) + { + FireDisconnectEvent(); + } + } + + Registration::Sample CDataWriter::GetRegistration() + { + // check connection timeouts + CheckConnections(); + + if (m_created) return GetRegistrationSample(); + else return GetUnregistrationSample(); + } + + Registration::Sample CDataWriter::GetRegistrationSample() + { + // create registration sample Registration::Sample ecal_reg_sample; ecal_reg_sample.cmd_type = bct_reg_publisher; @@ -701,27 +722,12 @@ namespace eCAL ecal_reg_sample_topic.connections_loc = static_cast(loc_connections); ecal_reg_sample_topic.connections_ext = static_cast(ext_connections); - // register publisher - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register"); -#endif - - return(true); -#else // ECAL_CORE_REGISTRATION -(void)force_; -return(false); -#endif // ECAL_CORE_REGISTRATION + return ecal_reg_sample; } - bool CDataWriter::Unregister() + Registration::Sample CDataWriter::GetUnregistrationSample() { -#if ECAL_CORE_REGISTRATION - if (m_topic_name.empty()) return(false); - - // create command parameter + // create unregistration sample Registration::Sample ecal_unreg_sample; ecal_unreg_sample.cmd_type = bct_unreg_publisher; @@ -734,18 +740,7 @@ return(false); ecal_reg_sample_topic.tid = m_topic_id; ecal_reg_sample_topic.uname = Process::GetUnitName(); - // unregister publisher - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::UnRegister"); -#endif - - return(true); -#else // ECAL_CORE_REGISTRATION - return(false); -#endif // ECAL_CORE_REGISTRATION + return ecal_unreg_sample; } void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index e473490633..c1108de17e 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -29,6 +29,7 @@ #include #include +#include "serialization/ecal_serialize_sample_registration.h" #include "util/ecal_expmap.h" #include "util/frequency_calculator.h" @@ -104,7 +105,7 @@ namespace eCAL void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); void RemoveSubscription(const SSubscriptionInfo& subscription_info_); - void RefreshRegistration(); + Registration::Sample GetRegistration(); void RefreshSendCounter(); bool IsCreated() const { return(m_created); } @@ -127,8 +128,13 @@ namespace eCAL std::string Dump(const std::string& indent_ = ""); protected: - bool Register(bool force_); - bool Unregister(); + void Register(bool force_); + void Unregister(); + + void CheckConnections(); + + Registration::Sample GetRegistrationSample(); + Registration::Sample GetUnregistrationSample(); bool StartUdpLayer(); bool StartShmLayer(); diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index 93d3916313..49f75c6041 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -96,10 +96,8 @@ namespace eCAL // stop cyclic registration thread m_reg_sample_snd_thread->stop(); - // add process unregistration sample - AddSample2SampleList(Registration::GetProcessUnregisterSample()); - - SendSampleList(); + // send process unregistration sample + SendSample(Registration::GetProcessUnregisterSample()); m_reg_sender.reset(); @@ -119,21 +117,15 @@ namespace eCAL } } - // update sample list - AddSample2SampleList(sample_); - - // if registration is forced if (force_) { - SendSampleList(); - - // send single registration sample over udp - //SendSample2UDP(sample_); - -#if ECAL_CORE_REGISTRATION_SHM - // broadcast (updated) sample list over shm - //SendSampleList2SHM(); -#endif + // send sample + SendSample(sample_); + } + else + { + // add sample to sample list and send it later + AddSample2SampleList(sample_); } return(true); @@ -161,46 +153,49 @@ namespace eCAL m_sample_list.samples.push_back(sample_); } - void CRegistrationProvider::ClearSampleList() - { - // lock sample list - const std::lock_guard lock(m_sample_list_mtx); - // clear sample list - m_sample_list.samples.clear(); - } - - void CRegistrationProvider::SendSampleList() + void CRegistrationProvider::SendSample(const Registration::Sample& sample_) { - std::lock_guard lock(m_sample_list_mtx); - m_reg_sender->SendSampleList(m_sample_list); + Registration::SampleList sample_list; + sample_list.samples.push_back(sample_); + m_reg_sender->SendSampleList(sample_list); } void CRegistrationProvider::RegisterSendThread() { + // collect all registrations and send them out + // the internal list already contain elements here: + // one process registration sample + // one or more registration/unregistration samples added by AddSample2SampleList + { + // lock sample list + std::lock_guard lock(m_sample_list_mtx); + #if ECAL_CORE_SUBSCRIBER - // refresh subscriber registration - if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations(); + // add subscriber registrations + if (g_subgate() != nullptr) g_subgate()->GetRegistrations(m_sample_list); #endif #if ECAL_CORE_PUBLISHER - // refresh publisher registration - if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations(); + // add publisher registrations + if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(m_sample_list); #endif #if ECAL_CORE_SERVICE - // refresh server registration - if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations(); + // add server registrations + if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(m_sample_list); - // refresh client registration - if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations(); + // add client registrations + if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(m_sample_list); #endif - SendSampleList(); + // send registration sample list + m_reg_sender->SendSampleList(m_sample_list); - // clear registration sample list - ClearSampleList(); + // clear it + m_sample_list.samples.clear(); - // add process registration sample to internal sample list as first sample (for next registration loop) - AddSample2SampleList(Registration::GetProcessRegisterSample()); + // and add process registration sample to internal sample list as first sample (for next registration loop) + m_sample_list.samples.push_back(Registration::GetProcessRegisterSample()); + } } } diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index 422e65277e..587be88dec 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -58,22 +58,21 @@ namespace eCAL protected: void AddSample2SampleList(const Registration::Sample& sample_); - void ClearSampleList(); - void SendSampleList(); + void SendSample(const Registration::Sample& sample_); void RegisterSendThread(); - static std::atomic m_created; + static std::atomic m_created; - std::unique_ptr m_reg_sender; + std::unique_ptr m_reg_sender; - std::shared_ptr m_reg_sample_snd_thread; + std::shared_ptr m_reg_sample_snd_thread; - std::mutex m_sample_list_mtx; - Registration::SampleList m_sample_list; + std::mutex m_sample_list_mtx; + Registration::SampleList m_sample_list; - bool m_use_registration_udp; - bool m_use_registration_shm; + bool m_use_registration_udp; + bool m_use_registration_shm; std::mutex m_callback_custom_apply_sample_map_mtx; std::map m_callback_custom_apply_sample_map; diff --git a/ecal/core/src/service/ecal_clientgate.cpp b/ecal/core/src/service/ecal_clientgate.cpp index 47f7402f6c..698d96d457 100644 --- a/ecal/core/src/service/ecal_clientgate.cpp +++ b/ecal/core/src/service/ecal_clientgate.cpp @@ -156,16 +156,15 @@ namespace eCAL return(ret_vec); } - void CClientGate::RefreshRegistrations() + void CClientGate::GetRegistrations(Registration::SampleList& reg_sample_list_) { if (!m_created) return; - // refresh client registrations - const std::shared_lock lock(m_client_set_sync); - for (auto *iter : m_client_set) + // read service registrations + std::shared_lock const lock(m_client_set_sync); + for (const auto& service_client_impl : m_client_set) { - // force client to (re)register itself on registration provider - iter->RefreshRegistration(); + reg_sample_list_.samples.emplace_back(service_client_impl->GetRegistration()); } } } diff --git a/ecal/core/src/service/ecal_clientgate.h b/ecal/core/src/service/ecal_clientgate.h index d3930051fe..910c0c0b77 100644 --- a/ecal/core/src/service/ecal_clientgate.h +++ b/ecal/core/src/service/ecal_clientgate.h @@ -55,7 +55,7 @@ namespace eCAL std::vector GetServiceAttr(const std::string& service_name_); - void RefreshRegistrations(); + void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: static std::atomic m_created; diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 90e2c37579..e0da287045 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -427,10 +427,15 @@ namespace eCAL } // called by eCAL:CClientGate every second to update registration layer - void CServiceClientImpl::RefreshRegistration() + Registration::Sample CServiceClientImpl::GetRegistration() { - if (!m_created) return; - Register(false); + // refresh connected services map + CheckForNewServices(); + + // check for disconnected services + CheckForDisconnectedServices(); + + return GetRegistrationSample(); } std::shared_ptr>> @@ -629,14 +634,12 @@ namespace eCAL response_.response = std::string(response_struct_.response.data(), response_struct_.response.size()); } - void CServiceClientImpl::Register(const bool force_) + Registration::Sample CServiceClientImpl::GetRegistrationSample() { - if (!m_created) return; - if (m_service_name.empty()) return; + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_reg_client; - Registration::Sample sample; - sample.cmd_type = bct_reg_client; - auto& service_client = sample.client; + auto& service_client = ecal_reg_sample.client; service_client.version = m_client_version; service_client.hname = Process::GetHostName(); service_client.pname = Process::GetProcessName(); @@ -645,7 +648,6 @@ namespace eCAL service_client.sname = m_service_name; service_client.sid = m_service_id; - { const std::lock_guard lock(m_method_sync); @@ -655,61 +657,25 @@ namespace eCAL const auto& method_information = method_information_pair.second; Service::Method method; - method.mname = method_name; - method.req_type = method_information.request_type.name; - method.req_desc = method_information.request_type.descriptor; - method.resp_type = method_information.response_type.name; - method.resp_desc = method_information.response_type.descriptor; + method.mname = method_name; + method.req_type = method_information.request_type.name; + method.req_desc = method_information.request_type.descriptor; + method.resp_type = method_information.response_type.name; + method.resp_desc = method_information.response_type.descriptor; method.call_count = m_method_call_count_map.at(method_name); service_client.methods.push_back(method); } } - // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); - - // refresh connected services map - CheckForNewServices(); - - // check for disconnected services - { - std::lock_guard const lock(m_client_map_sync); - for (auto& client : m_client_map) - { - if (client.second->get_state() == eCAL::service::State::FAILED) - { - std::string const service_key = client.first; - - // is the service still in the connecting map ? - auto iter = m_connected_services_map.find(service_key); - if (iter != m_connected_services_map.end()) - { - // call disconnect event - std::lock_guard const lock_cb(m_event_callback_map_sync); - auto e_iter = m_event_callback_map.find(client_event_disconnected); - if (e_iter != m_event_callback_map.end()) - { - SClientEventCallbackData sdata; - sdata.type = client_event_disconnected; - sdata.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - sdata.attr = iter->second; - (e_iter->second)(m_service_name.c_str(), &sdata); - } - // remove service - m_connected_services_map.erase(iter); - } - } - } - } + return ecal_reg_sample; } - void CServiceClientImpl::Unregister() + Registration::Sample CServiceClientImpl::GetUnregistrationSample() { - if (m_service_name.empty()) return; + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_unreg_client; - Registration::Sample sample; - sample.cmd_type = bct_unreg_client; - auto& service_client = sample.client; + auto& service_client = ecal_reg_sample.client; service_client.hname = Process::GetHostName(); service_client.pname = Process::GetProcessName(); service_client.uname = Process::GetUnitName(); @@ -718,9 +684,24 @@ namespace eCAL service_client.sid = m_service_id; service_client.version = m_client_version; + return ecal_reg_sample; + } + + void CServiceClientImpl::Register(const bool force_) + { + if (!m_created) return; + if (m_service_name.empty()) return; + + // register entity + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + } + + void CServiceClientImpl::Unregister() + { + if (m_service_name.empty()) return; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); } void CServiceClientImpl::CheckForNewServices() @@ -763,6 +744,37 @@ namespace eCAL } } + void CServiceClientImpl::CheckForDisconnectedServices() + { + std::lock_guard const lock(m_client_map_sync); + for (auto& client : m_client_map) + { + if (client.second->get_state() == eCAL::service::State::FAILED) + { + std::string const service_key = client.first; + + // is the service still in the connecting map ? + auto iter = m_connected_services_map.find(service_key); + if (iter != m_connected_services_map.end()) + { + // call disconnect event + std::lock_guard const lock_cb(m_event_callback_map_sync); + auto e_iter = m_event_callback_map.find(client_event_disconnected); + if (e_iter != m_event_callback_map.end()) + { + SClientEventCallbackData sdata; + sdata.type = client_event_disconnected; + sdata.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + sdata.attr = iter->second; + (e_iter->second)(m_service_name.c_str(), &sdata); + } + // remove service + m_connected_services_map.erase(iter); + } + } + } + } + void CServiceClientImpl::ErrorCallback(const std::string& method_name_, const std::string& error_message_) { std::lock_guard const lock(m_response_callback_sync); diff --git a/ecal/core/src/service/ecal_service_client_impl.h b/ecal/core/src/service/ecal_service_client_impl.h index c1368c10f6..1c75b75ab3 100644 --- a/ecal/core/src/service/ecal_service_client_impl.h +++ b/ecal/core/src/service/ecal_service_client_impl.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,9 +25,11 @@ #include #include #include - #include +#include "serialization/ecal_serialize_sample_registration.h" +#include "serialization/ecal_struct_service.h" + #include #include #include @@ -78,7 +80,7 @@ namespace eCAL void RegisterService(const std::string& key_, const SServiceAttr& service_); // called by eCAL:CClientGate every second to update registration layer - void RefreshRegistration(); + Registration::Sample GetRegistration(); std::string GetServiceName() { return m_service_name; }; @@ -94,10 +96,14 @@ namespace eCAL static void fromSerializedProtobuf(const std::string& response_pb_, eCAL::SServiceResponse& response_); static void fromStruct(const Service::Response& response_struct_, eCAL::SServiceResponse& response_); + Registration::Sample GetRegistrationSample(); + Registration::Sample GetUnregistrationSample(); + void Register(bool force_); void Unregister(); void CheckForNewServices(); + void CheckForDisconnectedServices(); void ErrorCallback(const std::string &method_name_, const std::string &error_message_); @@ -128,7 +134,7 @@ namespace eCAL ServiceMethodInformationMapT m_method_information_map; using MethodCallCountMapT = std::map; - MethodCallCountMapT m_method_call_count_map; + MethodCallCountMapT m_method_call_count_map; std::atomic m_created; }; diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index bc35617942..7aab9c31f4 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -304,33 +304,32 @@ namespace eCAL } // called by the eCAL::CServiceGate to register a client - void CServiceServerImpl::RegisterClient(const std::string& /*key_*/, const SClientAttr& /*client_*/) // TODO: This function is empty, why does it exist???? + void CServiceServerImpl::RegisterClient(const std::string& /*key_*/, const SClientAttr& /*client_*/) { + // this function is just a placeholder to implement logic if a new client connects + // currently there is no need to do so } // called by eCAL:CServiceGate every second to update registration layer - void CServiceServerImpl::RefreshRegistration() + Registration::Sample CServiceServerImpl::GetRegistration() { - if (!m_created) return; - Register(false); + return GetRegistrationSample(); } - void CServiceServerImpl::Register(const bool force_) + Registration::Sample CServiceServerImpl::GetRegistrationSample() { - if (!m_created) return; - if (m_service_name.empty()) return; + // create registration sample + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_reg_service; // might be zero in contruction phase unsigned short const server_tcp_port_v0(m_tcp_server_v0 ? m_tcp_server_v0->get_port() : 0); - if ((Config::IsServiceProtocolV0Enabled()) && (server_tcp_port_v0 == 0)) return; + if ((Config::IsServiceProtocolV0Enabled()) && (server_tcp_port_v0 == 0)) return ecal_reg_sample; unsigned short const server_tcp_port_v1(m_tcp_server_v1 ? m_tcp_server_v1->get_port() : 0); - if ((Config::IsServiceProtocolV1Enabled()) && (server_tcp_port_v1 == 0)) return; + if ((Config::IsServiceProtocolV1Enabled()) && (server_tcp_port_v1 == 0)) return ecal_reg_sample; - // create service registration sample - Registration::Sample sample; - sample.cmd_type = bct_reg_service; - auto& service = sample.service; + auto& service = ecal_reg_sample.service; service.version = m_server_version; service.hname = Process::GetHostName(); service.pname = Process::GetProcessName(); @@ -357,28 +356,49 @@ namespace eCAL } } - // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); + return ecal_reg_sample; } - void CServiceServerImpl::Unregister() + Registration::Sample CServiceServerImpl::GetUnregistrationSample() { - if (m_service_name.empty()) return; + // create registration sample + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_unreg_service; + + auto& service = ecal_reg_sample.service; + service.version = m_server_version; + service.hname = Process::GetHostName(); + service.pname = Process::GetProcessName(); + service.uname = Process::GetUnitName(); + service.pid = Process::GetProcessID(); + service.sname = m_service_name; + service.sid = m_service_id; + + return ecal_reg_sample; + } - // create service registration sample - Registration::Sample sample; - sample.cmd_type = bct_unreg_service; - auto& service = sample.service; - service.version = m_server_version; - service.hname = Process::GetHostName(); - service.pname = Process::GetProcessName(); - service.uname = Process::GetUnitName(); - service.pid = Process::GetProcessID(); - service.sname = m_service_name; - service.sid = m_service_id; + void CServiceServerImpl::Register(bool force_) + { +#if ECAL_CORE_REGISTRATION + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_service_name + "::CServiceServerImpl::Register"); +#endif +#endif // ECAL_CORE_REGISTRATION + } - // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); + void CServiceServerImpl::Unregister() + { +#if ECAL_CORE_REGISTRATION + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug4, m_service_name + "::CServiceServerImpl::Unregister"); +#endif +#endif // ECAL_CORE_REGISTRATION } int CServiceServerImpl::RequestCallback(const std::string& request_pb_, std::string& response_pb_) diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index fec56d9071..22eef8a9ec 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,17 +26,17 @@ #include #include #include +#include + +#include "serialization/ecal_serialize_sample_registration.h" +#include "serialization/ecal_struct_service.h" #include #include #include #include - -#include #include -#include "serialization/ecal_struct_service.h" - namespace eCAL { /** @@ -80,7 +80,7 @@ namespace eCAL void RegisterClient(const std::string& key_, const SClientAttr& client_); // called by eCAL:CServiceGate every second to update registration layer - void RefreshRegistration(); + Registration::Sample GetRegistration(); std::string GetServiceName() { return m_service_name; }; @@ -88,6 +88,9 @@ namespace eCAL void Register(bool force_); void Unregister(); + Registration::Sample GetRegistrationSample(); + Registration::Sample GetUnregistrationSample(); + /** * @brief Calls the request callback based on the request and fills the response * diff --git a/ecal/core/src/service/ecal_servicegate.cpp b/ecal/core/src/service/ecal_servicegate.cpp index fe587ea929..1b275a5d50 100644 --- a/ecal/core/src/service/ecal_servicegate.cpp +++ b/ecal/core/src/service/ecal_servicegate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,15 +94,15 @@ namespace eCAL return(ret_state); } - void CServiceGate::RefreshRegistrations() + void CServiceGate::GetRegistrations(Registration::SampleList& reg_sample_list_) { if (!m_created) return; - // refresh service registrations + // read service registrations std::shared_lock const lock(m_service_set_sync); for (const auto& service_server_impl : m_service_set) { - service_server_impl->RefreshRegistration(); + reg_sample_list_.samples.emplace_back(service_server_impl->GetRegistration()); } } } diff --git a/ecal/core/src/service/ecal_servicegate.h b/ecal/core/src/service/ecal_servicegate.h index da3047f6a1..687c527e3a 100644 --- a/ecal/core/src/service/ecal_servicegate.h +++ b/ecal/core/src/service/ecal_servicegate.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ #pragma once #include "ecal_def.h" +#include "serialization/ecal_struct_sample_registration.h" #include #include @@ -45,7 +46,7 @@ namespace eCAL bool Register (CServiceServerImpl* service_); bool Unregister(CServiceServerImpl* service_); - void RefreshRegistrations(); + void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: static std::atomic m_created; diff --git a/ecal/tests/cpp/util_test/src/util_gettopics.cpp b/ecal/tests/cpp/util_test/src/util_gettopics.cpp index 3b3199e9da..495a9dcfea 100644 --- a/ecal/tests/cpp/util_test/src/util_gettopics.cpp +++ b/ecal/tests/cpp/util_test/src/util_gettopics.cpp @@ -36,6 +36,9 @@ TEST(core_cpp_util, GetTopics) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + std::map topic_info_map; // create and check a few pub/sub entities @@ -135,6 +138,9 @@ TEST(core_cpp_util, GetTopicsParallel) // initialize eCAL API eCAL::Initialize(0, nullptr, "core_cpp_util"); + // enable loop back communication in the same process + eCAL::Util::EnableLoopback(true); + auto create_publishers = [&]() { std::string topic_name = "Test.ParallelUtilFunctions"; std::atomic call_back_count{ 0 };